Build a news-based real-time alert system with Twitter, Amazon SageMaker, and Hugging Face

Today, social media is a huge source of news. Users rely on platforms like Facebook and Twitter to consume news. For certain industries such as insurance companies, first respondents, law enforcement, and government agencies, being able to quickly process news about relevant events occurring can help them take action while these events are still unfolding.

It’s not uncommon for organizations trying to extract value from text data to look for a solution that doesn’t involve the training of a complex NLP (natural language processing) model. For those organizations, using a pre-trained NLP model is more practical. Furthermore, if the chosen model doesn’t satisfy their success metrics, organizations want to be able to easily pick another model and reassess.

At present, it’s easier than ever to extract information from text data thanks to the following:

  • The rise of state-of-the art, general-purpose NLP architectures such as transformers
  • The ability that developers and data scientists have to quickly build, train, and deploy machine learning (ML) models at scale on the cloud with services like Amazon SageMaker
  • The availability of thousands of pre-trained NLP models in hundreds of languages and with support for multiple frameworks provided by the community in platforms like Hugging Face Hub

In this post, we show you how to build a real-time alert system that consumes news from Twitter and classifies the tweets using a pre-trained model from the Hugging Face Hub. You can use this solution for zero-shot classification, meaning you can classify tweets at virtually any set of categories, and deploy the model with SageMaker for real-time inference.

Alternatively, if you’re looking for insights into your customer’s conversations and deepen brand awareness by analyzing social media interactions, we encourage you to check out the AI-Driven Social Media Dashboard. The solution uses Amazon Comprehend, a fully managed NLP service that uncovers valuable insights and connections in text without requiring machine learning experience.

Zero-shot learning

The fields of NLP and natural language understanding (NLU) have rapidly evolved to address use cases involving text classification, question answering, summarization, text generation, and more. This evolution has been possible, in part, thanks to the rise of state-of-the art, general-purpose architectures such as transformers, but also the availability of more and better-quality text corpora available for the training of such models.

The transformer architecture is a complex neural network that requires domain expertise and a huge amount of data in order to be trained from scratch. A common practice is to take a pre-trained state-of-the-art transformer like BERT, RoBERTa, T5, GPT-2, or DistilBERT and fine-tune (transfer learning) the model to a specific use case.

Nevertheless, even performing transfer learning on a pre-trained NLP model can often be a challenging task, requiring large amounts of labeled text data and a team of experts to curate the data. This complexity prevents most organizations from using these models effectively, but zero-shot learning helps ML practitioners and organizations overcome this shortcoming.

Zero-shot learning is a specific ML task in which a classifier learns on one set of labels during training, and then during inference is evaluated on a different set of labels that the classifier has never seen before. In NLP, you can use a zero-shot sequence classifier trained on a natural language inference (NLI) task to classify text without any fine-tuning. In this post, we use the popular NLI BART model bart-large-mnli to classify tweets. This is a large pre-trained model (1.6 GB), available on the Hugging Face model hub.

Hugging Face is an AI company that manages an open-source platform (Hugging Face Hub) with thousands of pre-trained NLP models (transformers) in more than 100 different languages and with support for different frameworks such as TensorFlow and PyTorch. The transformers library helps developers and data scientists get started in complex NLP and NLU tasks such as classification, information extraction, question answering, summarization, translation, and text generation.

AWS and Hugging Face have been collaborating to simplify and accelerate the adoption of NLP models. A set of Deep Learning Containers (DLCs) for training and inference in PyTorch or TensorFlow, and Hugging Face estimators and predictors for the SageMaker Python SDK are now available. These capabilities help developers with all levels of expertise get started with NLP easily.

Overview of solution

We provide a working solution that fetches tweets in real time from selected Twitter accounts. For the demonstration of our solution, we use three accounts, Amazon Web Services (@awscloud), AWS Security (@AWSSecurityInfo), and Amazon Science (@AmazonScience), and classify their content into one of the following categories: security, database, compute, storage, and machine learning. If the model returns a category with a confidence score greater than 40%, a notification is sent.

In the following example, the model classified a tweet from Amazon Web Services in the machine learning category, with a confidence score of 97%, generating an alert.
Outline of the solution
The solution relies on a Hugging Face pre-trained transformer model (from the Hugging Face Hub) to classify tweets based on a set of labels that are provided at inference time—the model doesn’t need to be trained. The following screenshots show more examples and how they were classified.
Some relevant examples
We encourage you to try the solution for yourself. Simply download the source code from the GitHub repository and follow the deployment instructions in the README file.

Solution architecture

The solution keeps an open connection to Twitter’s endpoint and, when a new tweet arrives, sends a message to a queue. A consumer reads messages from the queue, calls the classification endpoint, and, depending on the results, notifies the end user.

The following is the architecture diagram of the solution.
Scope of the solution
The solution workflow consists of the following components:

  1. The solution relies on Twitter’s Stream API to get tweets that match the configured rules (tweets from the accounts of interest) in real time. To do so, an application running inside a container keeps an open connection to Twitter’s endpoint. Refer to Twitter API for more details.
  2. The container runs on Amazon Elastic Container Service (Amazon ECS), a fully managed container orchestration service that makes it easy for you to deploy, manage, and scale containerized applications. A single task runs on a serverless infrastructure managed by AWS Fargate.
  3. The Twitter Bearer token is securely stored in AWS Systems Manager Parameter Store, a capability of AWS Systems Manager that provides secure, hierarchical storage for configuration data and secrets. The container image is hosted on Amazon Elastic Container Registry (Amazon ECR), a fully managed container registry offering high-performance hosting.
  4. Whenever a new tweet arrives, the container application puts the tweet into an Amazon Simple Queue Service (Amazon SQS) queue. Amazon SQS is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications.
  5. The logic of the solution resides in an AWS Lambda function. Lambda is a serverless, event-driven compute service. The function consumes new tweets from the queue and classifies them by calling an endpoint.
  6. The endpoint relies on a Hugging Face model and is hosted on SageMaker. The endpoint runs the inference and outputs the class of the tweet.
  7. Depending on the classification, the function generates a notification through Amazon Simple Notification Service (Amazon SNS), a fully managed messaging service. You can subscribe to the SNS topic, and multiple destinations can receive that notification (see Amazon SNS event destinations). For instance, you can deliver the notification to inboxes as email messages (see Email notifications).

Deploy Hugging Face models with SageMaker

You can select any of the over 10,000 publicly available models from the Hugging Face Model Hub and deploy them with SageMaker by using Hugging Face Inference DLCs.

When using AWS CloudFormation, you select one of the publicly available Hugging Face Inference Containers and configure the model and the task. This solution uses the facebook/bart-large-mnli model and the zero-shot-classification task, but you can choose any of the models under Zero-Shot Classification on the Hugging Face Model Hub. You configure those by setting the HF_MODEL_ID and HF_TASK environment variables in your CloudFormation template, as in the following code:

SageMakerModel:
  Type: AWS::SageMaker::Model
  Properties:
    ExecutionRoleArn: !GetAtt SageMakerModelRole.Arn
    PrimaryContainer:
      Image: 763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-inference:1.7-transformers4.6-cpu-py36-ubuntu18.04
      Environment:
        HF_MODEL_ID: facebook/bart-large-mnli
        HF_TASK: zero-shot-classification
        SAGEMAKER_CONTAINER_LOG_LEVEL: 20
        SAGEMAKER_REGION: us-east-1

Alternatively, if you’re not using AWS CloudFormation, you can achieve the same results with few lines of code. Refer to Deploy models to Amazon SageMaker for more details.

To classify the content, you just call the SageMaker endpoint. The following is a Python code snippet:

endpoint_name = os.environ['ENDPOINT_NAME']
labels = os.environ['ENDPOINT_NAME']

data = {
    'inputs': tweet,
    'parameters': {
        'candidate_labels': labels,
        'multi_class': False
    }
}

