Provision and manage ML environments with Amazon SageMaker Canvas using AWS CDK and AWS Service Catalog

The proliferation of machine learning (ML) across a wide range of use cases is becoming prevalent in every industry. However, this outpaces the increase in the number of ML practitioners who have traditionally been responsible for implementing these technical solutions to realize business outcomes.

In today’s enterprise, there is a need for machine learning to be used by non-ML practitioners who are proficient with data, which is the foundation of ML. To make this a reality, the value of ML is being realized across the enterprise through no-code ML platforms. These platforms enable different personas, for example business analysts, to use ML without writing a single line of code and deliver solutions to business problems in a quick, simple, and intuitive manner. Amazon SageMaker Canvas is a visual point-and-click service that enables business analysts to use ML to solve business problems by generating accurate predictions on their own—without requiring any ML experience or having to write a single line of code. Canvas has expanded the use of ML in the enterprise with a simple-to-use intuitive interface that helps businesses implement solutions quickly.

Although Canvas has enabled democratization of ML, the challenge of provisioning and deploying ML environments in a secure manner still remains. Typically, this is the responsibility of central IT teams in most large enterprises. In this post, we discuss how IT teams can administer, provision, and manage secure ML environments using Amazon SageMaker Canvas, AWS Cloud Development Kit (AWS CDK) and AWS Service Catalog. The post presents a step-by-step guide for IT administrators to achieve this quickly and at scale.

Overview of the AWS CDK and AWS Service Catalog

The AWS CDK is an open-source software development framework to define your cloud application resources. It uses the familiarity and expressive power of programming languages for modeling your applications, while provisioning resources in a safe and repeatable manner.

AWS Service Catalog lets you centrally manage deployed IT services, applications, resources, and metadata. With AWS Service Catalog, you can create, share, organize and govern cloud resources with infrastructure as code (IaC) templates and enable fast and straightforward provisioning.

Solution overview

We enable provisioning of ML environments using Canvas in three steps:

  1. First, we share how you can manage a portfolio of resources necessary for the approved usage of Canvas using AWS Service Catalog.
  2. Then, we deploy an example AWS Service Catalog portfolio for Canvas using the AWS CDK.
  3. Finally, we demonstrate how you can provision Canvas environments on demand within minutes.

Prerequisites

To provision ML environments with Canvas, the AWS CDK, and AWS Service Catalog, you need to do the following:

  1. Have access to the AWS account where the Service Catalog portfolio will be deployed. Make sure you have the credentials and permissions to deploy the AWS CDK stack into your account. The AWS CDK Workshop is a helpful resource you can refer to if you need support.
  2. We recommend following certain best practices that are highlighted through the concepts detailed in the following resources:
  3. Clone this GitHub repository into your environment.

Provision approved ML environments with Amazon SageMaker Canvas using AWS Service Catalog

In regulated industries and most large enterprises, you need to adhere to the requirements mandated by IT teams to provision and manage ML environments. These may include a secure, private network, data encryption, controls to allow only authorized and authenticated users such as AWS Identity and Access Management (IAM) for accessing solutions such as Canvas, and strict logging and monitoring for audit purposes.

As an IT administrator, you can use AWS Service Catalog to create and organize secure, reproducible ML environments with SageMaker Canvas into a product portfolio. This is managed using IaC controls that are embedded to meet the requirements mentioned before, and can be provisioned on demand within minutes. You can also maintain control of who can access this portfolio to launch products.

The following diagram illustrates this architecture.

Example flow

In this section, we demonstrate an example of an AWS Service Catalog portfolio with SageMaker Canvas. The portfolio consists of different aspects of the Canvas environment that are part of the Service Catalog portfolio:

  • Studio domain – Canvas is an application that runs within Studio domains. The domain consists of an Amazon Elastic File System (Amazon EFS) volume, a list of authorized users, and a range of security, application, policy, and Amazon Virtual Private Cloud (VPC) configurations. An AWS account is linked to one domain per Region.
  • Amazon S3 bucket – After the Studio domain is created, an Amazon Simple Storage Service (Amazon S3) bucket is provisioned for Canvas to allow importing datasets from local files, also known as local file upload. This bucket is in the customer’s account and is provisioned once.
  • Canvas user – SageMaker Canvas is an application where you can add user profiles within the Studio domain for each Canvas user, who can proceed to import datasets, build and train ML models without writing code, and run predictions on the model.
  • Scheduled shutdown of Canvas sessions – Canvas users can log out from the Canvas interface when they’re done with their tasks. Alternatively, administrators can shut down Canvas sessions from the AWS Management Console as part of managing the Canvas sessions. In this part of the AWS Service Catalog portfolio, an AWS Lambda function is created and provisioned to automatically shut down Canvas sessions at defined scheduled intervals. This helps manage open sessions and shut them down when not in use.

This example flow can be found in the GitHub repository for quick reference.

Deploy the flow with the AWS CDK

In this section, we deploy the flow described earlier using the AWS CDK. After it’s deployed, you can also do version tracking and manage the portfolio.

The portfolio stack can be found in app.py and the product stacks under the products/ folder. You can iterate on the IAM roles, AWS Key Management Service (AWS KMS) keys, and VPC setup in the studio_constructs/ folder. Before deploying the stack into your account, you can edit the following lines in app.py and grant portfolio access to an IAM role of your choice.

You can manage access to the portfolio for the relevant IAM users, groups, and roles. See Granting Access to Users for more details.

Deploy the portfolio into your account

You can now run the following commands to install the AWS CDK and make sure you have the right dependencies to deploy the portfolio:

npm install -g aws-cdk@2.27.0
python3 -m venv .venv
source .venv/bin/activate
pip3 install -r requirements.txt

Run the following commands to deploy the portfolio into your account:

ACCOUNT_ID=$(aws sts get-caller-identity --query Account | tr -d '"')
AWS_REGION=$(aws configure get region)
cdk bootstrap aws://${ACCOUNT_ID}/${AWS_REGION}
cdk deploy --require-approval never

The first two commands get your account ID and current Region using the AWS Command Line Interface (AWS CLI) on your computer. Following this, cdk bootstrap and cdk deploy build assets locally, and deploy the stack in a few minutes.

The portfolio can now be found in AWS Service Catalog, as shown in the following screenshot.

On-demand provisioning

The products within the portfolio can be launched quickly and easily on demand from the Provisioning menu on the AWS Service Catalog console. A typical flow is to launch the Studio domain and the Canvas auto shutdown first because this is usually a one-time action. You can then add Canvas users to the domain. The domain ID and user IAM role ARN are saved in AWS Systems Manager and are automatically populated with the user parameters as shown in the following screenshot.

You can also use cost allocation tags that are attached to each user. For example, UserCostCenter is a sample tag where you can add the name of each user.

Key considerations for governing ML environments using Canvas

Now that we have provisioned and deployed an AWS Service Catalog portfolio focused on Canvas, we’d like to highlight a few considerations to govern the Canvas-based ML environments focused on the domain and the user profile.

The following are considerations regarding the Studio domain:

  • Networking for Canvas is managed at the Studio domain level, where the domain is deployed on a private VPC subnet for secure connectivity. See Securing Amazon SageMaker Studio connectivity using a private VPC to learn more.
  • A default IAM execution role is defined at the domain level. This default role is assigned to all Canvas users in the domain.
  • Encryption is done using AWS KMS by encrypting the EFS volume in the domain. For additional controls, you can specify your own managed key, also known as a customer managed key (CMK). See Protect Data at Rest Using Encryption to learn more.
  • The ability to upload files from your local disk is done by attaching a cross-origin resource sharing (CORS) policy to the S3 bucket used by Canvas. See Give Your Users Permissions to Upload Local Files to learn more.

The following are considerations regarding the user profile:

  • Authentication in Studio can be done both through single sign-on (SSO) and IAM. If you have an existing identity provider to federate users to access the console, you can assign a Studio user profile to each federated identity using IAM. See the section Assigning the policy to Studio users in Configuring Amazon SageMaker Studio for teams and groups with complete resource isolation to learn more.
  • You can assign IAM execution roles to each user profile. While using Studio, a user assumes the role mapped to their user profile that overrides the default execution role. You can use this for fine-grained access controls within a team.
  • You can achieve isolation using attribute-based access controls (ABAC) to ensure users can only access the resources for their team. See Configuring Amazon SageMaker Studio for teams and groups with complete resource isolation to learn more.
  • You can perform fine-grained cost tracking by applying cost allocation tags to user profiles.

