Achieve enterprise-grade monitoring for your Amazon SageMaker models using Fiddler

This is a guest blog post by Danny Brock, Rajeev Govindan and Krishnaram Kenthapadi at Fiddler AI.

Your Amazon SageMaker models are live. They’re handling millions of inferences each day and driving better business outcomes for your company. They’re performing exactly as well as the day they were launched.

Er, wait. Are they? Maybe. Maybe not.

Without enterprise-class model monitoring, your models may be decaying in silence. Your machine learning (ML) teams may never know that these models have actually morphed from miracles of revenue generation to liabilities making incorrect decisions that cost your company time and money.

Don’t fret. The solution is closer than you think.

Fiddler, an enterprise-class Model Performance Management solution available on the AWS Marketplace, offers model monitoring and explainable AI to help ML teams inspect and address a comprehensive range of model issues. Through model monitoring, model explainability, analytics, and bias detection, Fiddler provides your company with an easy-to-use single pane of glass to ensure your models are behaving as they should. And if they’re not, Fiddler also provides features that allow you to inspect your models to find the underlying root causes of performance decay.

This post shows how your MLOps team can improve data scientist productivity and reduce time to detect issues for your models deployed in SageMaker by integrating with the Fiddler Model Performance Management Platform in a few simple steps.

Solution overview

The following reference architecture highlights the primary points of integration. Fiddler exists as a “sidecar” to your existing SageMaker ML workflow.

The remainder of this post walks you through the steps to integrate your SageMaker model with Fiddler’s Model Performance Management Platform:

  1. Ensure your model has data capture enabled.
  2. Create a Fiddler trial environment.
  3. Register information about your model in your Fiddler environment.
  4. Create an AWS Lambda function to publish SageMaker inferences to Fiddler.
  5. Explore Fiddler’s monitoring capabilities in your Fiddler trial environment.

Prerequisites

This post assumes that you have set up SageMaker and deployed a model endpoint. To learn how to configure SageMaker for model serving, refer to Deploy Models for Inference. Some examples are also available on the GitHub repo.

Ensure your model has data capture enabled

On the SageMaker console, navigate to your model’s serving endpoint and ensure you have enabled data capture into an Amazon Simple Storage Service (Amazon S3) bucket. This stores the inferences (requests and responses) your model makes each day as JSON lines files (.jsonl) in Amazon S3.

Create a Fiddler trial environment

From the fiddler.ai website, you can request a free trial. After filling out a quick form, Fiddler will contact you to understand the specifics of your model performance management needs and will have a trial environment ready for you in a few hours. You can expect a dedicated environment like https://yourcompany.try.fiddler.ai.

Register information about your model in your Fiddler environment

Before you can begin publishing events from your SageMaker hosted model into Fiddler, you need to create a project within your Fiddler trial environment and provide Fiddler details about your model through a step called model registration. If you want to use a preconfigured notebook from within Amazon SageMaker Studio rather than copy and paste the following code snippets, you can reference the Fiddler quickstart notebook on GitHub. Studio provides a single web-based visual interface where you can perform all ML development steps.

First, you must install the Fiddler Python client in your SageMaker notebook and instantiate the Fiddler client. You can get the AUTH_TOKEN from the Settings page in your Fiddler trial environment.

# Install the fiddler client
!pip install fiddler-client

# Connect to the Fiddler Trial Environment
import fiddler as fdl
import pandas as pd

fdl.__version__

URL = 'https://yourcompany.try.fiddler.ai'
ORG_ID = 'yourcompany'
AUTH_TOKEN = 'UUID-Token-Here-Found-In-Your-Fiddler-Env-Settings-Page'

client = fdl.FiddlerApi(URL, ORG_ID, AUTH_TOKEN)

Next, create a project within your Fiddler trial environment:

# Create Project
PROJECT_ID = 'credit_default'  # update this with your project name
DATASET_ID = f'{PROJECT_ID}_dataset'
MODEL_ID = f'{PROJECT_ID}_model'

client.create_project(PROJECT_ID)

Now upload your training dataset. The notebook also provides a sample dataset to run Fiddler’s explainability algorithms and as a baseline for monitoring metrics. The dataset is also used to generate the schema for this model in Fiddler.