response = sagemaker.invoke_endpoint(EndpointName=endpoint_name,
                                     ContentType='application/json',
                                     Body=json.dumps(data))

response_body = json.loads(response['Body'].read())

Note the False value for the multi_class parameter to indicate that the sum of all the probabilities for each class will add up to 1.

Solution improvements

You can enhance the solution proposed here by storing the tweets and the model results. Amazon Simple Storage Service (Amazon S3), an object storage service, is one option. You can write tweets, results, and other metadata as JSON objects into an S3 bucket. You can then perform ad hoc queries against that content using Amazon Athena, an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL.

You can use the history not only to extract insights but also to train a custom model. You can use Hugging Face support to train a model with your own data with SageMaker. Learn more on Run training on Amazon SageMaker.

Real-world use cases

Customers are already experimenting with Hugging Face models on SageMaker. Seguros Bolívar, a Colombian financial and insurance company founded in 1939, is an example.

“We developed a threat notification solution for customers and insurance brokers. We use Hugging Face pre-trained NLP models to classify tweets from relevant accounts to generate notifications for our customers in near-real time as a prevention strategy to help mitigate claims. A claim occurs because customers are not aware of the level of risk they are exposed to. The solution allows us to generate awareness in our customers, turning risk into something measurable in concrete situations.”

– Julian Rico, Chief of Research and Knowledge at Seguros Bolívar.

Seguros Bolívar worked with AWS to re-architecture their solution; it now relies on SageMaker and resembles the one described in this post.

Conclusion

Zero-shot classification is ideal when you have little data to train a custom text classifier or when you can’t afford to train a custom NLP model. For specialized use cases, when text is based on specific words or terms, it’s better to go with a supervised classification model based on a custom training set.

In this post, we showed you how to build a news classifier using a Hugging Face zero-shot model on AWS. We used Twitter as our news source, but you can choose a news source that is more suitable to your specific needs. Furthermore, you can easily change the model, just specify your chosen model in the CloudFormation template.

For the source code, refer to the GitHub repository It includes the full setup instructions. You can clone, change, deploy, and run it yourself. You can also use it as a starting point and customize the categories and the alert logic or build another solution for a similar use case.

Please give it a try, and let us know what you think. As always, we’re looking forward to your feedback. You can send it to your usual AWS Support contacts, or in the AWS Forum for SageMaker.


About the authors

David Laredo is a Prototyping Architect at AWS Envision Engineering in LATAM, where he has helped develop multiple machine learning prototypes. Previously he has worked as a Machine Learning Engineer and has been doing machine learning for over 5 years. His areas of interest are NLP, time series, and end-to-end ML.

Rafael Werneck is a Senior Prototyping Architect at AWS Envision Engineering, based in Brazil. Previously, he worked as a Software Development Engineer on Amazon.com.br and Amazon RDS Performance Insights.

Vikram Elango is an AI/ML Specialist Solutions Architect at Amazon Web Services, based in Virginia, USA. Vikram helps financial and insurance industry customers with design and thought leadership to build and deploy machine learning applications at scale. He is currently focused on natural language processing, responsible AI, inference optimization, and scaling ML across the enterprise. In his spare time, he enjoys traveling, hiking, cooking, and camping with his family.

Read More

Achieve enterprise-grade monitoring for your Amazon SageMaker models using Fiddler

This is a guest blog post by Danny Brock, Rajeev Govindan and Krishnaram Kenthapadi at Fiddler AI.

Your Amazon SageMaker models are live. They’re handling millions of inferences each day and driving better business outcomes for your company. They’re performing exactly as well as the day they were launched.

Er, wait. Are they? Maybe. Maybe not.

Without enterprise-class model monitoring, your models may be decaying in silence. Your machine learning (ML) teams may never know that these models have actually morphed from miracles of revenue generation to liabilities making incorrect decisions that cost your company time and money.

Don’t fret. The solution is closer than you think.

Fiddler, an enterprise-class Model Performance Management solution available on the AWS Marketplace, offers model monitoring and explainable AI to help ML teams inspect and address a comprehensive range of model issues. Through model monitoring, model explainability, analytics, and bias detection, Fiddler provides your company with an easy-to-use single pane of glass to ensure your models are behaving as they should. And if they’re not, Fiddler also provides features that allow you to inspect your models to find the underlying root causes of performance decay.

This post shows how your MLOps team can improve data scientist productivity and reduce time to detect issues for your models deployed in SageMaker by integrating with the Fiddler Model Performance Management Platform in a few simple steps.

Solution overview

The following reference architecture highlights the primary points of integration. Fiddler exists as a “sidecar” to your existing SageMaker ML workflow.

The remainder of this post walks you through the steps to integrate your SageMaker model with Fiddler’s Model Performance Management Platform:

  1. Ensure your model has data capture enabled.
  2. Create a Fiddler trial environment.
  3. Register information about your model in your Fiddler environment.
  4. Create an AWS Lambda function to publish SageMaker inferences to Fiddler.
  5. Explore Fiddler’s monitoring capabilities in your Fiddler trial environment.

Prerequisites

This post assumes that you have set up SageMaker and deployed a model endpoint. To learn how to configure SageMaker for model serving, refer to Deploy Models for Inference. Some examples are also available on the GitHub repo.

Ensure your model has data capture enabled

On the SageMaker console, navigate to your model’s serving endpoint and ensure you have enabled data capture into an Amazon Simple Storage Service (Amazon S3) bucket. This stores the inferences (requests and responses) your model makes each day as JSON lines files (.jsonl) in Amazon S3.

Create a Fiddler trial environment

From the fiddler.ai website, you can request a free trial. After filling out a quick form, Fiddler will contact you to understand the specifics of your model performance management needs and will have a trial environment ready for you in a few hours. You can expect a dedicated environment like https://yourcompany.try.fiddler.ai.

Register information about your model in your Fiddler environment

Before you can begin publishing events from your SageMaker hosted model into Fiddler, you need to create a project within your Fiddler trial environment and provide Fiddler details about your model through a step called model registration. If you want to use a preconfigured notebook from within Amazon SageMaker Studio rather than copy and paste the following code snippets, you can reference the Fiddler quickstart notebook on GitHub. Studio provides a single web-based visual interface where you can perform all ML development steps.

First, you must install the Fiddler Python client in your SageMaker notebook and instantiate the Fiddler client. You can get the AUTH_TOKEN from the Settings page in your Fiddler trial environment.

# Install the fiddler client
!pip install fiddler-client

# Connect to the Fiddler Trial Environment
import fiddler as fdl
import pandas as pd

fdl.__version__

URL = 'https://yourcompany.try.fiddler.ai'
ORG_ID = 'yourcompany'
AUTH_TOKEN = 'UUID-Token-Here-Found-In-Your-Fiddler-Env-Settings-Page'

client = fdl.FiddlerApi(URL, ORG_ID, AUTH_TOKEN)

Next, create a project within your Fiddler trial environment:

# Create Project
PROJECT_ID = 'credit_default'  # update this with your project name
DATASET_ID = f'{PROJECT_ID}_dataset'
MODEL_ID = f'{PROJECT_ID}_model'

client.create_project(PROJECT_ID)

Now upload your training dataset. The notebook also provides a sample dataset to run Fiddler’s explainability algorithms and as a baseline for monitoring metrics. The dataset is also used to generate the schema for this model in Fiddler.

