Data processing options for AI/ML

Training an accurate machine learning (ML) model requires many different steps, but none are potentially more important than data processing. Examples of processing steps include converting data to the input format expected by the ML algorithm, rescaling and normalizing, cleaning and tokenizing text, and many more. However, data processing at scale involves considerable operational overhead: managing complex infrastructure like processing clusters, writing code to tie all the moving pieces together, and implementing security and governance. Fortunately, AWS provides a wide variety of data processing options to suit every ML workload and teams’ preferred workflows. This set of options expanded even more at AWS re:Invent 2020, so now is the perfect time to examine how to choose between them.

In this post, we review the primary options and provide guidance on how to select one to match your use case and how your team prefers to work with Python, Spark, SQL, and other tools. Although the discussion centers around Amazon SageMaker for ML model building, training, and hosting, it’s equally applicable to workflows where other AWS services are used for these tasks, such as Amazon Personalize or Amazon Comprehend. The main assumption we make is that the decision is being made by those in data science, ML engineering, or MLOps roles. Other factors that are important in making the decision are team experience level, and inclination for writing code and managing infrastructure. Lower experience and inclination typically map to choosing a more fully managed option instead of a less managed or “roll your own” approach.

Prerequisite: Data Lake or Lake House

Before we dive deep into the options, there is a question we must answer: how do we reconcile our chosen option with the preferred technology choices of data engineering teams? Different tools may be suited to different roles; the tools a data scientist may prefer for an ML workflow may have little overlap with the tools used by a data engineer to support analytics workloads such as reporting. The good news is that AWS makes it very easy for these roles to pick their own tools and apply them to their organization’s data without conflict. The key is to create a data lake in Amazon Simple Storage Service (Amazon S3) at the center of the organization’s architecture for all data. This separates data and compute and avoids the problem of each team having individual data silos.

With a data lake in the center of the architecture, data engineering teams can apply their own tools for analytics workloads. At the same time, data science teams can also use their own separate tools to access the same data for ML workloads. Multiple separate processing clusters run by various teams can access the same data, always keeping in mind the need to retain the raw data in Amazon S3 for all teams as a source of truth. Additionally, use of a feature store for transformed data, such as Amazon SageMaker Feature Store, by data science teams helps delineate the boundary with data engineering, as well as provide benefits such as feature discovery, sharing, updating, and reuse.

As an alternative to a “classic” data lake, the teams might build on top of a Lake House Architecture, an evolution of the concept of a data lake. Featuring support for ACID transactions, this architecture enables multiple users to concurrently insert, delete, and modify rows across tables, while still allowing other users to simultaneously run analytical queries and ML models on the same datasets. AWS Lake Formation recently added new features to support the Lake House Architecture (currently in preview).

Now that we’ve solved the conundrum of enabling data engineering and data science teams to use their separate, preferred tools without conflict, let’s examine the data processing options for ML workloads on AWS.

Options overview

In this post, we review the following processing options. They’re in categories ranked by the following formula: (user friendliness for data scientists and ML engineers) x (usefulness for ML-specific tasks).

  1. SageMaker managed features only
  2. Low (or no) code solutions with other AWS services
  3. Spark in Amazon EMR
  4. Self-managed stack with Python or R

Keep in mind that these are not mutually exclusive; you can use them in various combinations to suit your team’s preferred workflow. For example, some teams may prefer to use SQL as much as possible, while others may use Spark for some tasks in addition to Python frameworks like Pandas. Another point to consider is that some services have built-in data visualization capabilities, while others do not and require use of other services for visualization. Let’s discuss the specifics of each option.

SageMaker managed features

SageMaker is a fully managed service that helps data scientists and developers prepare, build, train, and deploy high-quality ML models quickly by bringing together a broad set of capabilities purpose-built for ML. These capabilities include robust data processing features. For data processing and data preparation, you can use either Amazon SageMaker Data Wrangler or Amazon SageMaker Processing for the processing itself, and either Amazon SageMaker Studio or SageMaker notebook instances for data visualization. You can process datasets with sizes ranging from small to very large (petabytes) with SageMaker.

SageMaker Data Wrangler is a feature of SageMaker, enabled through SageMaker Studio, that makes it easy for data scientists and ML engineers to aggregate and prepare data for ML applications using a visual interface to accelerate data cleansing, exploration, and visualization. It allows you to easily connect to various data sources such as Amazon S3 and apply built-in transformations or custom transformations written in PySpark, Pandas, or SQL.

SageMaker Processing comes built in with SageMaker, and provides you with full control of your cluster resources such as instance count, type, and storage. It includes prebuilt containers for Spark and Scikit-learn, and offers an easy path for integrating your custom containers. For a “lift and shift” of an existing workload to SageMaker, SageMaker Processing may be a good fit. The following table compares SageMaker Processing and SageMaker Data Wrangler across some key dimensions.

The following table compares SageMaker Processing and SageMaker Data Wrangler across some key dimensions.

The SageMaker option is ranked first due to its ease of use for data scientists and ML engineers, and its usefulness for ML-specific tasks; it was built from the ground up specifically to support ML workloads. However, several other options may be useful even though they weren’t developed solely for, or dedicated specifically to, ML workloads. Let’s review those options next.

Low (or no) code

This option involves several services that are serverless: infrastructure details and management are hidden under the hood. Additionally, there might be no need to write custom code, or in some cases, any code at all. This may lead to a relatively fast path to results, while potentially causing greater workflow friction by requiring you to switch between multiple services, UIs, and tools, and sacrificing some flexibility and ability to customize. For our purposes, we consider a solution that requires SQL queries to be a low code solution, and one that doesn’t require any code, even SQL, to be a no code solution.

For example, one low code possibility involves Amazon Athena, a serverless interactive query service, for transforming data using standard SQL queries, in combination with Amazon QuickSight, a serverless BI tool that offers no code, built-in visualizations. When evaluating this powerful combination, consider whether your data transformations can be accomplished with SQL. On the visualization side, an alternative to QuickSight is to use a library such as PyAthena to run the queries in SageMaker notebooks with Python code and visualize the results there.

Another low code possibility involves AWS Glue, a serverless ETL service that catalogs your data and offers built-in transforms, along with the ability to write custom PySpark code. For visualizations, besides QuickSight, you can attach either SageMaker or Zeppelin notebooks to an AWS Glue development endpoint. Choosing between AWS Glue and Athena comes down to a team’s preference for using SQL versus PySpark code (in the case when AWS Glue built-in transforms don’t fully cover the desired set of data transforms).

A no code possibility is AWS Glue DataBrew, a serverless visual data preparation tool, to transform data, combined with either the SageMaker console to start model training jobs using built-in algorithms such as XGBoost, or the SageMaker Studio UI to start AutoML model training jobs with SageMaker Autopilot. With many built-in transformations and built-in visualizations, DataBrew covers both data processing and data visualization. However, if your dataset requires custom transformations other than the built-in ones, you need to pair DataBrew with another solution that allows you to write custom code. Autopilot automatically performs typical featurization of data (such as one-hot encoding of categorical values) as part of its AutoML pipeline, so you might find the set of transformations in DataBrew sufficient if paired with Autopilot. The following table provides a more detailed comparison.

The following table provides a more detailed comparison.

Spark in Amazon EMR

Many organizations use Spark for data processing and other purposes, such as the basis for a data warehouse. In these situations, Spark clusters are typically run in Amazon EMR, a managed service for Hadoop-ecosystem clusters, which eliminates the need to do your own setup, tuning, and maintenance. From the perspective of a data scientist or ML engineer, Spark in Amazon EMR may be considered in the following circumstances:

  • Spark is already used for a data warehouse or other application with a persistent cluster. Unlike the other options we described, which only provision transient resources, Amazon EMR also enables creation of persistent clusters to support analytics applications.
  • The team already has a complete end-to-end pipeline in Spark and also the skillset and inclination to run a persistent Spark cluster for the long term. Otherwise, the SageMaker and AWS Glue options for Spark generally are preferable.

Another consideration is the wider range of instance types offered by Amazon EMR, including AWS Graviton2 processors and Amazon EC2 Spot Instances for cost optimization.

For visualization with Amazon EMR, there are several choices. To keep your primary ML workflow within SageMaker, use SageMaker Studio and its built-in SparkMagic kernel to connect to Amazon EMR. You can start to query, analyze, and process data with Spark in a few steps. For added security, you can connect to EMR clusters using Kerberos authentication. Amazon EMR also features other integrations with SageMaker, for example you can start a SageMaker model training job from a Spark pipeline in Amazon EMR. Another visualization possibility is to use Amazon EMR Studio (preview), which provides access to fully managed Jupyter notebooks, and includes the ability to log in via AWS Single Sign-On (AWS SSO). However, EMR Studio lacks the many SageMaker-specific UI integrations of SageMaker Studio.

There are other factors to consider when evaluating this option. Spark is based on the Scala/Java stack, with all the problems that entails in regard to dependency management and JVM issues that may be unfamiliar to data scientists. Also keep in mind that Spark’s PySpark API has often lagged behind its primary API in Scala, which is a language less familiar to data scientists. In this regard, if you prefer the alternative Dask framework for your workloads, you can install Dask on your EMR clusters.

Self-managed stack using Python or R

For this option, teams roll their own solutions using Amazon Elastic Compute Cloud (Amazon EC2) compute resources, or the container services Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (Amazon EKS).  Integration with SageMaker is most conveniently achieved using the Amazon SageMaker Python SDK. Any machine with AWS Identity and Access Management (IAM) permissions to SageMaker can use the SageMaker Python SDK to invoke SageMaker functionality for model building, training, tuning, deployment, and more.

This option provides the most flexibility to mix and match any data processing tools and frameworks. It also offers access to the widest range of EC2 instance types and storage options. In addition to the possibility of using Spot Instances similarly to Amazon EMR, you can also use this option with the flexible pricing model of AWS Savings Plans. These plans can be applied not only to Amazon EC2 resources, but also to serverless compute AWS Lambda resources, and serverless compute engine AWS Fargate resources.

However, keep in mind in regard to user-friendliness for data scientists and ML engineers, this option requires them to manage low-level infrastructure, a task better suited to other roles. Also, with respect to usefulness for ML-specific tasks, although there are many frameworks and tools that can be layered on top of these services to make management easier and provide specific functionality for ML workloads, this option is still far less managed than the preceding options. It requires more personnel time to manage, tune, maintain infrastructure and dependencies, and write code to fill functionality gaps. As a result, this option also is likely to prove the most costly in the long run.

Review and conclusion

Your choice of a data processing option for ML workloads typically depends on your team’s preference for tools (Spark, SQL, or Python) and inclination for writing code and managing infrastructure. The following table summarizes the options across several relevant dimensions. The first column emphasizes that separate services or features may be used for processing and related visualization, and the third column refers to resources used to process data rather than for visualization, which tends to happen on lighter-weight resources.

The following table summarizes the options across several relevant dimensions.

Workloads evolve over time, and you don’t need to be locked in to one set of tools forever. You can mix and match according to your use case. When you use Amazon S3 at the center of your data lake and the fully managed SageMaker service for core ML workflow steps, it’s easy to switch tools as needed or desired to accommodate the latest technologies. Whichever option you choose now, AWS provides the flexibility to evolve your tool chain to best fit the then-current data processing needs of your ML workloads.


About the Author

Brent Rabowsky focuses on data science at AWS, and leverages his expertise to help AWS customers with their own data science projects.

Read More

Translating JSON documents using Amazon Translate

JavaScript Object Notation (JSON) is a schema-less, lightweight format for storing and transporting data. It’s a text-based, self-describing representation of structured data that is based on key-value pairs. JSON is supported either natively or through libraries in most major programming languages, and is commonly used to exchange information between web clients and web servers. Over the last 15 years, JSON has become ubiquitous on the web and is the format of choice for almost all web services.

To reach more users, you often want to localize your content and applications that may be in JSON format. This post shows you a serverless approach for easily translating JSON documents using Amazon Translate. Serverless architecture is ideal because it is event-driven and can automatically scale, making it a cost effective solution. In this approach, JSON tags are left as they are, and the content within those tags is translated. This allows you to preserve the context of the text, so that translations can be handled with greater precision. The approach presented here was recently used by a large higher education customer of AWS for translating media documents that are in JSON format.