# Upload Baseline Dataset
df_baseline = pd.read_csv(‘<your-training-file.csv>')

dataset_info = fdl.DatasetInfo.from_dataframe(df_baseline, max_inferred_cardinality=1000)

upload_result = client.upload_dataset(PROJECT_ID,
                                      dataset={'baseline': df_baseline},
                                      dataset_id=DATASET_ID,
                                      info=dataset_info)

Lastly, before you can start publishing inferences to Fiddler for monitoring, root cause analysis, and explanations, you need to register your model. Let’s first create a model_info object that contains the metadata about your model:

# Update task from the list below if your model task is not binary classification
model_task = 'binary' 

if model_task == 'regression':
    model_task_fdl = fdl.ModelTask.REGRESSION
    
elif model_task == 'binary':
    model_task_fdl = fdl.ModelTask.BINARY_CLASSIFICATION

elif model_task == 'multiclass':
    model_task_fdl = fdl.ModelTask.MULTICLASS_CLASSIFICATION

elif model_task == 'ranking':
    model_task_fdl = fdl.ModelTask.RANKING

    
# Specify column types|
target = 'TARGET'
outputs = ['prediction']  # change this to your target variable
features = [‘<add your feature list here>’]
     
# Generate ModelInfo
model_info = fdl.ModelInfo.from_dataset_info(
    dataset_info=dataset_info,
    dataset_id=DATASET_ID,
    model_task=model_task_fdl,
    target=target,
    outputs=outputs,
    features=features,
    binary_classification_threshold=.125,  # update this if your task is not a binary classification
    description='<model-description>',
    display_name='<model-display-name>'
)
model_info

Then you can register the model using your new model_info object:

# Register Info about your model with Fiddler
client.register_model(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    model_id=MODEL_ID,
    model_info=model_info
)

Great! Now you can publish some events to Fiddler in order to observe the model’s performance.

Create a Lambda function to publish SageMaker inferences to Fiddler

With the simple-to-deploy serverless architecture of Lambda, you can quickly build the mechanism required to move your inferences from the S3 bucket you set up earlier into your newly provisioned Fiddler trial environment. This Lambda function is responsible for opening any new JSONL event log files in your model’s S3 bucket, parsing and formatting the JSONL content into a dataframe, and then publishing that dataframe of events to your Fiddler trial environment. The following screenshot shows the code details of our function.

The Lambda function needs to be configured to trigger off of newly created files in your S3 bucket. The following tutorial guides you through creating an Amazon EventBridge trigger that invokes the Lambda function whenever a file is uploaded to Amazon S3. The following screenshot shows our function’s trigger configuration. This makes it simple to ensure that any time your model makes new inferences, those events stored in Amazon S3 are loaded into Fiddler to drive the model observability your company needs.

To simplify this further, the code for this Lambda function is publicly available from Fiddler’s documentation site. This code example currently works for binary classification models with structured inputs. If you have model types with different features or tasks, please contact Fiddler for assistance with minor changes to the code.

The Lambda function needs to make reference to the Fiddler Python client. Fiddler has created a publicly available Lambda layer that you can reference to ensure that the import fiddler as fdl step works seamlessly. You can reference this layer via an ARN in the us-west-2 Region: arn:aws:lambda:us-west-2:079310353266:layer:fiddler-client-0814:1, as shown in the following screenshot.

You also need to specify Lambda environment variables so the Lambda function knows how to connect to your Fiddler trial environment, and what the inputs and outputs are within the .jsonl files being captured by your model. The following screenshot shows a list of the required environment variables, which are also on Fiddler’s documentation site. Update the values for the environment variables to match your model and dataset.

Explore Fiddler’s monitoring capabilities in your Fiddler trial environment

You’ve done it! With your baseline data, model, and traffic connected, you can now explain data drift, outliers, model bias, data issues, and performance blips, and share dashboards with others. Complete your journey by watching a demo about the model performance management capabilities you have introduced to your company.

The example screenshots below provide a glimpse of model insights like drift, outlier detection, local point explanations, and model analytics that will be found in your Fiddler trial environment.

Conclusion

This post highlighted the need for enterprise-class model monitoring and showed how you can integrate your models deployed in SageMaker with the Fiddler Model Performance Management Platform in just a few steps. Fiddler offers functionality for model monitoring, explainable AI, bias detection, and root cause analysis, and is available on the AWS Marketplace. By providing your MLOps team with an easy-to-use single pane of glass to ensure your models are behaving as expected and to identify the underlying root causes of performance degradation, Fiddler can help improve data scientist productivity and reduce time to detect and resolve issues.

If you would like to learn more about Fiddler please visit fiddler.ai or if you would prefer to set up a personalized demo and technical discussion email sales@fiddler.ai.


About the Authors

Danny Brock is a Sr Solutions Engineer at Fiddler AI. Danny is long tenured in the analytics and ML space, running presales and post-sales teams for startups like Endeca and Incorta. He founded his own big data analytics consulting company, Branchbird, in 2012.

Rajeev Govindan is a Sr Solutions Engineer at Fiddler AI. Rajeev has extensive experience in sales engineering and software development at several enterprise companies, including AppDynamics.

Krishnaram Kenthapadi is the Chief Scientist of Fiddler AI. Previously, he was a Principal Scientist at Amazon AWS AI, where he led the fairness, explainability, privacy, and model understanding initiatives in the Amazon AI platform, and prior to that, he held roles at LinkedIn AI and Microsoft Research. Krishnaram received his PhD in Computer Science from Stanford University in 2006.

Read More

Track your ML experiments end to end with Data Version Control and Amazon SageMaker Experiments

Data scientists often work towards understanding the effects of various data preprocessing and feature engineering strategies in combination with different model architectures and hyperparameters. Doing so requires you to cover large parameter spaces iteratively, and it can be overwhelming to keep track of previously run configurations and results while keeping experiments reproducible.

This post walks you through an example of how to track your experiments across code, data, artifacts, and metrics by using Amazon SageMaker Experiments in conjunction with Data Version Control (DVC). We show how you can use DVC side by side with Amazon SageMaker processing and training jobs. We train different CatBoost models on the California housing dataset from the StatLib repository, and change holdout strategies while keeping track of the data version with DVC. In each individual experiment, we track input and output artifacts, code, and metrics using SageMaker Experiments.

SageMaker Experiments

SageMaker Experiments is an AWS service for tracking machine learning (ML) experiments. The SageMaker Experiments Python SDK is a high-level interface to this service that helps you track experiment information using Python.

The goal of SageMaker Experiments is to make it as simple as possible to create experiments, populate them with trials, add tracking and lineage information, and run analytics across trials and experiments.

When discussing SageMaker Experiments, we refer to the following concepts:

  • Experiment – A collection of related trials. You add trials to an experiment that you want to compare together.
  • Trial – A description of a multi-step ML workflow. Each step in the workflow is described by a trial component.
  • Trial component – A description of a single step in an ML workflow, such as data cleaning, feature extraction, model training, or model evaluation.
  • Tracker – A Python context manager for logging information about a single trial component (for example, parameters, metrics, or artifacts).

Data Version Control

Data Version Control (DVC) is a new type of data versioning, workflow, and experiment management software that builds upon Git (although it can work standalone). DVC reduces the gap between established engineering toolsets and data science needs, allowing you to take advantage of new features while reusing existing skills and intuition.

Data science experiment sharing and collaboration can be done through a regular Git flow (commits, branching, tagging, pull requests) the same way it works for software engineers. With Git and DVC, data science and ML teams can version experiments, manage large datasets, and make projects reproducible.

DVC has the following features:

  • DVC is a free, open-source command line tool.
  • DVC works on top of Git repositories and has a similar command line interface and flow as Git. DVC can also work standalone, but without versioning capabilities.
  • Data versioning is enabled by replacing large files, dataset directories, ML models, and so on with small metafiles (easy to handle with Git). These placeholders point to the original data, which is decoupled from source code management.
  • You can use on-premises or cloud storage to store the project’s data separate from its code base. This is how data scientists can transfer large datasets or share a GPU-trained model with others.
  • DVC makes data science projects reproducible by creating lightweight pipelines using implicit dependency graphs, and by codifying the data and artifacts involved.
  • DVC is platform agnostic. It runs on all major operating systems (Linux, macOS, and Windows), and works independently of the programming languages (Python, R, Julia, shell scripts, and so on) or ML libraries (Keras, TensorFlow, PyTorch, Scipy, and more) used in the project.
  • DVC is quick to install and doesn’t require special infrastructure, nor does it depend on APIs or external services. It’s a standalone CLI tool.

SageMaker Experiments and DVC sample

The following GitHub sample shows how to use DVC within the SageMaker environment. In particular, we look at how to build a custom image with DVC libraries installed by default to provide a consistent development environment to your data scientists in Amazon SageMaker Studio, and how to run DVC alongside SageMaker managed infrastructure for processing and training. Furthermore, we show how to enrich SageMaker tracking information with data versioning information from DVC, and visualize them within the Studio console.

The following diagram illustrates the solution architecture and workflow.Solution architecture and workflow

Build a custom Studio image with DVC already installed

In this GitHub repository, we explain how to create a custom image for Studio that has DVC already installed. The advantage of creating an image and making it available to all Studio users is that it creates a consistent environment for the Studio users, which they could also run locally. Although the sample is based on AWS Cloud9, you can also build the container on your local machine as long as you have Docker installed and running. This sample is based on the following Dockerfile and environment.yml. The resulting Docker image is stored in Amazon Elastic Container Registry (Amazon EMR) in your AWS account. See the following code:

# Login to ECR
aws --region ${REGION} ecr get-login-password | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom

# Create the ECR repository
aws --region ${REGION} ecr create-repository --repository-name smstudio-custom

# Build the image - it might take a few minutes to complete this step
docker build . -t ${IMAGE_NAME} -t ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom:${IMAGE_NAME}

# Push the image to ECR
docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom:${IMAGE_NAME}

You can now create a new Studio domain or update an existing Studio domain that has access to the newly created Docker image.

We use AWS Cloud Development Kit (AWS CDK) to create the following resources via AWS CloudFormation:

  • A SageMaker execution role with the right permissions to your new or existing Studio domain
  • A SageMaker image and SageMaker image version from the Docker image conda-env-dvc-kernel that we created earlier
  • An AppImageConfig that specifies how the kernel gateway should be configured
  • A Studio user (data-scientist-dvc) with the correct SageMaker execution role and the custom Studio image available to it

For detailed instructions, refer to Associate a custom image to SageMaker Studio.

Run the lab

To run the lab, complete the following steps:

  1. In the Studio domain, launch Studio for the data-scientist-dvc user.
  2. Choose the Git icon, then choose Clone a Repository.
    Clone a Repository
  3. Enter the URL of the repository (https://github.com/aws-samples/amazon-sagemaker-experiments-dvc-demo) and choose Clone.Clone a repo button
  4. In the file browser, choose the amazon-sagemaker-experiments-dvc-demo repository.
  5. Open the dvc_sagemaker_script_mode.ipynb notebook.
  6. For Custom Image, choose the image conda-env-dvc-kernel.
  7. Choose Select.
    conda-env-dvc-kernel

Configure DVC for data versioning

We create a subdirectory where we prepare the data: sagemaker-dvc-sample. Within this subdirectory, we initialize a new Git repository and set the remote to a repository we create in AWS CodeCommit. The goal is to have DVC configurations and files for data tracking versioned in this repository. However, Git offers native capabilities to manage subprojects via, for example, git submodules and git subtrees, and you can extend this sample to use any of the aforementioned tools that best fit your workflow.

The main advantage of using CodeCommit with SageMaker in our case is its integration with AWS Identity and Access Management (IAM) for authentication and authorization, meaning we can use IAM roles to push and pull data without the need to fetch credentials (or SSH keys). Setting the appropriate permissions on the SageMaker execution role also allows the Studio notebook and the SageMaker training and processing job to interact securely with CodeCommit.

Although you can replace CodeCommit with any other source control service, such as GitHub, Gitlab, or Bitbucket, you need consider how to handle the credentials for your system. One possibility is to store these credentials on AWS Secrets Manager and fetch them at run time from the Studio notebook as well as from the SageMaker processing and training jobs.

Init DVC

Process and train with DVC and SageMaker

In this section, we explore two different approaches to tackle our problem and how we can keep track of the two tests using SageMaker Experiments according to the high-level conceptual architecture we showed you earlier.

Set up a SageMaker experiment

To track this test in SageMaker, we need to create an experiment. We need to also define the trial within the experiment. For the sake of simplicity, we just consider one trial for the experiment, but you can have any number of trials within an experiment, for example, if you want to test different algorithms.

We create an experiment named DEMO-sagemaker-experiments-dvc with two trials, dvc-trial-single-file and dvc-trial-multi-files, each representing a different version of the dataset.

Let’s create the DEMO-sagemaker-experiments-dvc experiment:

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

experiment_name = 'DEMO-sagemaker-experiments-dvc'

# create the experiment if it doesn't exist
try:
    my_experiment = Experiment.load(experiment_name=experiment_name)
    print("existing experiment loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_experiment = Experiment.create(
            experiment_name = experiment_name,
            description = "How to integrate DVC"
        )
        print("new experiment created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Test 1: Generate single files for training and validation

In this section, we create a processing script that fetches the raw data directly from Amazon Simple Storage Service (Amazon S3) as input; processes it to create the train, validation, and test datasets; and stores the results back to Amazon S3 using DVC. Furthermore, we show how you can track output artifacts generated by DVC with SageMaker when running processing and training jobs and via SageMaker Experiments.

First, we create the dvc-trial-single-file trial and add it to the DEMO-sagemaker-experiments-dvc experiment. By doing so, we keep all trial components related to this test organized in a meaningful way.

first_trial_name = "dvc-trial-single-file"

try:
    my_first_trial = Trial.load(trial_name=first_trial_name)
    print("existing trial loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_first_trial = Trial.create(
            experiment_name=experiment_name,
            trial_name=first_trial_name,
        )
        print("new trial created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Use DVC in a SageMaker processing job to create the single file version

In this section, we create a processing script that gets the raw data directly from Amazon S3 as input using the managed data loading capability of SageMaker; processes it to create the train, validation, and test datasets; and stores the results back to Amazon S3 using DVC. It’s very important to understand that when using DVC to store data to Amazon S3 (or pull data from Amazon S3), we’re losing SageMaker managed data loading capabilities, which can potentially have an impact on performance and costs of our processing and training jobs, especially when working with very large datasets. For more information on the different SageMaker native input mode capabilities, refer to Access Training Data.

Finally, we unify DVC tracking capabilities with SageMaker tracking capabilities when running processing jobs via SageMaker Experiments.

The processing script expects the address of the Git repository and the branch we want to create to store the DVC metadata passed via environmental variables. The datasets themselves are stored in Amazon S3 by DVC. Although environmental variables are automatically tracked in SageMaker Experiments and visible in the trial component parameters, we might want to enrich the trial components with further information, which then become available for visualization in the Studio UI using a tracker object. In our case, the trial components parameters include the following:

  • DVC_REPO_URL
  • DVC_BRANCH
  • USER
  • data_commit_hash
  • train_test_split_ratio

The preprocessing script clones the Git repository; generates the train, validation, and test datasets; and syncs it using DVC. As mentioned earlier, when using DVC, we can’t take advantage of native SageMaker data loading capabilities. Aside from the performance penalties we might suffer on large datasets, we also lose the automatic tracking capabilities for the output artifacts. However, thanks to the tracker and the DVC Python API, we can compensate for these shortcomings, retrieve such information at run time, and store it in the trial component with little effort. The added value by doing so is to have in single view of the input and output artifacts that belong to this specific processing job.

The full preprocessing Python script is available in the GitHub repo.

with Tracker.load() as tracker:
    tracker.log_parameters({"data_commit_hash": commit_hash})
    for file_type in file_types:
        path = dvc.api.get_url(
            f"{data_path}/{file_type}/california_{file_type}.csv",
            repo=dvc_repo_url,
            rev=dvc_branch
        )
        tracker.log_output(name=f"california_{file_type}",value=path)

SageMaker gives us the possibility to run our processing script on container images managed by AWS that are optimized to run on the AWS infrastructure. If our script requires additional dependencies, we can supply a requirements.txt file. When we start the processing job, SageMaker uses pip-install to install all the libraries we need (for example, DVC-related libraries). If you need to have a tighter control of all libraries installed on the containers, you can bring your own container in SageMaker, for example for processing and training.

We have now all the ingredients to run our SageMaker processing job:

  • A processing script that can process several arguments (--train-test-split-ratio) and two environmental variables (DVC_REPO_URL and DVC_BRANCH)
  • A requiremets.txt file
  • A Git repository (in CodeCommit)
  • A SageMaker experiment and trial
from sagemaker.processing import FrameworkProcessor, ProcessingInput
from sagemaker.sklearn.estimator import SKLearn

dvc_repo_url = "codecommit::{}://sagemaker-dvc-sample".format(region)
dvc_branch = my_first_trial.trial_name

script_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version='0.23-1',
    instance_count=1,
    instance_type='ml.m5.xlarge',
    env={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "USER": "sagemaker"
    },
    role=role
)

experiment_config={
    "ExperimentName": my_experiment.experiment_name,
    "TrialName": my_first_trial.trial_name
}

We then run the processing job with the preprocessing-experiment.py script, experiment_config, dvc_repo_url, and dvc_branch we defined earlier.

%%time

script_processor.run(
    code='./source_dir/preprocessing-experiment.py',
    dependencies=['./source_dir/requirements.txt'],
    inputs=[ProcessingInput(source=s3_data_path, destination="/opt/ml/processing/input")],
    experiment_config=experiment_config,
    arguments=["--train-test-split-ratio", "0.2"]
)

The processing job takes approximately 5 minutes to complete. Now you can view the trial details for the single file dataset.

The following screenshot shows where you can find the stored information within Studio. Note the values for dvc-trial-single-file in DVC_BRANCH, DVC_REPO_URL, and data_commit_hash on the Parameters tab.

SageMaker Experiments parameters tab

Also note the input and output details on the Artifacts tab.

SageMaker Experiments artifacts tab

Create an estimator and fit the model with single file data version

To use DVC integration inside a SageMaker training job, we pass a dvc_repo_url and dvc_branch as environmental variables when you create the Estimator object.

We train on the dvc-trial-single-file branch first.

When pulling data with DVC, we use the following dataset structure:

dataset
    |-- train
    |   |-- california_train.csv
    |-- test
    |   |-- california_test.csv
    |-- validation
    |   |-- california_validation.csv

Now we create a Scikit-learn Estimator using the SageMaker Python SDK. This allows us to specify the following:

  • The path to the Python source file, which should be run as the entry point to training.
  • The IAM role that controls permissions for accessing Amazon S3 and CodeCommit data and running SageMaker functions.
  • A list of dictionaries that define the metrics used to evaluate the training jobs.
  • The number and type of training instances. We use one ml.m5.large instance.
  • Hyperparameters that are used for training.
  • Environment variables to use during the training job. We use DVC_REPO_URL, DVC_BRANCH, and USER.
metric_definitions = [{'Name': 'median-AE', 'Regex': "AE-at-50th-percentile: ([0-9.]+).*$"}]

hyperparameters={ 
        "learning_rate" : 1,
        "depth": 6
    }
estimator = SKLearn(
    entry_point='train.py',
    source_dir='source_dir',
    role=role,
    metric_definitions=metric_definitions,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type='ml.m5.large',
    framework_version='0.23-1',
    base_job_name='training-with-dvc-data',
    environment={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "USER": "sagemaker"
    }
)

experiment_config={
    "ExperimentName": my_experiment.experiment_name,
    "TrialName": my_first_trial.trial_name
}

We call the fit method of the Estimator with the experiment_config we defined earlier to start the training.

%%time
estimator.fit(experiment_config=experiment_config)

The training job takes approximately 5 minutes to complete. The logs show those lines, indicating the files pulled by DVC:

Running dvc pull command
A       train/california_train.csv
A       test/california_test.csv
A       validation/california_validation.csv
3 files added and 3 files fetched
Starting the training.
Found train files: ['/opt/ml/input/data/dataset/train/california_train.csv']
Found validation files: ['/opt/ml/input/data/dataset/train/california_train.csv']

Test 2: Generate multiple files for training and validation

We create a new dvc-trial-multi-files trial and add it to the current DEMO-sagemaker-experiments-dvc experiment.

second_trial_name = "dvc-trial-multi-files"
try:
    my_second_trial = Trial.load(trial_name=second_trial_name)
    print("existing trial loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_second_trial = Trial.create(
            experiment_name=experiment_name,
            trial_name=second_trial_name,
        )
        print("new trial created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Differently from the first processing script, we now create out of the original dataset multiple files for training and validation and store the DVC metadata in a different branch.

You can explore the second preprocessing Python script on GitHub.

%%time

script_processor.run(
    code='./source_dir/preprocessing-experiment-multifiles.py',
    dependencies=['./source_dir/requirements.txt'],
    inputs=[ProcessingInput(source=s3_data_path, destination="/opt/ml/processing/input")],
    experiment_config=experiment_config,
    arguments=["--train-test-split-ratio", "0.1"]
)

The processing job takes approximately 5 minutes to complete. Now you can view the trial details for the multi-file dataset.

The following screenshots show where you can find the stored information within SageMaker Experiments in the Trial components section within the Studio UI. Note the values for dvc-trial-multi-files in DVC_BRANCH, DVC_REPO_URL, and data_commit_hash on the Parameters tab.

SageMaker multi files experiments parameters tab

You can also review the input and output details on the Artifacts tab.

SageMaker multi files experiments artifacts tab

We now train on the dvc-trial-multi-files branch. When pulling data with DVC, we use the following dataset structure:

dataset
    |-- train
    |   |-- california_train_1.csv
    |   |-- california_train_2.csv
    |   |-- california_train_3.csv
    |   |-- california_train_4.csv
    |   |-- california_train_5.csv
    |-- test
    |   |-- california_test.csv
    |-- validation
    |   |-- california_validation_1.csv
    |   |-- california_validation_2.csv
    |   |-- california_validation_3.csv

Similar as we did before, we create a new Scikit-learn Estimator with the trial name dvc-trial-multi-files and start the training job.

%%time

estimator.fit(experiment_config=experiment_config)

The training job takes approximately 5 minutes to complete. On the training job logs output to the notebook, you can see those lines, indicating the files pulled by DVC:

Running dvc pull command
A       validation/california_validation_2.csv
A       validation/california_validation_1.csv
A       validation/california_validation_3.csv
A       train/california_train_4.csv
A       train/california_train_5.csv
A       train/california_train_2.csv
A       train/california_train_3.csv
A       train/california_train_1.csv
A       test/california_test.csv
9 files added and 9 files fetched
Starting the training.
Found train files: ['/opt/ml/input/data/dataset/train/california_train_2.csv', '/opt/ml/input/data/dataset/train/california_train_5.csv', '/opt/ml/input/data/dataset/train/california_train_4.csv', '/opt/ml/input/data/dataset/train/california_train_1.csv', '/opt/ml/input/data/dataset/train/california_train_3.csv']
Found validation files: ['/opt/ml/input/data/dataset/validation/california_validation_2.csv', '/opt/ml/input/data/dataset/validation/california_validation_1.csv', '/opt/ml/input/data/dataset/validation/california_validation_3.csv']

Host your model in SageMaker

After you train your ML model, you can deploy it using SageMaker. To deploy a persistent, real-time endpoint that makes one prediction at a time, we use SageMaker real-time hosting services.

from sagemaker.serializers import CSVSerializer

predictor = estimator.deploy(1, "ml.t2.medium", serializer=CSVSerializer())

First, we get the latest test dataset locally on the development notebook in Studio. For this purpose, we can use dvc.api.read() to load the raw data that was stored in Amazon S3 by the SageMaker processing job.

import io
import dvc.api

raw = dvc.api.read(
    "dataset/test/california_test.csv",
    repo=dvc_repo_url,
    rev=dvc_branch
)

Then we prepare the data using Pandas, load a test CSV file, and call predictor.predict to invoke the SageMaker endpoint created earlier, with data, and get predictions.

test = pd.read_csv(io.StringIO(raw), sep=",", header=None)
X_test = test.iloc[:, 1:].values
y_test = test.iloc[:, 0:1].values

predicted = predictor.predict(X_test)
for i in range(len(predicted)-1):
    print(f"predicted: {predicted[i]}, actual: {y_test[i][0]}")

Delete the endpoint

You should delete endpoints when they’re no longer in use, because they’re billed by the time deployed (for more information, see Amazon SageMaker Pricing). Make sure to delete the endpoint to avoid unexpected costs.

predictor.delete_endpoint()

Clean up

Before you remove all the resources you created, make sure that all apps are deleted from the data-scientist-dvc user, including all KernelGateway apps, as well as the default JupiterServer app.

Then you can destroy the AWS CDK stack by running the following command:

cdk destroy

If you used an existing domain, also run the following commands:

# inject your DOMAIN_ID into the configuration file
sed -i 's/<your-sagemaker-studio-domain-id>/'"$DOMAIN_ID"'/' ../update-domain-no-custom-images.json
# update the sagemaker studio domain
aws --region ${REGION} sagemaker update-domain --cli-input-json file://../update-domain-no-custom-images.json

Conclusion

In this post, you walked through an example of how to track your experiments across code, data, artifacts, and metrics by using SageMaker Experiments and SageMaker processing and training jobs in conjunction with DVC. We created a Docker image containing DVC, which was required for Studio as the development notebook, and showed how you can use processing and training jobs with DVC. We prepared two versions of the data and used DVC to manage it with Git. Then you used SageMaker Experiments to track the processing and training with the two versions of the data in order to have a unified view of parameters, artifacts, and metrics in a single pane of glass. Finally, you deployed the model to a SageMaker endpoint and used a testing dataset from the second dataset version to invoke the SageMaker endpoint and get predictions.

As next step, you can extend the existing notebook and introduce your own feature engineering strategy and use DVC and SageMaker to run your experiments. Let’s go build!

For further reading, refer to the following resources:


About the Authors

Paolo Di FrancescoPaolo Di Francesco is a solutions architect at AWS. He has experience in the telecommunications and software engineering. He is passionate about machine learning and is currently focusing on using his experience to help customers reach their goals on AWS, in particular in discussions around MLOps. Outside of work, he enjoys playing football and reading.

Eitan SelaEitan Sela is a Machine Learning Specialist Solutions Architect with Amazon Web Services. He works with AWS customers to provide guidance and technical assistance, helping them build and operate machine learning solutions on AWS. In his spare time, Eitan enjoys jogging and reading the latest machine learning articles.

Read More

Build a predictive maintenance solution with Amazon Kinesis, AWS Glue, and Amazon SageMaker

Organizations are increasingly building and using machine learning (ML)-powered solutions for a variety of use cases and problems, including predictive maintenance of machine parts, product recommendations based on customer preferences, credit profiling, content moderation, fraud detection, and more. In many of these scenarios, the effectiveness and benefits derived from these ML-powered solutions can be further enhanced when they can process and derive insights from data events in near-real time.

Although the business value and benefits of near-real-time ML-powered solutions are well established, the architecture required to implement these solutions at scale with optimum reliability and performance is complicated. This post describes how you can combine Amazon Kinesis, AWS Glue, and Amazon SageMaker to build a near-real-time feature engineering and inference solution for predictive maintenance.

Use case overview

We focus on a predictive maintenance use case where sensors deployed in the field (such as industrial equipment or network devices), need to replaced or rectified before they become faulty and cause downtime. Downtime can be expensive for businesses and can lead to poor customer experience. Predictive maintenance powered by an ML model can also help in augmenting the regular schedule-based maintenance cycles by informing when a machine part in good condition should not be replaced, therefore avoiding unnecessary cost.

In this post, we focus on applying machine learning to a synthetic dataset containing machine failures due to features such as air temperature, process temperature, rotation speed, torque, and tool wear. The dataset used is sourced from the UCI Data Repository.

Machine failure consists of five independent failure modes:

  • Tool Wear Failure (TWF)
  • Heat Dissipation Failure (HDF)
  • Power Failure (PWF)
  • Over-strain Failure (OSF)
  • Random Failure (RNF)

The machine failure label indicates whether the machine has failed for a particular data point if any of the preceding failure modes are true. If at least one of the failure modes is true, the process fails and the machine failure label is set to 1. The objective for the ML model is to identify machine failures correctly, so a downstream predictive maintenance action can be initiated.

Solution overview

For our predictive maintenance use case, we assume that device sensors stream various measurements and readings about machine parts. Our solution then takes a slice of streaming data each time (micro-batch), and performs processing and feature engineering to create features. The created features are then used to generate inferences from a trained and deployed ML model in near-real time. The generated inferences can be further processed and consumed by downstream applications, to take appropriate actions and initiate maintenance activity.

The following diagram shows the architecture of our overall solution.

The solution broadly consists of the following sections, which are explained in detail later in this post:

  • Streaming data source and ingestion – We use Amazon Kinesis Data Streams to collect streaming data from the field sensors at scale and make it available for further processing.
  • Near-real-time feature engineering – We use AWS Glue streaming jobs to read data from a Kinesis data stream and perform data processing and feature engineering, before storing the derived features in Amazon Simple Storage Service (Amazon S3). Amazon S3 provides a reliable and cost-effective option to store large volumes of data.
  • Model training and deployment – We use the AI4I predictive maintenance dataset from the UCI Data Repository to train an ML model based on the XGBoost algorithm using SageMaker. We then deploy the trained model to a SageMaker asynchronous inference endpoint.
  • Near-real-time ML inference – After the features are available in Amazon S3, we need to generate inferences from the deployed model in near-real time. SageMaker asynchronous inference endpoints are well suited for this requirement because they support larger payload sizes (up to 1 GB) and can generate inferences within minutes (up to a maximum of 15 minutes). We use S3 event notifications to run an AWS Lambda function to invoke a SageMaker asynchronous inference endpoint. SageMaker asynchronous inference endpoints accept S3 locations as input, generate inferences from the deployed model, and write these inferences back to Amazon S3 in near-real time.

The source code for this solution is located on GitHub. The solution has been tested and should be run in us-east-1.

We use an AWS CloudFormation template, deployed using AWS Serverless Application Model (AWS SAM), and SageMaker notebooks to deploy the solution.

Prerequisites

To get started, as a prerequisite, you must have the SAM CLI, Python 3, and PIP installed. You must also have the AWS Command Line Interface (AWS CLI) configured properly.

Deploy the solution

You can use AWS CloudShell to run these steps. CloudShell is a browser-based shell that is pre-authenticated with your console credentials and includes pre-installed common development and operations tools (such as AWS SAM, AWS CLI, and Python). Therefore, no local installation or configuration is required.

  • We begin by creating an S3 bucket where we store the script for our AWS Glue streaming job. Run the following command in your terminal to create a new bucket:
aws s3api create-bucket --bucket sample-script-bucket-$RANDOM --region us-east-1
  • Note down the name of the bucket created.

ML-9132 Solution Arch

  • Next, we clone the code repository locally, which contains the CloudFormation template to deploy the stack. Run the following command in your terminal:
git clone https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance
  • Navigate to the sam-template directory:
cd amazon-sagemaker-predictive-maintenance/sam-template

ML-9132 git clone repo

  • Run the following command to copy the AWS Glue job script (from glue_streaming/app.py) to the S3 bucket you created:
aws s3 cp glue_streaming/app.py s3://sample-script-bucket-30232/glue_streaming/app.py

ML-9132 copy glue script

  • You can now go ahead with the build and deployment of the solution, through the CloudFormation template via AWS SAM. Run the following command:
sam build

ML-9132 SAM Build

sam deploy --guided
  • Provide arguments for the deployment such as the stack name, preferred AWS Region (us-east-1), and GlueScriptsBucket.

Make sure you provide the same S3 bucket that you created earlier for the AWS Glue script S3 bucket (parameter GlueScriptsBucket in the following screenshot).

ML-9132 SAM Deploy Param

After you provide the required arguments, AWS SAM starts the stack deployment. The following screenshot shows the resources created.

ML-9132 SAM Deployed

After the stack is deployed successfully, you should see the following message.

ML-9132 SAM CF deployed

  • On the AWS CloudFormation console, open the stack (for this post, nrt-streaming-inference) that was provided when deploying the CloudFormation template.
  • On the Resources tab, note the SageMaker notebook instance ID.
  1. ML-9132 SM Notebook Created
  • On the SageMaker console, open this instance.

ML-9132 image018

The SageMaker notebook instance already has the required notebooks pre-loaded.

Navigate to the notebooks folder and open and follow the instructions within the notebooks (Data_Pre-Processing.ipynb and ModelTraining-Evaluation-and-Deployment.ipynb) to explore the dataset, perform preprocessing and feature engineering, and train and deploy the model to a SageMaker asynchronous inference endpoint.

ML-9132 Open SM Notebooks

Streaming data source and ingestion

Kinesis Data Streams is a serverless, scalable, and durable real-time data streaming service that you can use to collect and process large streams of data records in real time. Kinesis Data Streams enables capturing, processing, and storing data streams from a variety of sources, such as IT infrastructure log data, application logs, social media, market data feeds, web clickstream data, IoT devices and sensors, and more. You can provision a Kinesis data stream in on-demand mode or provisioned mode depending on the throughput and scaling requirements. For more information, see Choosing the Data Stream Capacity Mode.

For our use case, we assume that various sensors are sending measurements such as temperature, rotation speed, torque, and tool wear to a data stream. Kinesis Data Streams acts as a funnel to collect and ingest data streams.

We use the Amazon Kinesis Data Generator (KDG) later in this post to generate and send data to a Kinesis data stream, simulating data being generated by sensors. The data from the data stream sensor-data-stream is ingested and processed using an AWS Glue streaming job, which we discuss next.

Near-real-time feature engineering

AWS Glue streaming jobs provide a convenient way to process streaming data at scale, without the need to manage the compute environment. AWS Glue allows you to perform extract, transform, and load (ETL) operations on streaming data using continuously running jobs. AWS Glue streaming ETL is built on the Apache Spark Structured Streaming engine, and can ingest streams from Kinesis, Apache Kafka, and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

The streaming ETL job can use both AWS Glue built-in transforms and transforms that are native to Apache Spark Structured Streaming. You can also use the Spark ML and MLLib libraries in AWS Glue jobs for easier feature processing using readily available helper libraries.

If the schema of the streaming data source is pre-determined, you can specify it in an AWS Data Catalog table. If the schema definition can’t be determined beforehand, you can enable schema detection in the streaming ETL job. The job then automatically determines the schema from the incoming data. Additionally, you can use the AWS Glue Schema Registry to allow central discovery, control, and evolution of data stream schemas. You can further integrate the Schema Registry with the Data Catalog to optionally use schemas stored in the Schema Registry when creating or updating AWS Glue tables or partitions in the Data Catalog.

For this post, we create an AWS Glue Data Catalog table (sensor-stream) with our Kinesis data stream as the source and define the schema for our sensor data.

We create an AWS Glue dynamic dataframe from the Data Catalog table to read the streaming data from Kinesis. We also specify the following options:

  • A window size of 60 seconds, so that the AWS Glue job reads and processes data in 60-second windows
  • The starting position TRIM_HORIZON, to allow reading from the oldest records in the Kinesis data stream

We also use Spark MLlib’s StringIndexer feature transformer to encode the string column type into label indexes. This transformation is implemented using Spark ML Pipelines. Spark ML Pipelines provide a uniform set of high-level APIs for ML algorithms to make it easier to combine multiple algorithms into a single pipeline or workflow.

We use the foreachBatch API to invoke a function named processBatch, which in turn processes the data referenced by this dataframe. See the following code:

# Read from Kinesis Data Stream
sourceStreamData = glueContext.create_data_frame.from_catalog(database = "sensordb", table_name = "sensor-stream", transformation_ctx = "sourceStreamData", additional_options = {"startingPosition": "TRIM_HORIZON"})
type_indexer = StringIndexer(inputCol="type", outputCol="type_enc", stringOrderType="alphabetAsc")
pipeline = Pipeline(stages=[type_indexer])
glueContext.forEachBatch(frame = sourceStreamData, batch_function = processBatch, options = {"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})

The function processBatch performs the specified transformations and partitions the data in Amazon S3 based on year, month, day, and batch ID.

We also re-partition the AWS Glue partitions into a single partition, to avoid having too many small files in Amazon S3. Having several small files can impede read performance, because it amplifies the overhead related to seeking, opening, and reading each file. We finally write the features to generate inferences into a prefix (features) within the S3 bucket. See the following code:

# Function that gets called to perform processing, feature engineering and writes to S3 for every micro batch of streaming data from Kinesis.
def processBatch(data_frame, batchId):
transformer = pipeline.fit(data_frame)
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
if (data_frame.count() > 0):
data_frame = transformer.transform(data_frame)
data_frame = data_frame.drop("type")
data_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
data_frame.printSchema()
# Write output features to S3
s3prefix = "features" + "/year=" + "{:0>4}".format(str(year)) + "/month=" + "{:0>2}".format(str(month)) + "/day=" + "{:0>2}".format(str(day)) + "/hour=" + "{:0>2}".format(str(hour)) + "/min=" + "{:0>2}".format(str(minute)) + "/batchid=" + str(batchId)
s3path = "s3://" + out_bucket_name + "/" + s3prefix + "/"
print("-------write start time------------")
print(str(datetime.datetime.now()))
data_frame = data_frame.toDF().repartition(1)
data_frame.write.mode("overwrite").option("header",False).csv(s3path)
print("-------write end time------------")
print(str(datetime.datetime.now()))

Model training and deployment

SageMaker is a fully managed and integrated ML service that enables data scientists and ML engineers to quickly and easily build, train, and deploy ML models.

Within the Data_Pre-Processing.ipynb notebook, we first import the AI4I Predictive Maintenance dataset from the UCI Data Repository and perform exploratory data analysis (EDA). We also perform feature engineering to make our features more useful for training the model.

For example, within the dataset, we have a feature named type, which represents the product’s quality type as L (low), M (medium), or H (high). Because this is categorical feature, we need to encode it before training our model. We use Scikit-Learn’s LabelEncoder to achieve this:

from sklearn.preprocessing import LabelEncoder
type_encoder = LabelEncoder()
type_encoder.fit(origdf['type'])
type_values = type_encoder.transform(origdf['type'])

After the features are processed and the curated train and test datasets are generated, we’re ready to train an ML model to predict whether the machine failed or not based on system readings. We train a XGBoost model, using the SageMaker built-in algorithm. XGBoost can provide good results for multiple types of ML problems, including classification, even when training samples are limited.

SageMaker training jobs provide a powerful and flexible way to train ML models on SageMaker. SageMaker manages the underlying compute infrastructure and provides multiple options to choose from, for diverse model training requirements, based on the use case.

xgb = sagemaker.estimator.Estimator(container,
role,
instance_count=1,
instance_type='ml.c4.4xlarge',
output_path=xgb_upload_location,
sagemaker_session=sagemaker_session)
xgb.set_hyperparameters(max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.8,
silent=0,
objective='binary:hinge',
num_round=100)

xgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})

When the model training is complete and the model evaluation is satisfactory based on the business requirements, we can begin model deployment. We first create an endpoint configuration with the AsyncInferenceConfig object option and using the model trained earlier:

endpoint_config_name = resource_name.format("EndpointConfig")
create_endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"VariantName": "variant1",
"ModelName": model_name,
"InstanceType": "ml.m5.xlarge",
"InitialInstanceCount": 1,
}
],
AsyncInferenceConfig={
"OutputConfig": {
"S3OutputPath": f"s3://{bucket}/{prefix}/output",
#Specify Amazon SNS topics
"NotificationConfig": {
"SuccessTopic": "arn:aws:sns:<region>:<account-id>:<success-sns-topic>",
"ErrorTopic": "arn:aws:sns:<region>:<account-id>:<error-sns-topic>",
}},
"ClientConfig": {"MaxConcurrentInvocationsPerInstance": 4},
},)

We then create a SageMaker asynchronous inference endpoint, using the endpoint configuration we created. After it’s provisioned, we can start invoking the endpoint to generate inferences asynchronously.

endpoint_name = resource_name.format("Endpoint")
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)