# Upload Baseline Dataset
df_baseline = pd.read_csv(‘<your-training-file.csv>')

dataset_info = fdl.DatasetInfo.from_dataframe(df_baseline, max_inferred_cardinality=1000)

upload_result = client.upload_dataset(PROJECT_ID,
                                      dataset={'baseline': df_baseline},
                                      dataset_id=DATASET_ID,
                                      info=dataset_info)

Lastly, before you can start publishing inferences to Fiddler for monitoring, root cause analysis, and explanations, you need to register your model. Let’s first create a model_info object that contains the metadata about your model:

# Update task from the list below if your model task is not binary classification
model_task = 'binary' 

if model_task == 'regression':
    model_task_fdl = fdl.ModelTask.REGRESSION
    
elif model_task == 'binary':
    model_task_fdl = fdl.ModelTask.BINARY_CLASSIFICATION

elif model_task == 'multiclass':
    model_task_fdl = fdl.ModelTask.MULTICLASS_CLASSIFICATION

elif model_task == 'ranking':
    model_task_fdl = fdl.ModelTask.RANKING

    
# Specify column types|
target = 'TARGET'
outputs = ['prediction']  # change this to your target variable
features = [‘<add your feature list here>’]
     
# Generate ModelInfo
model_info = fdl.ModelInfo.from_dataset_info(
    dataset_info=dataset_info,
    dataset_id=DATASET_ID,
    model_task=model_task_fdl,
    target=target,
    outputs=outputs,
    features=features,
    binary_classification_threshold=.125,  # update this if your task is not a binary classification
    description='<model-description>',
    display_name='<model-display-name>'
)
model_info

Then you can register the model using your new model_info object:

# Register Info about your model with Fiddler
client.register_model(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    model_id=MODEL_ID,
    model_info=model_info
)

Great! Now you can publish some events to Fiddler in order to observe the model’s performance.

Create a Lambda function to publish SageMaker inferences to Fiddler

With the simple-to-deploy serverless architecture of Lambda, you can quickly build the mechanism required to move your inferences from the S3 bucket you set up earlier into your newly provisioned Fiddler trial environment. This Lambda function is responsible for opening any new JSONL event log files in your model’s S3 bucket, parsing and formatting the JSONL content into a dataframe, and then publishing that dataframe of events to your Fiddler trial environment. The following screenshot shows the code details of our function.

The Lambda function needs to be configured to trigger off of newly created files in your S3 bucket. The following tutorial guides you through creating an Amazon EventBridge trigger that invokes the Lambda function whenever a file is uploaded to Amazon S3. The following screenshot shows our function’s trigger configuration. This makes it simple to ensure that any time your model makes new inferences, those events stored in Amazon S3 are loaded into Fiddler to drive the model observability your company needs.

To simplify this further, the code for this Lambda function is publicly available from Fiddler’s documentation site. This code example currently works for binary classification models with structured inputs. If you have model types with different features or tasks, please contact Fiddler for assistance with minor changes to the code.

The Lambda function needs to make reference to the Fiddler Python client. Fiddler has created a publicly available Lambda layer that you can reference to ensure that the import fiddler as fdl step works seamlessly. You can reference this layer via an ARN in the us-west-2 Region: arn:aws:lambda:us-west-2:079310353266:layer:fiddler-client-0814:1, as shown in the following screenshot.

You also need to specify Lambda environment variables so the Lambda function knows how to connect to your Fiddler trial environment, and what the inputs and outputs are within the .jsonl files being captured by your model. The following screenshot shows a list of the required environment variables, which are also on Fiddler’s documentation site. Update the values for the environment variables to match your model and dataset.

Explore Fiddler’s monitoring capabilities in your Fiddler trial environment

You’ve done it! With your baseline data, model, and traffic connected, you can now explain data drift, outliers, model bias, data issues, and performance blips, and share dashboards with others. Complete your journey by watching a demo about the model performance management capabilities you have introduced to your company.

The example screenshots below provide a glimpse of model insights like drift, outlier detection, local point explanations, and model analytics that will be found in your Fiddler trial environment.

Conclusion

This post highlighted the need for enterprise-class model monitoring and showed how you can integrate your models deployed in SageMaker with the Fiddler Model Performance Management Platform in just a few steps. Fiddler offers functionality for model monitoring, explainable AI, bias detection, and root cause analysis, and is available on the AWS Marketplace. By providing your MLOps team with an easy-to-use single pane of glass to ensure your models are behaving as expected and to identify the underlying root causes of performance degradation, Fiddler can help improve data scientist productivity and reduce time to detect and resolve issues.

If you would like to learn more about Fiddler please visit fiddler.ai or if you would prefer to set up a personalized demo and technical discussion email sales@fiddler.ai.


About the Authors

Danny Brock is a Sr Solutions Engineer at Fiddler AI. Danny is long tenured in the analytics and ML space, running presales and post-sales teams for startups like Endeca and Incorta. He founded his own big data analytics consulting company, Branchbird, in 2012.

Rajeev Govindan is a Sr Solutions Engineer at Fiddler AI. Rajeev has extensive experience in sales engineering and software development at several enterprise companies, including AppDynamics.

Krishnaram Kenthapadi is the Chief Scientist of Fiddler AI. Previously, he was a Principal Scientist at Amazon AWS AI, where he led the fairness, explainability, privacy, and model understanding initiatives in the Amazon AI platform, and prior to that, he held roles at LinkedIn AI and Microsoft Research. Krishnaram received his PhD in Computer Science from Stanford University in 2006.

Read More

Track your ML experiments end to end with Data Version Control and Amazon SageMaker Experiments

Data scientists often work towards understanding the effects of various data preprocessing and feature engineering strategies in combination with different model architectures and hyperparameters. Doing so requires you to cover large parameter spaces iteratively, and it can be overwhelming to keep track of previously run configurations and results while keeping experiments reproducible.

This post walks you through an example of how to track your experiments across code, data, artifacts, and metrics by using Amazon SageMaker Experiments in conjunction with Data Version Control (DVC). We show how you can use DVC side by side with Amazon SageMaker processing and training jobs. We train different CatBoost models on the California housing dataset from the StatLib repository, and change holdout strategies while keeping track of the data version with DVC. In each individual experiment, we track input and output artifacts, code, and metrics using SageMaker Experiments.

SageMaker Experiments

SageMaker Experiments is an AWS service for tracking machine learning (ML) experiments. The SageMaker Experiments Python SDK is a high-level interface to this service that helps you track experiment information using Python.

The goal of SageMaker Experiments is to make it as simple as possible to create experiments, populate them with trials, add tracking and lineage information, and run analytics across trials and experiments.

When discussing SageMaker Experiments, we refer to the following concepts:

  • Experiment – A collection of related trials. You add trials to an experiment that you want to compare together.
  • Trial – A description of a multi-step ML workflow. Each step in the workflow is described by a trial component.
  • Trial component – A description of a single step in an ML workflow, such as data cleaning, feature extraction, model training, or model evaluation.
  • Tracker – A Python context manager for logging information about a single trial component (for example, parameters, metrics, or artifacts).

Data Version Control

Data Version Control (DVC) is a new type of data versioning, workflow, and experiment management software that builds upon Git (although it can work standalone). DVC reduces the gap between established engineering toolsets and data science needs, allowing you to take advantage of new features while reusing existing skills and intuition.

Data science experiment sharing and collaboration can be done through a regular Git flow (commits, branching, tagging, pull requests) the same way it works for software engineers. With Git and DVC, data science and ML teams can version experiments, manage large datasets, and make projects reproducible.

DVC has the following features:

  • DVC is a free, open-source command line tool.
  • DVC works on top of Git repositories and has a similar command line interface and flow as Git. DVC can also work standalone, but without versioning capabilities.
  • Data versioning is enabled by replacing large files, dataset directories, ML models, and so on with small metafiles (easy to handle with Git). These placeholders point to the original data, which is decoupled from source code management.
  • You can use on-premises or cloud storage to store the project’s data separate from its code base. This is how data scientists can transfer large datasets or share a GPU-trained model with others.
  • DVC makes data science projects reproducible by creating lightweight pipelines using implicit dependency graphs, and by codifying the data and artifacts involved.
  • DVC is platform agnostic. It runs on all major operating systems (Linux, macOS, and Windows), and works independently of the programming languages (Python, R, Julia, shell scripts, and so on) or ML libraries (Keras, TensorFlow, PyTorch, Scipy, and more) used in the project.
  • DVC is quick to install and doesn’t require special infrastructure, nor does it depend on APIs or external services. It’s a standalone CLI tool.

SageMaker Experiments and DVC sample

The following GitHub sample shows how to use DVC within the SageMaker environment. In particular, we look at how to build a custom image with DVC libraries installed by default to provide a consistent development environment to your data scientists in Amazon SageMaker Studio, and how to run DVC alongside SageMaker managed infrastructure for processing and training. Furthermore, we show how to enrich SageMaker tracking information with data versioning information from DVC, and visualize them within the Studio console.

The following diagram illustrates the solution architecture and workflow.Solution architecture and workflow

Build a custom Studio image with DVC already installed

In this GitHub repository, we explain how to create a custom image for Studio that has DVC already installed. The advantage of creating an image and making it available to all Studio users is that it creates a consistent environment for the Studio users, which they could also run locally. Although the sample is based on AWS Cloud9, you can also build the container on your local machine as long as you have Docker installed and running. This sample is based on the following Dockerfile and environment.yml. The resulting Docker image is stored in Amazon Elastic Container Registry (Amazon EMR) in your AWS account. See the following code:

# Login to ECR
aws --region ${REGION} ecr get-login-password | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom

# Create the ECR repository
aws --region ${REGION} ecr create-repository --repository-name smstudio-custom

# Build the image - it might take a few minutes to complete this step
docker build . -t ${IMAGE_NAME} -t ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom:${IMAGE_NAME}

# Push the image to ECR
docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/smstudio-custom:${IMAGE_NAME}

You can now create a new Studio domain or update an existing Studio domain that has access to the newly created Docker image.

We use AWS Cloud Development Kit (AWS CDK) to create the following resources via AWS CloudFormation:

  • A SageMaker execution role with the right permissions to your new or existing Studio domain
  • A SageMaker image and SageMaker image version from the Docker image conda-env-dvc-kernel that we created earlier
  • An AppImageConfig that specifies how the kernel gateway should be configured
  • A Studio user (data-scientist-dvc) with the correct SageMaker execution role and the custom Studio image available to it

For detailed instructions, refer to Associate a custom image to SageMaker Studio.

Run the lab

To run the lab, complete the following steps:

  1. In the Studio domain, launch Studio for the data-scientist-dvc user.
  2. Choose the Git icon, then choose Clone a Repository.
    Clone a Repository
  3. Enter the URL of the repository (https://github.com/aws-samples/amazon-sagemaker-experiments-dvc-demo) and choose Clone.Clone a repo button
  4. In the file browser, choose the amazon-sagemaker-experiments-dvc-demo repository.
  5. Open the dvc_sagemaker_script_mode.ipynb notebook.
  6. For Custom Image, choose the image conda-env-dvc-kernel.
  7. Choose Select.
    conda-env-dvc-kernel

Configure DVC for data versioning

We create a subdirectory where we prepare the data: sagemaker-dvc-sample. Within this subdirectory, we initialize a new Git repository and set the remote to a repository we create in AWS CodeCommit. The goal is to have DVC configurations and files for data tracking versioned in this repository. However, Git offers native capabilities to manage subprojects via, for example, git submodules and git subtrees, and you can extend this sample to use any of the aforementioned tools that best fit your workflow.

The main advantage of using CodeCommit with SageMaker in our case is its integration with AWS Identity and Access Management (IAM) for authentication and authorization, meaning we can use IAM roles to push and pull data without the need to fetch credentials (or SSH keys). Setting the appropriate permissions on the SageMaker execution role also allows the Studio notebook and the SageMaker training and processing job to interact securely with CodeCommit.

Although you can replace CodeCommit with any other source control service, such as GitHub, Gitlab, or Bitbucket, you need consider how to handle the credentials for your system. One possibility is to store these credentials on AWS Secrets Manager and fetch them at run time from the Studio notebook as well as from the SageMaker processing and training jobs.

Init DVC

Process and train with DVC and SageMaker

In this section, we explore two different approaches to tackle our problem and how we can keep track of the two tests using SageMaker Experiments according to the high-level conceptual architecture we showed you earlier.

Set up a SageMaker experiment

To track this test in SageMaker, we need to create an experiment. We need to also define the trial within the experiment. For the sake of simplicity, we just consider one trial for the experiment, but you can have any number of trials within an experiment, for example, if you want to test different algorithms.

We create an experiment named DEMO-sagemaker-experiments-dvc with two trials, dvc-trial-single-file and dvc-trial-multi-files, each representing a different version of the dataset.

Let’s create the DEMO-sagemaker-experiments-dvc experiment:

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

experiment_name = 'DEMO-sagemaker-experiments-dvc'

# create the experiment if it doesn't exist
try:
    my_experiment = Experiment.load(experiment_name=experiment_name)
    print("existing experiment loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_experiment = Experiment.create(
            experiment_name = experiment_name,
            description = "How to integrate DVC"
        )
        print("new experiment created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Test 1: Generate single files for training and validation

In this section, we create a processing script that fetches the raw data directly from Amazon Simple Storage Service (Amazon S3) as input; processes it to create the train, validation, and test datasets; and stores the results back to Amazon S3 using DVC. Furthermore, we show how you can track output artifacts generated by DVC with SageMaker when running processing and training jobs and via SageMaker Experiments.

First, we create the dvc-trial-single-file trial and add it to the DEMO-sagemaker-experiments-dvc experiment. By doing so, we keep all trial components related to this test organized in a meaningful way.

first_trial_name = "dvc-trial-single-file"

try:
    my_first_trial = Trial.load(trial_name=first_trial_name)
    print("existing trial loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_first_trial = Trial.create(
            experiment_name=experiment_name,
            trial_name=first_trial_name,
        )
        print("new trial created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Use DVC in a SageMaker processing job to create the single file version

In this section, we create a processing script that gets the raw data directly from Amazon S3 as input using the managed data loading capability of SageMaker; processes it to create the train, validation, and test datasets; and stores the results back to Amazon S3 using DVC. It’s very important to understand that when using DVC to store data to Amazon S3 (or pull data from Amazon S3), we’re losing SageMaker managed data loading capabilities, which can potentially have an impact on performance and costs of our processing and training jobs, especially when working with very large datasets. For more information on the different SageMaker native input mode capabilities, refer to Access Training Data.

Finally, we unify DVC tracking capabilities with SageMaker tracking capabilities when running processing jobs via SageMaker Experiments.

The processing script expects the address of the Git repository and the branch we want to create to store the DVC metadata passed via environmental variables. The datasets themselves are stored in Amazon S3 by DVC. Although environmental variables are automatically tracked in SageMaker Experiments and visible in the trial component parameters, we might want to enrich the trial components with further information, which then become available for visualization in the Studio UI using a tracker object. In our case, the trial components parameters include the following:

  • DVC_REPO_URL
  • DVC_BRANCH
  • USER
  • data_commit_hash
  • train_test_split_ratio

The preprocessing script clones the Git repository; generates the train, validation, and test datasets; and syncs it using DVC. As mentioned earlier, when using DVC, we can’t take advantage of native SageMaker data loading capabilities. Aside from the performance penalties we might suffer on large datasets, we also lose the automatic tracking capabilities for the output artifacts. However, thanks to the tracker and the DVC Python API, we can compensate for these shortcomings, retrieve such information at run time, and store it in the trial component with little effort. The added value by doing so is to have in single view of the input and output artifacts that belong to this specific processing job.

The full preprocessing Python script is available in the GitHub repo.

with Tracker.load() as tracker:
    tracker.log_parameters({"data_commit_hash": commit_hash})
    for file_type in file_types:
        path = dvc.api.get_url(
            f"{data_path}/{file_type}/california_{file_type}.csv",
            repo=dvc_repo_url,
            rev=dvc_branch
        )
        tracker.log_output(name=f"california_{file_type}",value=path)

SageMaker gives us the possibility to run our processing script on container images managed by AWS that are optimized to run on the AWS infrastructure. If our script requires additional dependencies, we can supply a requirements.txt file. When we start the processing job, SageMaker uses pip-install to install all the libraries we need (for example, DVC-related libraries). If you need to have a tighter control of all libraries installed on the containers, you can bring your own container in SageMaker, for example for processing and training.

We have now all the ingredients to run our SageMaker processing job:

  • A processing script that can process several arguments (--train-test-split-ratio) and two environmental variables (DVC_REPO_URL and DVC_BRANCH)
  • A requiremets.txt file
  • A Git repository (in CodeCommit)
  • A SageMaker experiment and trial
from sagemaker.processing import FrameworkProcessor, ProcessingInput
from sagemaker.sklearn.estimator import SKLearn

dvc_repo_url = "codecommit::{}://sagemaker-dvc-sample".format(region)
dvc_branch = my_first_trial.trial_name

script_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version='0.23-1',
    instance_count=1,
    instance_type='ml.m5.xlarge',
    env={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "USER": "sagemaker"
    },
    role=role
)

experiment_config={
    "ExperimentName": my_experiment.experiment_name,
    "TrialName": my_first_trial.trial_name
}

We then run the processing job with the preprocessing-experiment.py script, experiment_config, dvc_repo_url, and dvc_branch we defined earlier.

%%time

script_processor.run(
    code='./source_dir/preprocessing-experiment.py',
    dependencies=['./source_dir/requirements.txt'],
    inputs=[ProcessingInput(source=s3_data_path, destination="/opt/ml/processing/input")],
    experiment_config=experiment_config,
    arguments=["--train-test-split-ratio", "0.2"]
)

The processing job takes approximately 5 minutes to complete. Now you can view the trial details for the single file dataset.

The following screenshot shows where you can find the stored information within Studio. Note the values for dvc-trial-single-file in DVC_BRANCH, DVC_REPO_URL, and data_commit_hash on the Parameters tab.

SageMaker Experiments parameters tab

Also note the input and output details on the Artifacts tab.

SageMaker Experiments artifacts tab

Create an estimator and fit the model with single file data version

To use DVC integration inside a SageMaker training job, we pass a dvc_repo_url and dvc_branch as environmental variables when you create the Estimator object.

We train on the dvc-trial-single-file branch first.

When pulling data with DVC, we use the following dataset structure:

dataset
    |-- train
    |   |-- california_train.csv
    |-- test
    |   |-- california_test.csv
    |-- validation
    |   |-- california_validation.csv

Now we create a Scikit-learn Estimator using the SageMaker Python SDK. This allows us to specify the following:

  • The path to the Python source file, which should be run as the entry point to training.
  • The IAM role that controls permissions for accessing Amazon S3 and CodeCommit data and running SageMaker functions.
  • A list of dictionaries that define the metrics used to evaluate the training jobs.
  • The number and type of training instances. We use one ml.m5.large instance.
  • Hyperparameters that are used for training.
  • Environment variables to use during the training job. We use DVC_REPO_URL, DVC_BRANCH, and USER.
metric_definitions = [{'Name': 'median-AE', 'Regex': "AE-at-50th-percentile: ([0-9.]+).*$"}]

hyperparameters={ 
        "learning_rate" : 1,
        "depth": 6
    }
estimator = SKLearn(
    entry_point='train.py',
    source_dir='source_dir',
    role=role,
    metric_definitions=metric_definitions,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type='ml.m5.large',
    framework_version='0.23-1',
    base_job_name='training-with-dvc-data',
    environment={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "USER": "sagemaker"
    }
)

experiment_config={
    "ExperimentName": my_experiment.experiment_name,
    "TrialName": my_first_trial.trial_name
}

We call the fit method of the Estimator with the experiment_config we defined earlier to start the training.

%%time
estimator.fit(experiment_config=experiment_config)

The training job takes approximately 5 minutes to complete. The logs show those lines, indicating the files pulled by DVC:

Running dvc pull command
A       train/california_train.csv
A       test/california_test.csv
A       validation/california_validation.csv
3 files added and 3 files fetched
Starting the training.
Found train files: ['/opt/ml/input/data/dataset/train/california_train.csv']
Found validation files: ['/opt/ml/input/data/dataset/train/california_train.csv']

Test 2: Generate multiple files for training and validation

We create a new dvc-trial-multi-files trial and add it to the current DEMO-sagemaker-experiments-dvc experiment.

second_trial_name = "dvc-trial-multi-files"
try:
    my_second_trial = Trial.load(trial_name=second_trial_name)
    print("existing trial loaded")
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        my_second_trial = Trial.create(
            experiment_name=experiment_name,
            trial_name=second_trial_name,
        )
        print("new trial created")
    else:
        print(f"Unexpected {ex}=, {type(ex)}")
        print("Dont go forward!")
        raise

Differently from the first processing script, we now create out of the original dataset multiple files for training and validation and store the DVC metadata in a different branch.

You can explore the second preprocessing Python script on GitHub.

%%time

script_processor.run(
    code='./source_dir/preprocessing-experiment-multifiles.py',
    dependencies=['./source_dir/requirements.txt'],
    inputs=[ProcessingInput(source=s3_data_path, destination="/opt/ml/processing/input")],
    experiment_config=experiment_config,
    arguments=["--train-test-split-ratio", "0.1"]
)

The processing job takes approximately 5 minutes to complete. Now you can view the trial details for the multi-file dataset.

The following screenshots show where you can find the stored information within SageMaker Experiments in the Trial components section within the Studio UI. Note the values for dvc-trial-multi-files in DVC_BRANCH, DVC_REPO_URL, and data_commit_hash on the Parameters tab.

SageMaker multi files experiments parameters tab

You can also review the input and output details on the Artifacts tab.

SageMaker multi files experiments artifacts tab

We now train on the dvc-trial-multi-files branch. When pulling data with DVC, we use the following dataset structure:

dataset
    |-- train
    |   |-- california_train_1.csv
    |   |-- california_train_2.csv
    |   |-- california_train_3.csv
    |   |-- california_train_4.csv
    |   |-- california_train_5.csv
    |-- test
    |   |-- california_test.csv
    |-- validation
    |   |-- california_validation_1.csv
    |   |-- california_validation_2.csv
    |   |-- california_validation_3.csv

Similar as we did before, we create a new Scikit-learn Estimator with the trial name dvc-trial-multi-files and start the training job.

%%time

estimator.fit(experiment_config=experiment_config)

The training job takes approximately 5 minutes to complete. On the training job logs output to the notebook, you can see those lines, indicating the files pulled by DVC:

Running dvc pull command
A       validation/california_validation_2.csv
A       validation/california_validation_1.csv
A       validation/california_validation_3.csv
A       train/california_train_4.csv
A       train/california_train_5.csv
A       train/california_train_2.csv
A       train/california_train_3.csv
A       train/california_train_1.csv
A       test/california_test.csv
9 files added and 9 files fetched
Starting the training.
Found train files: ['/opt/ml/input/data/dataset/train/california_train_2.csv', '/opt/ml/input/data/dataset/train/california_train_5.csv', '/opt/ml/input/data/dataset/train/california_train_4.csv', '/opt/ml/input/data/dataset/train/california_train_1.csv', '/opt/ml/input/data/dataset/train/california_train_3.csv']
Found validation files: ['/opt/ml/input/data/dataset/validation/california_validation_2.csv', '/opt/ml/input/data/dataset/validation/california_validation_1.csv', '/opt/ml/input/data/dataset/validation/california_validation_3.csv']

Host your model in SageMaker

After you train your ML model, you can deploy it using SageMaker. To deploy a persistent, real-time endpoint that makes one prediction at a time, we use SageMaker real-time hosting services.

from sagemaker.serializers import CSVSerializer

predictor = estimator.deploy(1, "ml.t2.medium", serializer=CSVSerializer())

First, we get the latest test dataset locally on the development notebook in Studio. For this purpose, we can use dvc.api.read() to load the raw data that was stored in Amazon S3 by the SageMaker processing job.

import io
import dvc.api

raw = dvc.api.read(
    "dataset/test/california_test.csv",
    repo=dvc_repo_url,
    rev=dvc_branch
)

Then we prepare the data using Pandas, load a test CSV file, and call predictor.predict to invoke the SageMaker endpoint created earlier, with data, and get predictions.

test = pd.read_csv(io.StringIO(raw), sep=",", header=None)
X_test = test.iloc[:, 1:].values
y_test = test.iloc[:, 0:1].values

predicted = predictor.predict(X_test)
for i in range(len(predicted)-1):
    print(f"predicted: {predicted[i]}, actual: {y_test[i][0]}")

Delete the endpoint

You should delete endpoints when they’re no longer in use, because they’re billed by the time deployed (for more information, see Amazon SageMaker Pricing). Make sure to delete the endpoint to avoid unexpected costs.

predictor.delete_endpoint()

Clean up

Before you remove all the resources you created, make sure that all apps are deleted from the data-scientist-dvc user, including all KernelGateway apps, as well as the default JupiterServer app.

Then you can destroy the AWS CDK stack by running the following command:

cdk destroy

If you used an existing domain, also run the following commands:

# inject your DOMAIN_ID into the configuration file
sed -i 's/<your-sagemaker-studio-domain-id>/'"$DOMAIN_ID"'/' ../update-domain-no-custom-images.json
# update the sagemaker studio domain
aws --region ${REGION} sagemaker update-domain --cli-input-json file://../update-domain-no-custom-images.json

Conclusion

In this post, you walked through an example of how to track your experiments across code, data, artifacts, and metrics by using SageMaker Experiments and SageMaker processing and training jobs in conjunction with DVC. We created a Docker image containing DVC, which was required for Studio as the development notebook, and showed how you can use processing and training jobs with DVC. We prepared two versions of the data and used DVC to manage it with Git. Then you used SageMaker Experiments to track the processing and training with the two versions of the data in order to have a unified view of parameters, artifacts, and metrics in a single pane of glass. Finally, you deployed the model to a SageMaker endpoint and used a testing dataset from the second dataset version to invoke the SageMaker endpoint and get predictions.

As next step, you can extend the existing notebook and introduce your own feature engineering strategy and use DVC and SageMaker to run your experiments. Let’s go build!

For further reading, refer to the following resources:


About the Authors

Paolo Di FrancescoPaolo Di Francesco is a solutions architect at AWS. He has experience in the telecommunications and software engineering. He is passionate about machine learning and is currently focusing on using his experience to help customers reach their goals on AWS, in particular in discussions around MLOps. Outside of work, he enjoys playing football and reading.

Eitan SelaEitan Sela is a Machine Learning Specialist Solutions Architect with Amazon Web Services. He works with AWS customers to provide guidance and technical assistance, helping them build and operate machine learning solutions on AWS. In his spare time, Eitan enjoys jogging and reading the latest machine learning articles.

Read More

Build a predictive maintenance solution with Amazon Kinesis, AWS Glue, and Amazon SageMaker

Organizations are increasingly building and using machine learning (ML)-powered solutions for a variety of use cases and problems, including predictive maintenance of machine parts, product recommendations based on customer preferences, credit profiling, content moderation, fraud detection, and more. In many of these scenarios, the effectiveness and benefits derived from these ML-powered solutions can be further enhanced when they can process and derive insights from data events in near-real time.

Although the business value and benefits of near-real-time ML-powered solutions are well established, the architecture required to implement these solutions at scale with optimum reliability and performance is complicated. This post describes how you can combine Amazon Kinesis, AWS Glue, and Amazon SageMaker to build a near-real-time feature engineering and inference solution for predictive maintenance.

Use case overview

We focus on a predictive maintenance use case where sensors deployed in the field (such as industrial equipment or network devices), need to replaced or rectified before they become faulty and cause downtime. Downtime can be expensive for businesses and can lead to poor customer experience. Predictive maintenance powered by an ML model can also help in augmenting the regular schedule-based maintenance cycles by informing when a machine part in good condition should not be replaced, therefore avoiding unnecessary cost.

In this post, we focus on applying machine learning to a synthetic dataset containing machine failures due to features such as air temperature, process temperature, rotation speed, torque, and tool wear. The dataset used is sourced from the UCI Data Repository.

Machine failure consists of five independent failure modes:

  • Tool Wear Failure (TWF)
  • Heat Dissipation Failure (HDF)
  • Power Failure (PWF)
  • Over-strain Failure (OSF)
  • Random Failure (RNF)

The machine failure label indicates whether the machine has failed for a particular data point if any of the preceding failure modes are true. If at least one of the failure modes is true, the process fails and the machine failure label is set to 1. The objective for the ML model is to identify machine failures correctly, so a downstream predictive maintenance action can be initiated.

Solution overview

For our predictive maintenance use case, we assume that device sensors stream various measurements and readings about machine parts. Our solution then takes a slice of streaming data each time (micro-batch), and performs processing and feature engineering to create features. The created features are then used to generate inferences from a trained and deployed ML model in near-real time. The generated inferences can be further processed and consumed by downstream applications, to take appropriate actions and initiate maintenance activity.

The following diagram shows the architecture of our overall solution.

The solution broadly consists of the following sections, which are explained in detail later in this post:

  • Streaming data source and ingestion – We use Amazon Kinesis Data Streams to collect streaming data from the field sensors at scale and make it available for further processing.
  • Near-real-time feature engineering – We use AWS Glue streaming jobs to read data from a Kinesis data stream and perform data processing and feature engineering, before storing the derived features in Amazon Simple Storage Service (Amazon S3). Amazon S3 provides a reliable and cost-effective option to store large volumes of data.
  • Model training and deployment – We use the AI4I predictive maintenance dataset from the UCI Data Repository to train an ML model based on the XGBoost algorithm using SageMaker. We then deploy the trained model to a SageMaker asynchronous inference endpoint.
  • Near-real-time ML inference – After the features are available in Amazon S3, we need to generate inferences from the deployed model in near-real time. SageMaker asynchronous inference endpoints are well suited for this requirement because they support larger payload sizes (up to 1 GB) and can generate inferences within minutes (up to a maximum of 15 minutes). We use S3 event notifications to run an AWS Lambda function to invoke a SageMaker asynchronous inference endpoint. SageMaker asynchronous inference endpoints accept S3 locations as input, generate inferences from the deployed model, and write these inferences back to Amazon S3 in near-real time.

The source code for this solution is located on GitHub. The solution has been tested and should be run in us-east-1.

We use an AWS CloudFormation template, deployed using AWS Serverless Application Model (AWS SAM), and SageMaker notebooks to deploy the solution.

Prerequisites

To get started, as a prerequisite, you must have the SAM CLI, Python 3, and PIP installed. You must also have the AWS Command Line Interface (AWS CLI) configured properly.

Deploy the solution

You can use AWS CloudShell to run these steps. CloudShell is a browser-based shell that is pre-authenticated with your console credentials and includes pre-installed common development and operations tools (such as AWS SAM, AWS CLI, and Python). Therefore, no local installation or configuration is required.

  • We begin by creating an S3 bucket where we store the script for our AWS Glue streaming job. Run the following command in your terminal to create a new bucket:
aws s3api create-bucket --bucket sample-script-bucket-$RANDOM --region us-east-1
  • Note down the name of the bucket created.

ML-9132 Solution Arch

  • Next, we clone the code repository locally, which contains the CloudFormation template to deploy the stack. Run the following command in your terminal:
git clone https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance
  • Navigate to the sam-template directory:
cd amazon-sagemaker-predictive-maintenance/sam-template

ML-9132 git clone repo

  • Run the following command to copy the AWS Glue job script (from glue_streaming/app.py) to the S3 bucket you created:
aws s3 cp glue_streaming/app.py s3://sample-script-bucket-30232/glue_streaming/app.py

ML-9132 copy glue script

  • You can now go ahead with the build and deployment of the solution, through the CloudFormation template via AWS SAM. Run the following command:
sam build

ML-9132 SAM Build

sam deploy --guided
  • Provide arguments for the deployment such as the stack name, preferred AWS Region (us-east-1), and GlueScriptsBucket.

Make sure you provide the same S3 bucket that you created earlier for the AWS Glue script S3 bucket (parameter GlueScriptsBucket in the following screenshot).

ML-9132 SAM Deploy Param

After you provide the required arguments, AWS SAM starts the stack deployment. The following screenshot shows the resources created.

ML-9132 SAM Deployed

After the stack is deployed successfully, you should see the following message.

ML-9132 SAM CF deployed

  • On the AWS CloudFormation console, open the stack (for this post, nrt-streaming-inference) that was provided when deploying the CloudFormation template.
  • On the Resources tab, note the SageMaker notebook instance ID.
  1. ML-9132 SM Notebook Created
  • On the SageMaker console, open this instance.

ML-9132 image018

The SageMaker notebook instance already has the required notebooks pre-loaded.

Navigate to the notebooks folder and open and follow the instructions within the notebooks (Data_Pre-Processing.ipynb and ModelTraining-Evaluation-and-Deployment.ipynb) to explore the dataset, perform preprocessing and feature engineering, and train and deploy the model to a SageMaker asynchronous inference endpoint.

ML-9132 Open SM Notebooks

Streaming data source and ingestion

Kinesis Data Streams is a serverless, scalable, and durable real-time data streaming service that you can use to collect and process large streams of data records in real time. Kinesis Data Streams enables capturing, processing, and storing data streams from a variety of sources, such as IT infrastructure log data, application logs, social media, market data feeds, web clickstream data, IoT devices and sensors, and more. You can provision a Kinesis data stream in on-demand mode or provisioned mode depending on the throughput and scaling requirements. For more information, see Choosing the Data Stream Capacity Mode.

For our use case, we assume that various sensors are sending measurements such as temperature, rotation speed, torque, and tool wear to a data stream. Kinesis Data Streams acts as a funnel to collect and ingest data streams.

We use the Amazon Kinesis Data Generator (KDG) later in this post to generate and send data to a Kinesis data stream, simulating data being generated by sensors. The data from the data stream sensor-data-stream is ingested and processed using an AWS Glue streaming job, which we discuss next.

Near-real-time feature engineering

AWS Glue streaming jobs provide a convenient way to process streaming data at scale, without the need to manage the compute environment. AWS Glue allows you to perform extract, transform, and load (ETL) operations on streaming data using continuously running jobs. AWS Glue streaming ETL is built on the Apache Spark Structured Streaming engine, and can ingest streams from Kinesis, Apache Kafka, and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

The streaming ETL job can use both AWS Glue built-in transforms and transforms that are native to Apache Spark Structured Streaming. You can also use the Spark ML and MLLib libraries in AWS Glue jobs for easier feature processing using readily available helper libraries.

If the schema of the streaming data source is pre-determined, you can specify it in an AWS Data Catalog table. If the schema definition can’t be determined beforehand, you can enable schema detection in the streaming ETL job. The job then automatically determines the schema from the incoming data. Additionally, you can use the AWS Glue Schema Registry to allow central discovery, control, and evolution of data stream schemas. You can further integrate the Schema Registry with the Data Catalog to optionally use schemas stored in the Schema Registry when creating or updating AWS Glue tables or partitions in the Data Catalog.

For this post, we create an AWS Glue Data Catalog table (sensor-stream) with our Kinesis data stream as the source and define the schema for our sensor data.

We create an AWS Glue dynamic dataframe from the Data Catalog table to read the streaming data from Kinesis. We also specify the following options:

  • A window size of 60 seconds, so that the AWS Glue job reads and processes data in 60-second windows
  • The starting position TRIM_HORIZON, to allow reading from the oldest records in the Kinesis data stream

We also use Spark MLlib’s StringIndexer feature transformer to encode the string column type into label indexes. This transformation is implemented using Spark ML Pipelines. Spark ML Pipelines provide a uniform set of high-level APIs for ML algorithms to make it easier to combine multiple algorithms into a single pipeline or workflow.

We use the foreachBatch API to invoke a function named processBatch, which in turn processes the data referenced by this dataframe. See the following code:

# Read from Kinesis Data Stream
sourceStreamData = glueContext.create_data_frame.from_catalog(database = "sensordb", table_name = "sensor-stream", transformation_ctx = "sourceStreamData", additional_options = {"startingPosition": "TRIM_HORIZON"})
type_indexer = StringIndexer(inputCol="type", outputCol="type_enc", stringOrderType="alphabetAsc")
pipeline = Pipeline(stages=[type_indexer])
glueContext.forEachBatch(frame = sourceStreamData, batch_function = processBatch, options = {"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})

The function processBatch performs the specified transformations and partitions the data in Amazon S3 based on year, month, day, and batch ID.

We also re-partition the AWS Glue partitions into a single partition, to avoid having too many small files in Amazon S3. Having several small files can impede read performance, because it amplifies the overhead related to seeking, opening, and reading each file. We finally write the features to generate inferences into a prefix (features) within the S3 bucket. See the following code:

# Function that gets called to perform processing, feature engineering and writes to S3 for every micro batch of streaming data from Kinesis.
def processBatch(data_frame, batchId):
transformer = pipeline.fit(data_frame)
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
if (data_frame.count() > 0):
data_frame = transformer.transform(data_frame)
data_frame = data_frame.drop("type")
data_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
data_frame.printSchema()
# Write output features to S3
s3prefix = "features" + "/year=" + "{:0>4}".format(str(year)) + "/month=" + "{:0>2}".format(str(month)) + "/day=" + "{:0>2}".format(str(day)) + "/hour=" + "{:0>2}".format(str(hour)) + "/min=" + "{:0>2}".format(str(minute)) + "/batchid=" + str(batchId)
s3path = "s3://" + out_bucket_name + "/" + s3prefix + "/"
print("-------write start time------------")
print(str(datetime.datetime.now()))
data_frame = data_frame.toDF().repartition(1)
data_frame.write.mode("overwrite").option("header",False).csv(s3path)
print("-------write end time------------")
print(str(datetime.datetime.now()))

Model training and deployment

SageMaker is a fully managed and integrated ML service that enables data scientists and ML engineers to quickly and easily build, train, and deploy ML models.

Within the Data_Pre-Processing.ipynb notebook, we first import the AI4I Predictive Maintenance dataset from the UCI Data Repository and perform exploratory data analysis (EDA). We also perform feature engineering to make our features more useful for training the model.

For example, within the dataset, we have a feature named type, which represents the product’s quality type as L (low), M (medium), or H (high). Because this is categorical feature, we need to encode it before training our model. We use Scikit-Learn’s LabelEncoder to achieve this:

from sklearn.preprocessing import LabelEncoder
type_encoder = LabelEncoder()
type_encoder.fit(origdf['type'])
type_values = type_encoder.transform(origdf['type'])

After the features are processed and the curated train and test datasets are generated, we’re ready to train an ML model to predict whether the machine failed or not based on system readings. We train a XGBoost model, using the SageMaker built-in algorithm. XGBoost can provide good results for multiple types of ML problems, including classification, even when training samples are limited.

SageMaker training jobs provide a powerful and flexible way to train ML models on SageMaker. SageMaker manages the underlying compute infrastructure and provides multiple options to choose from, for diverse model training requirements, based on the use case.

xgb = sagemaker.estimator.Estimator(container,
role,
instance_count=1,
instance_type='ml.c4.4xlarge',
output_path=xgb_upload_location,
sagemaker_session=sagemaker_session)
xgb.set_hyperparameters(max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.8,
silent=0,
objective='binary:hinge',
num_round=100)

xgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})

When the model training is complete and the model evaluation is satisfactory based on the business requirements, we can begin model deployment. We first create an endpoint configuration with the AsyncInferenceConfig object option and using the model trained earlier:

endpoint_config_name = resource_name.format("EndpointConfig")
create_endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"VariantName": "variant1",
"ModelName": model_name,
"InstanceType": "ml.m5.xlarge",
"InitialInstanceCount": 1,
}
],
AsyncInferenceConfig={
"OutputConfig": {
"S3OutputPath": f"s3://{bucket}/{prefix}/output",
#Specify Amazon SNS topics
"NotificationConfig": {
"SuccessTopic": "arn:aws:sns:<region>:<account-id>:<success-sns-topic>",
"ErrorTopic": "arn:aws:sns:<region>:<account-id>:<error-sns-topic>",
}},
"ClientConfig": {"MaxConcurrentInvocationsPerInstance": 4},
},)