Amazon Translate is a neural machine translation service that delivers fast, high-quality, affordable, and customizable language translation. Neural machine translation uses deep learning models to deliver more accurate and natural-sounding translation than traditional statistical and rule-based translation algorithms. The translation service is trained on a wide variety of content across different use cases and domains to perform well on many kinds of content. Its asynchronous batch processing capability enables you to translate a large collection of text, HTML, and OOXML documents with a single API call.

In this post, we walk you through creating an automated and serverless pipeline for translating JSON documents using Amazon Translate.

Solution overview

Amazon Translate currently supports the ability to ignore tags and only translate text content in XML documents. In this solution, we therefore first convert JSON documents to XML documents, use Amazon Translate to convert text content in the XML document, and then covert the XML document back to JSON.

The solution uses serverless technologies and managed services to provide maximum scalability and cost-effectiveness. In addition to Amazon Translate, the solution uses the following services:

  • AWS Lambda – Runs code in response to triggers such as changes in data, changes in application state, or user actions. Because services like Amazon S3 and Amazon SNS can directly trigger a Lambda function, you can build a variety of real-time serverless data-processing systems.
  • Amazon Simple Notification Service (Amazon SNS) – Enables you to decouple microservices, distributed systems, and serverless applications with a highly available, durable, secure, fully managed publish/subscribe messaging service.
  • Amazon Simple Storage Service (Amazon S3) – Stores your documents and allows for central management with fine-tuned access controls.
  • AWS Step Functions – Coordinates multiple AWS services into serverless workflows.

Solution architecture

The architecture workflow contains the following steps:

  1. Users upload one or more JSON documents to Amazon S3.
  2. The Amazon S3 upload triggers a Lambda function.
  3. The function converts the JSON documents into XML, stores them in Amazon S3, and invokes Amazon Translate in batch mode to translate the XML documents texts into the target language.
  4. The Step Functions-based job poller polls for the translation job to complete.
  5. Step Functions sends an SNS notification when the translation is complete.
  6. A Lambda function reads the translated XML documents in Amazon S3, converts them to JSON documents, and stores them back in Amazon S3.

The following diagram illustrates this architecture.

Deploying the solution with AWS CloudFormation

The first step is to use an AWS CloudFormation template to provision the necessary resources needed for the solution, including the AWS Identity and Access Management (IAM) roles, IAM policies, and SNS topics.

  1. Launch the AWS CloudFormation template by choosing Launch Stack (this creates the stack the us-east-1 Region):
  1. For Stack name, enter a unique stack name for this account; for example, translate-json-document.
  2. For SourceLanguageCode, enter the language code for the current language of the JSON documents; for example, en for English.
  3. For TargetLanguageCode, enter the language code that you want your translated documents in; for example, es for Spanish.

For more information about supported languages, see Supported Languages and Language Codes.

  1. For TriggerFileName, enter the name of the file that triggers the translation serverless pipeline; the default is triggerfile.
  2. In the Capabilities and transforms section, select the check boxes to acknowledge that AWS CloudFormation will create IAM resources and transform the AWS Serverless Application Model (AWS SAM) template.

AWS SAM templates simplify the definition of resources needed for serverless applications. When deploying AWS SAM templates in AWS CloudFormation, AWS CloudFormation performs a transform to convert the AWS SAM template into a CloudFormation template. For more information, see Transform.

  1. Choose Create stack.

The stack creation may take up to 20 minutes, after which the status changes to CREATE_COMPLETE. You can see the name of the newly created S3 bucket on the Outputs tab.

Translating JSON documents

To translate your documents, upload one or more JSON documents to the input folder of the S3 bucket you created in the previous step. For this post, we use the following JSON file:

{
    "firstName": "John",
    "lastName": "Doe",
    "isAlive": true,
    "age": 27,
    "address": {
      "streetAddress": "100 Main Street",
      "city": "Anytown",
      "postalCode": "99999-9999"
    },
    "phoneNumbers": [
      {
        "type": "home",
        "number": "222 555-1234"
      },
      {
        "type": "office",
        "number": "222 555-4567"
      }
    ],
    "children": ["Richard Roe", "Paulo Santos"],
    "spouse": "Jane Doe"
}

After you upload all the JSON documents, upload the file that triggers the translation workflow. This file can be a zero-byte file, but the filename should match the TriggerFileName parameter in the CloudFormation stack. The default name for the file is triggerfile.

This upload event triggers the Lambda function <Stack name>-S3FileEventProcessor-<Random string>, which converts the uploaded JSON documents into XML and places them in the xmlin folder of the S3 bucket. The function then invokes the Amazon Translate startTextTranslationJob, with the xmlin folder in the S3 bucket location as the input location and the xmlout folder as the output location for the translated XML files.

The following code is the processRequest method in the <Stack name>-S3FileEventProcessor-<Random string> Lambda function:

def processRequest(request):
    output = ""
    logger.info("request: {}".format(request))

    bucketName = request["bucketName"]
    sourceLanguageCode = request["sourceLanguage"]
    targetLanguageCode = request["targetLanguage"]
    access_role = request["access_role"]
    triggerFile = request["trigger_file"]
    try:
        # Filter only the JSON files for processing
        objs = S3Helper().getFilteredFileNames(bucketName,"input/","json")
        for obj in objs:
            try:
                content = S3Helper().readFromS3(bucketName,obj)
                logger.debug(content)
                jsonDocument = json.loads(content)
                print(jsonDocument)
                # Convert the JSON document into XML
                outputXML = json2xml.Json2xml(jsonDocument, attr_type=False).to_xml()
                logger.debug(outputXML)
                newObjectKey = "xmlin/{}.xml".format(FileHelper.getFileName(obj))
                # Store the XML in the S3 location for Translation
                S3Helper().writeToS3(str(outputXML),bucketName,newObjectKey)   
                output = "Output Object: {}/{}".format(bucketName, newObjectKey)
                logger.debug(output)
                # Rename the JSON files to prevent reprocessing
                S3Helper().renameObject(bucketName,obj,"{}.processed".format(obj))
            except ValueError:
                logger.error("Error occured loading the json file:{}".format(obj))
            except ClientError as e:
                logger.error("An error occured with S3 Bucket Operation: %s" % e)
        # Start the translation batch job using Amazon Translate
        startTranslationJob(bucketName,sourceLanguageCode,targetLanguageCode,access_role)
        S3Helper().deleteObject(bucketName,"input/{}".format(triggerFile))
    except ClientError as e:
        logger.error("An error occured with S3 Bucket Operation: %s" % e)

The Amazon Translate job completion SNS notification from the job poller triggers the Lambda function <Stack name>-TranslateJsonJobSNSEventProcessor-<Random string>. The function converts the XML document created by the Amazon Translate batch job to JSON documents in the output folder of the S3 bucket with the following naming convention: TargetLanguageCode-<inputJsonFileName>.json.

The following code shows the JSON document translated in Spanish.

{
    "firstName": "John",
    "lastName": "Cierva",
    "isAlive": "Es verdad",
    "age": "27",
    "address": {
        "streetAddress": "100 Calle Principal",
        "city": "En cualquier ciudad",
        "postalCode": "99999-9999"
    },
    "phoneNumbers": {
        "item": [
            {
                "type": "hogar",
                "number": "222 555-1234"
            },
            {
                "type": "oficina",
                "number": "222 555-4567"
            }
        ]
    },
    "children": {
        "item": [
            "Richard Roe",
            "Paulo Santos"
        ]
    },
    "spouse": "Jane Doe"
}

The following code is the processRequest method containing the logic in the <Stack name>-TranslateJsonJobSNSEventProcessor-<Random string> Lambda function:

def processRequest(request):
    output = ""
    logger.debug("request: {}".format(request))
    up = urlparse(request["s3uri"], allow_fragments=False)
    accountid = request["accountId"]
    jobid =  request["jobId"]
    bucketName = up.netloc
    objectkey = up.path.lstrip('/')
    # choose the base path for iterating within the translated files for the specific job
    basePrefixPath = objectkey  + accountid + "-TranslateText-" + jobid + "/"
    languageCode = request["langCode"]
    logger.debug("Base Prefix Path:{}".format(basePrefixPath))
    # Filter only the translated XML files for processing
    objs = S3Helper().getFilteredFileNames(bucketName,basePrefixPath,"xml")
    for obj in objs:
        try:
            content = S3Helper().readFromS3(bucketName,obj)
            logger.debug(content)
            #Convert the XML file to Dictionary object
            data_dict = xmltodict.parse(content)
            #Generate the Json content from the dictionary
            data_dict =  data_dict["all"]
            flatten_dict = {k: (data_dict[k]["item"] if (isinstance(v,dict) and len(v.keys()) ==1 and "item" in v.keys())  else v) for (k,v) in data_dict.items()}
            json_data = json.dumps(flatten_dict,ensure_ascii=False).encode('utf-8')
            logger.debug(json_data)
            newObjectKey = "output/{}.json".format(FileHelper.getFileName(obj))
            #Write the JSON object to the S3 output folder within the bucket
            S3Helper().writeToS3(json_data,bucketName,newObjectKey)   
            output = "Output Object: {}/{}".format(bucketName, newObjectKey)
            logger.debug(output)
        except ValueError:
            logger.error("Error occured loading the json file:{}".format(obj))
        except ClientError as e:
            logger.error("An error occured with S3 bucket operations: %s" % e)
        except :
            e = sys.exc_info()[0]
            logger.error("Error occured processing the xmlfile: %s" % e)
    objs = S3Helper().getFilteredFileNames(bucketName,"xmlin/","xml")
    if( request["delete_xmls"] and request["delete_xmls"] == "true") :
        for obj in objs:
            try:
                logger.debug("Deleting temp xml files {}".format(obj))
                S3Helper().deleteObject(bucketName,obj)
            except ClientError as e:
                logger.error("An error occured with S3 bucket operations: %s" % e)
            except :
                e = sys.exc_info()[0]
                logger.error("Error occured processing the xmlfile: %s" % e)

For any pipeline failures, check the Amazon CloudWatch Logs for the corresponding Lambda function and look for potential errors that caused the failure.

To do a translation for a different source-target language combination, you can update the SOURCE_LANG_CODE and TARGET_LANG_CODE environment variable for the <Stack name>-S3FileEventProcessor-<Random string> Lambda function and trigger the solution pipeline by uploading JSON documents and the TriggerFileName into the input folder of the S3 bucket.

All code used in this post is available in the GitHub repo. If you want to build your own pipeline and don’t need to use the CloudFormation template provided, you can use the file s3_event_handler.py under the directory translate_json in the GitHub repo. That file carries code to convert a JSON file into XML as well as to call the Amazon Translate API. The code for converting translated XML back to JSON format is available in the file sns_event_handler.py.

Conclusion

In this post, we demonstrated how to translate JSON documents using Amazon Translate asynchronous batch processing.

You can easily integrate the approach into your own pipelines as well as handle large volumes of JSON text given the scalable serverless architecture. This methodology works for translating JSON documents between over 70 languages that are supported by Amazon Translate (as of this writing). Because this solution uses asynchronous batch processing, you can customize your machine translation output using parallel data. For more information on using parallel data, see Customizing Your Translations with Parallel Data (Active Custom Translation). For a low-latency, low-throughput solution translating smaller JSON documents, you can perform the translation through the real-time Amazon Translate API.

For further reading, we recommend the following:


About the Authors

Siva Rajamani is a Boston-based Enterprise Solutions Architect for AWS. He enjoys working closely with customers and supporting their digital transformation and AWS adoption journey. His core areas of focus are serverless, application integration, and security. Outside of work, he enjoys outdoors activities and watching documentaries.

 

 

Raju Penmatcha is a Senior AI/ML Specialist Solutions Architect at AWS. He works with education, government, and non-profit customers on machine learning and artificial intelligence related projects, helping them build solutions using AWS. When not helping customers, he likes traveling to new places with his family.

 

 

Read More

Using container images to run PyTorch models in AWS Lambda

PyTorch is an open-source machine learning (ML) library widely used to develop neural networks and ML models. Those models are usually trained on multiple GPU instances to speed up training, resulting in expensive training time and model sizes up to a few gigabytes. After they’re trained, these models are deployed in production to produce inferences. They can be synchronous, asynchronous, or batch-based workloads. Those endpoints must be highly scalable and resilient in order to process from zero to millions of requests. This is where AWS Lambda can be a compelling compute service for scalable, cost-effective, and reliable synchronous and asynchronous ML inferencing. Lambda offers benefits such as automatic scaling, reduced operational overhead, and pay-per-inference billing.