Near-real-time inference

SageMaker asynchronous inference endpoints provide the ability to queue incoming inference requests and process them asynchronously in near-real time. This is ideal for applications that have inference requests with larger payload sizes (up to 1 GB), may require longer processing times (up to 15 minutes), and have near-real-time latency requirements. Asynchronous inference also enables you to save on costs by auto scaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.

You can create a SageMaker asynchronous inference endpoint similar to how you create a real-time inference endpoint and additionally specify the AsyncInferenceConfig object, while creating your endpoint configuration with the EndpointConfig field in the CreateEndpointConfig API. The following diagram shows the inference workflow and how an asynchronous inference endpoint generates an inference.

ML-9132 SageMaker Asych Arch

To invoke the asynchronous inference endpoint, the request payload should be stored in Amazon S3 and reference to this payload needs to be provided as part of the InvokeEndpointAsync request. Upon invocation, SageMaker queues the request for processing and returns an identifier and output location as a response. Upon processing, SageMaker places the result in the Amazon S3 location. You can optionally choose to receive success or error notifications with Amazon Simple Notification Service (Amazon SNS).

Test the end-to-end solution

To test the solution, complete the following steps:

  • On the AWS CloudFormation console, open the stack you created earlier (nrt-streaming-inference).
  • On the Outputs tab, copy the name of the S3 bucket (EventsBucket).

