Build a CI/CD pipeline for deploying custom machine learning models using AWS services

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the ML process to make it easier to develop high-quality ML artifacts. AWS Serverless Application Model (AWS SAM) is an open-source framework for building serverless applications. It provides shorthand syntax to express functions, APIs, databases, event source mappings, steps in AWS Step Functions, and more.

Generally, ML workflows orchestrate and automate sequences of ML tasks. A workflow includes data collection, training, testing, human evaluation of the ML model, and deployment of the models for inference.

For continuous integration and continuous delivery (CI/CD) pipelines, AWS recently released Amazon SageMaker Pipelines, the first purpose-built, easy-to-use CI/CD service for ML. Pipelines is a native workflow orchestration tool for building ML pipelines that takes advantage of direct SageMaker integration. For more information, see Building, automating, managing, and scaling ML workflows using Amazon SageMaker Pipelines.

In this post, I show you an extensible way to automate and deploy custom ML models using service integrations between Amazon SageMaker, Step Functions, and AWS SAM using a CI/CD pipeline.

To build this pipeline, you also need to be familiar with the following AWS services:

  • AWS CodeBuild – A fully managed continuous integration service that compiles source code, runs tests, and produces software packages that are ready to deploy
  • AWS CodePipeline – A fully managed continuous delivery service that helps you automate your release pipelines
  • Amazon Elastic Container Registry (Amazon ECR) – A container registry
  • AWS Lambda – A service that lets you run code without provisioning or managing servers. You pay only for the compute time you consume
  • Amazon Simple Storage Service (Amazon S3) – An object storage service that offers industry-leading scalability, data availability, security, and performance
  • AWS Step Functions – A serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services

Solution overview

The solution has two main sections:

  • Use AWS SAM to create a Step Functions workflow with SageMaker – Step Functions recently announced native service integrations with SageMaker. You can use this feature to train ML models, deploy ML models, test results, and expose an inference endpoint. This feature also provides a way to wait for human approval before the state transitions can progress towards the final ML model inference endpoint’s configuration and deployment.
  • Deploy the model with a CI/CD pipeline – One of the requirements of SageMaker is that the source code of custom models needs to be stored as a Docker image in an image registry such as Amazon ECR. SageMaker then references this Docker image for training and inference. For this post, we create a CI/CD pipeline using CodePipeline and CodeBuild to build, tag, and upload the Docker image to Amazon ECR and then start the Step Functions workflow to train and deploy the custom ML model on SageMaker, which references this tagged Docker image.

The following diagram describes the general overview of the MLOps CI/CD pipeline.

The workflow includes the following steps:

  1. The data scientist works on developing custom ML model code using their local notebook or a SageMaker notebook. They commit and push changes to a source code repository.
  2. A webhook on the code repository triggers a CodePipeline build in the AWS Cloud.
  3. CodePipeline downloads the source code and starts the build process.
  4. CodeBuild downloads the necessary source files and starts running commands to build and tag a local Docker container image.
  5. CodeBuild pushes the container image to Amazon ECR. The container image is tagged with a unique label derived from the repository commit hash.
  6. CodePipeline invokes Step Functions and passes the container image URI and the unique container image tag as parameters to Step Functions.
  7. Step Functions starts a workflow by initially calling the SageMaker training job and passing the necessary parameters.
  8. SageMaker downloads the necessary container image and starts the training job. When the job is complete, Step Functions directs SageMaker to create a model and store the model in the S3 bucket.
  9. Step Functions starts a SageMaker batch transform job on the test data provided in the S3 bucket.
  10. When the batch transform job is complete, Step Functions sends an email to the user using Amazon Simple Notification Service (Amazon SNS). This email includes the details of the batch transform job and links to the test data prediction outcome stored in the S3 bucket. After sending the email, Step Function enters a manual wait phase.
  11. The email sent by Amazon SNS has links to either accept or reject the test results. The recipient can manually look at the test data prediction outcomes in the S3 bucket. If they’re not satisfied with the results, they can reject the changes to cancel the Step Functions workflow.
  12. If the recipient accepts the changes, an Amazon API Gateway endpoint invokes a Lambda function with an embedded token that references the waiting Step Functions step.
  13. The Lambda function calls Step Functions to continue the workflow.
  14. Step Functions resumes the workflow.
  15. Step Functions creates a SageMaker endpoint config and a SageMaker inference endpoint.
  16. When the workflow is successful, Step Functions sends an email with a link to the final SageMaker inference endpoint.