Clean up

In order to clean up the resources created by the AWS CDK stack above, navigate over to the AWS CloudFormation stacks page and delete the Canvas stacks. You can also run cdk destroy from within the repository folder, to do the same.

Conclusion

In this post, we shared how you can quickly and easily provision ML environments with Canvas using AWS Service Catalog and the AWS CDK. We discussed how you can create a portfolio on AWS Service Catalog, provision the portfolio, and deploy it in your account. IT administrators can use this method to deploy and manage users, sessions, and associated costs while provisioning Canvas.

Learn more about Canvas on the product page and the Developer Guide. For further reading, you can learn how to enable business analysts to access SageMaker Canvas using AWS SSO without the console. You can also learn how business analysts and data scientists can collaborate faster using Canvas and Studio.


About the Authors

Davide Gallitelli is a Specialist Solutions Architect for AI/ML in the EMEA region. He is based in Brussels and works closely with customers throughout Benelux. He has been a developer since he was very young, starting to code at the age of 7. He started learning AI/ML at university, and has fallen in love with it since then.

Sofian Hamiti is an AI/ML specialist Solutions Architect at AWS. He helps customers across industries accelerate their AI/ML journey by helping them build and operationalize end-to-end machine learning solutions.

Shyam Srinivasan is a Principal Product Manager on the AWS AI/ML team, leading product management for Amazon SageMaker Canvas. Shyam cares about making the world a better place through technology and is passionate about how AI and ML can be a catalyst in this journey.

Avi Patel works as a software engineer on the Amazon SageMaker Canvas team. His background consists of working full stack with a frontend focus. In his spare time, he likes to contribute to open source projects in the crypto space and learn about new DeFi protocols.

Jared Heywood is a Senior Business Development Manager at AWS. He is a global AI/ML specialist helping customers with no-code machine learning. He has worked in the AutoML space for the past 5 years and launched products at Amazon like Amazon SageMaker JumpStart and Amazon SageMaker Canvas.

Read More

New features for Amazon SageMaker Pipelines and the Amazon SageMaker SDK

Amazon SageMaker Pipelines allows data scientists and machine learning (ML) engineers to automate training workflows, which helps you create a repeatable process to orchestrate model development steps for rapid experimentation and model retraining. You can automate the entire model build workflow, including data preparation, feature engineering, model training, model tuning, and model validation, and catalog it in the model registry. You can configure pipelines to run automatically at regular intervals or when certain events are triggered, or you can run them manually as needed.

In this post, we highlight some of the enhancements to the Amazon SageMaker SDK and introduce new features of Amazon SageMaker Pipelines that make it easier for ML practitioners to build and train ML models.

Pipelines continues to innovate its developer experience, and with these recent releases, you can now use the service in a more customized way:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Updated documentation on PipelineVariable usage for estimator, processor, tuner, transformer, and model base classes, Amazon models, and framework models. There will be additional changes coming with newer versions of the SDK to support all subclasses of estimators and processors.
  • 2.90.0 – Availability of ModelStep for integrated model resource creation and registration tasks.
  • 2.88.2 – Availability of PipelineSession for managed interaction with SageMaker entities and resources.
  • 2.88.2 – Subclass compatibility for workflow pipeline job steps so you can build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
  • 2.76.0 – Availability of FailStep to conditionally stop a pipeline with a failure status.

In this post, we walk you through a workflow using a sample dataset with a focus on model building and deployment to demonstrate how to implement Pipelines’s new features. By the end, you should have enough information to successfully use these newer features and simplify your ML workloads.

Features overview

Pipelines offers the following new features:

  • Pipeline variable annotation – Certain method parameters accept multiple input types, including PipelineVariables, and additional documentation has been added to clarify where PipelineVariables are supported in both the latest stable version of SageMaker SDK documentation and the init signature of the functions. For example, in the following TensorFlow estimator, the init signature now shows that model_dir and image_uri support PipelineVariables, whereas the other parameters do not. For more information, refer to TensorFlow Estimator.

    • Before:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • After:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline sessionPipelineSession is a new concept introduced to bring unity across the SageMaker SDK and introduces lazy initialization of the pipeline resources (the run calls are captured but not run until the pipeline is created and run). The PipelineSession context inherits the SageMakerSession and implements convenient methods for you to interact with other SageMaker entities and resources, such as training jobs, endpoints, and input datasets stored in Amazon Simple Storage Service (Amazon S3).
  • Subclass compatibility with workflow pipeline job steps – You can now build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.

    • For example, creating a processing step with SKLearnProcessor previously required the following:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • As we see in the preceding code, ProcessingStep needs to do basically the same preprocessing logic as .run, just without initiating the API call to start the job. But with subclass compatibility now enabled with workflow pipeline job steps, we declare the step_args argument that takes the preprocessing logic with .run so you can build a job abstraction and configure it as you would use it without Pipelines. We also pass in the pipeline_session, which is a PipelineSession object, instead of sagemaker_session to make sure the run calls are captured but not called until the pipeline is created and run. See the following code:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Model step (a streamlined approach with model creation and registration steps) –Pipelines offers two step types to integrate with SageMaker models: CreateModelStep and RegisterModel. You can now achieve both using only the ModelStep type. Note that a PipelineSession is required to achieve this. This brings similarity between the pipeline steps and the SDK.

    • Before:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • After:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail step (conditional stop of the pipeline run)FailStep allows a pipeline to be stopped with a failure status if a condition is met, such as if the model score is below a certain threshold.

Solution overview

In this solution, your entry point is the Amazon SageMaker Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from within Studio, refer to View, Track, and Execute SageMaker Pipelines in SageMaker Studio.

The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train and generate inferences using the new features.

The pipeline includes the following steps:

  1. Preprocess data to build features required and split data into train, validation, and test datasets.
  2. Create a training job with the SageMaker XGBoost framework.
  3. Evaluate the trained model using the test dataset.
  4. Check if the AUC score is above a predefined threshold.
    • If the AUC score is less than the threshold, stop the pipeline run and mark it as failed.
    • If the AUC score is greater than the threshold, create a SageMaker model and register it in the SageMaker model registry.
  5. Apply batch transform on the given dataset using the model created in the previous step.

Prerequisites

To follow along with this post, you need an AWS account with a Studio domain.

Pipelines is integrated directly with SageMaker entities and resources, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because it’s a fully managed service, which means that it creates and manages resources for you. For more information on the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker product page.

Before getting started, install SageMaker SDK version >= 2.104.0 and xlrd >=1.0.0 within the Studio notebook using the following code snippet:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML workflow

For this post, you use the following components:

  • Data preparation

    • SageMaker Processing – SageMaker Processing is a fully managed service allowing you to run custom data transformations and feature engineering for ML workloads.
  • Model building

  • Model training and evaluation

    • One-click training – The SageMaker distributed training feature. SageMaker provides distributed training libraries for data parallelism and model parallelism. The libraries are optimized for the SageMaker training environment, help adapt your distributed training jobs to SageMaker, and improve training speed and throughput.
    • SageMaker Experiments – Experiments is a capability of SageMaker that lets you organize, track, compare, and evaluate your ML iterations.
    • SageMaker batch transform – Batch transform or offline scoring is a managed service in SageMaker that lets you predict on a larger dataset using your ML models.
  • Workflow orchestration

A SageMaker pipeline is a series of interconnected steps defined by a JSON pipeline definition. It encodes a pipeline using a directed acyclic graph (DAG). The DAG gives information on the requirements for and relationships between each step of the pipeline, and its structure is determined by the data dependencies between steps. These dependencies are created when the properties of a step’s output are passed as the input to another step.

The following diagram illustrates the different steps in the SageMaker pipeline (for a churn prediction use case) where the connections between the steps are inferred by SageMaker based on the inputs and outputs defined by the step definitions.

The next sections walk through creating each step of the pipeline and running the entire pipeline once created.

Project structure