This is the S3 bucket to which our AWS Glue streaming job writes features after reading and processing from the Kinesis data stream.

ML-9132 S3 events bucket

Next, we set up event notifications for this S3 bucket.

  • On the Amazon S3 console, navigate to the bucket EventsBucket.
  • On the Properties tab, in the Event notifications section, choose Create event notification.

ML-9132 S3 events bucket properties

ML-9132 S3 events bucket notification

  • For Event name, enter invoke-endpoint-lambda.
  • For Prefix, enter features/.
  • For Suffix, enter .csv.
  • For Event types, select All object create events.

ML-9132 S3 events bucket notification config
ML-9132 S3 events bucket notification config

  • For Destination, select Lambda function.
  • For Lambda function, and choose the function invoke-endpoint-asynch.
  • Choose Save changes.

ML-9132 S3 events bucket notification config lambda

  • On the AWS Glue console, open the job GlueStreaming-Kinesis-S3.
  • Choose Run job.

ML-9132 Run Glue job

Next we use the Kinesis Data Generator (KDG) to simulate sensors sending data to our Kinesis data stream. If this is your first time using the KDG, refer to Overview for the initial setup. The KDG provides a CloudFormation template to create the user and assign just enough permissions to use the KDG for sending events to Kinesis. Run the CloudFormation template within the AWS account that you’re using to build the solution in this post. After the KDG is set up, log in and access the KDG to send test events to our Kinesis data stream.

  • Use the Region in which you created the Kinesis data stream (us-east-1).
  • On the drop-down menu, choose the data stream sensor-data-stream.
  • In the Records per second section, select Constant and enter 100.
  • Unselect Compress Records.
  • For Record template, use the following template:
{
"air_temperature": {{random.number({"min":295,"max":305, "precision":0.01})}},
"process_temperature": {{random.number({"min":305,"max":315, "precision":0.01})}},
"rotational_speed": {{random.number({"min":1150,"max":2900})}},
"torque": {{random.number({"min":3,"max":80, "precision":0.01})}},
"tool_wear": {{random.number({"min":0,"max":250})}},
"type": "{{random.arrayElement(["L","M","H"])}}"
}
  • Click Send data to start sending data to the Kinesis data stream.