We then create a SageMaker asynchronous inference endpoint, using the endpoint configuration we created. After it’s provisioned, we can start invoking the endpoint to generate inferences asynchronously.

endpoint_name = resource_name.format("Endpoint")
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)

Near-real-time inference

SageMaker asynchronous inference endpoints provide the ability to queue incoming inference requests and process them asynchronously in near-real time. This is ideal for applications that have inference requests with larger payload sizes (up to 1 GB), may require longer processing times (up to 15 minutes), and have near-real-time latency requirements. Asynchronous inference also enables you to save on costs by auto scaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.

You can create a SageMaker asynchronous inference endpoint similar to how you create a real-time inference endpoint and additionally specify the AsyncInferenceConfig object, while creating your endpoint configuration with the EndpointConfig field in the CreateEndpointConfig API. The following diagram shows the inference workflow and how an asynchronous inference endpoint generates an inference.

ML-9132 SageMaker Asych Arch

To invoke the asynchronous inference endpoint, the request payload should be stored in Amazon S3 and reference to this payload needs to be provided as part of the InvokeEndpointAsync request. Upon invocation, SageMaker queues the request for processing and returns an identifier and output location as a response. Upon processing, SageMaker places the result in the Amazon S3 location. You can optionally choose to receive success or error notifications with Amazon Simple Notification Service (Amazon SNS).