Let’s start with the project structure:

  • /sm-pipelines-end-to-end-example – The project name

    • /data – The datasets
    • /pipelines – The code files for pipeline components

      • /customerchurn
        • preprocess.py
        • evaluate.py
    • sagemaker-pipelines-project.ipynb – A notebook walking through the modeling workflow using Pipelines’s new features

Download the dataset

To follow along with this post, you need to download and save the sample dataset under the data folder within the project home directory, which saves the file in Amazon Elastic File System (Amazon EFS) within the Studio environment.

Build the pipeline components

Now you’re ready to build the pipeline components.

Import statements and declare parameters and constants

Create a Studio notebook called sagemaker-pipelines-project.ipynb within the project home directory. Enter the following code block in a cell, and run the cell to set up SageMaker and S3 client objects, create PipelineSession, and set up the S3 bucket location using the default bucket that comes with a SageMaker session:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the modules available under the sagemaker.workflow.parameters module, such as ParameterInteger, ParameterFloat, and ParameterString, to specify pipeline parameters of various data types. Run the following code to set up multiple input parameters:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generate a batch dataset

Generate the batch dataset, which you use later in the batch transform step:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Upload data to an S3 bucket

Upload the datasets to Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Define a processing script and processing step

In this step, you prepare a Python script to do feature engineering, one hot encoding, and curate the training, validation, and test splits to be used for model building. Run the following code to build your processing script:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Next, run the following code block to instantiate the processor and the Pipelines step to run the processing script. Because the processing script is written in Pandas, you use a SKLearnProcessor. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Define a training step

Set up model training using a SageMaker XGBoost estimator and the Pipelines TrainingStep function:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Define the evaluation script and model evaluation step

Run the following code block to evaluate the model once trained. This script encapsulates the logic to check if the AUC score meets the specified threshold.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Next, run the following code block to instantiate the processor and the Pipelines step to run the evaluation script. Because the evaluation script uses the XGBoost package, you use a ScriptProcessor along with the XGBoost image. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Define a create model step

Run the following code block to create a SageMaker model using the Pipelines model step. This step utilizes the output of the training step to package the model for deployment. Note that the value for the instance type argument is passed using the Pipelines parameter you defined earlier in the post.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Define a batch transform step

Run the following code block to run batch transformation using the trained model with the batch input created in the first step:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Define a register model step

The following code registers the model within the SageMaker model registry using the Pipelines model step:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Define a fail step to stop the pipeline

The following code defines the Pipelines fail step to stop the pipeline run with an error message if the AUC score doesn’t meet the defined threshold:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Define a condition step to check AUC score

The following code defines a condition step to check the AUC score and conditionally create a model and run a batch transformation and register a model in the model registry, or stop the pipeline run in a failed state:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Build and run the pipeline

After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Run the following code in a cell in your notebook. If the pipeline already exists, the code updates the pipeline. If the pipeline doesn’t exist, it creates a new one.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusion

In this post, we introduced some of the new features now available with Pipelines along with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy a model for churn prediction. The solution can be extended with additional data sources

to implement your own ML workflow. For more details on the steps available in the Pipelines workflow, refer to Amazon SageMaker Model Building Pipeline and SageMaker Workflows. The AWS SageMaker Examples GitHub repo has more examples around various use cases using Pipelines.


About the Authors

Jerry Peng is a software development engineer with AWS SageMaker. He focuses on building end-to-end large-scale MLOps system from training to model monitoring in production. He is also passionate about bringing the concept of MLOps to broader audience.

Dewen Qi is a Software Development Engineer in AWS. She currently focuses on developing and improving SageMaker Pipelines. Outside of work, she enjoys practicing Cello.

Gayatri Ghanakota is a Sr. Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.

Rupinder Grewal is a Sr Ai/ML Specialist Solutions Architect with AWS. He currently focuses on serving of models and MLOps on SageMaker. Prior to this role he has worked as Machine Learning Engineer building and hosting models. Outside of work he enjoys playing tennis and biking on mountain trails.

Ray Li is a Sr. Data Scientist with AWS Professional Services. His specialty focuses on building and operationalizing AI/ML solutions for customers of varying sizes, ranging from startups to enterprise organizations. Outside of work, Ray enjoys fitness and traveling.

Read More

Reduce the time taken to deploy your models to Amazon SageMaker for testing

Data scientists often train their models locally and look for a proper hosting service to deploy their models. Unfortunately, there’s no one set mechanism or guide to deploying pre-trained models to the cloud. In this post, we look at deploying trained models to Amazon SageMaker hosting to reduce your deployment time.

SageMaker is a fully managed machine learning (ML) service. With SageMaker, you can quickly build and train ML models and directly deploy them into a production-ready hosted environment. Additionally, you don’t need to manage servers. You get an integrated Jupyter notebook environment with easy access to your data sources. You can perform data analysis, train your models, and test them using your own algorithms or use SageMaker-provided ML algorithms that are optimized to run efficiently against large datasets spread across multiple machines. Training and hosting are billed by minutes of usage, with no minimum fees and no upfront commitments.

Solution overview

Data scientists sometimes train models locally using their IDE and either ship those models to the ML engineering team for deployment or just run predictions locally on powerful machines. In this post, we introduce a Python library that simplifies the process of deploying models to SageMaker for hosting on real-time or serverless endpoints.

This Python library gives data scientists a simple interface to quickly get started on SageMaker without needing to know any of the low-level SageMaker functionality.

If you have models trained locally using your preferred IDE and want to benefit from the scale of the cloud, you can use this library to deploy your model to SageMaker. With SageMaker, in addition to all the scaling benefits of a cloud-based ML platform, you have access to purpose-built training tools (distributed training, hyperparameter tuning), experiment management, model management, bias detection, model explainability, and many other capabilities that can help you in any aspect of the ML lifecycle. You can choose from the three most popular frameworks for ML: Scikit-learn, PyTorch, and TensorFlow, and can pick the type of compute you want. Defaults are provided along the way so users of this library can deploy their models without needing to make complex decisions or learn new concepts. In this post, we show you how to get started with this library and optimize deploying your ML models on SageMaker hosting.

The library can be found in the GitHub repository.

The SageMaker Migration Toolkit

The SageMakerMigration class is available through a Python library published to GitHub. Instructions to install this library are provided in the repository; make sure that you follow the README to properly set up your environment. After you install this library, the rest of this post talks about how you can use it.

The SageMakerMigration class consists of high-level abstractions over SageMaker APIs that significantly reduce the steps needed to deploy your model to SageMaker, as illustrated in the following figure. This is intended for experimentation so developers can quickly get started and test SageMaker. It is not intended for production migrations.

For Scikit-learn, PyTorch, and TensorFlow models, this library supports deploying trained models to a SageMaker real-time endpoint or serverless endpoint. To learn more about the inference options in SageMaker, refer to Deploy Models for Inference.

Real-time vs. serverless endpoints

Real-time inference is ideal for inference workloads where you have real-time, interactive, low latency requirements. You can deploy your model to SageMaker hosting services and get an endpoint that can be used for inference. These endpoints are fully managed and support auto scaling.

SageMaker Serverless Inference is a purpose-built inference option that makes it easy for you to deploy and scale ML models. Serverless Inference is ideal for workloads that have idle periods between traffic spurts and can tolerate cold starts. Serverless endpoints automatically launch compute resources and scale them in and out depending on traffic, eliminating the need to choose instance types or manage scaling policies. This takes away the undifferentiated heavy lifting of selecting and managing servers.

Depending on your use case, you may want to quickly host your model on SageMaker without actually having an instance always on and incurring costs, in which case a serverless endpoint is a great solution.

Prepare your trained model and inference script

After you’ve identified the model you want to deploy on SageMaker, you must ensure the model is presented to SageMaker in the right format. SageMaker endpoints generally consist of two components: the trained model artifact (.pth, .pkl, and so on) and an inference script. The inference script is not always mandatory, but if not provided, the default handlers for the serving container that you’re using are applied. It’s essential to provide this script if you need to customize your input/output functionality for inference.

The trained model artifact is simply a saved Scikit-learn, PyTorch, or TensorFlow model. For Scikit-learn, this is typically a pickle file, for PyTorch this is a .pt or .pth file, and for TensorFlow this is a folder with assets, .pb files, and other variables.