Use AWS SAM to create a Step Functions workflow with SageMaker

In this first section, you visualize the Step Functions ML workflow easily in Visual Studio Code and deploy it to the AWS environment using AWS SAM. You use some of the new features and service integrations such as support in AWS SAM for AWS Step Functions, native support in Step Functions for SageMaker integrations, and support in Step Functions to visualize workflows directly in VS Code.

Prerequisites

Before getting started, make sure you complete the following prerequisites:

Deploy the application template

To get started, follow the instructions on GitHub to complete the application setup. Alternatively, you can switch to the terminal and enter the following command:

git clone https://github.com/aws-samples/sam-sf-sagemaker-workflow.git

The directory structure should be as follows:

. sam-sf-sagemaker-workflow
|– cfn
|—- sam-template.yaml
| — functions
| —- api_sagemaker_endpoint
| —- create_and_email_accept_reject_links
| —- respond_to_links
| —- update_sagemakerEndpoint_API
| — scripts
| — statemachine
| —- mlops.asl.json

The code has been broken down into subfolders with the main AWS SAM template residing in path cfn/sam-template.yaml.

The Step Functions workflows are stored in the folder statemachine/mlops.asl.json, and any other Lambda functions used are stored in functions folder.

To start with the AWS SAM template, run the following bash scripts from the root folder:

#Create S3 buckets if required before executing the commands.
S3_BUCKET=bucket-mlops #bucket to store AWS SAM template
S3_BUCKET_MODEL=ml-models   #bucket to store ML models
STACK_NAME=sam-sf-sagemaker-workflow   #Name of the AWS SAM stack
sam build  -t cfn/sam-template.yaml    #AWS SAM build 
sam deploy --template-file .aws-sam/build/template.yaml 
--stack-name ${STACK_NAME} --force-upload 
--s3-bucket ${S3_BUCKET} --s3-prefix sam 
--parameter-overrides S3ModelBucket=${S3_BUCKET_MODEL} 
--capabilities CAPABILITY_IAM

The sam build command builds all the functions and creates the final AWS CloudFormation template. The sam deploy command uploads the necessary files to the S3 bucket and starts creating or updating the CloudFormation template to create the necessary AWS infrastructure.

When the template has finished successfully, go to the CloudFormation console. On the Outputs tab, copy the MLOpsStateMachineArn value to use later.

The following diagram shows the workflow carried out in Step Functions, using VS Code integrations with Step Functions.

The following JSON based snippet of Amazon States Language describes the workflow visualized in the preceding diagram.