This post shows you how to use any PyTorch model with Lambda for scalable inferences in production with up to 10 GB of memory. This allows us to use ML models in Lambda functions up to a few gigabytes. For the PyTorch example, we use the Huggingface Transformers, open-source library to build a question-answering endpoint.

Overview of solution

Lambda is a serverless compute service that lets you run code without provisioning or managing servers. Lambda automatically scales your application by running code in response to every event, allowing event-driven architectures and solutions. The code runs in parallel and processes each event individually, scaling with the size of the workload, from a few requests per day to hundreds of thousands of workloads. The following diagram illustrates the architecture of our solution.

The following diagram illustrates the architecture of our solution.

You can package your code and dependencies as a container image using tools such as the Docker CLI. The maximum container size is 10 GB. After the model for inference is Dockerized, you can upload the image to Amazon Elastic Container Registry (Amazon ECR). You can then create the Lambda function from the container image stored in Amazon ECR.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Implementing the solution

We use a pre-trained language model (DistilBERT) from Huggingface. Huggingface provides a variety of pre-trained language models; the model we’re using is 250 MB large and can be used to build a question-answering endpoint.

We use the AWS SAM CLI to create the serverless endpoint with an Amazon API Gateway. The following diagram illustrates our architecture.

To implement the solution, complete the following steps: 

  1. On your local machine, run sam init.
  2. Enter 1 for the template source (AWS Quick Start Templates)
  3. As a package type, enter 2 for image.
  4. For the base image, enter 3 - amazon/python3.8-base.
  5. As a project name, enter lambda-pytorch-example.
  6. Change your workdir to lambda-pytorch-example and copy the following code snippets into the hello_world folder.

The following code is an example of a requirements.txt file to run PyTorch code in Lambda. Huggingface has as a dependency PyTorch so we don’t need to add it here separately. Add the requirements to the empty requirements.txt in the folder hello_world.

# List all python libraries for the lambda
transformers[torch]==4.1.1

The following is the code for the app.py file:

import json
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
import torch

tokenizer = AutoTokenizer.from_pretrained("model/")
model = AutoModelForQuestionAnswering.from_pretrained("model/")

def lambda_handler(event, context):

    body = json.loads(event['body'])

    question = body['question']
    context = body['context']

    inputs = tokenizer.encode_plus(question, context,add_special_tokens=True, return_tensors="pt")
    input_ids = inputs["input_ids"].tolist()[0]

    output = model(**inputs)
    answer_start_scores = output.start_logits
    answer_end_scores = output.end_logits

    answer_start = torch.argmax(answer_start_scores)
    answer_end = torch.argmax(answer_end_scores) + 1

    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))

    print('Question: {0}, Answer: {1}'.format(question, answer))

    return {
        'statusCode': 200,
        'body': json.dumps({
            'Question': question,
            'Answer': answer
        })
    }

The following Dockerfile is an example for Python 3.8, which downloads and uses the DistilBERT language model fine-tuned for the question-answering task. For more information, see DistilBERT base uncased distilled SQuAD. You can use your custom models by copying them to the model folder and referencing it in the app.py.

# Pull the base image with python 3.8 as a runtime for your Lambda
FROM public.ecr.aws/lambda/python:3.8

# Copy the earlier created requirements.txt file to the container
COPY requirements.txt ./

# Install the python requirements from requirements.txt
RUN python3.8 -m pip install -r requirements.txt

# Copy the earlier created app.py file to the container
COPY app.py ./

# Load the BERT model from Huggingface and store it in the model directory
RUN mkdir model
RUN curl -L https://huggingface.co/distilbert-base-uncased-distilled-squad/resolve/main/pytorch_model.bin -o ./model/pytorch_model.bin
RUN curl https://huggingface.co/distilbert-base-uncased-distilled-squad/resolve/main/config.json -o ./model/config.json
RUN curl https://huggingface.co/distilbert-base-uncased-distilled-squad/resolve/main/tokenizer.json -o ./model/tokenizer.json
RUN curl https://huggingface.co/distilbert-base-uncased-distilled-squad/resolve/main/tokenizer_config.json -o ./model/tokenizer_config.json

# Set the CMD to your handler
CMD ["app.lambda_handler"]

Change your working directory back to lambda-pytorch-example and copy the following content into the template.yaml file:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  python3.8

  Sample SAM Template for lambda-pytorch-example

Resources:
  pytorchEndpoint:
    Type: AWS::Serverless::Function
    Properties:
      PackageType: Image
      MemorySize: 5000
      Timeout: 300
      Events:
        ApiEndpoint:
          Type: HttpApi
          Properties:
            Path: /inference
            Method: post
            TimeoutInMillis: 29000
    Metadata:
      Dockerfile: Dockerfile
      DockerContext: ./hello_world
      DockerTag: python3.8-v1

Outputs:
  InferenceApi:
    Description: "API Gateway endpoint URL for Prod stage for inference function"
    Value: !Sub "https://${ServerlessHttpApi}.execute-api.${AWS::Region}.amazonaws.com/inference"

Now we need to create an Amazon ECR repository in AWS and register the local Docker to it. The repositoryUri is displayed in the output; save it for later.

# Create an ECR repository
aws ecr create-repository --repository-name lambda-pytorch-example --image-scanning-configuration scanOnPush=true --region <REGION>

# Register docker to ECR
aws ecr get-login-password --region <REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com

Deploying the application

The following steps deploy the application to your AWS account:

  1. Run sam build && sam deploy –-guided.
  2. For Stack Name, enter pytorch-lambda-example.
  3. Choose the same Region that you created the Amazon ECR repository in.
  4. Enter the image repository for the function (enter the earlier saved repositoryUri of the Amazon ECR repository).
  5. For Confirm changes before deploy and Allow SAM CLI IAM role creation, keep the defaults.
  6. For pytorchEndpoint may not have authorization defined, Is this okay?, select y.
  7. Keep the defaults for the remaining prompts.

AWS SAM uploads the container images to the Amazon ECR repository and deploys the application. During this process, you see a change set along with the status of the deployment. For a more detailed description about AWS SAM and container images for Lambda, see Using container image support for AWS Lambda with AWS SAM.

When the deployment is complete, the stack output is displayed. Use the InferenceApi endpoint to test your deployed application. The endpoint URL is displayed as an output during the deployment of the stack.

Overcoming a Lambda function cold start

Because the plain language model is already around 250 MB, the initial function run can take up to 25 seconds and may even exceed the maximum API timeout of 29 seconds. That time can also be reached when the function wasn’t called for some time and therefore is in a cold start mode. When the Lambda function is in a hot state, one inference run takes about 150 milliseconds.

There are multiple ways to mitigate the runtime of Lambda functions in a cold state. Lambda supports provisioned concurrency to keep the functions initialized. Another way is to create an Amazon CloudWatch event that periodically calls the function to keep it warm.

Make sure to change <API_GATEWAY_URL> to the URL of your API Gateway endpoint. In the following example code, the text is copied from the Wikipedia page on cars. You can change the question and context as you like and check the model’s answers.

curl --header "Content-Type: application/json" --request POST --data '{"question": "When was the car invented?","context": "Cars came into global use during the 20th century, and developed economies depend on them. The year 1886 is regarded as the birth year of the modern car when German inventor Karl Benz patented his Benz Patent-Motorwagen. Cars became widely available in the early 20th century. One of the first cars accessible to the masses was the 1908 Model T, an American car manufactured by the Ford Motor Company. Cars were rapidly adopted in the US, where they replaced animal-drawn carriages and carts, but took much longer to be accepted in Western Europe and other parts of the world."}' <API_GATEWAY_URL>

The response shows the correct answer to the question:

{"Question": "When was the car invented?", "Answer": "1886"}

Conclusion

Container image support for Lambda allows you to customize your function even more, opening up many new use cases for serverless ML. You can bring your custom models and deploy them on Lambda using up to 10 GB for the container image size. For smaller models that don’t need much computing power, you can perform online training and inference purely in Lambda. When the model size increases, cold start issues become more and more important and need to be mitigated. There is also no restriction on the framework or language with container images; other ML frameworks such as TensorFlow, Apache MXNet, XGBoost, or Scikit-learn can be used as well!

If you do require GPU for your inference, you can consider using containers services such as Amazon Elastic Container Service (Amazon ECS), Kubernetes, or deploy the model to an Amazon SageMaker endpoint


About the Author

Jan Bauer is a Cloud Application Developer at AWS Professional Services. His interests are serverless computing, machine learning, and everything that involves cloud computing.

Read More

Building secure machine learning environments with Amazon SageMaker

As businesses and IT leaders look to accelerate the adoption of machine learning (ML) and artificial intelligence (AI), there is a growing need to understand how to build secure and compliant ML environments that meet enterprise requirements. One major challenge you may face is integrating ML workflows into existing IT and business work streams. A second challenge is bringing together stakeholders from business leadership, data science, engineering, risk and compliance, and cybersecurity to define the requirements and guardrails for the organization. Third, because building secure ML environments in the cloud is a relatively new topic, understanding recommended practices is also helpful.

In this post, we introduce a series of hands-on workshops and associated code artifacts to help you build secure ML environments on top of Amazon SageMaker, a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy ML models quickly. The objective of these workshops is to address the aforementioned challenges by helping bring together different IT stakeholders and data scientists and provide best practices to build and operate secure ML environments. These workshops are a summary of recommended practices from large enterprises and small and medium businesses. You can access these workshops on Building Secure Environments, and you can find the associated code on GitHub. We believe that these workshops are valuable for the following primary teams:

  • Cloud engineering – This team is responsible for creating and maintaining a set of enterprise-wide guardrails for operating in the cloud. Key requirements for these teams include isolation from public internet, restriction of data traffic flows, use of strict AWS Identity and Access Management (IAM) controls to allow only authorized and authenticated users the ability to access project resources, and the use of defense-in-depth methodologies to detect and mitigate potential threats. This team can use tools like AWS Service Catalog to build repeatable patterns using infrastructure as code (IaC) practices via AWS CloudFormation.
  • ML platform: This team is responsible for building and maintaining the infrastructure for supporting ML services, such as provisioning notebooks for data scientists to use, creating secure buckets for storing data, managing costs for ML from various lines of business (LOBs), and more.
  • Data science COE: Data scientists within an AI Center of Excellence (COE) or embedded within the LOBs are responsible for building, training, and deploying models. In regulated industries, data scientists need to adhere to the organization’s security boundaries, such as using encrypted buckets for data access, use of private networking for accessing APIs, committing code to source control, ensuring all their experiments and trials are properly logged, enforcing encryption of data in transit, and monitoring deployed models.

The following diagram is the architecture for the secure environment developed in this workshop.

The following diagram is the architecture for the secure environment developed in this workshop.

In the Building Secure Environments workshop aimed at the cloud engineering and ML platform teams, we cover how this architecture can be set up in Labs 1–2. Specifically, we use AWS Service Catalog to provision a Shared Services Amazon Virtual Private Cloud (Amazon VPC), which hosts a private PyPI package repository to pull packages from an Amazon Simple Storage Service (Amazon S3) bucket via a secure VPC endpoint.

After the environment is provisioned, the following architecture diagram illustrates the typical data scientist workflow within the project VPC, which is covered in detail in the workshop Using Secure Environments aimed at data scientists.

After the environment is provisioned, the following architecture diagram illustrates the typical data scientist workflow within the project VPC.

This workshop quickly sets up the secure environment (Steps 1–3) and then focuses on using SageMaker notebook instances to securely explore and process data (Steps 4–5). Following that, we train a model (Steps 6–7) and deploy and monitor the model and model metadata (8–9) while enforcing version control (Step 4).

The workshops and associated code let you implement recommended practices and patterns and help you to quickly get started building secure environments, and improve productivity with the ability to securely build, train, deploy and monitor ML models. Although the workshop is built using SageMaker notebook instances, in this post we highlight how you can adapt this to Amazon SageMaker Studio. Although the workshop is built using SageMaker notebook instances, in this post we highlight how you can adapt this to Amazon SageMaker Studio, the first integrated development environment for machine learning on AWS.

Workshop features