Generally, you need to able to control how your model processes input and performs inference, and control the output format for your response. With SageMaker, you can provide an inference script to add this customization. Any inference script used by SageMaker must have one or more of the following four handler functions: model_fn, input_fn, predict_fn, and output_fn.

Note that these four functions apply to PyTorch and Scikit-learn containers specifically. TensorFlow has slightly different handlers because it’s integrated with TensorFlow Serving. For an inference script with TensorFlow, you have two model handlers: input_handler and output_handler. Again, these have the same preprocessing and postprocessing purpose that you can work with, but they’re configured slightly differently to integrate with TensorFlow Serving. For PyTorch models, model_fn is a compulsory function to have in the inference script.

model_fn

This is the function that is first called when you invoke your SageMaker endpoint. This is where you write your code to load the model. For example:

def model_fn(model_dir):
    model = Your_Model()
    with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    return model

Depending on the framework and type of model, this code may change, but the function must return an initialized model.

input_fn

This is the second function that is called when your endpoint is invoked. This function takes the data sent to the endpoint for inference and parses it into the format required for the model to generate a prediction. For example:

def input_fn(request_body, request_content_type):
    """An input_fn that loads a pickled tensor"""
    if request_content_type == 'application/python-pickle':
        return torch.load(BytesIO(request_body))
    else:
        # Handle other content-types here or raise an Exception
        # if the content type is not supported.
        pass

The request_body contains the data to be used for generating inference from the model and is parsed in this function so that it’s in the required format.

predict_fn

This is the third function that is called when your model is invoked. This function takes the preprocessed input data returned from input_fn and uses the model returned from model_fn to make the prediction. For example:

def predict_fn(input_data, model):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    with torch.no_grad():
        return model(input_data.to(device))

You can optionally add output_fn to parse the output of predict_fn before returning it to the client. The function signature is def output_fn(prediction, content_type).

Move your pre-trained model to SageMaker

After you have your trained model file and inference script, you must put these files in a folder as follows:

#SKLearn Model

model_folder/
    model.pkl
    inference.py
    
# Tensorflow Model
model_folder/
    0000001/
        assets/
        variables/
        keras_metadata.pb
        saved_model.pb
    inference.py
    
# PyTorch Model
model_folder/
    model.pth
    inference.py

After your model and inference script have been prepared and saved in this folder structure, your model is ready for deployment on SageMaker. See the following code:

from sagemaker_migration import frameworks as fwk

if __name__ == "__main__":
    ''' '''
    sk_model = fwk.SKLearnModel(
        version = "0.23-1", 
        model_data = 'model.joblib',
        inference_option = 'real-time',
        inference = 'inference.py',
        instance_type = 'ml.m5.xlarge'
    )
    sk_model.deploy_to_sagemaker()

After deployment of your endpoint, make sure to clean up any resources you won’t utilize via the SageMaker console or through the delete_endpoint Boto3 API call.

Conclusion

The goal of the SageMaker Migration Toolkit project is to make it easy for data scientists to onboard their models onto SageMaker to take advantage of cloud-based inference. The repository will continue to evolve and support more options for migrating workloads to SageMaker. The code is open source and we welcome community contributions through pull requests and issues.

Check out the GitHub repository to explore more on utilizing the SageMaker Migration Toolkit, and feel free to also contribute examples or feature requests to add to the project!


About the authors

Kirit Thadaka is an ML Solutions Architect working in the Amazon SageMaker Service SA team. Prior to joining AWS, Kirit spent time working in early stage AI startups followed by some time in consulting in various roles in AI research, MLOps, and technical leadership.

Ram Vegiraju is a ML Architect with the SageMaker Service team. He focuses on helping customers build and optimize their AI/ML solutions on Amazon SageMaker. In his spare time, he loves traveling and writing.

Read More

Introducing self-service quota management and higher default service quotas for Amazon Textract

Today, we’re excited to announce self-service quota management support for Amazon Textract via the AWS Service Quotas console, and higher default service quotas in select AWS Regions.

Customers tell us they need quick turnaround times to process their requests for quota increases and visibility into their service quotas so they may continue to scale their Amazon Textract usage. With this launch, we’re improving Amazon Textract support for service quotas by enabling you to self-manage your service quotas via the Service Quotas console. In addition to viewing the default service quotas, you can now view your account’s applied custom quotas for a specific Region, view your historical utilization metrics per applied quota, set up alarms to notify when utilization approaches a threshold, and add tags to your quotas for easier organization. Additionally, we’re launching the Amazon Textract Service Quota Calculator, which will help you quickly estimate service quota requirements for your workload prior to submitting a quota increase request.

In this post, we discuss the updated default service quotas, the new service quota management capabilities, and the service quota calculator for Amazon Textract.

Increased default service quotas for Amazon Textract

Amazon Textract now has higher service quotas for several asynchronous and synchronous APIs in multiple major AWS Regions. The updated default service quotas are available for US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Mumbai), and Europe (Ireland) Regions. The following table summarizes the before and after default quota numbers for each of these Regions for the respective synchronous and asynchronous APIs. You can refer to Amazon Textract endpoints and quotas to learn more about the current default quotas.

Synchronous Operations API Region Before After
Transactions per second per account for synchronous operations AnalyzeDocument US East (Ohio) 1 10
Asia Pacific (Mumbai) 1 5
Europe (Ireland) 1 5
DetectDocumentText US East (Ohio) 1 10
US East (N. Virginia) 10 25
US West (Oregon) 10 25
Asia Pacific (Mumbai) 1 5
Europe (Ireland) 1 5
Asynchronous Operations API Region Before After
Transactions per second per account for all Start (asynchronous) operations StartDocumentAnalysis US East (Ohio) 2 10
Asia Pacific (Mumbai) 2 5
Europe (Ireland) 2 5
StartDocumentTextDetection US East (Ohio) 1 5
US East (N. Virginia) 10 15
US West (Oregon) 10 15
Asia Pacific (Mumbai) 1 5
Europe (Ireland) 1 5
Transactions per second per account for all Get (asynchronous) operations GetDocumentAnalysis US East (Ohio) 5 10
GetDocumentTextDetection US East (Ohio) 5 10
US East (N. Virginia) 10 25
US West (Oregon) 10 25

Improved service quota support for Amazon Textract

Starting today, you can manage your Amazon Textract service quotas via the Service Quotas console. Requests may now be processed automatically, speeding up approval times. After a quota request for a specific Region is approved, the new quota is immediately available for scaling your Amazon Textract usage and also visible on the Service Quotas console. You can see the default and applied quota values for your account in a given Region, and view the historical utilization metrics via an integrated Amazon CloudWatch graph. This enables you to make informed decisions about whether a quota increase is required to scale your workload. You can also use CloudWatch alarms to notify whenever a specified quota reaches a predefined threshold, which can help investigate issues with your applications or monitor spikey workloads. You can also add tags to the quotas, which allows better administration and monitoring.

The following sections discusses the features that are now available via the Service Quotas console for Amazon Textract.

Default and applied quotas

You can now have visibility into the AWS default quota value and applied quota value of a specific quota for Amazon Textract on the Service Quotas console. The default quota value is the default value of the quota in that specific Region, and the applied quota value is the currently applied value for that quota for the account in that Region.

Monitoring via CloudWatch graphs

The Service Quotas console also displays a utilization against the total applied quota value. You can also view the weekly, daily, and hourly trend in utilization of the applied quota through an integrated CloudWatch graph, right from the Service Quotas console, for a given quota. You can add this graph to a custom CloudWatch dashboard for better monitoring and reporting of service usage and overall utilization.

Amazon Textract Service Quota Console cloudwatch graph and alarms

We have also added the capability to set up CloudWatch alarms to notify you automatically whenever a specified quota reaches a certain configurable threshold. This helps you monitor the usage of Amazon Textract from your applications, analyze spikey workloads, make informed decisions about the overall utilization, control costs, and make improvements to the application’s architecture.

Quota tagging

With quota tagging, you can now add tags to applied quotas to simplify administration. Tags help you identify and organize AWS resources. With quota tags, you can manage the applied service quotas for Amazon Textract along with other AWS service quotas, as part of your administration and governance practices. You can better manage and monitor quotas and quota utilization for different environments based on tags. For example, you can use production or development tags to logically separate and monitor dev environment and production environment quotas and quota utilization for accounts under AWS Organizations and unified reporting.