{
    "Comment": "This Step Function starts machine learning pipeline, once the custom model has been uploaded to ECR. Two parameters are expected by Step Functions are git commitID and the sagemaker ECR custom container URI",
    "StartAt": "SageMaker Create Training Job",
    "States": {
        "SageMaker Create Training Job": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
            "Parameters": {
                "TrainingJobName.$": "$.commitID",
                "ResourceConfig": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.c4.2xlarge",
                    "VolumeSizeInGB": 20
                },
                "HyperParameters": {
                    "mode": "batch_skipgram",
                    "epochs": "5",
                    "min_count": "5",
                    "sampling_threshold": "0.0001",
                    "learning_rate": "0.025",
                    "window_size": "5",
                    "vector_dim": "300",
                    "negative_samples": "5",
                    "batch_size": "11"
                },
                "AlgorithmSpecification": {
                    "TrainingImage.$": "$.imageUri",
                    "TrainingInputMode": "File"
                },
                "OutputDataConfig": {
                    "S3OutputPath": "s3://${S3ModelBucket}/output"
                },
                "StoppingCondition": {
                    "MaxRuntimeInSeconds": 100000
                },
                "RoleArn": "${SagemakerRoleArn}",
                "InputDataConfig": [
                    {
                        "ChannelName": "training",
                        "DataSource": {
                            "S3DataSource": {
                                "S3DataType": "S3Prefix",
                                "S3Uri": "s3://${S3ModelBucket}/iris.csv",
                                "S3DataDistributionType": "FullyReplicated"
                            }
                        }
                    }
                ]
            },
            "Retry": [
                {
                    "ErrorEquals": [
                        "SageMaker.AmazonSageMakerException"
                    ],
                    "IntervalSeconds": 1,
                    "MaxAttempts": 1,
                    "BackoffRate": 1.1
                },
                {
                    "ErrorEquals": [
                        "SageMaker.ResourceLimitExceededException"
                    ],
                    "IntervalSeconds": 60,
                    "MaxAttempts": 1,
                    "BackoffRate": 1
                }
            ],
            "Catch": [
                {
                    "ErrorEquals": [
                        "States.ALL"
                    ],
                    "ResultPath": "$.cause",
                    "Next": "FailState"
                }
            ],
            "Next": "SageMaker Create Model"
        },
        "SageMaker Create Model": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createModel",
            "Parameters": {
                "ExecutionRoleArn": "${SagemakerRoleArn}",
                "ModelName.$": "$.TrainingJobName",
                "PrimaryContainer": {
                    "ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts",
                    "Image.$": "$.AlgorithmSpecification.TrainingImage"
                }
            },
            "ResultPath": "$.taskresult",
            "Next": "SageMaker Create Transform Job",
            "Catch": [
                {
                "ErrorEquals": ["States.ALL" ],
                "Next": "FailState"
                }
            ]
        },
        "SageMaker Create Transform Job": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
            "Parameters": {
                "ModelName.$": "$.TrainingJobName",
                "TransformInput": {
                    "SplitType": "Line",
                    "CompressionType": "None",
                    "ContentType": "text/csv",
                    "DataSource": {
                        "S3DataSource": {
                            "S3DataType": "S3Prefix",
                            "S3Uri": "s3://${S3ModelBucket}/iris.csv"
                        }
                    }
                },
                "TransformOutput": {
                    "S3OutputPath.$": "States.Format('s3://${S3ModelBucket}/transform_output/{}/iris.csv', $.TrainingJobName)" ,
                    "AssembleWith": "Line",
                    "Accept": "text/csv"
                },
                "DataProcessing": {
                    "InputFilter": "$[1:]"
                },
                "TransformResources": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.m4.xlarge"
                },
                "TransformJobName.$": "$.TrainingJobName"
            },
            "ResultPath": "$.result",
            "Next": "Send Approve/Reject Email Request",
            "Catch": [
                {
                "ErrorEquals": [
                    "States.ALL"
                ],
                "Next": "FailState"
                }
            ]
        },
        "Send Approve/Reject Email Request": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
            "Parameters": {
                "FunctionName": "${CreateAndEmailLinkFnName}",
                "Payload": {
                    "token.$":"$$.Task.Token",
                    "s3_batch_output.$":"$.result.TransformOutput.S3OutputPath"                                      
                }
            },
            "ResultPath": "$.output",
            "Next": "Sagemaker Create Endpoint Config",
            "Catch": [
                {
                    "ErrorEquals": [ "rejected" ],
                    "ResultPath": "$.output",
                    "Next": "FailState"
                }
            ]
            
            
        },
        "Sagemaker Create Endpoint Config": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createEndpointConfig",
            "Parameters": {
                "EndpointConfigName.$": "$.TrainingJobName",
                "ProductionVariants": [
                    {
                        "InitialInstanceCount": 1,
                        "InitialVariantWeight": 1,
                        "InstanceType": "ml.t2.medium",
                        "ModelName.$": "$.TrainingJobName",
                        "VariantName": "AllTraffic"
                    }
                ]
            },
            "ResultPath": "$.result",
            "Next": "Sagemaker Create Endpoint",
            "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "FailState"
                }
              ]
        },
        "Sagemaker Create Endpoint": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createEndpoint",
            "Parameters": {
                "EndpointName.$": "$.TrainingJobName",
                "EndpointConfigName.$": "$.TrainingJobName"
            },            
            "Next": "Send Email With API Endpoint",
            "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "FailState"
                }
              ]
        },
        "Send Email With API Endpoint": {
            "Type": "Task",
            "Resource": "${UpdateSagemakerEndpointAPI}",
            "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "FailState"
                }
              ],
             "Next": "SuccessState"
        },
        "SuccessState": {
            "Type": "Succeed"            
        },
        "FailState": {
            "Type": "Fail"          
        }               
                
    }
}