The workshop is a collection of feature implementations grouped together to provide a coherent starting point for customers looking to build secure data science environments. The features implemented are broadly categorized across seven areas:

  • Enforce your existing IT policies in your AWS account and data science environment to mitigate risks
  • Create environments with least privilege access to sensitive data in the interest of reducing the blast radius of a compromised or malicious actor
  • Protect sensitive data against data exfiltration using a number of controls designed to mitigate the data exfiltration risk
  • Encrypt sensitive data and intellectual property at rest and in transit as part of a defense-in-depth strategy
  • Audit and trace activity in your environment
  • Reproduce results in your environment by tracking the lineage of ML artifacts throughout the lifecycle and using source and version control tools such as AWS CodeCommit
  • Manage costs and allow teams to self service using a combination of tagging and the AWS Service Catalog to automate building secure environments

In the following sections, we cover in more depth how these different features have been implemented.

Enforcing existing IT policies

When entrusting sensitive data to AWS services, you need confidence that you can govern your data to the same degree with the managed service as if you were running the service yourself. A typical starting point to govern your data in an AWS environment is to create a VPC that is tailored and configured to your standards in terms of information security, firewall rules, and routing. This becomes a starting point for your data science environment and the services that projects use to deliver on their objectives. SageMaker, and many other AWS services, can be deployed into your VPC. This allows you to use network-level controls to manage the Amazon Elastic Compute Cloud (Amazon EC2)-based resources that reside within the network. To learn about how to set up SageMaker Studio in a private VPC, see Securing Amazon SageMaker Studio connectivity using a private VPC.

The network-level controls deployed as a part of this workshop include the following:

  • Security groups to manage which resources and services, such as SageMaker, can communicate with other resources in the VPC
  • VPC endpoints to grant explicit access to specific AWS services from within the VPC, like Amazon S3 or Amazon CloudWatch
  • VPC endpoints to grant explicit access to customer-managed shared services such as a PyPi repository server

The shared service PyPi repository demonstrates how you can create managed artifact repositories that can then be shared across project environments. Because the environments don’t have access to the open internet, access to common package and library repositories is restricted to your repositories that hold your packages. This limits any potential threats from unapproved packages entering your secure environment.

With the launch of AWS CodeArtifact, you can now use CodeArtifact as your private PyPi repository. CodeArtifact provides VPC endpoints to maintain private networking. To learn more about how to integrate CodeArtifact with SageMaker notebook instances and Studio notebooks, see Private package installation in Amazon SageMaker running in internet-free mode.

In addition to configuring a secure network environment, this workshop also uses IAM policies to create a preventive control that requires that all SageMaker resources be provisioned within a customer VPC. An AWS Lambda function is also deployed as a corrective control to stop any SageMaker resources that are provisioned without a VPC attachment.

One of the unique elements of SageMaker notebooks is that they are managed EC2 instances in which you can tailor the operating system. This workshop uses SageMaker lifecycle configuration policies to configure the Linux operating system of the SageMaker notebook to be inline with IT policy, such as disabling root access for data scientists. For SageMaker Studio, you can enforce your IT policies of using security approved containers and packages for running notebooks by bringing your own custom image. SageMaker handles versioning of the images, and provides data scientists with a user-friendly drop-down to select the custom image of their choice.

Labs 1–3 in the Building Secure Environments and Labs 1–2 in the Using Secure Environments workshops focus on how you can enforce IT policies on your ML environments.

Least privilege access to sensitive data

In the interest of least privileged access to sensitive data, it’s simpler to provide isolated environments to any individual project. These isolated environments provide a method of restricting access to customer-managed assets, datasets, and AWS services on a project-by-project basis, with a lower risk of cross-project data movement. The following discusses some of the key mechanisms used in the workshops to provide isolated, project-specific environments. The workshop hosts multiple projects in a single AWS account, but given sufficient maturity of automation, you could provide the same level of isolation using project-specific AWS accounts. Although you can have multiple SageMaker notebook instances within a single account, you can only have one Studio domain per Region in an account. You can therefore use a domain to create isolated project-specific environments in separate accounts.

To host multiple projects in a single AWS account, the workshop dedicates a private, single-tenant VPC to each project. This creates a project-specific network boundary that grants access to specific AWS resources and services using VPC endpoints and endpoint policies. This combination creates logically isolated single-tenant project environments that are dedicated to a project team.

In addition to a dedicated network environment, the workshop creates AWS resources that are dedicated to individual projects. S3 buckets, for instance, are created per project and bound to the VPC for the project. An S3 bucket policy restricts the objects in the bucket to only be accessed from within the VPC. Equally, the endpoint policy associated with the Amazon S3 VPC endpoint within the VPC only allows principals in the VPC to communicate with those specific S3 buckets. This could be expanded as needed in order to support accessing other buckets, perhaps in conjunction with an Amazon S3-based data lake.

Other AWS resources that are created on behalf of an individual project include IAM roles that govern who can access the project environment and what permissions they have within the environment. This prevents other project teams from accessing resources in the AWS account that aren’t dedicated to that other project.

To manage intellectual property developed by the project, a CodeCommit repository is created to provide the project with a dedicated Git repository to manage and version control their source code. We use CodeCommit to commit any code developed in notebooks by data scientists in Labs 3–4 in the Using Secure Environments workshop.

Protecting against data exfiltration

As described earlier, project teams have access to AWS services and resources like Amazon S3 and objects in Amazon S3 through the VPC endpoints in the project’s VPC. The isolated VPC environment gives you full control over the ingress and egress of data flowing across the network boundary. The workshop uses security groups to govern which AWS resources can communicate with specific AWS services. The workshop also uses VPC endpoint policies to limit the AWS resources that can be accessed using the VPC endpoints.

When data is in Amazon S3, the bucket policy applied to the bucket doesn’t allow resources from outside the VPC to read data from the bucket, ensuring that it’s bound, as a backing store, to the VPC.

Data protection

The application of ML technologies is often done using sensitive customer data. This data may contain commercially sensitive, personal identifiable, or proprietary information that must be protected over the data’s lifetime. SageMaker and associated services such as Amazon Elastic Container Registry (Amazon ECR), Amazon S3, and CodeCommit all support end-to-end encryption both at rest and in transit.

Encryption at rest

SageMaker prefers to source information from Amazon S3, which supports multiple methods of encrypting data. For the purposes of this workshop, the S3 buckets are configured to automatically encrypt objects with a specified customer master key (CMK) that is stored in AWS Key Management Service (AWS KMS). A preventive control is also configured to require that data put into Amazon S3 is encrypted using a KMS key. These two mechanisms ensure that data stored in Amazon S3 is encrypted using a key that is managed and controlled by the customer.

Similar to Amazon S3, Amazon ECR is also used to store customer-built Docker containers that are likely to contain intellectual property. Amazon ECR supports the encryption of images at rest using a CMK. This enables you to support PCI-DSS compliance requirements for separate authentication of the storage and cryptography. With this feature enabled, Amazon ECR automatically encrypts images when pushed, and decrypts them when pulled.

As data is moving into SageMaker-managed resources from Amazon S3, it’s important to ensure that the encryption at rest of the data persists. SageMaker supports this by allowing the specification of KMS CMKs for encrypting the EBS volumes that hold the data retrieved from Amazon S3. Encryption keys can be specified to encrypt the volumes of all Amazon EC2-based SageMaker resources, such as processing jobs, notebooks, training jobs, and model endpoints. A preventive control is deployed in this workshop, which allows the provisioning of SageMaker resources only if a KMS key has been specified to encrypt the volumes.

Encryption in transit

AWS makes extensive use of HTTPS communication for its APIs. The services mentioned earlier are no exception. In addition to passing all API calls through a TLS encrypted channel, AWS APIs also require that requests are signed using the Signature version 4 signing process. This process uses client access keys to sign every API request, adding authentication information as well as preventing tampering of the request in flight.

As services like SageMaker, Amazon S3, and Amazon ECR interact with one another, they must also communicate using Signature V4 signed packets over encrypted HTTPS channels. This ensures that communication between AWS services is encrypted to a known standard, protecting customer data as it moves between services.

When communicating with SageMaker resources such as notebooks or hosted models, the communication is also performed over authenticated and signed HTTPS requests as with other AWS services.

Intra-node encryption

SageMaker provides added benefit to secure your data when training using distributed clusters. Some ML frameworks when performing distributed training pass coefficients between the different instances of the algorithm in plain text. This shared state is not your training data, but is the information that the algorithms require to stay synchronized with one another. You can instruct SageMaker to automatically encrypt inter-node communication for your training job. The data passed between nodes is then passed over an encrypted tunnel without your algorithm having to take on responsibility for encrypting and decrypting the data. To enable inter-node encryption, ensure that your security groups are configured to permit UDP traffic over port 500 and that you have set EnableInterContainerTrafficEncryption to True. For more detailed instructions, see Protect Communications Between ML Compute Instances in a Distributed Training Job.

Ensuring encryption at rest and in transit during the ML workflow is covered in detail in Labs 3–4 of the Using Secure Environments workshop.

Traceability, reproducibility, and auditability

A common pain point that you may face is a lack of recommended practices around code and ML lifecycle traceability. Often, this can arise from data scientists not being trained in MLOps (ML and DevOps) best practices, and the inherent experimental nature of the ML process. In regulated industries such as financial services, regulatory bodies such as the Office of the Comptroller of the Currency (OCC) and Federal Reserve Board (FRB) have documented guidelines on managing the risk of analytical models.

Lack of best practices around documenting the end-to-end ML lifecycle can lead to lost cycles in trying to trace the source code, model hyperparameters, and training data. The following figure shows the different steps in the lineage of a model that may be tracked for traceability and reproducibility reasons.

The following figure shows the different steps in the lineage of a model that may be tracked for traceability and reproducibility reasons.

Traceability refers to the ability to map outputs from one step in the ML cycle to the inputs of another, thereby having a record of the entire lineage of a model. Enforcing data scientists to use source and version control tools such as Git or BitBucket to regularly check in code, and not approve or promote models until code has been checked in, can help mitigate this issue. In this workshop, we provision a private CodeCommit repository for use by data scientists, along with their notebook instance. Admins can tag these repositories to the users, to identify the users responsible for the commits, and ensure code is being frequently checked into source control. One way to do this is to use project-specific branches, and ensure that the branch has been merged with the primary branch in the shared services environment prior to being promoted to pre-production or test. Data scientists should not be allowed to directly promote code from dev to production without this intermediate step.

In addition to versioning code, versioning data used for training models is important as well. All the buckets created in this workshop have versioning automatically enabled to enforce version control on any data stored there, such as training data, processed data, and training, validation and test data. SageMaker Experiments automatically keeps track of the pointer to the specific version of the training data used during model training.

Data scientists often tend to explore data in notebooks, and use notebooks to engineer features as well. In this workshop, we demonstrate how to use SageMaker Processing to not only offload the feature engineering code from the notebook instance onto separate compute instances to run at scale, but also to subsequently track parameters used for engineering features in SageMaker Experiments for reproducibility reasons. SageMaker recently launched SageMaker Clarify, which allows you to detect bias in your data as well as extract feature importances. You can run these jobs as you would run SageMaker Processing jobs using the Clarify SDK.

Versioning and tagging experiments, hyperparameter tuning jobs, and data processing jobs allow data scientists to collaborate faster. SageMaker Experiments automatically tracks and logs metadata from SageMaker training, processing, and batch transform jobs, and surfaces relevant information such as model hyperparameters, model artifact location, model container metadata in a searchable way. For more information, see Amazon SageMaker Experiments – Organize, Track And Compare Your Machine Learning Trainings.

Additionally, it keeps track of model metrics that allow data scientists to compare different trained models and identify the ones that meet their business objectives. You can also use SageMaker Experiments to track which user launched a training job and use IAM condition keys to enforce resource tags on the Experiment APIs.

Additionally, in SageMaker Studio, SageMaker Experiments tracks the user profile of the user launching jobs, providing additional auditability. We demonstrate the use of SageMaker Experiments and how you can use Experiments to search for specific trials and extract the model metadata in Labs 3–4 of the Using Secure Environments workshop.

Although accurately capturing the lineage of ML models can certainly help reproduce the model outputs, depending on the model’s risk level, you may also be required to document feature importance from your models. In this workshop, we demonstrate one methodology for doing so, using Shapley values. We note however that this approach is by no means exhaustive and you should work with your risk, legal, and compliance teams to assess legal, ethical, regulatory, and compliance requirements for, and implications of, building and using ML systems.