Amazon Textract Service Quota Calculator

We’re introducing a new quota calculator on the Amazon Textract console. The quota calculator helps forecast service quota requirements based on answers to questions about your workload and usage of Amazon Textract. With calculations based on your usage patterns, such as number of documents and number of pages per document, it provides actionable recommendations in the form of a required quota value for the workload.

As shown in the following screenshot, the quota calculator is now accessible directly from the Amazon Textract console. You can also navigate to the Service Quotas console directly from the calculator, where you can manage the service quotas based on the calculated recommendations.

Amazon Textract Quota Calculator

Quota calculator for synchronous operations

To view the current quota values and recommended quota values for synchronous operations, you start by selecting Synchronous under Processing type. For example, if you’re interested in calculating the desired quota values for your workload that uses the DetectDocumentText API, you select the Synchronous processing type, and then choose Detect Document Text on the Use case type drop-down menu.

Amazon Quota Calculator sync calculation

After you specify your desired options, the quota calculator prompts for additional inputs, which include the maximum number of documents you expect to process via the API per day or per hour. The corresponding numbers of documents to be processed shown under View calculation is automatically calculated based on the input. Because synchronous processing allows text detection and analysis of single-page documents, the number of pages per document defaults to 1. For multi-page documents, we recommend using asynchronous processing.

Amazon Quota Calculator sync input usage values

The output of this calculation is a current quota value applicable for that account in the current Region, and the recommended quota value, based on the quota type selected and the provided number of documents.

Amazon Quota Calculator sync calculation output

You can copy the recommended quota value within the calculator and use the Quota type (in this case, DetectDocumentText) deep link to navigate to the specific quota on the Service Quotas console to create a quota increase request.

Quota calculator for asynchronous operations

The way to view current quota values and recommended quota values for asynchronous operations is similar to that of the synchronous operations. Specify the use case type for your asynchronous operation usage, and answer a few questions relevant to your workload to view the current quotas and recommended quotas for all the asynchronous operations relevant to the use case.

For example, if you’re running asynchronous jobs using the StartDocumentTextDetection API and consecutively using the GetDocumentTextDetection API to get the results of the job in your workload, choose the Document Text Detection option as your use case. Because these two APIs are always used in conjunction to each other, the calculator provides recommendations for both the APIs. For asynchronous operations, there are limits on the total number of concurrent jobs that can be run per account in a given Region. Therefore, the calculator also calculates the recommended total number of concurrent asynchronous jobs recommended for your workload.

Amazon Quota Calculator Async calculation

In addition to the processing type and use case type, you need to provide specific values relevant to your workload:

  • The maximum number of documents you expect to process
  • A processing time frame value in hours, which is the approximate length of time over which you expect to process the documents
  • The maximum number of pages per document, because asynchronous operations allow processing multi-page documents

Amazon Quota Calculator Async calculation input usage values

Quota calculation for asynchronous operations generates recommended quota values for all the asynchronous APIs relevant to the selected use case. In our example, the quota values for the StartDocumentTextDetection API, GetDocumentTextDetection API, and number of concurrent text detection jobs are generated by the calculator, as shown in the following screenshot. You can then use the required quota value to request quota increases via the Service Quotas console using the corresponding deep links under Quota type.

Amazon Quota Calculator Async calculation output

It’s worth noting that the all the quota-related information within the calculator is shown for the current AWS Region for the AWS Management Console. To view the quota information for a different Region, you can change the Region manually from the top navigation bar of the console. Recommendations generated by the calculator are based on the current applied quota for that account for the current Region, the selected processing type (asynchronous and synchronous), and other information relevant to your workload. You can use these recommendations to submit quota increase requests via the Service Quotas console. Although most requests are processed automatically, some requests may need additional manual review prior to being approved.

Conclusion

In this post, we announced the updated default service quotas in select AWS Regions and the self-service quota management capabilities of Amazon Textract. We also announced the availability of a new quota calculator, available on the Amazon Textract console. You can start taking advantage of the new default service quotas, and use the Amazon Textract quota calculator to generate recommended quota values to quickly scale your workload. With the improved Service Quotas console for Amazon Textract, you can request quota increases, monitor quota utilization and service usage, and set up alarms. With the features announced in this post, you can now easily monitor your quota utilization, manage costs, and follow best practices to scale your Amazon Textract usage.

To learn more about the Amazon Textract service quota calculator and extended features for quota management, visit Quotas in Amazon Textract.


About the authors

Anjan BiswasAnjan Biswas is a Senior AI Services Solutions Architect with focus on AI/ML and Data Analytics. Anjan is part of the world-wide AI services team and works with customers to help them understand, and develop solutions to business problems with AI and ML. Anjan has over 14 years of experience working with global supply chain, manufacturing, and retail organizations and is actively helping customers get started and scale on AWS AI services.

Shashwat SapreShashwat Sapre is a Senior Technical Product Manager with the Amazon Textract team. He is focused on building machine learning-based services for AWS customers. In his spare time, he likes reading about new technologies, traveling and exploring different cuisines.

Read More

Large-scale revenue forecasting at Bosch with Amazon Forecast and Amazon SageMaker custom models

This post is co-written by Goktug Cinar, Michael Binder, and Adrian Horvath from Bosch Center for Artificial Intelligence (BCAI).

Revenue forecasting is a challenging yet crucial task for strategic business decisions and fiscal planning in most organizations. Often, revenue forecasting is manually performed by financial analysts and is both time consuming and subjective. Such manual efforts are especially challenging for large-scale, multinational business organizations that require revenue forecasts across a wide range of product groups and geographical areas at multiple levels of granularity. This requires not only accuracy but also hierarchical coherence of the forecasts.

Bosch is a multinational corporation with entities operating in multiple sectors, including automotive, industrial solutions, and consumer goods. Given the impact of accurate and coherent revenue forecasting on healthy business operations, the Bosch Center for Artificial Intelligence (BCAI) has been heavily investing in the use of machine learning (ML) to improve the efficiency and accuracy of financial planning processes. The goal is to alleviate the manual processes by providing reasonable baseline revenue forecasts via ML, with only occasional adjustments needed by the financial analysts using their industry and domain knowledge.

To achieve this goal, BCAI has developed an internal forecasting framework capable of providing large-scale hierarchical forecasts via customized ensembles of a wide range of base models. A meta-learner selects the best-performing models based on features extracted from each time series. The forecasts from the selected models are then averaged to obtain the aggregated forecast. The architectural design is modularized and extensible through the implementation of a REST-style interface, which allows continuous performance improvement via the inclusion of additional models.

BCAI partnered with the Amazon ML Solutions Lab (MLSL) to incorporate the latest advances in deep neural network (DNN)-based models for revenue forecasting. Recent advances in neural forecasters have demonstrated state-of-the-art performance for many practical forecasting problems. Compared to traditional forecasting models, many neural forecasters can incorporate additional covariates or metadata of the time series. We include CNN-QR and DeepAR+, two off-the-shelf models in Amazon Forecast, as well as a custom Transformer model trained using Amazon SageMaker. The three models cover a representative set of the encoder backbones often used in neural forecasters: convolutional neural network (CNN), sequential recurrent neural network (RNN), and transformer-based encoders.

One of the key challenges faced by the BCAI-MLSL partnership was to provide robust and reasonable forecasts under the impact of COVID-19, an unprecedented global event causing great volatility on global corporate financial results. Because neural forecasters are trained on historical data, the forecasts generated based on out-of-distribution data from the more volatile periods could be inaccurate and unreliable. Therefore, we proposed the addition of a masked attention mechanism in the Transformer architecture to address this issue.

The neural forecasters can be bundled as a single ensemble model, or incorporated individually into Bosch’s model universe, and accessed easily via REST API endpoints. We propose an approach to ensemble the neural forecasters through backtest results, which provides competitive and robust performance over time. Additionally, we investigated and evaluated a number of classical hierarchical reconciliation techniques to ensure that forecasts aggregate coherently across product groups, geographies, and business organizations.

