AWS Machine Learning Blog
New features for Amazon SageMaker Pipelines and the Amazon SageMaker SDK
Amazon SageMaker Pipelines allows data scientists and machine learning (ML) engineers to automate training workflows, which helps you create a repeatable process to orchestrate model development steps for rapid experimentation and model retraining. You can automate the entire model build workflow, including data preparation, feature engineering, model training, model tuning, and model validation, and catalog it in the model registry. You can configure pipelines to run automatically at regular intervals or when certain events are triggered, or you can run them manually as needed.
In this post, we highlight some of the enhancements to the Amazon SageMaker SDK and introduce new features of Amazon SageMaker Pipelines that make it easier for ML practitioners to build and train ML models.
Pipelines continues to innovate its developer experience, and with these recent releases, you can now use the service in a more customized way:
- 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Updated documentation on
PipelineVariable
usage for estimator, processor, tuner, transformer, and model base classes, Amazon models, and framework models. There will be additional changes coming with newer versions of the SDK to support all subclasses of estimators and processors. - 2.90.0 – Availability of ModelStep for integrated model resource creation and registration tasks.
- 2.88.2 – Availability of PipelineSession for managed interaction with SageMaker entities and resources.
- 2.88.2 – Subclass compatibility for workflow pipeline job steps so you can build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
- 2.76.0 – Availability of FailStep to conditionally stop a pipeline with a failure status.
In this post, we walk you through a workflow using a sample dataset with a focus on model building and deployment to demonstrate how to implement Pipelines’s new features. By the end, you should have enough information to successfully use these newer features and simplify your ML workloads.
Features overview
Pipelines offers the following new features:
- Pipeline variable annotation – Certain method parameters accept multiple input types, including
PipelineVariables
, and additional documentation has been added to clarify wherePipelineVariables
are supported in both the latest stable version of SageMaker SDK documentation and the init signature of the functions. For example, in the following TensorFlow estimator, the init signature now shows thatmodel_dir
andimage_uri
supportPipelineVariables
, whereas the other parameters do not. For more information, refer to TensorFlow Estimator.- Before:
- After:
- Pipeline session – PipelineSession is a new concept introduced to bring unity across the SageMaker SDK and introduces lazy initialization of the pipeline resources (the run calls are captured but not run until the pipeline is created and run). The
PipelineSession
context inherits theSageMakerSession
and implements convenient methods for you to interact with other SageMaker entities and resources, such as training jobs, endpoints, and input datasets stored in Amazon Simple Storage Service (Amazon S3). - Subclass compatibility with workflow pipeline job steps – You can now build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
- For example, creating a processing step with
SKLearnProcessor
previously required the following: - As we see in the preceding code,
ProcessingStep
needs to do basically the same preprocessing logic as.run
, just without initiating the API call to start the job. But with subclass compatibility now enabled with workflow pipeline job steps, we declare thestep_args
argument that takes the preprocessing logic with .run so you can build a job abstraction and configure it as you would use it without Pipelines. We also pass in thepipeline_session
, which is aPipelineSession
object, instead ofsagemaker_session
to make sure the run calls are captured but not called until the pipeline is created and run. See the following code:
- For example, creating a processing step with
- Model step (a streamlined approach with model creation and registration steps) –Pipelines offers two step types to integrate with SageMaker models:
CreateModelStep
andRegisterModel
. You can now achieve both using only theModelStep
type. Note that aPipelineSession
is required to achieve this. This brings similarity between the pipeline steps and the SDK.- Before:
-
- After:
- Fail step (conditional stop of the pipeline run) –
FailStep
allows a pipeline to be stopped with a failure status if a condition is met, such as if the model score is below a certain threshold.
Solution overview
In this solution, your entry point is the Amazon SageMaker Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from within Studio, refer to View, Track, and Execute SageMaker Pipelines in SageMaker Studio.
The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train and generate inferences using the new features.
The pipeline includes the following steps:
- Preprocess data to build features required and split data into train, validation, and test datasets.
- Create a training job with the SageMaker XGBoost framework.
- Evaluate the trained model using the test dataset.
- Check if the AUC score is above a predefined threshold.
- If the AUC score is less than the threshold, stop the pipeline run and mark it as failed.
- If the AUC score is greater than the threshold, create a SageMaker model and register it in the SageMaker model registry.
- Apply batch transform on the given dataset using the model created in the previous step.
Prerequisites
To follow along with this post, you need an AWS account with a Studio domain.
Pipelines is integrated directly with SageMaker entities and resources, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because it’s a fully managed service, which means that it creates and manages resources for you. For more information on the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker product page.
Before getting started, install SageMaker SDK version >= 2.104.0 and xlrd >=1.0.0 within the Studio notebook using the following code snippet:
ML workflow
For this post, you use the following components:
- Data preparation
- SageMaker Processing – SageMaker Processing is a fully managed service allowing you to run custom data transformations and feature engineering for ML workloads.
- Model building
- Studio notebooks – One-click notebooks with elastic compute.
- SageMaker built-in algorithms – XGBoost as a built-in algorithm.
- Model training and evaluation
- One-click training – The SageMaker distributed training feature. SageMaker provides distributed training libraries for data parallelism and model parallelism. The libraries are optimized for the SageMaker training environment, help adapt your distributed training jobs to SageMaker, and improve training speed and throughput.
- SageMaker Experiments – Experiments is a capability of SageMaker that lets you organize, track, compare, and evaluate your ML iterations.
- SageMaker batch transform – Batch transform or offline scoring is a managed service in SageMaker that lets you predict on a larger dataset using your ML models.
- Workflow orchestration
- SageMaker Pipelines – ML workflow orchestration and automation
A SageMaker pipeline is a series of interconnected steps defined by a JSON pipeline definition. It encodes a pipeline using a directed acyclic graph (DAG). The DAG gives information on the requirements for and relationships between each step of the pipeline, and its structure is determined by the data dependencies between steps. These dependencies are created when the properties of a step’s output are passed as the input to another step.
The following diagram illustrates the different steps in the SageMaker pipeline (for a churn prediction use case) where the connections between the steps are inferred by SageMaker based on the inputs and outputs defined by the step definitions.
The next sections walk through creating each step of the pipeline and running the entire pipeline once created.
Project structure
Let’s start with the project structure:
- /sm-pipelines-end-to-end-example – The project name
- /data – The datasets
- /pipelines – The code files for pipeline components
- /customerchurn
- preprocess.py
- evaluate.py
- /customerchurn
- sagemaker-pipelines-project.ipynb – A notebook walking through the modeling workflow using Pipelines’s new features
Download the dataset
To follow along with this post, you need to download and save the sample dataset under the data folder within the project home directory, which saves the file in Amazon Elastic File System (Amazon EFS) within the Studio environment.
Build the pipeline components
Now you’re ready to build the pipeline components.
Import statements and declare parameters and constants
Create a Studio notebook called sagemaker-pipelines-project.ipynb
within the project home directory. Enter the following code block in a cell, and run the cell to set up SageMaker and S3 client objects, create PipelineSession
, and set up the S3 bucket location using the default bucket that comes with a SageMaker session:
Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the modules available under the sagemaker.workflow.parameters
module, such as ParameterInteger
, ParameterFloat
, and ParameterString
, to specify pipeline parameters of various data types. Run the following code to set up multiple input parameters:
Generate a batch dataset
Generate the batch dataset, which you use later in the batch transform step:
Upload data to an S3 bucket
Upload the datasets to Amazon S3:
Define a processing script and processing step
In this step, you prepare a Python script to do feature engineering, one hot encoding, and curate the training, validation, and test splits to be used for model building. Run the following code to build your processing script:
Next, run the following code block to instantiate the processor and the Pipelines step to run the processing script. Because the processing script is written in Pandas, you use a SKLearnProcessor. The Pipelines ProcessingStep
function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
Define a training step
Set up model training using a SageMaker XGBoost estimator and the Pipelines TrainingStep
function:
Define the evaluation script and model evaluation step
Run the following code block to evaluate the model once trained. This script encapsulates the logic to check if the AUC score meets the specified threshold.
Next, run the following code block to instantiate the processor and the Pipelines step to run the evaluation script. Because the evaluation script uses the XGBoost package, you use a ScriptProcessor
along with the XGBoost image. The Pipelines ProcessingStep
function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
Define a create model step
Run the following code block to create a SageMaker model using the Pipelines model step. This step utilizes the output of the training step to package the model for deployment. Note that the value for the instance type argument is passed using the Pipelines parameter you defined earlier in the post.
Define a batch transform step
Run the following code block to run batch transformation using the trained model with the batch input created in the first step:
Define a register model step
The following code registers the model within the SageMaker model registry using the Pipelines model step:
Define a fail step to stop the pipeline
The following code defines the Pipelines fail step to stop the pipeline run with an error message if the AUC score doesn’t meet the defined threshold:
Define a condition step to check AUC score
The following code defines a condition step to check the AUC score and conditionally create a model and run a batch transformation and register a model in the model registry, or stop the pipeline run in a failed state:
Build and run the pipeline
After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.
Run the following code in a cell in your notebook. If the pipeline already exists, the code updates the pipeline. If the pipeline doesn’t exist, it creates a new one.
Conclusion
In this post, we introduced some of the new features now available with Pipelines along with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy a model for churn prediction. The solution can be extended with additional data sources
to implement your own ML workflow. For more details on the steps available in the Pipelines workflow, refer to Amazon SageMaker Model Building Pipeline and SageMaker Workflows. The AWS SageMaker Examples GitHub repo has more examples around various use cases using Pipelines.
About the Authors
Jerry Peng is a software development engineer with AWS SageMaker. He focuses on building end-to-end large-scale MLOps system from training to model monitoring in production. He is also passionate about bringing the concept of MLOps to broader audience.
Dewen Qi is a Software Development Engineer in AWS. She currently focuses on developing and improving SageMaker Pipelines. Outside of work, she enjoys practicing Cello.
Gayatri Ghanakota is a Sr. Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.
Rupinder Grewal is a Sr Ai/ML Specialist Solutions Architect with AWS. He currently focuses on serving of models and MLOps on SageMaker. Prior to this role he has worked as Machine Learning Engineer building and hosting models. Outside of work he enjoys playing tennis and biking on mountain trails.
Ray Li is a Sr. Data Scientist with AWS Professional Services. His specialty focuses on building and operationalizing AI/ML solutions for customers of varying sizes, ranging from startups to enterprise organizations. Outside of work, Ray enjoys fitness and traveling.