ML-9132 Kineses Data Gen

The AWS Glue streaming job reads and extracts a micro-batch of data (representing sensor readings) from the Kinesis data stream based on the window size provided. The streaming job then processes and performs feature engineering on this micro-batch before partitioning and writing it to the prefix features within the S3 bucket.

As new features created by the AWS Glue streaming job are written to the S3 bucket, a Lambda function (invoke-endpoint-asynch) is triggered, which invokes a SageMaker asynchronous inference endpoint by sending an invocation request to get inferences from our deployed ML model. The asynchronous inference endpoint queues the request for asynchronous invocation. When the processing is complete, SageMaker stores the inference results in the Amazon S3 location (S3OutputPath) that was specified during the asynchronous inference endpoint configuration.

For our use case, the inference results indicate if a machine part is likely to fail or not, based on the sensor readings.

ML-9132 Model inferences

SageMaker also sends a success or error notification with Amazon SNS. For example, if you set up an email subscription for the success and error SNS topics (specified within the asynchronous SageMaker inference endpoint configuration), an email can be sent every time an inference request is processed. The following screenshot shows a sample email from the SNS success topic.

ML-9132 SNS email subscribe

For real-world applications, you can integrate SNS notifications with other services such as Amazon Simple Queue Service (Amazon SQS) and Lambda for additional postprocessing of the generated inferences or integration with other downstream applications, based on your requirements. For example, for our predictive maintenance use case, you can invoke a Lambda function based on an SNS notification to read the generated inference from Amazon S3, further process it (such as aggregation or filtering), and initiate workflows such as sending work orders for equipment repair to technicians.