In this post, we demonstrate the following:

  • How to apply Forecast and SageMaker custom model training for hierarchical, large-scale time-series forecasting problems
  • How to ensemble custom models with off-the-shelf models from Forecast
  • How to reduce the impact of disruptive events such as COVID-19 on forecasting problems
  • How to build an end-to-end forecasting workflow on AWS

Challenges

We addressed two challenges: creating hierarchical, large-scale revenue forecasting, and the impact of the COVID-19 pandemic on long-term forecasting.

Hierarchical, large-scale revenue forecasting

Financial analysts are tasked with forecasting key financial figures, including revenue, operational costs, and R&D expenditures. These metrics provide business planning insights at different levels of aggregation and enable data-driven decision-making. Any automated forecasting solution needs to provide forecasts at any arbitrary level of business-line aggregation. At Bosch, the aggregations can be imagined as grouped time series as a more general form of hierarchical structure. The following figure shows a simplified example with a two-level structure, which mimics the hierarchical revenue forecasting structure at Bosch. The total revenue is split into multiple levels of aggregations based on product and region.

The total number of time series that need to be forecasted at Bosch is at the scale of millions. Notice that the top-level time series can be split by either products or regions, creating multiple paths to the bottom level forecasts. The revenue needs to be forecasted at every node in the hierarchy with a forecasting horizon of 12 months into the future. Monthly historical data is available.

The hierarchical structure can be represented using the following form with the notation of a summing matrix S (Hyndman and Athanasopoulos):

In this equation, Y equals the following:

Here, b represents the bottom level time-series at time t.

Impacts of the COVID-19 pandemic

The COVID-19 pandemic brought significant challenges for forecasting due to its disruptive and unprecedented effects on almost all aspects of work and social life. For long-term revenue forecasting, the disruption also brought unexpected downstream impacts. To illustrate this problem, the following figure shows a sample time series where the product revenue experienced a significant drop at the start of the pandemic and gradually recovered afterwards. A typical neural forecasting model will take revenue data including the out-of-distribution (OOD) COVID period as the historical context input, as well as the ground truth for model training. As a result, the forecasts produced are no longer reliable.

Modeling approaches

In this section, we discuss our various modeling approaches.

Amazon Forecast

Forecast is a fully-managed AI/ML service from AWS that provides preconfigured, state-of-the-art time series forecasting models. It combines these offerings with its internal capabilities for automated hyperparameter optimization, ensemble modeling (for the models provided by Forecast), and probabilistic forecast generation. This allows you to easily ingest custom datasets, preprocess data, train forecasting models, and generate robust forecasts. The service’s modular design further enables us to easily query and combine predictions from additional custom models developed in parallel.

We incorporate two neural forecasters from Forecast: CNN-QR and DeepAR+. Both are supervised deep learning methods that train a global model for the entire time series dataset. Both CNNQR and DeepAR+ models can take in static metadata information about each time series, which are the corresponding product, region, and business organization in our case. They also automatically add temporal features such as month of the year as part of the input to the model.

Transformer with attention masks for COVID

The Transformer architecture (Vaswani et al.), originally designed for natural language processing (NLP), recently emerged as a popular architectural choice for time series forecasting. Here, we used the Transformer architecture described in Zhou et al. without probabilistic log sparse attention. The model uses a typical architecture design by combining an encoder and a decoder. For revenue forecasting, we configure the decoder to directly output the forecast of the 12-month horizon instead of generating the forecast month by month in an autoregressive manner. Based on the frequency of the time series, additional time related features such as month of the year are added as the input variable. Additional categorical variables describing the meta information (product, region, business organization) are fed into the network via a trainable embedding layer.

The following diagram illustrates the Transformer architecture and the attention masking mechanism. Attention masking is applied throughout all the encoder and decoder layers, as highlighted in orange, to prevent OOD data from affecting the forecasts.

We mitigate the impact of OOD context windows by adding attention masks. The model is trained to apply very little attention to the COVID period that contains outliers via masking, and performs forecasting with masked information. The attention mask is applied throughout every layer of the decoder and encoder architecture. The masked window can be either specified manually or through an outlier detection algorithm. Additionally, when using a time window containing outliers as the training labels, the losses are not back-propagated. This attention masking-based method can be applied to handle disruptions and OOD cases brought by other rare events and improve the robustness of the forecasts.

Model ensemble

Model ensemble often outperforms single models for forecasting—it improves model generalizability and is better at handling time series data with varying characteristics in periodicity and intermittency. We incorporate a series of model ensemble strategies to improve model performance and robustness of forecasts. One common form of deep learning model ensemble is to aggregate results from model runs with different random weight initializations, or from different training epochs. We utilize this strategy to obtain forecasts for the Transformer model.

To further build an ensemble on top of different model architectures, such as Transformer, CNNQR, and DeepAR+, we use a pan-model ensemble strategy that selects the top-k best performing models for each time series based on the backtest results and obtain their averages. Because backtest results can be exported directly from trained Forecast models, this strategy enables us to take advantage of turnkey services like Forecast with improvements gained from custom models such as Transformer. Such an end-to-end model ensemble approach doesn’t require training a meta-learner or calculating time series features for model selection.

Hierarchical reconciliation

The framework is adaptive to incorporate a wide range of techniques as postprocessing steps for hierarchical forecast reconciliation, including bottom-up (BU), top-down reconciliation with forecasting proportions (TDFP), ordinary least square (OLS), and weighted least square (WLS). All the experimental results in this post are reported using top-down reconciliation with forecasting proportions.

Architecture overview

We developed an automated end-to-end workflow on AWS to generate revenue forecasts utilizing services including Forecast, SageMaker, Amazon Simple Storage Service (Amazon S3), AWS Lambda, AWS Step Functions, and AWS Cloud Development Kit (AWS CDK). The deployed solution provides individual time series forecasts through a REST API using Amazon API Gateway, by returning the results in predefined JSON format.

The following diagram illustrates the end-to-end forecasting workflow.

Key design considerations for the architecture are versatility, performance, and user-friendliness. The system should be sufficiently versatile to incorporate a diverse set of algorithms during development and deployment, with minimal required changes, and can be easily extended when adding new algorithms in the future. The system should also add minimum overhead and support parallelized training for both Forecast and SageMaker to reduce training time and obtain the latest forecast faster. Finally, the system should be simple to use for experimentation purposes.

The end-to-end workflow sequentially runs through the following modules:

  1. A preprocessing module for data reformatting and transformation
  2. A model training module incorporating both the Forecast model and custom model on SageMaker (both are running in parallel)
  3. A postprocessing module supporting model ensemble, hierarchical reconciliation, metrics, and report generation

Step Functions organizes and orchestrates the workflow from end to end as a state machine. The state machine run is configured with a JSON file containing all the necessary information, including the location of the historical revenue CSV files in Amazon S3, the forecast start time, and model hyperparameter settings to run the end-to-end workflow. Asynchronous calls are created to parallelize model training in the state machine using Lambda functions. All the historical data, config files, forecast results, as well as intermediate results such as backtesting results are stored in Amazon S3. The REST API is built on top of Amazon S3 to provide a queryable interface for querying forecasting results. The system can be extended to incorporate new forecast models and supporting functions such as generating forecast visualization reports.

Evaluation

In this section, we detail the experiment setup. Key components include the dataset, evaluation metrics, backtest windows, and model setup and training.

Dataset

To protect the financial privacy of Bosch while using a meaningful dataset, we used a synthetic dataset that has similar statistical characteristics to a real-world revenue dataset from one business unit at Bosch. The dataset contains 1,216 time series in total with revenue recorded in a monthly frequency, covering January 2016 to April 2022. The dataset is delivered with 877 time series at the most granular level (bottom time series), with a corresponding grouped time series structure represented as a summing matrix S. Each time series is associated with three static categorical attributes, which corresponds to product category, region, and organizational unit in the real dataset (anonymized in the synthetic data).

Evaluation metrics