Step Functions process to create the SageMaker workflow

In this section, we discuss the detailed steps involved in creating the SageMaker workflow using Step Functions.

Step Functions uses the commit ID passed by CodePipeline as a unique identifier to create a SageMaker training job. The training job can sometimes take a long time to complete; to wait for the job, you use .sync while specifying the resource section of the SageMaker training job.

When the training job is complete, Step Functions creates a model and saves the model in an S3 bucket.

Step Functions then uses a batch transform step to evaluate and test the model, based on batch data initially provided by the data scientist in an S3 bucket. When the evaluation step is complete, the output is stored in an S3 bucket.

Step Functions then enters a manual approval stage. To create this state, you use callback URLs. To implement this state in Step Functions, use .waitForTaskToken while calling a Lambda resource and pass a token to the Lambda function.

The Lambda function uses Amazon SNS or Amazon Simple Email Service (Amazon SES) to send an email to the subscribed party. You need to add your email address to the SNS topic to receive the accept/reject email while testing.

You receive an email, as in the following screenshot, with links to the data stored in the S3 bucket. This data has been batch transformed using the custom ML model created in the earlier step by SageMaker. You can choose Accept or Reject based on your findings.

If you choose Reject, Step Functions stops running the workflow. If you’re satisfied with the results, choose Accept, which triggers the API link. This link passes the embedded token and type to the API Gateway or Lambda endpoint as request parameters to progress to the next Step Functions step.

See the following Python code:

import json
import boto3
sf = boto3.client('stepfunctions')
def lambda_handler(event, context):
    type= event.get('queryStringParameters').get('type')
    token= event.get('queryStringParameters').get('token')    
    
    if type =='success':
        sf.send_task_success(
        taskToken=token,
        output="{}"
    )
    else:
        sf.send_task_failure(
        taskToken=token
        
    )

    

    return {
        'statusCode': 200,
        'body': json.dumps('Responded to Step Function')
    }

Step Functions then creates the final unique SageMaker endpoint configuration and inference endpoint. You can achieve this in Lambda code using special resource values, as shown in the following screenshot.

When the SageMaker endpoint is ready, an email is sent to the subscriber with a link to the API of the SageMaker inference endpoint.

Deploy the model with a CI/CD pipeline

In this section, you use the CI/CD pipeline to deploy a custom ML model.

The pipeline starts its run as soon as it detects updates to the source code of the custom model. The pipeline downloads the source code from the repository, builds and tags the Docker image, and uploads the Docker image to Amazon ECR. After uploading the Docker image, the pipeline triggers the Step Functions workflow to train and deploy the custom model to SageMaker. Finally, the pipeline sends an email to the specified users with details about the SageMaker inference endpoint.