Test the end-to-end solution

To test the solution, complete the following steps:

  • On the AWS CloudFormation console, open the stack you created earlier (nrt-streaming-inference).
  • On the Outputs tab, copy the name of the S3 bucket (EventsBucket).

This is the S3 bucket to which our AWS Glue streaming job writes features after reading and processing from the Kinesis data stream.

ML-9132 S3 events bucket

Next, we set up event notifications for this S3 bucket.

  • On the Amazon S3 console, navigate to the bucket EventsBucket.
  • On the Properties tab, in the Event notifications section, choose Create event notification.

ML-9132 S3 events bucket properties

ML-9132 S3 events bucket notification

  • For Event name, enter invoke-endpoint-lambda.
  • For Prefix, enter features/.
  • For Suffix, enter .csv.
  • For Event types, select All object create events.

ML-9132 S3 events bucket notification config
ML-9132 S3 events bucket notification config

  • For Destination, select Lambda function.
  • For Lambda function, and choose the function invoke-endpoint-asynch.
  • Choose Save changes.

ML-9132 S3 events bucket notification config lambda

  • On the AWS Glue console, open the job GlueStreaming-Kinesis-S3.
  • Choose Run job.

ML-9132 Run Glue job

Next we use the Kinesis Data Generator (KDG) to simulate sensors sending data to our Kinesis data stream. If this is your first time using the KDG, refer to Overview for the initial setup. The KDG provides a CloudFormation template to create the user and assign just enough permissions to use the KDG for sending events to Kinesis. Run the CloudFormation template within the AWS account that you’re using to build the solution in this post. After the KDG is set up, log in and access the KDG to send test events to our Kinesis data stream.

  • Use the Region in which you created the Kinesis data stream (us-east-1).
  • On the drop-down menu, choose the data stream sensor-data-stream.
  • In the Records per second section, select Constant and enter 100.
  • Unselect Compress Records.
  • For Record template, use the following template:
{
"air_temperature": {{random.number({"min":295,"max":305, "precision":0.01})}},
"process_temperature": {{random.number({"min":305,"max":315, "precision":0.01})}},
"rotational_speed": {{random.number({"min":1150,"max":2900})}},
"torque": {{random.number({"min":3,"max":80, "precision":0.01})}},
"tool_wear": {{random.number({"min":0,"max":250})}},
"type": "{{random.arrayElement(["L","M","H"])}}"
}
  • Click Send data to start sending data to the Kinesis data stream.