Deployed endpoints should be monitored against data drift as a best practice. In these workshops, we demonstrate how SageMaker Model Monitor automatically extracts the statistics from the features as a baseline, captures the input payload and the model predictions, and checks for any data drift against the baseline at regular intervals. The detected drift can be visualized using SageMaker Studio and used to set thresholds and alarms to re-trigger model retraining or alert developers of model drift.

To audit ML environments, admins can monitor instance-level metrics related to training jobs, processing jobs, and hyperparameter tuning jobs using CloudWatch Events. You can use lifecycle configurations to also publish Jupyter logs to CloudWatch. Here we demonstrate the use of detective and preventive controls to prevent data scientists from launching training jobs outside the project VPC. Additional preventive controls using IAM condition keys such as sagemaker:InstanceTypes may be added to prevent data scientists from misusing certain instance types (such as the more expensive GPU instances) or enforcing that data scientists only train models using AWS Nitro System instances, which offer enhanced security. Studio notebook logs are automatically published to CloudWatch.

Self-service

Customers are rapidly adopting IaC best practices using tools such as AWS CloudFormation or HashiCorp Terraform to ensure repeatability across their cloud workflows. However, a consistent pain point for data science and IT teams across enterprises has been the challenge to create repeatable environments that can be easily scaled across the organization.

AWS Service Catalog allows you to build products that abstract the underlying CloudFormation templates. These products can be shared across accounts, and a consistent taxonomy can be enforced using the TagOptions Library. Administrators can design products for the data science teams to run in their accounts that provision all the underlying resources automatically, while allowing data scientists to customize resources such as underlying compute instances (GPU or CPU) required for running notebooks, but disallowing data scientists from creating notebook instances any other way. Similarly, admins can enforce that data scientists enter their user information while creating products to have visibility on who is creating notebooks.

To allow teams to move at speed and to free constrained cloud operations teams from easily automated work, this workshop uses the AWS Service Catalog to automate common activities such as SageMaker notebook creation. AWS Service Catalog provides you with a way to codify your own best practice for deploying logically grouped assets, such as a project team environment, and allow project teams to deploy these assets for themselves.

The AWS Service Catalog allows cloud operations teams to give business users a way to self-service and obtain on-demand assets that are deployed in a manner compliant with internal IT policies. Business users no longer have to submit tickets for common activities and wait for the ticket to be serviced by the cloud operations team. Additionally, AWS Service Catalog provides the cloud operations team with a centralized location to understand who has deployed various assets and manage those deployed assets to ensure that, as IT policy evolves, updates can be provided across provisioned products. This is covered in detail in Labs 1–2 of the Building Secure Environments workshop.

Cost management

It’s important to be able to track expenses during the lifecycle of a project. To demonstrate this capability, the workshop uses cost tags to track all resources associated with any given project. The cost tags used in this workshop tag resources like SageMaker training jobs, VPCs, and S3 buckets with the project name and the environment type (development, testing, production). You can use these tags to identify a project’s costs across services and your environments to ensure that teams are accountable for their consumption. You can also use SageMaker Processing to offload feature engineering tasks and SageMaker Training jobs to train models at scale, and use lightweight notebooks and further save on costs. As we show in this workshop, admins can enforce this directly by allowing data scientists to create notebooks only via AWS Service Catalog using approved instance types only.

Conclusion

In this series of workshops, we have implemented a number of features and best practices that cover the most common pain points that CTO teams face when provisioning and using secure environments for ML. For a detailed discussion on ML governance as it applies to regulated industries such as financial services, see Machine Learning Best Practices in Financial Services. Additionally, you may want to look at the AWS Well-Architected guidelines as they apply to machine learning and financial services, respectively. Feel free to connect with the authors and don’t hesitate to reach out to your AWS account teams if you wish to run these hands-on labs.

Further reading


About the Authors

Jason BartoJason Barto works as a Principal Solutions Architect with AWS. Jason supports customers to accelerate and optimize their business by leveraging cloud services. Jason has 20 years of professional experience developing systems for use in secure, sensitive environments. He has led teams of developers and worked as a systems architect to develop petabyte scale analytics platforms, real-time complex event processing systems, and cyber-defense monitoring systems. Today he is working with financial services customers to implement secure, resilient, and self-healing data and analytics systems using open-source technologies and AWS services

Stefan Natu is a Sr. AI/ML Specialist Solutions Architect at Amazon Web Services. He is focused on helping financial services customers build end-to-end machine learning solutions on AWS. In his spare time, he enjoys reading machine learning blogs, playing the guitar, and exploring the food scene in New York City.

Read More

Running multiple HPO jobs in parallel on Amazon SageMaker

The ability to rapidly iterate and train machine learning (ML) models is key to deriving business value from ML workloads. Because ML models often have many tunable parameters (known as hyperparameters) that can influence the model’s ability to effectively learn, data scientists often use a technique known as hyperparameter optimization (HPO) to achieve the best-performing model against a certain predefined metric. Depending on the number of hyperparameters and the size of the search space, finding the best model can require thousands or even tens of thousands of training runs. Real-world problems that often require extensive HPO include image segmentation for modeling vehicular traffic for autonomous driving, developing algorithmic trading strategies based on historical financial data, or building fraud detection models on transaction data. Amazon SageMaker provides a built-in HPO algorithm that removes the undifferentiated heavy lifting required to build your own HPO algorithm. This post shows how to batch your HPO jobs to maximize the number of jobs you can run in parallel, thereby reducing the total time it takes to effectively cover the desired parameter space and obtain the best-performing models.

Before diving into the batching approach on Amazon SageMaker, let’s briefly review the state-of-the-art [1]. There are a large number of HPO algorithms, ranging from random or grid search, Bayesian search, and hand tuning, where researchers use their domain knowledge to tune parameters to population-based training inspired from genetic algorithms. For deep learning models, however, even training a single training run can be time consuming. In that case, it becomes important to have an aggressive early stopping strategy, which ends trials in search spaces that are unlikely to produce good results. Several strategies like successive halving or asynchronous successive halving use multi-arm bandits to trade-off between exploration (trying out different parameter combinations) versus exploitation (allowing a training run to converge). Finally, to help developers quickly iterate with these approaches, there are a number of tools, such as SageMaker HPO, Ray, HyperOpt, and more. In this post, you also see how you can bring one of the most popular HPO tools, Ray Tune, to SageMaker.

Use case: Predicting credit card loan defaults

To demonstrate this on a concrete example, imagine that you’re an ML engineer working for a bank, and you want to predict the likelihood of a customer defaulting on their credit card payments. To train a model, you use historical data available from the UCI repository. All the code developed in this post is made available on GitHub. The notebook covers the data preprocessing required to prep the raw data for training. Because the number of defaults is quite small (as shown in the following graph), we split the dataset into train and test, and upsample the training data to 50/50 default versus non-defaulted loans.

Then we upload the datasets to Amazon Simple Storage Service (Amazon S3). See the following code:

#Upload Training and test data into S3
train_s3 = sagemaker_session.upload_data(path='./train_full_split/', key_prefix=prefix + '/train')
print(train_s3)
test_s3 = sagemaker_session.upload_data(path='./test_full_split/', key_prefix=prefix + '/test')
print(test_s3)

Although SageMaker provides many built-in algorithms, such as XGBoost, in this post we demonstrate how to apply HPO to a custom PyTorch model using the SageMaker PyTorch training container using script mode. You can then adapt this to your own custom deep learning code. Furthermore, we will demonstrate how you can bring custom metrics to SageMaker HPO.

When dealing with tabular data, it’s helpful to shard your dataset into smaller files to avoid long data loading times, which can starve your compute resources and lead to inefficient CPU/GPU usage. We create a custom Dataset class to fetch our data and wrap this in the DataLoader class to iterate over the dataset. We set the batch size to 1, because each batch consists of 10,000 rows, and load it using Pandas.

Our model is a simple feed-forward neural net, as shown in the following code snippet:

class Net(nn.Module):
    def __init__(self, inp_dimension):
        super().__init__()
        self.fc1 = nn.Linear(inp_dimension, 500)
        self.drop = nn.Dropout(0.3)
        self.bn1 = nn.BatchNorm1d(500)
        self.bn2=nn.BatchNorm1d(250)
        self.fc2 = nn.Linear(500, 250)
        self.fc3 = nn.Linear(250, 100)
        self.bn3=nn.BatchNorm1d(100)
        self.fc4 = nn.Linear(100,2)
        


    def forward(self, x):
        x = x.squeeze()
        x = F.relu(self.fc1(x.float()))
        x = self.drop(x)
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        x = self.drop(x)
        x = self.bn2(x)
        x = F.relu(self.fc3(x))
        x = self.drop(x)
        x = self.bn3(x)
        x = self.fc4(x)
        # last layer converts it to binary classification probability
        return F.log_softmax(x, dim=1)

As shown in the Figure above, the dataset is highly imbalanced and as such, model accuracy isn’t the most useful evaluation metric, because a baseline model that predicts all customers won’t default on their payments will have high accuracy. A more useful metric is the AUC, which is the area under the receiver operator characteristic (ROC) curve that aims to minimize the number of false positives while maximizing the number of true positives. A false positive (model incorrectly predicting a good customer will default on their payment) can cause the bank to lose revenue by denying credit cards to customers. To make sure that your HPO algorithm can optimize on a custom metric such as the AUC or F1-score, you need to log those metrics into STDOUT, as shown in the following code:

def test(model, test_loader, device, loss_optim):
    model.eval()
    test_loss = 0
    correct = 0
    fulloutputs = []
    fulltargets = []
    fullpreds = []
    with torch.no_grad():
        for i, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            target = target.squeeze()
            test_loss += loss_optim(output, target).item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            fulloutputs.append(output.cpu().numpy()[:, 1])
            fulltargets.append(target.cpu().numpy())
            fullpreds.append(pred.squeeze().cpu())

    i+=1
    test_loss /= i
    logger.info("Test set Average loss: {:.4f}, Test Accuracy: {:.0f}%;n".format(
            test_loss, 100. * correct / (len(target)*i)
        ))
    fulloutputs = [item for sublist in fulloutputs for item in sublist]
    fulltargets = [item for sublist in fulltargets for item in sublist]
    fullpreds = [item for sublist in fullpreds for item in sublist]
    logger.info('Test set F1-score: {:.4f}, Test set AUC: {:.4f} n'.format(
        f1_score(fulltargets, fullpreds), roc_auc_score(fulltargets, fulloutputs)))

Now we’re ready to define our SageMaker estimator and define the parameters for the HPO job:

estimator = PyTorch(entry_point="train_cpu.py",
                    role=role,
                    framework_version='1.6.0',
                    py_version='py36',
                    source_dir='./code',
                    output_path = f's3://{bucket}/{prefix}/output',
                    instance_count=1, 
                    sagemaker_session=sagemaker_session,
                    instance_type='ml.m5.xlarge', 
                    hyperparameters={
                        'epochs': 10, # run more epochs for HPO.
                        'backend': 'gloo' #gloo for cpu, nccl for gpu
                    }
            )
            
#specify the hyper-parameter ranges           
hyperparameter_ranges = {'lr': ContinuousParameter(0.001, 0.1),
                         'momentum': CategoricalParameter(list(np.arange(0, 10)/10))}

inputs ={'training': train_s3,
         'testing':test_s3}

#specify the custom HPO metric
objective_metric_name = 'test AUC'
objective_type = 'Maximize'
metric_definitions = [{'Name': 'test AUC',
                       'Regex': 'Test set AUC: ([0-9\.]+)'}]   #note that the regex must match your test function above      
estimator.fit({'training': train_s3,
                'testing':test_s3},
             wait=False)

We pass in the paths to the training and test data in Amazon S3.

With the setup in place, let’s now turn to running multiple HPO jobs.

Parallelizing HPO jobs

To run multiple hyperparameter tuning jobs in parallel, we must first determine the tuning strategy. SageMaker currently provides a random and Bayesian optimization strategy. For random strategy, different HPO jobs are completely independent of one another, whereas Bayesian optimization treats the HPO problem as a regression problem and makes intelligent guesses about the next set of parameters to pick based on the prior set of trials.