We use median-Mean Arctangent Absolute Percentage Error (median-MAAPE) and weighted-MAAPE to evaluate the model performance and perform comparative analysis, which are the standard metrics used at Bosch. MAAPE addresses the shortcomings of the Mean Absolute Percentage Error (MAPE) metric commonly used in business context. Median-MAAPE gives an overview of the model performance by computing the median of the MAAPEs calculated individually on each time series. Weighted-MAAPE reports a weighted combination of the individual MAAPEs. The weights are the proportion of the revenue for each time series compared to the aggregated revenue of the entire dataset. Weighted-MAAPE better reflects downstream business impacts of the forecasting accuracy. Both metrics are reported on the entire dataset of 1,216 time series.

Backtest windows

We use rolling 12-month backtest windows to compare model performance. The following figure illustrates the backtest windows used in the experiments and highlights the corresponding data used for training and hyperparameter optimization (HPO). For backtest windows after COVID-19 starts, the result is affected by OOD inputs from April to May 2020, based on what we observed from the revenue time series.

Model setup and training

For Transformer training, we used quantile loss and scaled each time series using its historical mean value before feeding it into Transformer and computing the training loss. The final forecasts are rescaled back to calculate the accuracy metrics, using the MeanScaler implemented in GluonTS. We use a context window with monthly revenue data from the past 18 months, selected via HPO in the backtest window from July 2018 to June 2019. Additional metadata about each time series in the form of static categorical variables are fed into the model via an embedding layer before feeding it to the transformer layers. We train the Transformer with five different random weight initializations and average the forecast results from the last three epochs for each run, in total averaging 15 models. The five model training runs can be parallelized to reduce training time. For the masked Transformer, we indicate the months from April to May 2020 as outliers.

For all Forecast model training, we enabled automatic HPO, which can select the model and training parameters based on a user-specified backtest period, which is set to the last 12 months in the data window used for training and HPO.

Experiment results

We train masked and unmasked Transformers using the same set of hyperparameters, and compared their performance for backtest windows immediately after COVID-19 shock. In the masked Transformer, the two masked months are April and May 2020. The following table shows the results from a series of backtest periods with 12-month forecasting windows starting from June 2020. We can observe that the masked Transformer consistently outperforms the unmasked version.

We further performed evaluation on the model ensemble strategy based on backtest results. In particular, we compare the two cases when only the top performing model is selected vs. when the top two performing models are selected, and model averaging is performed by computing the mean value of the forecasts. We compare the performance of the base models and the ensemble models in the following figures. Notice that none of the neural forecasters consistently out-perform others for the rolling backtest windows.

The following table shows that, on average, ensemble modeling of the top two models gives the best performance. CNNQR provides the second-best result.

Conclusion

This post demonstrated how to build an end-to-end ML solution for large-scale forecasting problems combining Forecast and a custom model trained on SageMaker. Depending on your business needs and ML knowledge, you can use a fully managed service such as Forecast to offload the build, train, and deployment process of a forecasting model; build your custom model with specific tuning mechanisms with SageMaker; or perform model ensembling by combining the two services.

If you would like help accelerating the use of ML in your products and services, please contact the Amazon ML Solutions Lab program.

References

Hyndman RJ, Athanasopoulos G. Forecasting: principles and practice. OTexts; 2018 May 8.

Vaswani A, Shazeer N, Parmar N, Uszkoreit J, Jones L, Gomez AN, Kaiser Ł, Polosukhin I. Attention is all you need. Advances in neural information processing systems. 2017;30.

Zhou H, Zhang S, Peng J, Zhang S, Li J, Xiong H, Zhang W. Informer: Beyond efficient transformer for long sequence time-series forecasting. InProceedings of AAAI 2021 Feb 2.


About the Authors

Goktug Cinar is a lead ML scientist and the technical lead of the ML and stats-based forecasting at Robert Bosch LLC and Bosch Center for Artificial Intelligence. He leads the research of the forecasting models, hierarchical consolidation, and model combination techniques as well as the software development team which scales these models and serves them as part of the internal end-to-end financial forecasting software.

Michael Binder is a product owner at Bosch Global Services, where he coordinates the development, deployment and implementation of the company wide predictive analytics application for the large-scale automated data driven forecasting of financial key figures.

Adrian Horvath is a Software Developer at Bosch Center for Artificial Intelligence, where he develops and maintains systems to create predictions based on various forecasting models.

Panpan Xu is a Senior Applied Scientist and Manager with the Amazon ML Solutions Lab at AWS. She is working on research and development of Machine Learning algorithms for high-impact customer applications in a variety of industrial verticals to accelerate their AI and cloud adoption. Her research interest includes model interpretability, causal analysis, human-in-the-loop AI and interactive data visualization.

Jasleen Grewal is an Applied Scientist at Amazon Web Services, where she works with AWS customers to solve real world problems using machine learning, with special focus on precision medicine and genomics. She has a strong background in bioinformatics, oncology, and clinical genomics. She is passionate about using AI/ML and cloud services to improve patient care.

Selvan Senthivel is a Senior ML Engineer with the Amazon ML Solutions Lab at AWS, focusing on helping customers on machine learning, deep learning problems, and end-to-end ML solutions. He was a founding engineering lead of Amazon Comprehend Medical and contributed to the design and architecture of multiple AWS AI services.

Ruilin Zhang is an SDE with the Amazon ML Solutions Lab at AWS. He helps customers adopt AWS AI services by building solutions to address common business problems.

Shane Rai is a Sr. ML Strategist with the Amazon ML Solutions Lab at AWS. He works with customers across a diverse spectrum of industries to solve their most pressing and innovative business needs using AWS’s breadth of cloud-based AI/ML services.

Lin Lee Cheong is an Applied Science Manager with the Amazon ML Solutions Lab team at AWS. She works with strategic AWS customers to explore and apply artificial intelligence and machine learning to discover new insights and solve complex problems.

Read More

Detect population variance of endangered species using Amazon Rekognition

Our planet faces a global extinction crisis. UN Report shows a staggering number of more than a million species feared to be on the path of extinction. The most common reasons for extinction include loss of habitat, poaching, and invasive species. Several wildlife conservation foundations, research scientists, volunteers, and anti-poaching rangers have been working tirelessly to address this crisis. Having accurate and regular information about endangered animals in the wild will improve wildlife conservationists’ ability to study and conserve endangered species. Wildlife scientists and field staff use cameras equipped with infrared triggers, called camera traps, and place them in the most effective locations in forests to capture images of wildlife. These images are then manually reviewed, which is a very time-consuming process.

In this post, we demonstrate a solution using Amazon Rekognition Custom Labels along with motion sensor camera traps to automate this process to recognize engendered species and study them. Rekognition Custom Labels is a fully managed computer vision service that allows developers to build custom models to classify and identify objects in images that are specific and unique to their use case. We detail how to recognize endangered animal species from images collected from camera traps, draw insights about their population count, and detect humans around them. This information will be helpful to conservationists, who can make proactive decisions to save them.

Solution overview

The following diagram illustrates the architecture of the solution.
Solution overview
This solution uses the following AI services, serverless technologies, and managed services to implement a scalable and cost-effective architecture:

  • Amazon Athena – A serverless interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL
  • Amazon CloudWatch – A monitoring and observability service that collects monitoring and operational data in the form of logs, metrics, and events
  • Amazon DynamoDB – A key-value and document database that delivers single-digit millisecond performance at any scale
  • AWS Lambda – A serverless compute service that lets you run code in response to triggers such as changes in data, shifts in system state, or user actions
  • Amazon QuickSight – A serverless, machine learning (ML)-powered business intelligence service that provides insights, interactive dashboards, and rich analytics
  • Amazon Rekognition – Uses ML to identify objects, people, text, scenes, and activities in images and videos, as well as detect any inappropriate content
  • Amazon Rekognition Custom Labels – Uses AutoML to help train custom models to identify the objects and scenes in images that are specific to your business needs
  • Amazon Simple Queue Service (Amazon SQS) – A fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications
  • Amazon Simple Storage Service (Amazon S3) – Serves as an object store for documents and allows for central management with fine-tuned access controls.