ML-9132 Kineses Data Gen

The AWS Glue streaming job reads and extracts a micro-batch of data (representing sensor readings) from the Kinesis data stream based on the window size provided. The streaming job then processes and performs feature engineering on this micro-batch before partitioning and writing it to the prefix features within the S3 bucket.

As new features created by the AWS Glue streaming job are written to the S3 bucket, a Lambda function (invoke-endpoint-asynch) is triggered, which invokes a SageMaker asynchronous inference endpoint by sending an invocation request to get inferences from our deployed ML model. The asynchronous inference endpoint queues the request for asynchronous invocation. When the processing is complete, SageMaker stores the inference results in the Amazon S3 location (S3OutputPath) that was specified during the asynchronous inference endpoint configuration.

For our use case, the inference results indicate if a machine part is likely to fail or not, based on the sensor readings.

ML-9132 Model inferences

SageMaker also sends a success or error notification with Amazon SNS. For example, if you set up an email subscription for the success and error SNS topics (specified within the asynchronous SageMaker inference endpoint configuration), an email can be sent every time an inference request is processed. The following screenshot shows a sample email from the SNS success topic.

ML-9132 SNS email subscribe

For real-world applications, you can integrate SNS notifications with other services such as Amazon Simple Queue Service (Amazon SQS) and Lambda for additional postprocessing of the generated inferences or integration with other downstream applications, based on your requirements. For example, for our predictive maintenance use case, you can invoke a Lambda function based on an SNS notification to read the generated inference from Amazon S3, further process it (such as aggregation or filtering), and initiate workflows such as sending work orders for equipment repair to technicians.

Clean up

When you’re done testing the stack, delete the resources (especially the Kinesis data stream, Glue streaming job, and SNS topics) to avoid unexpected charges.

Run the following code to delete your stack:

sam delete nrt-streaming-inference

Also delete the resources such as SageMaker endpoints by following the cleanup section in the ModelTraining-Evaluation-and-Deployment notebook.

Conclusion

In this post, we used a predictive maintenance use case to demonstrate how to use various services such as Kinesis, AWS Glue, and SageMaker to build a near-real-time inference pipeline. We encourage you to try this solution and let us know what you think.

If you have any questions, share them in the comments.


About the authors

Rahul Sharma is a Solutions Architect at AWS Data Lab, helping AWS customers design and build AI/ML solutions. Prior to joining AWS, Rahul has spent several years in the finance and insurance sector, helping customers build data and analytical platforms.

Pat Reilly is an Architect in the AWS Data Lab, where he helps customers design and build data workloads to support their business. Prior to AWS, Pat consulted at an AWS Partner, building AWS data workloads across a variety of industries.

Read More