Clean up

When you’re done testing the stack, delete the resources (especially the Kinesis data stream, Glue streaming job, and SNS topics) to avoid unexpected charges.

Run the following code to delete your stack:

sam delete nrt-streaming-inference

Also delete the resources such as SageMaker endpoints by following the cleanup section in the ModelTraining-Evaluation-and-Deployment notebook.

Conclusion

In this post, we used a predictive maintenance use case to demonstrate how to use various services such as Kinesis, AWS Glue, and SageMaker to build a near-real-time inference pipeline. We encourage you to try this solution and let us know what you think.

If you have any questions, share them in the comments.


About the authors

Rahul Sharma is a Solutions Architect at AWS Data Lab, helping AWS customers design and build AI/ML solutions. Prior to joining AWS, Rahul has spent several years in the finance and insurance sector, helping customers build data and analytical platforms.

Pat Reilly is an Architect in the AWS Data Lab, where he helps customers design and build data workloads to support their business. Prior to AWS, Pat consulted at an AWS Partner, building AWS data workloads across a variety of industries.

Read More

LiDAR 3D point cloud labeling with Velodyne LiDAR sensor in Amazon SageMaker Ground Truth

LiDAR is a key enabling technology in growing autonomous markets, such as robotics, industrial, infrastructure, and automotive. LiDAR delivers precise 3D data about its environment in real time to provide “vision” for autonomous solutions. For autonomous vehicles (AVs), nearly every carmaker uses LiDAR to augment camera and radar systems for a comprehensive perception stack capable of safely navigating complex roadway environments. Computer vision systems can use the 3D maps generated by LiDAR sensors for object detection, object classification, and scene segmentation. Like any other supervised machine learning (ML) system, the point cloud data generated by LiDAR sensors should be labeled correctly in order for the ML model to make correct inferences. This allows AVs to operate smoothly and efficiently, avoiding incidents and collisions with objects, pedestrians, vehicles, and other road users.

In this post, we demonstrate how to label 3D point cloud data generated by Velodyne LiDAR sensors using Amazon SageMaker Ground Truth. We break down the process of sending data for annotation so that you can obtain precise, high-quality results.

The code for this example is available on GitHub.

Solution overview

SageMaker Ground Truth is a data labeling service that you can use to create high-quality labeled datasets for various types of ML use cases. SageMaker Ground Truth is a capability in Amazon SageMaker, which is a comprehensive and fully managed ML service. With SageMaker, data scientists and developers can quickly and easily build and train ML models, and then directly deploy them into a production-ready environment.

In addition to LiDAR data, we also include camera images, using the sensor fusion feature in SageMaker Ground Truth to deliver robust visual information about the scenes that annotators are labeling. Through sensor fusion, annotators can adjust labels in the 3D scene as well as in 2D images. It delivers the unique capability to ensure that annotations in LiDAR data are mirrored in 2D imagery, making the process more efficient.

With SageMaker Ground Truth, Velodyne LiDAR’s 3D point cloud data generated by a Velodyne LiDAR sensor mounted on a vehicle can be labeled for tracking moving objects. In this challenging use case, we can follow the trajectory of an object like a car or a pedestrian in a dynamic environment, while our point of reference is also moving. In this case, our point of reference is a car that is equipped with Velodyne LiDAR.

To perform this task, we walk through the following topics:

  • Velodyne technology
  • The dataset
  • Creating a labeling job
  • The point cloud sequence input manifest file
  • Building the sequence input manifest file
  • Labeling the category configuration file
  • Specifying the job resources
  • Completing a labeling job

Prerequisites

To implement the solution in this post, you must have the following prerequisites:

  • An AWS account for running the code.
  • An Amazon Simple Storage Service (Amazon S3) bucket you can write to. The bucket must be in the same Region as the SageMaker notebook instance. We can also define a valid S3 prefix. All the files related to this experiment are stored in that prefix of our bucket. We must attach the CORS policy to this bucket. For instructions, refer to Configuring cross-origin resource sharing (CORS). Enter the following policy in the CORS configuration editor:
<?xml version="1.0" encoding="UTF-8"?>
<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<CORSRule>
    <AllowedOrigin>*</AllowedOrigin>
    <AllowedMethod>GET</AllowedMethod>
    <AllowedMethod>HEAD</AllowedMethod>
    <AllowedMethod>PUT</AllowedMethod>
    <MaxAgeSeconds>3000</MaxAgeSeconds>
    <ExposeHeader>Access-Control-Allow-Origin</ExposeHeader>
    <AllowedHeader>*</AllowedHeader>
</CORSRule>
<CORSRule>
    <AllowedOrigin>*</AllowedOrigin>
    <AllowedMethod>GET</AllowedMethod>
</CORSRule>
</CORSConfiguration>

Velodyne technology

LiDAR can be divided into different categories, including scanning LiDAR and flash LiDAR. Conventionally scanning LiDAR uses mechanical rotation to spin the sensor for 360-degree detection. Velodyne, which invented the industry’s first 3D LiDAR, continues to innovate and launch new rotational products with cutting-edge technology. Velodyne’s Ultra Puck is a scanning LiDAR sensor that uses Velodyne’s patented surround view technology. It provides a full 360-degree environmental view to deliver accurate real-time 3D data. The Ultra Puck has a compact form factor and delivers the real-time object detection needed for safe navigation and reliable operation. With a combination of optimal power and high performance, this sensor provides distance and calibrated reflectivity measurements at all rotational angles. It’s an ideal solution for robotics, mapping, security, driver assistance, and autonomous navigation. Besides the LiDAR sensor itself, Velodyne has created the Vella Development Kit (VDK), a collection of tools, hardware, and documentation that facilitate access to the Velodyne’s autonomy software stack. The VDK can be configured for different custom interfaces and environments, providing you with a broad range of applications for increased autonomy and improved safety.

Additionally, the VDK can reduce the upfront work you would have to otherwise put in to enable an end-to-end data collection and annotation pipeline by providing the following necessary capabilities:

  • Clock synchronization between LiDAR, odometry, and camera frames
  • Calibration for LiDAR vehicle 5-DOF extrinsic calibration (z is not observable)
  • Calibration for LiDAR camera extrinsic, intrinsic, and distortion parameters
  • Collect motion compensated (intra-frame or multi-frame), synchronized LiDAR point clouds and camera images

To develop vehicle-based perception capabilities, Velodyne’s software team has set up their own data collection vehicle with one of their Ultra Puck LiDAR units, a camera and GPS/IMU sensors mounted to the vehicle hood. In the subsequent steps, we refer to their internal processes that use the VDK to prepare, collect, and annotate data needed to develop their vehicle-based perception capabilities as an example to other customers trying to solve their own perception use cases.

Clock synchronization

Accurate clock synchronization of the LiDAR, odometry, and camera outputs can be crucial for any multi-sensor application that combines those data streams. For best results, you should use a PTP synchronization system with a primary clock and support by all sensors. One advantage of PTP is the ability to synchronize multiple devices to high accuracy with a single timing source. Such a system can achieve synchronization accuracy better than 1 microsecond. Other solutions include PPS distribution and per-device time sources. As an alternative option, the VDK supports software synchronization utilizing time-of-arrival timestamping, which can be a great way to get an application off the ground quickly in the absence of proper clock synchronization infrastructure. This can result in timestamping errors on the order of 1–10 milliseconds due to a combination of latency and queuing delays at various levels of the network infrastructure and host operating system, which may or may not be acceptable, depending on the application.

LiDAR vehicle calibration

The LiDAR vehicle calibration estimates the extrinsic position of the LiDAR in vehicle frame along five axes. Z value is unobservable; therefore you must measure the z value independently. Our process is a targetless calibration technique but it works well in an environment where the ground is relatively flat, and the environment has contiguous static objects features rather than dynamic (vehicles, pedestrians) or non-contiguous (shrubs and bushes) features. Think of a parking lot with few obstacles and buildings with flat facades. The presence of geometric structures is ideal for improving the calibration quality. The user is required to drive in some predefined driving patterns indicated by the VDK to expose most of the parameters. One minute of data is sufficient for this calibration. After the data is uploaded to Veldoyne’s platform service, the calibration takes place on the cloud and the result is made available within 24 hours. For the purposes of this notebook, the calibration parameters have already been processed and provided.

The LiDAR dataset

The dataset and resources used in this notebook are provided by Velodyne. This dataset contains one continuous scene from an autonomous vehicle experiment driving around on a highway in California. The entire scene contains 60 frames. The dataset contents are as follows:

  • lidar_cam_calib_vlp32_06_10_2021.yaml – Camera calibration information, one camera only
  • images/ – Camera footage for each frame
  • poses/ – Pose JSON file containing LiDAR extrinsic matrix for each frame
  • rectified_scans_local/ – .pcb files in LiDAR sensor local coordinate system

Run the following code to download the dataset locally and then upload to your S3 bucket, which we defined in the initialization section:

source_bucket = 'velodyne-blog'
source_prefix = 'highway_data_07'
source_data = f's3://{source_bucket}/{source_prefix}'

!aws s3 cp $source_data ./$PREFIX --recursive
target_s3 = f's3://{BUCKET}/{PREFIX}'
!aws s3 cp ./$PREFIX $target_s3 –recursive

Create a labeling job

As the next step, we need to create a data labeling job in SageMaker Ground Truth. We select the task type as object tracking. For more information about 3D point cloud labeling task types, refer to 3D Point Cloud Task types. To create an object tracking point cloud labeling job, we need to add the following resources as the labeling job inputs:

  • Point cloud sequence input manifest – A JSON file defining the point cloud frame sequence and associated sensor fusion data. For more information, see Create a Point Cloud Sequence Input Manifest.
  • Input manifest file – The input file for the labeling job. Each line of the manifest file contains a link to a sequence file defined in the point cloud sequence input manifest.
  • Label category configuration file – This file is used to specify your labels, label category, frame attributes, and worker instructions. For more information, see Create a Labeling Category Configuration File with Label Category and Frame Attributes.
  • Predefined AWS resources – Includes the following:

    • Pre-annotation Lambda ARN – Refer to PreHumanTaskLambdaArn.
    • Annotation consolidation ARN – The AWS Lambda function used to consolidate labels from different workers. Refer to AnnotationConsolidationLambdaArn.
    • Workforce ARN – Defines which workforce type we want to use. Refer to Create and Manage Workforces for more details.
    • HumanTaskUiArn – Defines the worker UI template to do the labeling job. This should have a format similar to arn:aws:sagemaker:<region>:123456789012:human-task-ui/PointCloudObjectTracking.

Keep in mind the following:

  • There should not be an entry for the UiTemplateS3Uri parameter.
  • Your LabelAttributeName must end in -ref. For example, ot-labels-ref.
  • The number of workers specified in NumberOfHumanWorkersPerDataObject should be 1.
  • 3D point cloud labeling doesn’t support active learning, so we shouldn’t specify values for parameters in LabelingJobAlgorithmsConfig.
  • 3D point cloud object tracking labeling jobs can take multiple hours to complete. You should specify a longer time limit for these labeling jobs in TaskTimeLimitInSeconds (up to 7 days, or 604,800 seconds).
    #object tracking as our 3D Point Cloud Task Type. 
    task_type = "3DPointCloudObjectTracking"

Point cloud sequence input manifest file

The following of the most important steps to generating a sequence input manifest file:

  1. Convert the 3D points to a world coordinate system.
  2. Generate the sensor extrinsic matrix to enable the sensor fusion feature in SageMaker Ground Truth.

The LiDAR sensor is mounted on a moving vehicle (ego vehicle), which captures the data in its own frame of reference. To perform object tracking, we need to convert this data to a global frame of reference to account for the moving ego vehicle itself. This is the world coordinate system.

Sensor fusion is a feature in SageMaker Ground Truth that synchronizes the 3D point cloud frame side by side with the camera frame. This provides visual context for human labelers and allows labelers to adjust annotation in 3D and 2D images synchronously. For instructions on matrix transformation, refer to Labeling data for 3D object tracking and sensor fusion in Amazon SageMaker Ground Truth.

The generate_transformed_pcd_from_point_cloud function performs the coordinate translation and then generates the 3D point data file, which SageMaker Ground Truth can consume.

To translate the data from local/sensor global coordinate system, multiply each point in a 3D frame with the extrinsic matrix for the LiDAR sensor.

SageMaker Ground Truth renders the 3D point cloud data in either Compact Binary Pack (.bin) or ASCII (.txt) format. Files in these formats need to contain information about the location (x, y, and z coordinates) of all points that make up that frame, and, optionally, information about the pixel color of each point for colored point clouds (i, r, g, b).

To read more about SageMaker Ground Truth accepted raw 3D data formats, see Accepted Raw 3D Data Formats.

Build the sequence input manifest file