First, let’s review some terminology:

  • Trials – A trial corresponds to a single training job with a set of fixed values for the hyperparameters
  • max_jobs – The total number of training trials to run for that given HPO job
  • max_parallel_jobs – The maximum concurrent running trials per HPO job

Suppose you want to run 10,000 total trials. To minimize the total HPO time, you want to run as many trials as possible in parallel. This is limited by the availability of a particular Amazon Elastic Compute Cloud (Amazon EC2) instance type in your Region and account. If you want to modify or increase those limits, speak to your AWS account representatives.

For this example, let’s suppose that you have 20 ml.m5.xlarge instances available. This means that you can simultaneously run 20 trials of one instance each. Currently, without increasing any limits, SageMaker limits max_jobs to 500 and max_parallel_jobs to 10. This means that you need to run a total of 10,000/500 = 20 HPO jobs. Because you can run 20 trials and max_parallel_jobs is 10, you can maximize the number of simultaneous HPO jobs running by running 20/10 = 2 HPO jobs in parallel. So one approach to batch your code is to always have two jobs running, until you meet your total required jobs of 20.

In the following code snippet, we show two ways in which you can poll the number of running jobs to achieve this. The first approach uses boto3, which is the AWS SDK for Python to poll running HPO jobs, and can be run in your notebook and is illustrated pictorially in the following diagram. This approach can primarily be used by data scientists. Whenever the number of running HPO jobs falls below a fixed number, indicated by the blue arrows in the dashed box on the left, the polling code will launch new jobs (shown in orange arrows). The second approach uses Amazon Simple Queue Service (Amazon SQS) and AWS Lambda to queue and poll SageMaker HPO jobs, allowing you to build an operational pipeline for repeatability.

Sounds complicated? No problem, the following code snippet allows you to determine the optimal strategy to minimize your overall HPO time by running as many HPO jobs in parallel as allowed. After you determine the instance type you want to use and your respective account limits for that instance, replace max_parallel_across_jobs with your value.

def bayesian_batching_cold_start(total_requested_trials, max_parallel_across_jobs=20, max_parallel_per_job=10, max_candidates_per_job = 500):
    '''Given a total number of requested trials, generates the strategy for Bayesian HPO
    The strategy is a list (batch_strat) where every element is the number of jobs to run in parallel. The sum of all elements in the list is
    the total number of HPO jobs needed to reach total_requested_trials. For example if batch_strat = [2, 2, 2, 1], means you will run a total of 7
    HPO jobs starting with 2 --> 2 ---> 2 ---> 1. 
    total_requested_trials = number of trails user wants to run.
    max_parallel_across_jobs = max number of training jobs across all trials Sagemaker runs in parallel. Limited by instance availability
    max_parallel_per_job = max number of parallel jobs to run per HPO job
    max_candidates_per_job = total number of training jobs per HPO job'''
    batch_strat = [] 
    tot_jobs_left = total_requested_trials
    max_parallel_hpo_jobs = max_parallel_across_jobs//max_parallel_per_job
    if total_requested_trials < max_parallel_hpo_jobs*max_candidates_per_job:
        batch_strat.append(total_requested_trials//max_candidates_per_job)
    else:
        while tot_jobs_left > max_parallel_hpo_jobs*max_candidates_per_job:
            batch_strat.append(max_parallel_hpo_jobs)
            tot_jobs_left -=max_parallel_hpo_jobs*max_candidates_per_job

        batch_strat.append(math.ceil((tot_jobs_left)/max_candidates_per_job))
    return math.ceil(total_requested_trials/max_candidates_per_job), max_parallel_hpo_jobs, batch_strat
                
bayesian_batching_cold_start(10000)
(20, 2, [2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

After you determine how to run your jobs, consider the following code for launching a given sequence of jobs. The helper function _parallel_hpo_no_polling runs the group of parallel HPO jobs indicated by the dashed box in the preceding figure. It’s important to set the wait parameter to False when calling the tuner, because this releases the API call to allow the loop to run. The orchestration code poll_and_run polls for the number of jobs that are running at any given time. If the number of jobs falls below the user-specified maximum number of trials they want to run in parallel (max_parallel_across_jobs), the function automatically launches new jobs. Now you might be thinking,  “But these jobs can take days to run, what if I want to turn off my laptop or if I lose my session?” No problem, the code picks up where it left off and runs the remaining number of jobs by counting how many HPO jobs are remaining prefixed by the job_name_prefix you provide.

Finally, the get_best_job function aggregates the outputs in a Pandas DataFrame in ascending order of the objective metric for visualization.

# helper function to launch a desired number of "n_parallel" HPO jobs simultaneously
def _parallel_hpo_no_polling(job_name_prefix, n_parallel, inputs, max_candidates_per_job, max_parallel_per_job):
    """kicks off N_parallel Bayesian HPO jobs in parallel
    job_name_prefix: user specified prefix for job names
    n_parallel: Number of HPO jobs to start in parallel
    inputs: training and test data s3 paths
    max_candidates_per_job: number of training jobs to run in each HPO job in total
    max_parallel_per_job: number of training jobs to run in parallel in each job
    
    """
    # kick off n_parallel jobs simultaneously and returns all the job names 
    tuning_job_names = []
    for i in range(n_parallel):
        timestamp_suffix = strftime("%d-%H-%M-%S", gmtime())
        try:
            tuner = HyperparameterTuner(estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions,
                            max_jobs=max_candidates_per_job,
                            max_parallel_jobs=max_parallel_per_job,
                            objective_type=objective_type
                    )
        # fit the tuner to the inputs and include it as part of experiments
            tuner.fit(inputs, 
                      job_name = f'{job_name_prefix}-{timestamp_suffix}',
                      wait=False
                     ) # set wait=False, so you can launch other jobs in parallel.
            tuning_job_names.append(tuner.latest_tuning_job.name)
            sleep(1) #this is required otherwise you will get an error for using the same tuning job name
            print(tuning_job_names)
        except Exception as e:
            sleep(5)
    return tuning_job_names

#orchestration and polling logicdef poll_and_run(job_name_prefix, inputs, max_total_candidates, max_parallel_across_jobs, max_candidates_per_job, max_parallel_per_job):
    """Polls for number of running HPO jobs. If less than max_parallel , starts a new one. 
    job_name_prefix: the name prefix to give all your training jobs
    max_total_candidates: how many total trails to run across all HPO jobs
    max_candidates_per_job: how many total trails to run for 1 HPO job 
    max_parallel_per_job: how many trials to run in parallel for a given HPO job (fixed to 10 without limit increases). 
    max_parallel_across_jobs: how many concurrent trials to run in parallel across all HPO jobs
    """
    #get how many jobs to run in total and concurrently
    max_num, max_parallel, _ = bayesian_batching_cold_start(max_total_candidates, 
                                                            max_parallel_across_jobs=max_parallel_across_jobs,
                                                            max_parallel_per_job=max_parallel_per_job,
                                                            max_candidates_per_job = max_candidates_per_job
                                                           )
    
    # continuously polls for running jobs -- if they are less than the required number, then launches a new one. 

    all_jobs = sm.list_hyper_parameter_tuning_jobs(SortBy='CreationTime', SortOrder='Descending', 
                                                       NameContains=job_name_prefix,
                                                        MaxResults = 100)['HyperParameterTuningJobSummaries']
    all_jobs = [i['HyperParameterTuningJobName'] for i in all_jobs]

    if len(all_jobs)==0:
        print(f"Starting a set of HPO jobs with the prefix {job_name_prefix} ...")
        num_left = max_num
        #kick off the first set of jobs
        all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel, num_left), inputs, max_candidates_per_job, max_parallel_per_job)
        
    else:
        print("Continuing where you left off...")
        response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']
                         for i in all_jobs]
        print(f"Already completed jobs = {response_list.count('Completed')}")
        num_left = max_num - response_list.count("Completed")
        print(f"Number of jobs left to complete = {max(num_left, 0)}")
    
    while num_left >0:
        response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus']
                         for i in all_jobs]
        running_jobs = response_list.count("InProgress") # look for the jobs that are running. 
        print(f"number of completed jobs = {response_list.count('Completed')}")
        sleep(10)
        if running_jobs < max_parallel and len(all_jobs) < max_num:
            all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel-running_jobs, num_left), inputs, max_candidates_per_job, max_parallel_per_job)
        num_left = max_num - response_list.count("Completed")
                
    return all_jobs
# Aggregate the results from all the HPO jobs based on the custom metric specified
def get_best_job(all_jobs_list):
    """Get the best job from the list of all the jobs completed.
    Objective is to maximize a particular value such as AUC or F1 score"""
    df = pd.DataFrame()
    for job in all_jobs_list:
        tuner = sagemaker.HyperparameterTuningJobAnalytics(job)
        full_df = tuner.dataframe()
        tuning_job_result = sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job)
        is_maximize = (tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] == 'Maximize')
        if len(full_df) > 0:
            df = pd.concat([df, full_df[full_df['FinalObjectiveValue'] < float('inf')]])
    if len(df) > 0:
        df = df.sort_values('FinalObjectiveValue', ascending=is_maximize)
        print("Number of training jobs with valid objective: %d" % len(df))
        print({"lowest":min(df['FinalObjectiveValue']),"highest": max(df['FinalObjectiveValue'])})
        pd.set_option('display.max_colwidth', -1)  # Don't truncate TrainingJobName
        return df
    else:
        print("No training jobs have reported valid results yet.")

Now, we can test this out by running a total of 260 trials, and request that the code run 20 trials in parallel at all times:

alljobs = poll_and_run('newtrials', inputs, max_total_candidates=260, max_parallel_across_jobs = 20, max_candidates_per_job=4, max_parallel_per_job=2)

After the jobs are complete, we can look at all the outputs (see the following screenshot).

After the jobs are complete, we can look at all the outputs (see the following screenshot).

The above code will allow you to run HPO jobs in parallel up to the allowed limit of 100 concurrent HPO jobs.

Parallelizing HPO jobs with warm start

Now suppose you want to run a warm start job, where the result of a prior job is used as input to the next job. Warm start is particularly useful if you have already determined a set of hyperparameters that produce a good model but now have new data. Another use case for warm start is when a single HPO job can take a long time, particularly for deep learning workloads. In that case, you may want to use the outputs of the prior job to launch the next one. For our use case, that could occur when you get a batch of new monthly or quarterly default data. For more information about SageMaker HPO with warm start, see Run a Warm Start Hyperparameter Tuning Job.

The crucial difference between warm and cold start is the naturally sequential nature of warm start. Again, suppose we want to launch 10,000 jobs with warm start. This time, we only launch a single HPO job with the maximally allowed max_jobs parameter, wait for its completion, and launch the next job with this job as parent. We repeat the process until the total desired number of jobs is reached. We can achieve this with the following code:

def large_scale_hpo_warmstart(job_name_prefix, inputs, max_total_trials,  max_parallel_per_job, max_trials_per_hpo_job=250):
    """Kicks off sequential HPO jobs with warmstart. 
    job_name_prefix: user defined prefix to name your HPO jobs. HPO will add a timestamp
    inputs: locations of train and test datasets in S3 provided as a dict
    max_total_trials: total number of trials you want to run
    max_trials_per_hpo_job: Fixed at 250 unless you want fewer.
    max_parallel_per_job: max trails to run in parallel per HPO job"""
    
    if max_trials_per_hpo_job >250:
        raise ValueError('Please select a value less than or equal to 250 for max_trials_per_hpo_job')
    
    base_hpo_job_name = job_name_prefix
    timestamp_suffix = strftime("%d-%H-%M-%S", gmtime())
    tuning_job_name = lambda i : f"{base_hpo_job_name}-{timestamp_suffix}-{i}"
    current_jobs_completed = 0
    job_names_list = []
    while current_jobs_completed < max_total_trials:
        jobs_to_launch = min(max_total_trials - current_jobs_completed, max_trials_per_hpo_job)

        hpo_job_config = dict(
            estimator=estimator,
            objective_metric_name=objective_metric_name,
            metric_definitions=metric_definitions,
            hyperparameter_ranges=hyperparameter_ranges,
            max_jobs=jobs_to_launch,
            strategy="Bayesian",
            objective_type=objective_type,
            max_parallel_jobs=max_parallel_per_job,
        )

        if current_jobs_completed > 0:
            parent_tuning_job_name = tuning_job_name(current_jobs_completed)
            warm_start_config = WarmStartConfig(
                WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM,
                parents={parent_tuning_job_name}
            )
            hpo_job_config.update(dict(
                base_tuning_job_name=parent_tuning_job_name,
                warm_start_config=warm_start_config
            ))

        tuner = HyperparameterTuner(**hpo_job_config)
        tuner.fit(
            inputs,
            job_name=tuning_job_name(current_jobs_completed + jobs_to_launch),
            logs=True,
        )
        tuner.wait()
        job_names_list.append(tuner.latest_tuning_job.name)
        current_jobs_completed += jobs_to_launch
    return job_names_list