The high-level steps in this solution are as follows:

  1. Train and build a custom model using Rekognition Custom Labels to recognize endangered species in the area. For this post, we train on images of rhinoceros.
  2. Images that are captured through the motion sensor camera traps are uploaded to an S3 bucket, which publishes an event for every uploaded image.
  3. A Lambda function is triggered for every event published, which retrieves the image from the S3 bucket and passes it to the custom model to detect the endangered animal.
  4. The Lambda function uses the Amazon Rekognition API to identify the animals in the image.
  5. If the image has any endangered species of rhinoceros, the function updates the DynamoDB database with the count of the animal, date of image captured, and other useful metadata that can be extracted from the image EXIF header.
  6. QuickSight is used to visualize the animal count and location data collected in the DynamoDB database to understand the variance of the animal population over time. By looking at the dashboards regularly, conservation groups can identify patterns and isolate probable causes like diseases, climate, or poaching that could be causing this variance and proactively take steps to address the issue.

Prerequisites

A good training set is required to build an effective model using Rekognition Custom Labels. We have used the images from AWS Marketplace (Animals & Wildlife Data Set from Shutterstock) and Kaggle to build the model.

Implement the solution

Our workflow includes the following steps:

  1. Train a custom model to classify the endangered species (rhino in our example) using the AutoML capability of Rekognition Custom Labels.

You can also perform these steps from the Rekognition Custom Labels console. For instructions, refer to Creating a project, Creating training and test datasets, and Training an Amazon Rekognition Custom Labels model.

In this example, we use the dataset from Kaggle. The following table summarizes the dataset contents.

Label Training Set Test Set
Lion 625 156
Rhino 608 152
African_Elephant 368 92
  1. Upload the pictures captured from the camera traps to a designated S3 bucket.
  2. Define the event notifications in the Permissions section of the S3 bucket to send a notification to a defined SQS queue when an object is added to the bucket.

Define event notification

The upload action triggers an event that is queued in Amazon SQS using the Amazon S3 event notification.

  1. Add the appropriate permissions via the access policy of the SQS queue to allow the S3 bucket to send the notification to the queue.

ML-9942-event-not

  1. Configure a Lambda trigger for the SQS queue so the Lambda function is invoked when a new message is received.

Lambda trigger

  1. Modify the access policy to allow the Lambda function to access the SQS queue.

Lambda function access policy

The Lambda function should now have the right permissions to access the SQS queue.

Lambda function permissions

  1. Set up the environment variables so they can be accessed in the code.

Environment variables

Lambda function code

The Lambda function performs the following tasks on receiving a notification from the SNS queue:

  1. Make an API call to Amazon Rekognition to detect labels from the custom model that identify the endangered species:
exports.handler = async (event) => {
const id = AWS.util.uuid.v4();
const bucket = event.Records[0].s3.bucket.name;
const photo = decodeURIComponent(event.Records[0].s3.object.key.replace(/+/g, ' '));
const client = new AWS.Rekognition({ region: REGION });
const paramsCustomLabel = {
Image: {
S3Object: {
Bucket: bucket,
Name: photo
},
},
ProjectVersionArn: REK_CUSTOMMODEL,
MinConfidence: MIN_CONFIDENCE
}
let response = await client.detectCustomLabels(paramsCustomLabel).promise();
console.log("Rekognition customLabels response = ",response);
  1. Fetch the EXIF tags from the image to get the date when the picture was taken and other relevant EXIF data. The following code uses the dependencies (package – version) exif-reader – ^1.0.3, sharp – ^0.30.7:
const getExifMetaData = async (bucket,key)=>{
return new Promise((resolve) => {
const s3 = new AWS.S3({ region: REGION });
const param = {
Bucket: bucket,
Key : key
};

s3.getObject(param, (error, data) => {
if (error) {
console.log("Error getting S3 file",error);
resolve({status:false,errorText: error.message});
} else {
sharp(data.Body)
.metadata()
.then(({ exif }) => {
const exifProperties = exifReader(exif);
resolve({status:true,exifProp: exifProperties});
}).catch(err => {console.log("Error Processing Exif ");resolve({status:false});})
}
});
});
}

var gpsData = "";
var createDate = "";
const imageS3 = await getExifMetaData(bucket, photo);
if(imageS3.status){
gpsData = imageS3.exifProp.gps;
createDate = imageS3.exifProp.image.CreateDate;
}else{
createDate = event.Records[0].eventTime;
console.log("No exif found in image, setting createDate as the date of event", createDate);
}

The solution outlined here is asynchronous; the images are captured by the camera traps and then at a later time uploaded to an S3 bucket for processing. If the camera trap images are uploaded more frequently, you can extend the solution to detect humans in the monitored area and send notifications to concerned activists to indicate possible poaching in the vicinity of these endangered animals. This is implemented through the Lambda function that calls the Amazon Rekognition API to detect labels for the presence of a human. If a human is detected, an error message is logged to CloudWatch Logs. A filtered metric on the error log triggers a CloudWatch alarm that sends an email to the conservation activists, who can then take further action.

  1. Expand the solution with the following code:
const paramHumanLabel = {
Image: {
S3Object: {
Bucket: bucket,
Name: photo
},
},
MinConfidence: MIN_CONFIDENCE
}

let humanLabel = await client.detectLabels(paramHumanLabel).promise();
let humanFound = humanLabel.Labels.filter(obj => obj.Name === HUMAN);
var humanDetected = false;
if(humanFound.length > 0){
console.error("Human Face Detected");
humanDetected = true;
}
  1. If any endangered species is detected, the Lambda function updates DynamoDB with the count, date and other optional metadata that is obtained from the image EXIF tags:
let dbresponse = await dynamo.putItem({
Item: {
id: { S: id },
type: { S: response.CustomLabels[0].Name },
image: {S : photo},
createDate: {S: createDate.toString()},
confidence: {S: response.CustomLabels[0].Confidence.toString()},
gps: {S: gpsData.toString()},
humanDetected: {BOOL: humanDetected}
},

TableName: ANIMAL_TABLENAME,
}).promise();

Query and visualize the data

You can now use Athena and QuickSight to visualize the data.

  1. Set the DynamoDB table as the data source for Athena.DynamoDB data source
  1. Add the data source details.

The next important step is to define a Lambda function that connects to the data source.

  1. Chose Create Lambda function.

Lambda function

  1. Enter names for AthenaCatalogName and SpillBucket; the rest can be default settings.
  2. Deploy the connector function.

Lambda connector

After all the images are processed, you can use QuickSight to visualize the data for the population variance over time from Athena.

  1. On the Athena console, choose a data source and enter the details.
  2. Choose Create Lambda function to provide a connector to DynamoDB.

Create Lambda function

  1. On the QuickSight dashboard, choose New Analysis and New Dataset.
  2. Choose Athena as the data source.

Athena as data source

  1. Enter the catalog, database, and table to connect to and choose Select.

Catalog

  1. Complete dataset creation.

Catalog

The following chart shows the number of endangered species captured on a given day.

QuickSight chart

GPS data is presented as part of the EXIF tags of a captured image. Due to the sensitivity of the location of these endangered animals, our dataset didn’t have the GPS location. However, we created a geospatial chart using simulated data to show how you can visualize locations when GPS data is available.

Geospatial chart

Clean up

To avoid incurring unexpected costs, be sure to turn off the AWS services you used as part of this demonstration—the S3 buckets, DynamoDB table, QuickSight, Athena, and the trained Rekognition Custom Labels model. You should delete these resources directly via their respective service consoles if you no longer need them. Refer to Deleting an Amazon Rekognition Custom Labels model for more information about deleting the model.

Conclusion

In this post, we presented an automated system that identifies endangered species, records their population count, and provides insights about variance in population over time. You can also extend the solution to alert the authorities when humans (possible poachers) are in the vicinity of these endangered species. With the AI/ML capabilities of Amazon Rekognition, we can support the efforts of conservation groups to protect endangered species and their ecosystems.

For more information about Rekognition Custom Labels, refer to Getting started with Amazon Rekognition Custom Labels and Moderating content. If you’re new to Rekognition Custom Labels, you can use our Free Tier, which lasts 3 months and includes 10 free training hours per month and 4 free inference hours per month. The Amazon Rekognition Free Tier includes processing 5,000 images per month for 12 months.


About the Authors

author-jyothiJyothi Goudar is Partner Solutions Architect Manager at AWS. She works closely with global system integrator partner to enable and support customers moving their workloads to AWS.

Jay Rao is a Principal Solutions Architect at AWS. He enjoys providing technical and strategic guidance to customers and helping them design and implement solutions on AWS.

Read More