We use Scikit Bring Your Own Container to build a custom container image and use the iris dataset to train and test the model.

When your Step Functions workflow is ready, build your full pipeline using the code provided in the GitHub repo.

After you download the code from the repo, the directory structure should look like the following:

. codepipeline-ecr-build-sf-execution
| — cfn
| —- params.json
| —- pipeline-cfn.yaml
| — container
| —- descision_trees
| —- local_test
| —- .dockerignore
| —- Dockerfile
| — scripts

In the params.json file in folder /cfn, provide in your GitHub token, repo name, the ARN of the Step Function state machine you created earlier.

You now create the necessary services and resources for the CI/CD pipeline. To create the CloudFormation stack, run the following code:

aws cloudformation create-stack --stack-name codepipeline-ecr-build-sf-execution --template-body file://cfn/pipeline-cfn.yaml  --parameters file://cfn/params.json --capabilities CAPABILITY_NAMED_IAM

Alternatively, to update the stack, run the following code:

aws cloudformation update-stack --stack-name codepipeline-ecr-build-sf-execution --template-body file://cfn/pipeline-cfn.yaml  --parameters file://cfn/params.json --capabilities CAPABILITY_NAMED_IAM

The CloudFormation template deploys a CodePipeline pipeline into your AWS account. The pipeline starts running as soon as code changes are committed to the repo. After the source code is downloaded by the pipeline stage, CodeBuild creates a Docker image and tags it with the commit ID and current timestamp before pushing the image to Amazon ECR. CodePipeline moves to the next stage to trigger a Step Functions step (which you created earlier).

When Step Functions is complete, a final email is generated with a link to the API Gateway URL that references the newly created SageMaker inference endpoint.

Test the workflow

To test your workflow, complete the following steps:

  1. Start the CodePipeline build by committing a code change to the codepipeline-ecr-build-sf-execution/container folder.
  2. On the CodePipeline console, check that the pipeline is transitioning through the different stages as expected.

When the pipeline reaches its final state, it starts the Step Functions workflow, which sends an email for approval.

  1. Approve the email to continue the Step Functions workflow.

When the SageMaker endpoint is ready, you should receive another email with a link to the API inference endpoint.

To test the iris dataset, you can try sending a single data point to the inference endpoint.

  1. Copy the inference endpoint link from the email and assign it to the bash variable INFERENCE_ENDPOINT as shown in the following code, then use the
INFERENCE_ENDPOINT=https://XXXX.execute-api.us-east-1.amazonaws.com/v1/invokeSagemakerAPI?sagemaker_endpoint=d236eba5-09-03-2020-18-29-15

curl --location --request POST ${INFERENCE_ENDPOINT}  --header 'Content-Type: application/json' --data-raw '{  "data": "4.5,1.3,0.3,0.3"
}'
{"result": "setosa"}

curl --location --request POST ${INFERENCE_ENDPOINT}  --header 'Content-Type: application/json' --data-raw '{
  "data": "5.9,3,5.1,1.8"
}'
{"result": "virginica"}

By sending different data, we get different sets of inference results back.

Clean up

To avoid ongoing charges, delete the resources created in the previous steps by deleting the CloudFormation templates. Additionally, on the SageMaker console, delete any unused models, endpoint configurations, and inference endpoints.

Conclusion

This post demonstrated how to create an ML pipeline for custom SageMaker ML models using some of the latest AWS service integrations.

You can extend this ML pipeline further by adding a layer of authentication and encryption while sending approval links. You can also add more steps to CodePipeline or Step Functions as deemed necessary for your project’s workflow.

The sample files are available in the GitHub repo. To explore related features of SageMaker and further reading, see the following:


About the Author

Sachin Doshi is a Senior Application Architect working in the AWS Professional Services team. He is based out of New York metropolitan area. Sachin helps customers optimize their applications using cloud native AWS services.

Read More