After the jobs run, again use the get_best_job function to aggregate the findings.

Using other HPO tools with SageMaker

SageMaker offers the flexibility to use other HPO tools such as the ones discussed earlier to run your HPO jobs by removing the undifferentiated heavy lifting of managing the underlying infrastructure. For example, a popular open-source HPO tool is Ray Tune [2], which is a Python library for large-scale HPO that supports most of the popular frameworks such as XGBoost, MXNet, PyTorch, and TensorFlow. Ray integrates with popular search algorithms such as Bayesian, HyperOpt, and SigOpt, combined with state-of-the-art schedulers such as Hyperband or ASHA.

To use Ray with PyTorch, you first need to include ray[tune] and tabulate to your requirements.txt file in your code folder containing your training script. Provide the code folder into the SageMaker PyTorch estimator as follows:

from sagemaker.pytorch import PyTorch

estimator = PyTorch(entry_point="train_ray_cpu.py", #put requirements.txt file to install ray
                    role=role,
                    source_dir='./code',
                    framework_version='1.6.0',
                    py_version='py3',
                    output_path = f's3://{bucket}/{prefix}/output',
                    instance_count=1,
                    instance_type='ml.m5.xlarge',
                    sagemaker_session=sagemaker_session,
                    hyperparameters={
                        'epochs': 7,
                        'backend': 'gloo' # gloo for CPU and nccl for GPU
                    },
                   disable_profiler=True)

inputs ={'training': train_s3,
         'testing':test_s3}

estimator.fit(inputs, wait=True)

Your training script needs to be modified to output your custom metrics to the Ray report generator, as shown in the following code. This allows your training job to communicate with Ray. Here we use the ASHA scheduler to implement early stopping:

# additional imports
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler

# modify test function to output to ray tune report.
def test(model, test_loader, device):
    # same as test function above with 1 line of code added to enable communication 
    # with tune.
    tune.report(loss=test_loss, accuracy=correct / (len(target)*i), f1score=f1score, roc=roc)

You also need to checkpoint your model at regular intervals:

for epoch in range(1, args.epochs + 1):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader, 1):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target.squeeze())
            loss.backward()
     
            optimizer.step()
            if batch_idx % args.log_interval == 0:
                logger.info("Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}".format(
                    epoch, batch_idx * len(data), len(train_loader.sampler),
                    100. * batch_idx / len(train_loader), loss.item()))
        # have your test function publish metrics to tune.report
        test(model, test_loader, device)
        # checkpoint your model
        with tune.checkpoint_dir(epoch) as checkpoint_dir: # modified to store checkpoint after every epoch.
                path = os.path.join(checkpoint_dir, "checkpoint")
                torch.save((model.state_dict(), optimizer.state_dict()), path)

Finally, you need to wrap the training script in a custom main function that sets up the hyperparameters such as the learning rate, the size of the first and second hidden layers, and any additional hyperparameters you want to iterate over. You also need to use a scheduler, such as the ASHA scheduler we use here, for single- and multi-node GPU training. We use the default tuning algorithm Variant Generation, which supports both random (shown in the following code) and grid search, depending on the config parameter used.

def main(args):
    config = {
        "l1": tune.sample_from(lambda _: 2**np.random.randint(2, 9)),
        "l2": tune.sample_from(lambda _: 2**np.random.randint(2, 9)),
        "lr": tune.loguniform(1e-4, 1e-1)
    }
    scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=args.epochs,
        grace_period=1,
        reduction_factor=2)
    reporter = CLIReporter(
        metric_columns=["loss","training_iteration", "roc"])
    
    # run the HPO job by calling train
    print("Starting training ....")
    result = tune.run(
        partial(train_ray, args=args),
        resources_per_trial={"cpu": args.num_cpus, "gpu": args.num_gpus},
        config=config,
        num_samples=args.num_samples,
        scheduler=scheduler,
        progress_reporter=reporter)

The output of the job looks like the following screenshot.

The output of the job looks like the following screenshot.

Ray Tune automatically ends poorly performing jobs while letting the better-performing jobs run longer, optimizing your total HPO times. In this case, the best-performing job ran all full 7 epochs, whereas other hyperparameter choices were stopped early. To learn more about how early stopping works with SageMaker HPO see here.

Queuing HPO jobs with Amazon SQS

When multiple data scientists create HPO jobs in the same account at the same time, the limit of 100 concurrent HPO jobs per account might be reached. In this case, we can use Amazon SQS to create an HPO job queue. Each HPO job request is represented as a message and submitted to an SQS queue. Each message contains hyperparameters and tunable hyperparameter ranges in the message body. A Lambda function is also created. The function first checks the number of HPO jobs in progress. If the 100 concurrent HPO jobs limit isn’t reached, it retrieves messages from the SQS queue and creates HPO jobs as stipulated in the message. The function is triggered by Amazon EventBridge events at a regular interval (for example, every 10 minutes). The simple architecture is shown as follows.

The simple architecture is shown as follows.

To build this architecture, we first create an SQS queue and note the URL. In the Lambda function, we use the following code to return the number of HPO jobs in progress:

sm_client = boto3.client('sagemaker')

def check_hpo_jobs():
    response = sm_client.list_hyper_parameter_tuning_jobs(
    MaxResults=HPO_LIMIT,
    StatusEquals='InProgress')
    return len(list(response["HyperParameterTuningJobSummaries"]))

If the number of HPO jobs in progress is greater than or equal to the limit of 100 concurrent HPO jobs (for current limits, see Amazon SageMaker endpoints and quotas), the Lambda function returns 200 status and exits. If the limit isn’t reached, the function calculates the number of HPO jobs available for creation and retrieves the same number of messages from the SQS queue. Then the Lambda function extracts hyperparameter ranges and other data fields for creating HPO jobs. If the HPO job is created successfully, the corresponding message is deleted from the SQS queue. See the following code:

def lambda_handler(event, context):
    
    # first: check HPO jobs in progress
    hpo_in_progress = check_hpo_jobs()
    
    if hpo_in_progress >= HPO_LIMIT:
        return {
        'statusCode': 200,
        'body': json.dumps('HPO concurrent jobs limit reached')
    }
    else:
        hpo_capacity = HPO_LIMIT - hpo_in_progress
        container = image_uris.retrieve("xgboost", region, "0.90-2")
        train_input = TrainingInput(f"s3://{bucket}/{key_prefix}/train/train.csv", content_type="text/csv")
        validation_input = TrainingInput(f"s3://{bucket}/{key_prefix}/validation/validation.csv", content_type="text/csv")
      
        while hpo_capacity > 0:
            sqs_response = sqs.receive_message(QueueUrl = queue_url)
            if 'Messages' in sqs_response.keys():
                msgs = sqs_response['Messages']
                for msg in msgs:
                    try:
                        hp_in_msg = json.loads(msg['Body'])['hyperparameter_ranges']
                        create_hpo(container,train_input,validation_input,hp_in_msg)
                        response = sqs.delete_message(QueueUrl=queue_url,ReceiptHandle=msg['ReceiptHandle'])
                        hpo_capacity = hpo_capacity-1
                        if hpo_capacity == 0: 
                            break
                    except :
                        return ("error occurred for message {}".format(msg['Body']))
            else:
                return {'statusCode': 200, 'body': json.dumps('Queue is empty')}
    
        return {'statusCode': 200,  'body': json.dumps('Lambda completes')}

After your Lambda function is created, you can add triggers with the following steps:

  1. On the Lambda console, choose your function.
  2. On the Configuration page, choose Add trigger.
  3. Select EventBridge (CloudWatch Events).
  4. Choose Create a new rule.
  5. Enter a name for your rule.
  6. Select Schedule expression.
  7. Set the rate to 10 minutes.
  8. Choose Add.

This rule triggers our Lambda function every 10 minutes.

When this is complete, you can test it out by sending messages to the SQS queue with your HPO job configuration in the message body. The code and notebook for this architecture is on our GitHub repo. See the following code:

response = sqs.send_message(
    QueueUrl=queue_url,
    DelaySeconds=1,
    MessageBody=(
        '{"hyperparameter_ranges":{
            "<hyperparamter1>":<range>,
            "hyperparamter2":<range>} }'
        )
    )

Conclusions

ML engineers often need to search through a large hyperparameter space to find the best-performing model for their use case. For complex deep learning models, where individual training jobs can be quite time consuming, this can be a cumbersome process that can often take weeks or months of developer time.

In this post, we discussed how you can maximize the number of tuning jobs you can launch in parallel with SageMaker, which reduces the total time it takes to run HPO with custom user-specified objective metrics. We first discussed a Jupyter notebook based approach that can be used by individual data scientists for research and experimentation workflows. We also demonstrated how to use an SQS queue to allow teams of data scientists to submit more jobs. SageMaker is a highly flexible platform, allowing you to bring your own HPO tool, which we illustrated using the popular open-source tool Ray Tune.

To learn more about bringing other algorithms such as genetic algorithms to SageMaker HPO, see Bring your own hyperparameter optimization algorithm on Amazon SageMaker.

References

[1] Hyper-Parameter Optimization: A Review of Algorithms and Applications, Yu, T. and Zhu, H., https://arxiv.org/pdf/2003.05689.pdf.

[2] Tune: A research platform for distributed model selection and training, https://arxiv.org/abs/1807.05118.


About the Authors

Iaroslav Shcherbatyi is a Machine Learning Engineer at Amazon Web Services. His work is centered around improvements to the Amazon SageMaker platform and helping customers best use its features. In his spare time, he likes to catch up on recent research in ML and do outdoor sports such as ice skating or hiking.

 

 

Enrico Sartorello is a Sr. Software Development Engineer at Amazon Web Services. He helps customers adopt machine learning solutions that fit their needs by developing new functionalities for Amazon SageMaker. In his spare time, he passionately follows his soccer team and likes to improve his cooking skills.

 

 

Tushar Saxena is a Principal Product Manager at Amazon, with the mission to grow AWS’ file storage business. Prior to Amazon, he led telecom infrastructure business units at two companies, and played a central role in launching Verizon’s fiber broadband service. He started his career as a researcher at GE R&D and BBN, working in computer vision, Internet networks, and video streaming.

 

 

Stefan Natu is a Sr. Machine Learning Specialist at Amazon Web Services. He is focused on helping financial services customers build end-to-end machine learning solutions on AWS. In his spare time, he enjoys reading machine learning blogs, playing the guitar, and exploring the food scene in New York City.

 

Qingwei Li is a Machine Learning Specialist at Amazon Web Services. He received his PhD in Operations Research after he broke his advisor’s research grant account and failed to deliver the Nobel Prize he promised. Currently, he helps customers in the financial service and insurance industry build machine learning solutions on AWS. In his spare time, he likes reading and teaching.

Read More

Accelerating the deployment of PPE detection solution to comply with safety guidelines

Personal protective equipment (PPE) such as face covers (face mask), hand covers (gloves), and head covers (helmet) are essential for many businesses. For example, helmets are required at construction sites for employee safety, and gloves and face masks are required in the restaurant industry for hygienic operations. In the current COVID-19 pandemic environment, PPE compliance has also become important as face masks are mandated by many businesses. In this post, we demonstrate how you can deploy a solution to automatically check face mask compliance on your business premises and extract actionable insights using the Amazon Rekognition DetectProtectiveEquipment API.

This solution has been developed by AWS Professional Services to help customers that rely heavily on on-site presence of customers or employees to support their safety. Our team built the following architecture to automate PPE detection by consuming the customer’s camera video feeds. This solution enabled a large sports entertainment customer to take timely action to ensure people who are on the premises comply with face mask requirements. The architecture is designed to take raw camera feeds for model inference and pass the model output to an analytic dashboard for further analysis. As of this writing, it’s successfully deployed at a customer site with multiple production cameras.

Let’s walk through the solution in detail and discuss the scalability and security of the application.

