Amazon SageMaker Pipelines allows data scientists and machine learning (ML) engineers to automate training workflows, which helps you create a repeatable process to orchestrate model development steps for rapid experimentation and model retraining. You can automate the entire model build workflow, including data preparation, feature engineering, model training, model tuning, and model validation, and catalog it in the model registry. You can configure pipelines to run automatically at regular intervals or when certain events are triggered, or you can run them manually as needed.
In this post, we highlight some of the enhancements to the Amazon SageMaker SDK and introduce new features of Amazon SageMaker Pipelines that make it easier for ML practitioners to build and train ML models.
Pipelines continues to innovate its developer experience, and with these recent releases, you can now use the service in a more customized way:
-
2.99.0, 2.101.1, 2.102.0, 2.104.0 – Updated documentation on
PipelineVariable
usage for estimator, processor, tuner, transformer, and model base classes, Amazon models, and framework models. There will be additional changes coming with newer versions of the SDK to support all subclasses of estimators and processors.
-
2.90.0 – Availability of ModelStep for integrated model resource creation and registration tasks.
-
2.88.2 – Availability of PipelineSession for managed interaction with SageMaker entities and resources.
-
2.88.2 – Subclass compatibility for workflow pipeline job steps so you can build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
-
2.76.0 – Availability of FailStep to conditionally stop a pipeline with a failure status.
In this post, we walk you through a workflow using a sample dataset with a focus on model building and deployment to demonstrate how to implement Pipelines’s new features. By the end, you should have enough information to successfully use these newer features and simplify your ML workloads.
Features overview
Pipelines offers the following new features:
-
Pipeline variable annotation – Certain method parameters accept multiple input types, including
PipelineVariables
, and additional documentation has been added to clarify where PipelineVariables
are supported in both the latest stable version of SageMaker SDK documentation and the init signature of the functions. For example, in the following TensorFlow estimator, the init signature now shows that model_dir
and image_uri
support PipelineVariables
, whereas the other parameters do not. For more information, refer to TensorFlow Estimator.
- Before:
TensorFlow(
py_version=None,
framework_version=None,
model_dir=None,
image_uri=None,
distribution=None,
**kwargs,
)
- After:
TensorFlow(
py_version: Union[str, NoneType] = None,
framework_version: Union[str, NoneType] = None,
model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
distribution: Union[Dict[str, str], NoneType] = None,
compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
**kwargs,
)
-
Pipeline session – PipelineSession is a new concept introduced to bring unity across the SageMaker SDK and introduces lazy initialization of the pipeline resources (the run calls are captured but not run until the pipeline is created and run). The
PipelineSession
context inherits the SageMakerSession
and implements convenient methods for you to interact with other SageMaker entities and resources, such as training jobs, endpoints, and input datasets stored in Amazon Simple Storage Service (Amazon S3).
-
Subclass compatibility with workflow pipeline job steps – You can now build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
- For example, creating a processing step with
SKLearnProcessor
previously required the following:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
role=role,
)
step_process = ProcessingStep(
name="{pipeline-name}-process",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
- As we see in the preceding code,
ProcessingStep
needs to do basically the same preprocessing logic as .run
, just without initiating the API call to start the job. But with subclass compatibility now enabled with workflow pipeline job steps, we declare the step_args
argument that takes the preprocessing logic with .run so you can build a job abstraction and configure it as you would use it without Pipelines. We also pass in the pipeline_session
, which is a PipelineSession
object, instead of sagemaker_session
to make sure the run calls are captured but not called until the pipeline is created and run. See the following code:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
role=role,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)
-
Model step (a streamlined approach with model creation and registration steps) –Pipelines offers two step types to integrate with SageMaker models:
CreateModelStep
and RegisterModel
. You can now achieve both using only the ModelStep
type. Note that a PipelineSession
is required to achieve this. This brings similarity between the pipeline steps and the SDK.
- Before:
step_register = RegisterModel(
name="ChurnRegisterModel",
estimator=xgb_custom_estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
-
- After:
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
-
Fail step (conditional stop of the pipeline run) –
FailStep
allows a pipeline to be stopped with a failure status if a condition is met, such as if the model score is below a certain threshold.
Solution overview
In this solution, your entry point is the Amazon SageMaker Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from within Studio, refer to View, Track, and Execute SageMaker Pipelines in SageMaker Studio.
The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train and generate inferences using the new features.
The pipeline includes the following steps:
- Preprocess data to build features required and split data into train, validation, and test datasets.
- Create a training job with the SageMaker XGBoost framework.
- Evaluate the trained model using the test dataset.
- Check if the AUC score is above a predefined threshold.
- If the AUC score is less than the threshold, stop the pipeline run and mark it as failed.
- If the AUC score is greater than the threshold, create a SageMaker model and register it in the SageMaker model registry.
- Apply batch transform on the given dataset using the model created in the previous step.
Prerequisites
To follow along with this post, you need an AWS account with a Studio domain.
Pipelines is integrated directly with SageMaker entities and resources, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because it’s a fully managed service, which means that it creates and manages resources for you. For more information on the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker product page.
Before getting started, install SageMaker SDK version >= 2.104.0 and xlrd >=1.0.0 within the Studio notebook using the following code snippet:
print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
import sagemaker
ML workflow
For this post, you use the following components:
-
Data preparation
-
SageMaker Processing – SageMaker Processing is a fully managed service allowing you to run custom data transformations and feature engineering for ML workloads.
-
Model building
-
Model training and evaluation
-
One-click training – The SageMaker distributed training feature. SageMaker provides distributed training libraries for data parallelism and model parallelism. The libraries are optimized for the SageMaker training environment, help adapt your distributed training jobs to SageMaker, and improve training speed and throughput.
-
SageMaker Experiments – Experiments is a capability of SageMaker that lets you organize, track, compare, and evaluate your ML iterations.
-
SageMaker batch transform – Batch transform or offline scoring is a managed service in SageMaker that lets you predict on a larger dataset using your ML models.
-
Workflow orchestration
A SageMaker pipeline is a series of interconnected steps defined by a JSON pipeline definition. It encodes a pipeline using a directed acyclic graph (DAG). The DAG gives information on the requirements for and relationships between each step of the pipeline, and its structure is determined by the data dependencies between steps. These dependencies are created when the properties of a step’s output are passed as the input to another step.
The following diagram illustrates the different steps in the SageMaker pipeline (for a churn prediction use case) where the connections between the steps are inferred by SageMaker based on the inputs and outputs defined by the step definitions.
The next sections walk through creating each step of the pipeline and running the entire pipeline once created.
Project structure
Let’s start with the project structure:
-
/sm-pipelines-end-to-end-example – The project name
-
/data – The datasets
-
/pipelines – The code files for pipeline components
- /customerchurn
- preprocess.py
- evaluate.py
-
sagemaker-pipelines-project.ipynb – A notebook walking through the modeling workflow using Pipelines’s new features
Download the dataset
To follow along with this post, you need to download and save the sample dataset under the data folder within the project home directory, which saves the file in Amazon Elastic File System (Amazon EFS) within the Studio environment.
Build the pipeline components
Now you’re ready to build the pipeline components.
Import statements and declare parameters and constants
Create a Studio notebook called sagemaker-pipelines-project.ipynb
within the project home directory. Enter the following code block in a cell, and run the cell to set up SageMaker and S3 client objects, create PipelineSession
, and set up the S3 bucket location using the default bucket that comes with a SageMaker session:
import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"
Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the modules available under the sagemaker.workflow.parameters
module, such as ParameterInteger
, ParameterFloat
, and ParameterString
, to specify pipeline parameters of various data types. Run the following code to set up multiple input parameters:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
input_data = ParameterString(
name="InputData",
default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
Generate a batch dataset
Generate the batch dataset, which you use later in the batch transform step:
def preprocess_batch_data(file_path):
df = pd.read_csv(file_path)
## Convert to datetime columns
df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
## Drop Rows with null values
df = df.dropna()
## Create Column which gives the days between the last order and the first order
df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
## Create Column which gives the days between when the customer record was created and the first order
df['created'] = pd.to_datetime(df['created'])
df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
## Drop Columns
df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
## Apply one hot encoding on favday and city columns
df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
return df
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)
Upload data to an S3 bucket
Upload the datasets to Amazon S3:
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
Define a processing script and processing step
In this step, you prepare a Python script to do feature engineering, one hot encoding, and curate the training, validation, and test splits to be used for model building. Run the following code to build your processing script:
%%writefile pipelines/customerchurn/preprocess.py
import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
base_dir = "/opt/ml/processing"
#Read Data
df = pd.read_csv(
f"{base_dir}/input/storedata_total.csv"
)
# convert created column to datetime
df["created"] = pd.to_datetime(df["created"])
#Convert firstorder and lastorder to datetime datatype
df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
#Drop Rows with Null Values
df = df.dropna()
#Create column which gives the days between the last order and the first order
df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
#Create column which gives the days between the customer record was created and the first order
df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
#Drop columns
df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
#Apply one hot encoding on favday and city columns
df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
# Split into train, validation and test datasets
y = df.pop("retained")
X_pre = df
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
np.random.shuffle(X)
# Split in Train, Test and Validation Datasets
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
train_rows = np.shape(train)[0]
validation_rows = np.shape(validation)[0]
test_rows = np.shape(test)[0]
train = pd.DataFrame(train)
test = pd.DataFrame(test)
validation = pd.DataFrame(validation)
# Convert the label column to integer
train[0] = train[0].astype(int)
test[0] = test[0].astype(int)
validation[0] = validation[0].astype(int)
# Save the Dataframes as csv files
train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
Next, run the following code block to instantiate the processor and the Pipelines step to run the processing script. Because the processing script is written in Pandas, you use a SKLearnProcessor. The Pipelines ProcessingStep
function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-churn-process",
role=role,
sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
destination=f"s3://{default_bucket}/output/train" ),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
destination=f"s3://{default_bucket}/output/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
destination=f"s3://{default_bucket}/output/test")
],
code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)
Define a training step
Set up model training using a SageMaker XGBoost estimator and the Pipelines TrainingStep
function:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path=model_path,
role=role,
sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
)
train_args = xgb_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="ChurnModelTrain",
step_args=train_args,
)
Define the evaluation script and model evaluation step
Run the following code block to evaluate the model once trained. This script encapsulates the logic to check if the AUC score meets the specified threshold.
%%writefile pipelines/customerchurn/evaluate.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":
#Read Model Tar File
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path=".")
model = pickle.load(open("xgboost-model", "rb"))
#Read Test Data using which we evaluate the model
test_path = "/opt/ml/processing/test/test.csv"
df = pd.read_csv(test_path, header=None)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = xgboost.DMatrix(df.values)
#Run Predictions
predictions = model.predict(X_test)
#Evaluate Predictions
fpr, tpr, thresholds = roc_curve(y_test, predictions)
auc_score = auc(fpr, tpr)
report_dict = {
"classification_metrics": {
"auc_score": {
"value": auc_score,
},
},
}
#Save Evaluation Report
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
Next, run the following code block to instantiate the processor and the Pipelines step to run the evaluation script. Because the evaluation script uses the XGBoost package, you use a ScriptProcessor
along with the XGBoost image. The Pipelines ProcessingStep
function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-churn-eval",
role=role,
sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
destination=f"s3://{default_bucket}/output/evaluation"),
],
code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
name="ChurnEvalModel",
step_args=eval_args,
property_files=[evaluation_report],
)
Define a create model step
Run the following code block to create a SageMaker model using the Pipelines model step. This step utilizes the output of the training step to package the model for deployment. Note that the value for the instance type argument is passed using the Pipelines parameter you defined earlier in the post.
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
step_create_model = ModelStep(
name="ChurnCreateModel",
step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)
Define a batch transform step
Run the following code block to run batch transformation using the trained model with the batch input created in the first step:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/ChurnTransform",
sagemaker_session=pipeline_session
)
step_transform = TransformStep(
name="ChurnTransform",
step_args=transformer.transform(
data=batch_data,
content_type="text/csv"
)
)
Define a register model step
The following code registers the model within the SageMaker model registry using the Pipelines model step:
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json",
)
)
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
Define a fail step to stop the pipeline
The following code defines the Pipelines fail step to stop the pipeline run with an error message if the AUC score doesn’t meet the defined threshold:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
name="ChurnAUCScoreFail",
error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
)
Define a condition step to check AUC score
The following code defines a condition step to check the AUC score and conditionally create a model and run a batch transformation and register a model in the model registry, or stop the pipeline run in a failed state:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="classification_metrics.auc_score.value",
),
right=auc_score_threshold,
)
step_cond = ConditionStep(
name="CheckAUCScoreChurnEvaluation",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_transform],
else_steps=[step_fail],
)
Build and run the pipeline
After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.
import json
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
processing_instance_type,
training_instance_type,
model_approval_status,
input_data,
batch_data,
auc_score_threshold,
],
steps=[step_process, step_train, step_eval, step_cond],
)
definition = json.loads(pipeline.definition())
print(definition)
Run the following code in a cell in your notebook. If the pipeline already exists, the code updates the pipeline. If the pipeline doesn’t exist, it creates a new one.
pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution
Conclusion
In this post, we introduced some of the new features now available with Pipelines along with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy a model for churn prediction. The solution can be extended with additional data sources
to implement your own ML workflow. For more details on the steps available in the Pipelines workflow, refer to Amazon SageMaker Model Building Pipeline and SageMaker Workflows. The AWS SageMaker Examples GitHub repo has more examples around various use cases using Pipelines.
About the Authors
Jerry Peng is a software development engineer with AWS SageMaker. He focuses on building end-to-end large-scale MLOps system from training to model monitoring in production. He is also passionate about bringing the concept of MLOps to broader audience.
Dewen Qi is a Software Development Engineer in AWS. She currently focuses on developing and improving SageMaker Pipelines. Outside of work, she enjoys practicing Cello.
Gayatri Ghanakota is a Sr. Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.
Rupinder Grewal is a Sr Ai/ML Specialist Solutions Architect with AWS. He currently focuses on serving of models and MLOps on SageMaker. Prior to this role he has worked as Machine Learning Engineer building and hosting models. Outside of work he enjoys playing tennis and biking on mountain trails.
Ray Li is a Sr. Data Scientist with AWS Professional Services. His specialty focuses on building and operationalizing AI/ML solutions for customers of varying sizes, ranging from startups to enterprise organizations. Outside of work, Ray enjoys fitness and traveling.
Read More