The next step is to build the point cloud sequence input manifest file. The steps listed in this section are also available in the notebook.

  1. Point the cloud data from the .pcd file, the LiDAR extrinsic matrix from the pose file, and the camera extrinsic, intrinsic, and distortion data from the camera calibration .yaml file.
  2. Perform a per-frame transform of the raw point cloud to the global frame of reference. Generate and store ASCII (.txt) for each frame to Amazon S3.
  3. Extract the ego vehicle pose from the LiDAR extrinsic matrix.
  4. Build a sensor position in the global coordinate system by extracting the camera pose from the camera inverse extrinsic matrix.
  5. Provide camera calibration parameters (such as distortion and skew).
  6. Build the array of data frames. Reference the ASCII file location, define the vehicle position in world coordinate system, and so on.
  7. Create the sequence manifest file sequence.json.
  8. Create our input manifest file. Each line identifies a single sequence file we just uploaded.

Label the category configuration file

Our label category configuration file is used to specify labels, or classes, for our labeling job. When we use the object detection or object tracking task types; we can also include label attributes in our label category configuration file. Workers can assign one or more attributes we provide to annotations to give more information about that object. For example, we may want to use the attribute occluded to have workers identify when an object is partially obstructed. Let’s look at an example of the label category configuration file for an object detection or object tracking labeling job:

label_category = {
  "categoryGlobalAttributes": [
    {
      "enum": [
        "75-100%",
        "25-75%",
        "0-25%"
      ],
      "name": "Visibility",
      "type": "string"
    }
  ],
  "documentVersion": "2020-03-01",
  "instructions": {
    "fullInstruction": "Draw a tight Cuboid. You only need to annotate those in the first frame. Please make sure the direction of the cubiod is accurately representative of the direction of the vehicle it bounds.",
    "shortInstruction": "Draw a tight Cuboid. You only need to annotate those in the first frame."
  },
  "labels": [
    {
      "categoryAttributes": [],
      "label": "Car"
    },
    {
      "categoryAttributes": [],
      "label": "Truck"
    },
    {
      "categoryAttributes": [],
      "label": "Bus"
    },
    {
      "categoryAttributes": [],
      "label": "Pedestrian"
    },
    {
      "categoryAttributes": [],
      "label": "Cyclist"
    },
    {
      "categoryAttributes": [],
      "label": "Motorcyclist"
    },
  ]
}

category_key = f'{PREFIX}/manifests_categories/label_category.json'
write_json_to_s3(label_category, BUCKET, category_key)

label_category_file = f's3://{BUCKET}/{category_key}'
print(f"label category file uri: {label_category_file}")

Specify the job resources

As the next step, we specify various labeling job resources:

  • Human task UI ARNHumanTaskUiArn is a resource that defines the worker task template used to render the worker UI and tools for the labeling job. This attribute is defined under UiConfig and the resource name is configured by Region and task type:

    human_task_ui_arn = (
        f"arn:aws:sagemaker:{region}:123456789012:human-task-ui/{task_type[2:]}"
    )

  • Work resource – In this example, we use private team resources. For instructions, refer to Create a Private Workforce (Amazon Cognito Console). When we’re done, we should put our resource ARN in the following parameter:

    workteam_arn = f"arn:aws:sagemaker:{region}:123456789012:workteam/private-crowd/test-team"#"<REPLACE W/ YOUR Private Team ARN>"

  • Pre-annotation Lambda ARN and post-annotation Lambda ARN – See the following code:

    ac_arn_map = {
        "us-west-2": "081040173940",
        "us-east-1": "432418664414",
        "us-east-2": "266458841044",
        "eu-west-1": "568282634449",
        "ap-northeast-1": "477331159723",
    }
    
    prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-{}".format(region, ac_arn_map[region], task_type)
    acs_arn = "arn:aws:lambda:{}:{}:function:ACS-{}".format(region, ac_arn_map[region], task_type)

  • HumanTaskConfig – We use this to specify our work team and configure our labeling job task. Feel free to update the task description in the following code:

    job_name = f"velodyne-blog-test-{str(time.time()).split('.')[0]}"
    
    # Task description info =================
    task_description = "Draw 3D boxes around required objects"
    task_keywords = ['lidar', 'pointcloud']
    task_title = job_name
    
    human_task_config = {
        "AnnotationConsolidationConfig": {
            "AnnotationConsolidationLambdaArn": acs_arn,
        },
        "WorkteamArn": workteam_arn,
        "PreHumanTaskLambdaArn": prehuman_arn,
        "MaxConcurrentTaskCount": 200,
        "NumberOfHumanWorkersPerDataObject": 1,  # One worker will work on each task
        "TaskAvailabilityLifetimeInSeconds": 18000, # Your workteam has 5 hours to complete all pending tasks.
        "TaskDescription": task_description,
        "TaskKeywords": task_keywords,
        "TaskTimeLimitInSeconds": 36000, # Each seq must be labeled within 1 hour.
        "TaskTitle": task_title,
        "UiConfig": {
            "HumanTaskUiArn": human_task_ui_arn,
        },
    }

Create the labeling job

Next, we create the labeling request, as shown in the following code:

labelAttributeName = f"{job_name}-ref" #must end with -ref

output_path = f"s3://{BUCKET}/{PREFIX}/output"

ground_truth_request = {
    "InputConfig" : {
      "DataSource": {
        "S3DataSource": {
          "ManifestS3Uri": manifest_uri,
        }
      },
      "DataAttributes": {
        "ContentClassifiers": [
          "FreeOfPersonallyIdentifiableInformation",
          "FreeOfAdultContent"
        ]
      },  
    },
    "OutputConfig" : {
      "S3OutputPath": output_path,
    },
    "HumanTaskConfig" : human_task_config,
    "LabelingJobName": job_name,
    "RoleArn": role, 
    "LabelAttributeName": labelAttributeName,
    "LabelCategoryConfigS3Uri": label_category_file,
    "Tags": [],
}

Finally, we create the labeling job:

sagemaker_client.create_labeling_job(**ground_truth_request)

Complete a labeling job

When our labeling job is ready, we can add ourselves to our private work team and experiment with the worker’s portal. We should receive an email with the portal link, our user name, and a temporary password. When we log in, we choose the labeling job from the list, and then we should see the worker’s portal like the following screenshot. (It may take a few minutes for a new labeling job to show up in the portal). More information on how to set up workers and instructions can be found here and here respectively.

When we’re are done with the labeling job, we can choose Submit, and then view the output data in the S3 output location we specified earlier.

Conclusion

In this post, we showed how we can create a 3D point cloud labeling job for object tracking for data captured using Velodyne’s LiDAR sensor. We followed the step-by-step instructions in this post and ran the provided code to create a SageMaker Ground Truth labeling job to label the 3D point cloud data. ML models can use the labels created with this job to train object detection, object recognition, and object tracking models commonly used in autonomous vehicle scenarios.

If you are interested in labeling 3D point cloud data captured via Velodyne’s LiDAR sensor, follow the steps in this article to label the data using Amazon SageMaker Ground Truth.


About the Authors

Sharath Nair leads the Computer Vision team that focusses on building perception algorithms for some of Velodyne’s software products like Object Detection & Tracking, Semantic Segmentation, SLAM, etc. Prior to Velodyne, Sharath worked on Autonomous Vehicles and Robotics and has been involved in this space for the past 6 years.

Oliver Monson is a Senior Data Operations Manager at Velodyne Lidar, responsible for the data pipelines and acquisition strategies that support the development of perception software. Prior to Velodyne, Oliver has managed operational teams executing on HD mapping, geospatial, and archaeological applications.

John Kua is Director of Software Engineering at Velodyne, overseeing the System Integration and Robotics, Vella Go, and Software Production teams. Prior to joining Velodyne, John spent over a decade building multimodal sensor platforms for a wide range of 3D localization and mapping applications in commercial and government applications. These platforms included a wide array of sensors including visible light, thermal, and hyperspectral cameras, lidar, GPS, IMUs, and even gamma-ray spectrometers and imagers.

Sally Frykman, Chief Marketing Officer at Velodyne, oversees the strategic development and execution of global marketing and communications programs that advance the company’s innovative vision and goals. Her multifaceted role encompasses a wide array of responsibilities, including promotion of the Velodyne brand, thought leadership development, and robust sales lead generation fueled by highly engaging digital marketing. Previously, Sally worked in public education and social work.

Nitin Wagh is Sr. Business Development Manager for Amazon AI. He likes the opportunity to help customers understand Machine Learning and  power of Augmented AI in AWS cloud. In his spare time, he loves spending time with family in outdoors activities.

James Wu is a Senior AI/ML Specialist Solution Architect at AWS. helping customers design and build AI/ML solutions. James’s work covers a wide range of ML use cases, with a primary interest in computer vision, deep learning, and scaling ML across the enterprise. Prior to joining AWS, James was an architect, developer, and technology leader for over 10 years, including 6 years in engineering and 4 years in marketing & advertising industries.

Farooq Sabir is a Senior Artificial Intelligence and Machine Learning Specialist Solutions Architect at AWS. He holds PhD and MS degrees in Electrical Engineering from The University of Texas at Austin and a MS in Computer Science from Georgia Institute of Technology. He has over 15 years of work experience and also likes to teach and mentor college students. At AWS, he helps customers formulate and solve their business problems in data science, machine learning, computer vision, artificial intelligence, numerical optimization and related domains. Based in Dallas, Texas, he and his family love to travel and make long road trips.

Read More