Solution overview

The PPE detection solution architecture is an end-to-end pipeline consisting of three components:

  • Video ingestion pipeline – Ensures you receive on-demand video feeds from camera and preprocesses the feeds to break them into frames. Finally, it saves the frames in an Amazon Simple Storage Service (Amazon S3) bucket for ML model inference.
  • Machine learning inference pipeline – Demonstrates how the machine learning (ML) model processes the frames as soon as they arrive at the S3 bucket. The model outputs are stored back in the S3 bucket for further visualization.
  • Model interaction pipeline – Used for visualizing the model outputs. The model outputs feed into Amazon QuickSight, which you can use to analyze the data based on the camera details, day, and time of day.

The following diagram illustrates this architecture (click to expand).

The following diagram illustrates this architecture.

We now discuss each section in detail.

Video ingestion pipeline

The following diagram shows the architecture of the video ingestion pipeline.

The following diagram shows the architecture of the video ingestion pipeline.

The video ingestion pipeline begins at a gateway located on premises at the customer location. The gateway is a Linux machine with access to RTSP streams on the cameras. Installed on the gateway is the open-source GStreamer framework and AWS-provided Amazon Kinesis Video Streams GStreamer plugin. For additional information on setting up a gateway with the tools needed to stream video to AWS, see Example: Kinesis Video Streams Producer SDK GStreamer Plugin.

The gateway continuously publishes live video to a Kinesis video stream, which acts like a buffer while AWS Fargate tasks read video fragments for further processing. To accommodate customer-specific requirements around the location of cameras that periodically come online and the time of day when streaming processing is needed, we developed a cost-effective and low-operational overhead consumer pipeline with automatic scaling. This avoids manually starting and stopping processing tasks when a camera comes online or goes dark.

Consuming from Kinesis Video Streams is accomplished via an AWS Fargate task running on Amazon Elastic Container Service (Amazon ECS). Fargate is a serverless compute engine that removes the need to provision and manage servers, and you pay for compute resources only when necessary. Processing periodic camera streams is an ideal use case for a Fargate task, which was developed to automatically end when no video data is available on a stream. Additionally, we built the framework to automatically start tasks using a combination of Amazon CloudWatch alarms, AWS Lambda, and checkpointing tables in Amazon DynamoDB. This ensures that the processing always continues from the video segment where the streaming data was paused.

The Fargate task consumes from the Kinesis Video Streams GetMedia API to obtain real-time, low-latency access to individual video fragments and combines them into video clips of 30 seconds or more. The video clips are then converted from MKV to an MP4 container and resampled to 1 frame per second (FPS) to extract an image from each second of video. Finally, the processed video clips and images are copied into an S3 bucket to feed the ML inference pipeline.

ML inference pipeline

The following diagram illustrates the architecture of the ML pipeline.

The following diagram illustrates the architecture of the ML pipeline.

The ML pipeline is architected to be automatically triggered when new data lands in the S3 bucket, and it utilizes a new deep learning-based computer vision model tailored for PPE detection in Amazon Rekognition. As soon as the S3 bucket receives a new video or image object, it generates and delivers an event notification to an Amazon Simple Queue Service (Amazon SQS) queue, where each queue item triggers a Lambda invocation. Each Lambda invocation calls the Amazon Rekognition DetectProtectiveEquipment API to generate model inference and delivers the result back to Amazon S3 through Amazon Kinesis Data Firehose.

The Amazon Rekognition PPE API detects several types of equipment, including hand covers (gloves), face covers (face masks), and head covers (helmets). For our use case, the customer was focused on detecting face masks. The computer vision model in Amazon Rekognition first detects if people are in a given image, and then detects face masks. Based on the location of the face mask on a face, if a person is wearing a face mask not covering their nose or mouth, the service will assign a noncompliant label. When the model can’t detect a face due to image quality, such as when the region of interest (face) is too small, it labels that region as unknown. For each image, the Amazon Rekognition API returns the number of compliant, noncompliant, and unknowns, which are used to calculate meaningful metrics for end users. The following table lists the metrics.

Metrics Description
Face Cover Non-Compliant Average number of detected faces not wearing masks appropriately across time
Face Cover Non-Compliant % Average number of detected faces not wearing masks divided by average number of detected faces
Detected Face Rate Average number of detected faces divided by average number of detected people (provides context to the effectiveness of the cameras for this key metric)

We use the following formulas to calculate these metrics:

  • Total number of noncompliant = Total number of detected faces not wearing face cover in the frame
  • Total number of compliant = Total number of detected faces wearing face cover in the frame
  • Total number of unknowns = Total number of people for which a face cover or face can’t be detected in the frame
  • Total number of detected faces = Total number of noncompliant + Total number of compliant
  • Total number of detected people = Total number of unknowns + Total number of detected faces
  • Mask noncompliant per frame = Total number of noncompliant in the frame

Preprocessing for crowded images and images of very small size

Amazon Rekognition PPE detection supports up to 15 people per image. To support images where more than 15 people are present, we fragment the image into smaller tiles and process them via Amazon Rekognition. Also, PPE detection requires a minimum face size of 40×40 pixels for an image with 1920×1080 pixels. If the image is too small, we interpolate it before performing inference. For more information about size limits, see Guidelines and Quotas in Amazon Rekognition.

Model interaction pipeline

Finally, we can visualize the calculated metrics in QuickSight. QuickSight is a cloud-native and serverless business intelligence tool that enables straightforward creation of interactive visualizations. For more information about setting up a dashboard, see Getting Started with Data Analysis in Amazon QuickSight.

As shown in the following dashboard, end users can configure and display the top priority statistics at the top of the dashboard, such as total count of noncompliant, seating area total count of noncompliant, and front entrance total count of noncompliant. In addition, end users can interact with the line chart to dive deep into the mask-wearing noncompliant patterns. The bottom chart shows such statistics of the eight cameras over time.

The bottom chart shows such statistics of the eight cameras over time.

You can create additional visualizations according to your business needs. For more information, see Working with Visual Types in Amazon QuickSight.

Code template

This section contains the code template to help you get started in deploying this solution into your AWS account. This is an AWS Serverless Application Model (AWS SAM) project and requires you to have the AWS Command Line Interface (AWS CLI) and AWS SAM CLI set up in your system.

To build and deploy the AWS CloudFormation stack, complete the following steps:

  1. Install the AWS CLI and configure your AWS CLI credentials.
  2. Install the AWS SAM CLI using these instructions.
  3. Download the Ppe-detection-ml-pipeline.zip
  4. Unzip the contents of the .zip file and navigate to the root of the project.
  5. Build the project – This will package the Lambda functions. Note: Python 3.8 and pip are required for this deployment.
    sam build

  1. Deploy the CloudFormation stack in your AWS account
    sam deploy --guided

Choose an appropriate AWS region to deploy the stack. Use the defaults for all other prompts.
Note: Use sam deploy for subsequent stack updates.

Note: The Rekognition PPE API needs the following SDK versions: boto3 >= 1.15.17 and botocore >=1.18.17. Currently, the AWS Lambda Python 3.8 runtime does not support the preceding versions (see documentation). A layer has been added to the Lambda function in template to support the required SDK versions. We will update this post and the code template after the updated SDK is natively supported by the Python 3.8 runtime.

We use Amazon S3 as a data lake to store the images coming out of the video ingestion pipeline after it splits the original camera feeds into 1 FPS images. The S3 data lake bucket folder structure, which organizes the collected image and camera metadata along with the model responses.

After deploying the stack, create the input folder inside the S3 bucket. The input prefix can contain multiple folders, which helps in organizing the results by camera source. To test the pipeline, upload a .jpg containing people and faces to input/[camera_id]/ folder in the S3 bucket. The camera_id can be any arbitrary name. The output and error prefixes are created automatically when the PPE detection job is triggered. The output prefix contains model inference outputs. The error prefix contains records of jobs that failed to run. Make sure you have a similar folder structure in the deployed S3 bucket for the code to work correctly.

S3 DataLake Bucket
-- input/
---- [camera_id]/ 
-- output/
---- ppe/
------ [YYYY]/[MM]/[DD]/
-- error/
---- ppe/

For example, this sample image is uploaded to the S3 Bucket location: input/CAMERA01/. After Amazon Kinesis Data Firehose delivers the event to Amazon S3, the output/ prefix will contain a file with a JSON record indicating the PPE compliant status of each individual.

{
  "CameraID": "CAMERA01",
  "Bucket": "ppe-detection-blog-post-datalakes3bucket-abc123abc123",
  "Key": "input/CAMERA01/ppe_sample_image.jpg",
  "RekResult": "{"0": "compliant", "1": "compliant", "2": "compliant", "3": "compliant"}",
  "FaceCoverNonCompliant": 0,
  "FaceCoverNonCompliantPercentage": 0,
  "detectedFaceRate": 100
}

The provided AWS SAM project creates the resources, roles, and necessary configuration of the pipeline. Note that the IAM roles deployed are very permissive and should be restricted in production environments.

Conclusion

In this post, we showed how we take a live camera feed as input to build a video ingestion pipeline and prepare the data for ML inference. Next we demonstrated a scalable solution to perform PPE detection using the Amazon Rekognition API. Then we discussed how to visualize the model output results on a QuickSight dashboard for building meaningful dashboards for your safety compliance guidelines. Finally, we provided an AWS SAM project of the ML pipeline if you want to deploy this in your own AWS account.

We also demonstrated how the AWS Professional Services team can help customers with the implementation and deployment of an end-to-end architecture for detecting PPE at scale using Amazon Rekognition’s new PPE detection feature. For additional information, see Automatically detecting personal protective equipment on persons in images using Amazon Rekognition to learn more about the new PPE detection API, insight into model outputs, and different ways to deploy a PPE solution for your own camera and networking requirements.

AWS Professional Services can help customize and implement the PPE detection solution based on your organization’s requirements. To learn more about how we can help, please connect with us with the help of your account manager.


About the Authors

Pranati Sahu is a Data Scientist at AWS Professional Services and Amazon ML Solutions Lab team. She has an MS in Operations Research from Arizona State University, Tempe and has worked on machine learning problems across industries including social media, consumer hardware, and retail. In her current role, she is working with customers to solve some of the industry’s complex machine learning use cases on AWS.

 

Surya Dulla is an Associate Cloud Developer at AWS Professional Services. She has an MS in Computer Science from Towson University, Maryland. She has experience in developing full stack applications, microservices in health care and financial industry. In her current role, she focuses on delivering best operational solutions for enterprises using AWS suite of developer tools.

 

 

Rohit Rangnekar is a Principal IoT Architect with AWS Professional Services based in the San Francisco Bay Area. He has an MS in Electrical Engineering from Virginia Tech and has architected and implemented data & analytics, AI/ML, and IoT platforms globally. Rohit has a background in software engineering and IoT edge solutions which he leverages to drive business outcomes in healthcare, space and telecom, semiconductors and energy verticals among others.

 

Taihua (Ray) Li is a data scientist with AWS Professional Services. He holds a M.S. in Predictive Analytics degree from DePaul University and has several years of experience building artificial intelligence powered applications for non-profit and enterprise organizations. At AWS, Ray helps customers to unlock business potentials and to drive actionable outcomes with machine learning. Outside of work, he enjoys fitness classes, biking, and traveling.

 

Han Man is a Data Scientist with AWS Professional Services. He has a PhD in engineering from Northwestern University and has several years of experience as a management consultant advising clients across many industries. Today he is passionately working with customers to develop and implement machine learning, deep learning, & AI solutions on AWS. He enjoys playing basketball in his spare time and taking his bulldog, Truffle, to the beach.

 

Jin Fei is a Senior Data Scientist with AWS Professional Services. He has a PhD in computer science in the area of computer vision and image analysis from University of Houston. He has worked as researchers and consultants in different industries including energy, manufacture, health science and finance. Other than providing machine learning and deep learning solutions to customers, his specialties also include IoT, software engineering and architectural design. He enjoys reading, photography, and swimming.

 

Kareem Williams is a data scientist with Amazon Research. He holds a M.S. in Data Science from Southern Methodist University and has several years of experience solving challenging business problems with ML for SMB and Enterprise organizations. Currently, Kareem is working on leveraging AI to build a more diverse and inclusive workspace at Amazon. In his spare time he enjoys soccer, hikes, and spending time with his family.

Read More