Integrating Amazon Polly with legacy IVR systems by converting output to WAV format

Amazon Web Services (AWS) offers a rich stack of artificial intelligence (AI) and machine learning (ML) services that help automate several components of the customer service industry. Amazon Polly, an AI generated text-to-speech service, enables you to automate and scale your interactive voice solutions, helping to improve productivity and reduce costs.

You might face common implementation challenges when updating or modifying legacy interactive voice response (IVR) systems that don’t support file formats such as MP3 and PCM. Amazon Polly, in order to minimize response latency, produces synthesis in real-time and streams the results back to the customer in a streamable format (MP3, Ogg/Vorbis or raw PCM samples) while the request is being processed. WAV audio format is not streamable by definition, but a WAV file can be easily created from a PCM stream generated by Polly at the end of synthesis, when all samples are collected and the length of the result can be calculated. This post shows you how to convert Amazon Polly output to a common audio format like WAV.

Converting Amazon Polly file output to WAV

One of the challenges with legacy systems is that they may not support Amazon Polly file outputs like MP3. The output of the Amazon Polly SynthesizeSpeech API call doesn’t support WAV, but some legacy IVRs obtain the audio output in WAV file format, which isn’t supported natively in Amazon Polly. Many of these applications are written in Python and Java.

The following sample code which will help in such situations where audio is in WAV file format not supported natively in Amazon Polly. The sample code converts files from PCM to WAV in Python for inputs given in both SSML and text.

#The following sample code snippet converts files from PCM to WAV in Python for both SSML and non SSML text

#Importing libraries
import boto3
import wave
import os

#Initializing variables
CHANNELS = 1 #Polly's output is a mono audio stream
RATE = 16000 #Polly supports 16000Hz and 8000Hz output for PCM format
OUTPUT_FILE_IN_WAVE = "sample_SSML.wav" #WAV format Output file  name
FRAMES = []
WAV_SAMPLE_WIDTH_BYTES = 2 # Polly's output is a stream of 16-bits (2 bytes) samples

#Initializing Polly Client
polly = boto3.client("polly")

#Input text for conversion
INPUT = "<speak>Hi! I'm Matthew. Hope you are doing well. This is a sample PCM to WAV conversion for SSML. I am a Neural voice and have a conversational style. </speak>" # Input in SSML

WORD = "<speak>"
try:
	if WORD in INPUT: #Checking for SSML input
        #Calling Polly synchronous API with text type as SSML
		response = polly.synthesize_speech(Text=INPUT, TextType="ssml", OutputFormat="pcm",VoiceId="Matthew", SampleRate="16000") #the input to sampleRate is a string value.
	else:
		 #Calling Polly synchronous API with text type as plain text
		response = polly.synthesize_speech(Text=INPUT, TextType="text", OutputFormat="pcm",VoiceId="Matthew", SampleRate="16000")
except (BotoCoreError, ClientError) as error:
    sys.exit(-1)

#Processing the response to audio stream
STREAM = response.get("AudioStream")
FRAMES.append(STREAM.read())

WAVEFORMAT = wave.open(OUTPUT_FILE_IN_WAVE,'wb')
WAVEFORMAT.setnchannels(CHANNELS)
WAVEFORMAT.setsampwidth(WAV_SAMPLE_WIDTH_BYTES)
WAVEFORMAT.setframerate(RATE)
WAVEFORMAT.writeframes(b''.join(FRAMES))
WAVEFORMAT.close()

The following is the sample output from the preceding code:

Conclusion

You can convert Amazon Polly output from PCM to WAV so that you can use Amazon Polly in your legacy IVR, enabling it to support WAV file format output. Try this out for yourself and let us know how it goes in the comments!

You can further refine the converted file using the powerful capabilities available in Amazon Polly like the SynthesizeSpeech request, managing lexicons, reserved characters in SSML, and controlling volume, speaking rate, and pitch.

 


About the Author

Abhishek Soni is a Partner Solutions Architect at AWS. He works with customers to provide technical guidance for the best outcome of workloads on AWS.

Read More

Introducing Amazon SageMaker Reinforcement Learning Components for open-source Kubeflow pipelines

This blog post was co-authored by AWS and Max Kelsen. Max Kelsen is one of Australia’s leading Artificial Intelligence (AI) and Machine Learning (ML) solutions businesses. The company delivers innovation, directly linked to the generation of business value and competitive advantage to customers in Australia and globally, including Fortune 500 companies. Max Kelsen is also dedicated to reinvesting our expertise and profits to solve the challenges of humankind, focusing on Genomics, AI Safety, and Quantum Computing.

Robots require the integration of technologies such as image recognition, sensing, artificial intelligence, machine learning (ML), and reinforcement learning (RL) in ways that are new to the field of robotics. Today, we’re launching Amazon SageMaker Reinforcement Learning Kubeflow Components supporting AWS RoboMaker, a cloud robotics service, for orchestrating robotics ML workflows. Orchestrating robotics operations to train, simulate, and deploy RL applications is difficult and time-consuming. Now, with SageMaker RL components and pipelines, it’s faster to experiment and manage robotics ML workflows from perception to controls and optimization, and create end-to-end solutions without having to rebuild each time.

Robots are being used more widely in society for purposes that are increasing in sophistication, such as complex assembly, picking and packing, last-mile delivery, environmental monitoring, search and rescue, and assisted surgery. Robotics often involves training complex sequences of behaviors. RL is an emerging ML technique that can help develop solutions for exactly these kinds of problems. It learns complex behaviors without requiring any labeled training data, and can make short-term decisions while optimizing for a long-term goal. For example, when a robot interacts with its environment, this mostly takes place in a simulator. The robot receives a positive or negative reward for actions that it takes. Rewards are computed by a user-defined function that outputs a numeric representation of the actions that should be incentivized. The agent tries to maximize positive rewards, and as a result the model learns an optimal strategy for decision-making.

SageMaker and AWS RoboMaker are two different services streamlined to serve two separate personas: data scientists and roboticists, respectively. SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy ML models quickly. SageMaker RL builds on top of SageMaker, adding pre-packaged RL toolkits and making it easy to integrate any simulation environment. AWS RoboMaker is the most complete cloud solution for robotic developers to simulate, test, and securely deploy robotic applications at scale. Its managed Robot Operating System (ROS) (https://www.ros.org/) and Gazebo (http://gazebosim.org/), an open-source robot simulation software, stacks free up engineering resources and enable you to start building quickly. The task of stitching together machine learning workflows for robotics using Amazon SageMaker and AWS RoboMaker is non-trivial, consuming valuable time for both data scientists and roboticists.

With Amazon SageMaker RL Components for Kubernetes, you can use SageMaker RL Components in your Kubeflow pipelines to invoke and parallelize SageMaker training jobs and AWS RoboMaker simulation jobs as steps in your RL training workflow, without having to worry about how it runs under the hood. The following diagram illustrates the pipeline workflow for SageMaker RL Components

The following diagram illustrates the pipeline workflow for SageMaker RL Components.

SageMaker Components in your Kubeflow pipeline simply loads the components and describes your pipeline using the Kubeflow Pipelines SDK. SageMaker RL uses open-source libraries such as Anyscale’s Ray to start training an RL agent by collecting experience from Gazebo (an open-source software to simulate populations of robots in complex indoor and outdoor environments) in AWS RoboMaker using ROS (a set of software libraries and tools that help you build robot applications). When the training is completed, the RL agent model is stored in an Amazon Simple Storage Service (Amazon S3) bucket, and an Amazon SageMaker inference node can be created for deployment in production. You can then download the model to the robot with the same ROS structure from the simulation to perform the required tasks.

Although our solution is implemented in Kubeflow components and pipelines, it’s not specific to Kubeflow and can be generalized to MLOps workflows in Argo and Kubernetes to orchestrate parallel robotics ML jobs.

Use case: Woodside Energy deploys robotics in oil and gas environments

Woodside Energy uses AWS RoboMaker with Amazon SageMaker Kubeflow operators to train, tune, and deploy reinforcement learning agents to their robots to perform manipulation tasks that are repetitive or dangerous. This framework will allow the team to iterate and deploy at scale.

“Our team and our partners wanted to start exploring using machine learning methods for robotics manipulation,” says Kyle Saltmarsh, Robotics Engineer at Woodside Energy. “Before we could do this effectively, we needed a framework that would allow us to train, test, tune, and deploy these models efficiently. Utilizing Kubeflow components and pipelines with SageMaker and RoboMaker provides us with this framework and we are excited to have our roboticists and data scientists focus their efforts and time on algorithms and implementation.”

Woodside and AWS engaged Max Kelsen to assist in the development and contribution of the RoboMaker and RLEstimator components that enable the pipelines described in this project. Max Kelsen leverages open source throughout most of its work, and views participation in these communities as strategically important to delivering the best outcomes for our clients.

In the following image, Ripley, a custom-built robotics platform by Woodside Energy, is getting ready to perform a double block and bleed, a manual pump shutdown procedure that involves turning multiple valves in sequence. Ripley is based on a Clearpath Robotics Husky equipped with two Universal Robotics UR5 arms, Intel RealSense D435 cameras on each wrist, and a Kodak PixPro body camera. The reinforcement learning formulation utilizes the joint states and camera views as inputs to the agent and outputs optimal trajectories for valve manipulation.

Ripley is a Clearpath Robotics Husky equipped with two Universal Robotics UR5 arms.

Getting started with SageMaker RL components

In a typical Kubeflow pipeline, each component encapsulates your logic in a container image. As a developer or data scientist, you bring in your training, data preprocessing, model serving, or other logic wrapped in a Kubeflow Pipelines ContainerOp function, which builds your code into a new container. Alternatively, you can put the code into a custom container image and push it to a container registry such as Amazon Elastic Container Registry (Amazon ECR). When the pipeline runs, the component’s container is instantiated on one of the worker nodes on the Kubernetes cluster running Kubeflow, and your logic is implemented. Pipeline components can read outputs from the previous components and create outputs that the next component in the pipeline can consume.

When you use SageMaker Components in your Kubeflow pipeline, rather than encapsulating your logic in a custom container, you simply load the components and describe your pipeline using the Kubeflow Pipelines SDK. When the pipeline runs, your instructions are translated into a SageMaker job or deployment. This workload runs on the fully managed infrastructure of SageMaker. You also get all the benefits of a typical SageMaker capability, including Managed Spot Training, automatic scaling of endpoints, and more.

You have separate VPCs for orchestration and simulation. The reason is that no direct communication is needed between the RLEstimator or AWS RoboMaker jobs and the Kubeflow Pipelines components. The components interact directly with the AWS RoboMaker and SageMaker APIs, but not the jobs themselves. The components poll the APIs for the status of the jobs and any related Amazon CloudWatch Logs, and the responses are reflected back to the Kubeflow Pipelines UI. This offers a single interface for viewing the status of the running jobs.

The orchestration VPC utilizes both public and private subnets and a NAT gateway. The Amazon Elastic Kubernetes Service (Amazon EKS) worker nodes are launched into a private network, and use a route to the NAT gateway in the public subnet to interact with AWS APIs, and also to pull public Docker images to run on the cluster. For this post, we allow public access to the EKS cluster endpoint. This allows you to run kubectl port forwarding from your local machine and by doing so open up a tunnel to access the Kubeflow UI. In a production system, we suggest placing the Kubeflow service behind an Application Load Balancer (ALB) and secure using AWS Identity and Access Management (IAM).

Prerequisites

To run the following use case, you need the following:

  • Kubernetes cluster – You can use your existing cluster or create a new one. The fastest way to get one up and running is to launch an EKS cluster using eksctl. For instructions, see Getting started with eksctl. Create a simple cluster with two CPU nodes to run this example. We tested this example on a 2 c5.xlarge. You just need enough node resources to run the SageMaker Component containers and Kubeflow. Training and deployments run on the SageMaker and AWS RoboMaker managed infrastructure.
  • Kubeflow Pipelines – Install Kubeflow Pipelines on your cluster. For instructions, see Step 1 in Deploying Kubeflow Pipelines. Your Kubeflow Pipelines version must be 0.5.0 or above. Optionally, you can install all of Kubeflow, which includes Kubeflow Pipelines.
  • SageMaker and AWS RoboMaker components prerequisites – For instructions on setting up IAM roles and permissions, see Amazon SageMaker Components for Kubeflow Pipelines. You need three IAM roles for the following:
    • Kubeflow pipeline pods to access SageMaker and AWS RoboMaker and launch training and simulation jobs.
    • Amazon SageMaker execution role to access other AWS resources such as Amazon S3.
    • AWS RoboMaker execution role to access other AWS resources such as Amazon S3.

You can launch an EKS cluster from your laptop, desktop, Amazon Elastic Compute Cloud (Amazon EC2) instance, or SageMaker notebook instance. This instance is typically called a gateway instance. Because Amazon EKS offers a fully managed control plane, you only use out-of-the-box the gateway instance to interact with the Kubernetes API and worker nodes. The instance should have a role that allows for interaction with the EKS cluster. The code in the examples here was run from a local device with access to the EKS cluster.

Solution overview

The code, configuration files, and Jupyter notebooks used in this post are available on GitHub. The following walkthrough is provided to explain the key concepts. Rather than copying code from these steps, we recommend running the prepared Jupyter notebook. In this post, we walk through the following high-level steps:

  1. Configure your dependent resources.
  2. Clone the example repository and install dependencies.
  3. Open the example Jupyter notebook.
  4. Install the Kubeflow Pipelines SDK and load SageMaker pipeline components.
  5. Prepare your training datasets and upload them to Amazon S3.
  6. Create your Kubernetes pipeline.
  7. Compile and run your pipeline.

Configuring your dependent resources

If you’re following the proposed architecture from this post, you run the simulation jobs in a private subnet. To ensure that the running jobs have connectivity to AWS resources, add VPC endpoints for the following services:

  • Amazon S3
  • CloudWatch

Next, create an S3 bucket to host your simulation job and RLEstimator job source files. The jobs also use this bucket to communicate by writing config files. The bucket should be in the same Region that you’re running the rest of your infrastructure, because VPC endpoints are locked to accessing resources within the same Region.

Finally, you need to configure an IAM role with access to the S3 bucket and AmazonSageMakerFullAccess and AWSRoboMaker_FullAccess policies.

Cloning the example repository and installing dependencies

Open a terminal and SSH to the EC2 gateway instance that you use to communicate with your EKS cluster. After you log in, clone the example repository to access the example Jupyter notebook. See the following code:

git clone https://github.com/MaxKelsen/kubeflow-pipelines-robomaker-examples
cd kubeflow-pipelines-robomaker-examples
pip install -r requirements.txt

Opening the example Jupyter notebook

As part of the previous step, you installed Jupyter. To open the Jupyter notebook on your gateway instance, complete the following steps:

  1. Launch JupyterLab on your gateway instance and access it on your local machine with the following code:
    jupyter lab

  1. If you’re running the JupyterLab server on an EC2 instance, set up a tunnel to the EC2 instance so you can access the JupyterLab client on your local laptop or desktop. (If you’re using Amazon Linux instead of Ubuntu, you have to use ec2-user as the username. Update the IP address of the EC2 instance and use the appropriate key pair.) See the following code:
    ssh -N -L 0.0.0.0:8081:localhost:8081 -L 0.0.0.0:8888:localhost:8888 -i ~/.ssh/<key_pair>.pem ubuntu@<IP_ADDRESS>

You can now access Jupyter lab at http://localhost:8888 on your local machine.

  1. Access the Kubeflow dashboard by running the following on your gateway instance:
    kubectl port-forward svc/istio-ingressgateway -n istio-system 8081:80

You can now access the Kubeflow dashboard at http://localhost:8081.

Open the example Jupyter notebook (kfp-robomaker-example.ipynb).

SageMaker RLEstimator supports two modes for training jobs (the GitHub repo includes one Jupyter notebook for the latter approach):

  • Bring your own Docker container image – In this mode, you can provide your own Docker container for training. Build your container with your training scripts and push it to Amazon ECR, which is a container registry. SageMaker pulls your container image, instantiates it, and runs training.
  • Bring your own training script (script mode) – In this mode, you don’t have to deal with Docker containers. Simply bring your RLEstimator training scripts in popular frameworks such as TensorFlow, PyTorch, MXNet, and popular RL toolkits such as Coach and Ray, and upload it to Amazon S3. SageMaker automatically pulls the appropriate container, downloads your training scripts, and runs it. This mode is ideal if you don’t want to deal with Docker containers. The kfp-robomaker-example.ipynb Jupyter notebook implements this approach.

The following example takes a closer look at the first approach (bringing your own Docker container image). You walk through all the important steps in the kfp-robomaker-example.ipynb Jupyter notebook. Having it open makes it easy for you to follow along.

The following screenshot shows the kfp-robomaker-example.ipynb notebook.

The following screenshot shows the kfp-robomaker-example.ipynb notebook.

Installing Kubeflow Pipelines SDK and loading SageMaker pipeline components

To install the SDK and load the pipeline components, complete the following steps:

  1. Install the Kubeflow Pipelines SDK with the following code:
    pip install kfp –upgrade

  1. Import Kubeflow Pipeline packages in Python with the following code:
    import kfp
    from kfp import components
    from kfp.components import func_to_container_op
    from kfp import dsl

  1. Load SageMaker Components in Python with the following code:
    robomaker_create_sim_app_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ 4aa11c3c7f6f068fdb135e1af4a0af5bb1d72d17
    /components/aws/sagemaker/create_simulation_app/component.yaml')
    robomaker_sim_job_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ 4aa11c3c7f6f068fdb135e1af4a0af5bb1d72d17/components/aws/sagemaker/simulation_job/component.yaml')
    robomaker_delete_sim_app_op = components.load_component_from_url( 'https://raw.githubusercontent.com/kubeflow/pipelines/ 4aa11c3c7f6f068fdb135e1af4a0af5bb1d72d17/components/aws/sagemaker/delete_simulation_app/component.yaml')
    sagemaker_rlestimator_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ 4aa11c3c7f6f068fdb135e1af4a0af5bb1d72d17/components/aws/sagemaker/rlestimator/component.yaml')

Preparing training datasets and uploading to Amazon S3

To prepare and upload the source code for SageMaker and AWS RoboMaker, enter the following code:

import boto3
s3 = boto3.resource('s3')
role = "<your_role_name>"
bucket_name = "<your_bucket_name>"
s3.meta.client.upload_file("sourcedir.tar.gz", bucket_name, "sagemaker-sources/sourcedir.tar.gz")
print(f"nUploaded to S3 location: {bucket_name}sagemaker-sources/sourcedir.tar.gz")
s3.meta.client.upload_file("output.tar", bucket_name, "robomaker-sources/output.tar")
print(f"nUploaded to S3 location: {bucket_name}robomaker-sources/output.tar")

Here we upload a sourcedir.tar.gz that contains some object_tracker code that the SageMaker RLEstimator training job uses. We also upload an output.tar file, which contains a colcon bundle that is used to create an AWS RoboMaker simulation application.

Creating a Kubeflow pipeline using AWS RoboMaker and SageMaker Components

You can express a Kubeflow pipeline as a function decorated with @dsl.pipeline, as shown in the following code and in kfp-robomaker-example.ipynb. For more information, see Overview of Kubeflow Pipelines.

@dsl.pipeline(
    name="SageMaker & RoboMaker pipeline",
    description="SageMaker & RoboMaker Reinforcement Learning job where the jobs work together to train an RL model",
)
def sagemaker_robomaker_rl_job(
    region="us-east-1",
    role=role,
    name="robomaker-pipeline-simulation-application"
    + "".join(random.choice(string.ascii_lowercase) for i in range(10)),
    sources=[
        {
            "s3Bucket": bucket_name,
            "s3Key": "robomaker-sources/output.tar",
            "architecture": "X86_64",
        }
    ],
…

In this code example, you create a new function called sagemaker_robomaker_rl_job() and define arguments that are common to all the steps in the pipeline. Within the function, you then define your pipeline components:

  • Creating the simulation application
  • RLEstimator training job
  • AWS RoboMaker simulation jobs
  • Deleting the simulation application

Component 1: Creating the simulation application

This component describes options for creating an AWS RoboMaker simulation application from a colcon bundle file. See the following code:

 robomaker_create_sim_app = robomaker_create_sim_app_op(
        region=region,
        app_name=name,
        sources=sources,
        simulation_software_name=simulation_software_name,
        simulation_software_version=simulation_software_version,
        robot_software_name=robot_software_name,
        robot_software_version=robot_software_version,
        rendering_engine_name=rendering_engine_name,
        rendering_engine_version=rendering_engine_version,
    ).set_display_name('Create RoboMaker Sim App')

The options include the simulation software name and version, the robot software name and version, and also the sources, which are a link to a colcon bundle in Amazon S3.

Component 2: RLEstimator training job

This component describes an SageMaker RLEstimator training job. The job receives data from the AWS RoboMaker simulation jobs and uses that data to train a model. The job issues new policy weights to the simulation jobs while training. See the following code:

rlestimator_training_toolkit_ray = sagemaker_rlestimator_op(
        region=region,
        entry_point=entry_point,
        source_dir=source_dir,
        toolkit=toolkit,
        toolkit_version=toolkit_version,
        framework=framework,
        role=assume_role,
        instance_type=instance_type,
        instance_count=instance_count,
        model_artifact_path=output_path,
        job_name=job_name,
        metric_definitions=metric_definitions,
        max_run=max_run,
        hyperparameters={
            "rl.training.config.lambda": "0.95",
            "robomaker.config.app_arn": robomaker_create_sim_app.outputs["arn"],
            "robomaker.config.num_workers": "3",
            "robomaker.config.packageName": "object_tracker_simulation",
            "robomaker.config.launchFile": "local_client.launch",
            "robomaker.config.policyServerPort": "9000",
            "robomaker.config.iamRole": assume_role,
            "robomaker.config.sagemakerBucket": input_bucket_name,
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
    ).set_display_name('Start RLEstimator Training')

The options include a reference to the source directory in Amazon S3 where the source code is stored, hyperparameters that are used to configure the training job, and VPC configuration to define where the training job is run. The RLEstimator job spins up a local Redis server that it uses to issue the new policy weights that are consumed by the simulation jobs.

Components 3,4,5: Multiple AWS RoboMaker simulation jobs

These components describe three AWS RoboMaker simulation jobs that use the simulation application created in the robomaker_create_sim_app component. The following code shows one of the components:

robomaker_simulation_job_1 = robomaker_sim_job_op(
        region=region,
        role=role,
        output_bucket=output_bucket,
        output_path=robomaker_output_path,
        max_run=max_run,
        failure_behavior="Fail",
        sim_app_arn=robomaker_create_sim_app.outputs["arn"],
        sim_app_launch_config={
            "packageName": "object_tracker_simulation",
            "launchFile": "local_client.launch",
            "environmentVariables": {
                "RLCAMP_POLICY_SERVER_PORT": "9000",
                "RLCAMP_SAGEMAKER_BUCKET": input_bucket_name,
                "RLCAMP_SAGEMAKER_JOB_NAME": job_name,
                "RLCAMP_AWS_REGION": region,
            },
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
        use_public_ip="False",
    ).set_display_name('RoboMaker Simulation 1')

Options include the simulation app launch configuration, which includes environment variables that can configure the S3 bucket used for communication between the RLEstimator job and these simulation jobs.

Component 6: Deleting the simulation application

This component is used to delete the simulation application that we created. There is a soft limit of 40 simulation applications per AWS account, so it makes sense to clean up automatically as we create new pipeline runs. See the following code:

robomaker_delete_sim_app = robomaker_delete_sim_app_op(
        region=region, arn=robomaker_create_sim_app.outputs["arn"],
    ).after(
        robomaker_simulation_job_1,
        robomaker_simulation_job_2,
        robomaker_simulation_job_3,
        robomaker_create_sim_app,
    ).set_display_name('Delete RoboMaker Sim App')

The only options are the Region to use to interact with the AWS RoboMaker API and also the ARN of the simulation application to delete. We use the .after() method to define when this component should run as part of the pipeline.

Compiling and running your pipeline

Using the Kubeflow pipeline compiler, you compile the pipeline, create an experiment, and run the pipeline. See the following code:

kfp.compiler.Compiler().compile(sagemaker_robomaker_rl_job,'sagemaker_robomaker_rl_job.zip')
client = kfp.Client()
aws_experiment = client.create_experiment(name='rm-kfp-experiment')
exp_name    = f'sagemaker_robomaker_rl_job-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sagemaker_robomaker_rl_job.zip')

The following is an annotated screenshot of a Kubeflow pipeline after it finishes running. All the steps are SageMaker and AWS RoboMaker capabilities running as part of a Kubeflow pipeline.

The following is an annotated screenshot of a Kubeflow pipeline after it finishes running.

Conclusion

In this post, we discussed using SageMaker RL Components to build open-source Kubeflow pipelines. If you have questions or comments about SageMaker RL Components or AWS RoboMaker components, please leave a comment or create an issue on the Kubeflow Pipelines GitHub repo.


About the Authors

Alex Chung is a Senior Product Manager with AWS in enterprise machine learning systems. His role is to make AWS MLOps products more accessible from Kubernetes and custom environments. He’s passionate about accelerating ML adopted for a large body of users to solve global economic and societal problems. Outside machine learning, he is also a board member at a Silicon Valley nonprofit for donating stock to charityCocatalyst.org.

Kyle Saltmarsh is a robotics engineer from the Intelligent Assets and Robotics group at Woodside Energy. Kyle enjoys rock climbing, long walks on the beach and robot learning.

Leonard O’Sullivan is a Senior Technical Engineer at Max Kelsen, with numerous AWS certifications including SysOps Admin, Developer and Solution Architect. Leonard has more than 8 years of software development experience, and his passion is creating extensible, maintainable and readable code, with a focus on optimizing workflows and removing bottlenecks. Leonard’s current position centres around the automation and optimization of Machine Learning Operations. In his free time, you can find him playing soccer, eating pizza or trying new activities such as axe throwing and hang gliding.

Nicholas Therkelsen-Terry is CEO and Co-Founder of Max Kelsen, a machine learning and artificial intelligence solutions company. Nick has a broad range of expertise spanning across business, economics, sales, management and law. Nick has a deep theoretical and applied understanding of cutting-edge machine learning techniques and has been widely recognized as an expert and thought-leader in this field. Nick is a founding member and board representative of the Queensland AI Hub, a large investment supporting the development of the AI industry, creating more jobs and providing aspiring AI engineers with a space of their own to contribute to Australia’s innovation growth.

Nicholas Thomson is a Software Development Engineer with AWS Deep Learning. He helps build the open-source deep learning infrastructure projects that power Amazon AI. In his free time, he enjoys playing pool or building proof of concept websites.

Ragha Prasad is a software engineer on the AWS RoboMaker team. Primarily interested in robotics and artificial intelligence. In his spare time, he likes to travel, work on art projects and catch up on documentaries.

Sahika Genc is a senior applied scientist at Amazon artificial intelligence (AI). Her research interests are in smart automation, robotics, predictive control and optimization, and reinforcement learning (RL), and she serves in the industrial committee for the International Federation of Automatic Control. She leads science teams in scalable autonomous driving and automation systems, including consumer products such as AWS DeepRacer and SageMaker RL. Previously, she was a senior research scientist in the Artificial Intelligence and Learning Laboratory at the General Electric (GE) Global Research Center.

Read More

Analyzing open-source ML pipeline models in real time using Amazon SageMaker Debugger

Open-source workflow managers are popular because they make it easy to orchestrate machine learning (ML) jobs for productions. Taking models into productions following a GitOps pattern is best managed by a container-friendly workflow manager, also known as MLOps. Kubeflow Pipelines (KFP) is one of the Kubernetes-based workflow managers used today. However, it doesn’t provide all the functionality you need for a best-in-class data science and ML engineer experience. A common issue when developing ML models is having access to the tensor-level metadata of how the job is performing. For extremely large models such as for natural language processing (NLP) and computer vision (CV), this can be critical to avoid wasted GPU resources. However, most training frameworks become a black box after starting to train a model.

Amazon SageMaker is a managed ML platform from AWS to build, train, and deploy ML models at scale. SageMaker Components for Kubeflow Pipelines offer the flexibility to run steps of your KFP workflows on SageMaker instead of on your Kubernetes cluster, which provides the extra capabilities of SageMaker to develop high-quality models. SageMaker Debugger offers the capability to debug ML models during training by identifying and detecting problems with the models in near-real time. This feature can be used when training models within Kubeflow Pipelines through the SageMaker Training component. When combined, you can ensure that if your training jobs aren’t continuously improving with decreasing loss rate, the job ends early, thereby saving both cost and time.

SageMaker Debugger allows you to capture and analyze the state from training with minimal code changes. The state is composed of the following:

  • The parameters being learned by the model, such as weights and biases for neural networks
  • The changes applied to these parameters by the optimizer, called gradients
  • The optimization parameters themselves
  • Scalar values, such as accuracies and losses
  • The output of each layer

The monitoring of these states is done through rules. SageMaker includes a variety of predefined rules, and you can also make custom rules using Python. For more information, see Amazon SageMaker Debugger – Debug Your Machine Learning Models.

In this post, we go over how to deploy a simple pipeline featuring a training component that has a debugger enabled.

Using SageMaker Debugger for Kubeflow Pipelines with XGBoost

This post demonstrates how adding additional parameters to configure the debugger component can allow us to easily find issues within a model. We train a gradient-boosting model on the Modified National Institute of Standards and Technology (MNIST) dataset using Kubeflow Pipelines. The MNIST dataset contains images of handwritten digits from 0–9 and is a popular ML problem. The MNIST dataset contains 60,000 training images and 10,000 test images.

This post walks through the following steps:

  • Generating your data
  • Cloning the sample repository
  • Creating the training pipeline
  • Adding debugger parameters
  • Compiling the pipeline
  • Deploying the training pipeline through Kubeflow Pipelines
  • Reading the debugger output

Prerequisites

To run the example in this post, you need the following prerequisites:

You can run this example from any instance that has Python installed and access to the Kubernetes cluster where Kubeflow pipelines is installed.

Generating your training data

This post uses a SageMaker prebuilt container to train an XGBoost model on the MNIST dataset. We include a Python file that uploads the MNIST dataset to an S3 bucket in the format that the XGBoost prebuilt container expects.

  1. Create an S3 bucket. This post uses the us-east-1 Region.
  2. Create a new file named s3_dsample_data_creator.py with the following code:
    import pickle, gzip, numpy, urllib.request, json
    from urllib.parse import urlparse
    
    ###################################################################
    # This is the only thing that you need to change to run this code 
    # Give the name of your S3 bucket 
    bucket = '<bucket-name>' 
    
    # If you are going to use the default values of the pipeline then 
    # give a bucket name which is in us-east-1 region 
    ###################################################################
    
    
    # Load the dataset
    urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
    with gzip.open('mnist.pkl.gz', 'rb') as f:
        train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
    
    
    # Upload dataset to S3
    from sagemaker.amazon.common import write_numpy_to_dense_tensor
    import io
    import boto3
    
    train_data_key = 'mnist_kmeans_example/train_data'
    test_data_key = 'mnist_kmeans_example/test_data'
    train_data_location = 's3://{}/{}'.format(bucket, train_data_key)
    test_data_location = 's3://{}/{}'.format(bucket, test_data_key)
    print('training data will be uploaded to: {}'.format(train_data_location))
    print('training data will be uploaded to: {}'.format(test_data_location))
    
    # Convert the training data into the format required by 
    # the SageMaker XGBoost algorithm
    buf = io.BytesIO()
    write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])
    buf.seek(0)
    
    boto3.resource('s3').Bucket(bucket).Object(train_data_key).upload_fileobj(buf)
    
    # Convert the test data into the format required by XGBoost algorithm
    write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])
    buf.seek(0)
    
    boto3.resource('s3').Bucket(bucket).Object(test_data_key).upload_fileobj(buf)
    
    # Convert the valid data into the format required by XGBoost algorithm
    numpy.savetxt('valid-data.csv', valid_set[0], delimiter=',', fmt='%g')
    s3_client = boto3.client('s3')
    input_key = "{}/valid_data.csv".format("mnist_kmeans_example/input")
    s3_client.upload_file('valid-data.csv', bucket, input_key)
    

  1. Replace <bucket-name> with the name of the bucket you created.

This script requires you to install Python3, boto3, and NumPy.

  1. Run this script by using python3 s3_sample_data_creator.py.
  2. Verify that the data was successfully uploaded.

In your S3 bucket, you should now see a folder called mnist_kmeans_example, and under input, there should be a CSV file named valid-data.

Cloning the sample repository

In a terminal window, clone the Kubeflow pipelines repository and navigate to the directory with the sample code:

git clone https://github.com/kubeflow/pipelines
cd pipelines/samples/contrib/aws-samples/sagemaker_debugger_demo

We now go over how to create the training pipeline debugger-component-demo.py. This folder contains what the final pipeline should be.

Creating a training pipeline

Create a debugger-component-demo.py Python file as our training pipeline. The pipeline specified has poor hyperparameters and results in a poor model. It doesn’t yet have a debugger configured, but can still be compiled and submitted as a training job, and outputs a model.

See the following code:

#!/usr/bin/env python3

import kfp
import json
import os
import copy
from kfp import components
from kfp import dsl


cur_file_dir = os.path.dirname(__file__)
components_dir = os.path.join(cur_file_dir, '../../../../components/aws/sagemaker/')

sagemaker_train_op = components.load_component_from_file(components_dir + '/train/component.yaml')

def training_input(input_name, s3_uri, content_type):
    return {
        "ChannelName": input_name,
        "DataSource": {"S3DataSource": {"S3Uri": s3_uri, "S3DataType": "S3Prefix"}},
        "ContentType": content_type
    }

bad_hyperparameters = {
    'max_depth': '5',
    'eta': '0',
    'gamma': '4',
    'min_child_weight': '6',
    'silent': '0',
    'subsample': '0.7',
    'num_round': '50'
}


@dsl.pipeline(
    name='XGBoost Training Pipeline with bad hyperparameters',
    description='SageMaker training job test with debugger'
)
def training(role_arn="", bucket_name="my-bucket"):
    train_channels = [
        training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/input/valid_data.csv", 'text/csv')
    ]

    training = sagemaker_train_op(
        region='us-east-1',
        # Refer this link for xgboost Registry URLs: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html
        image='683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3',
        hyperparameters=bad_hyperparameters,
        channels=train_channels,
        instance_type='ml.m5.2xlarge',
        model_artifact_path=f's3://{bucket_name}/mnist_kmeans_example/output/model',
        role=role_arn,
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(training, __file__ + '.zip')

Adding debugger parameters

To enable SageMaker Debugger in your training jobs, you need to define the additional parameters to configure the debugger.

First, use debug_hook_config to select the tensor groups you want to collect for analysis and specify the frequency at which you want to save them. debug_hook_config takes in two parameters:

  • S3OutputPath – Points to the Amazon S3 URI where we intend to store our debugging tensors. SageMaker takes care of uploading these tensors transparently during the run.
  • CollectionConfigurations – Enumerates named collections of tensors we want to save. Collections are a convenient way to organize relevant tensors under same umbrella to make it easy to navigate them during analysis. In this particular example, one of the collections we instruct SageMaker Debugger to save is named metrics. We also instruct SageMaker Debugger to save metrics every three iterations.
    # Collections of tensors we want to save
    collections = {
        'feature_importance' : {
            'save_interval': '5'
        },
        'losses' : {
            'save_interval': '10'
        },
        'average_shap': {
            'save_interval': '5'
        },
        'metrics': {
            'save_interval': '3'
        }
    }
    
    
    # Helper method to format CollectionConfigurations
    def format_collection_config(collection_dict):
        output = []
        for key, val in collection_dict.items():
            output.append({'CollectionName': key, 'CollectionParameters': val})
        return output
    
    
    # Helper method to format debug_hook_config
    def training_debug_hook(s3_uri, collection_dict):
        return {
            'S3OutputPath': s3_uri,
            'CollectionConfigurations': format_collection_config(collection_dict)
        }
        
        
    # Provide the debug_hook_config input to the pipeline
    @dsl.pipeline(...)
    def training(role_arn="", bucket_name="my-bucket"):
        ...
        # debug_hook_config containing S3OutputPath and collections to be saved
        training = sagemaker_train_op(
            debug_hook_config=training_debug_hook(f's3://{bucket_name}/mnist_kmeans_example/hook_config', collections),

We also need to specify what rules we want to activate for automatic analysis using debug_rules_config. In this example, we use two SageMaker built-in rules: OverTraining and LossNotDecreasing. As the names suggest, the rules attempt to evaluate if the loss is not decreasing in the tensors captured by the debugging hook during training and also if the model is being over-trained (validation loss should not increase). See the following code:

# Helper method to format debug_rules_config
def training_debug_rules(rule_name, parameters):
    return {
        'RuleConfigurationName': rule_name,
        # Refer this link for Debugger Registry URLs: https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-docker-images-rules.html
        'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
        'RuleParameters': parameters
    }

# Provide the debug_rule_config input to the pipeline
@dsl.pipeline(...)
def training(role_arn="", bucket_name="my-bucket"):
    ...
    # Rules and rule parameters
    train_debug_rules = [
        training_debug_rules("LossNotDecreasing", {"rule_to_invoke": "LossNotDecreasing", "tensor_regex": ".*"}),
        training_debug_rules("Overtraining", {'rule_to_invoke': 'Overtraining', 'patience_train': '10', 'patience_validation': '20'}),
    ]
    training = sagemaker_train_op(
        # Provide the debug_rule_config as input to the pipeline
        debug_rule_config=train_debug_rules,
        ...
    )

For more information about SageMaker rules and the configurations best suited for using them, see Amazon SageMaker Debugger RulesConfig.

The following code shows what the pipeline looks like after configuring the debug hook and rules:

#!/usr/bin/env python3

import kfp
import json
import os
import copy
from kfp import components
from kfp import dsl


cur_file_dir = os.path.dirname(__file__)
components_dir = os.path.join(cur_file_dir, '../../../../components/aws/sagemaker/')

sagemaker_train_op = components.load_component_from_file(components_dir + '/train/component.yaml')

def training_input(input_name, s3_uri, content_type):
    return {
        "ChannelName": input_name,
        "DataSource": {"S3DataSource": {"S3Uri": s3_uri, "S3DataType": "S3Prefix"}},
        "ContentType": content_type
    }


def training_debug_hook(s3_uri, collection_dict):
    return {
        'S3OutputPath': s3_uri,
        'CollectionConfigurations': format_collection_config(collection_dict)
    }


def training_debug_rules(rule_name, parameters):
    return {
        'RuleConfigurationName': rule_name,
        # Refer this link for Debugger Registry URLs: https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-docker-images-rules.html
        'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
        'RuleParameters': parameters
    }


def format_collection_config(collection_dict):
    output = []
    for key, val in collection_dict.items():
        output.append({'CollectionName': key, 'CollectionParameters': val})
    return output


collections = {
    'feature_importance' : {
        'save_interval': '5'
    },
    'losses' : {
        'save_interval': '10'
    },
    'average_shap': {
        'save_interval': '5'
    },
    'metrics': {
        'save_interval': '3'
    }
}


bad_hyperparameters = {
    'max_depth': '5',
    'eta': '0',
    'gamma': '4',
    'min_child_weight': '6',
    'silent': '0',
    'subsample': '0.7',
    'num_round': '50'
}


@dsl.pipeline(
    name='XGBoost Training Pipeline with bad hyperparameters',
    description='SageMaker training job test with debugger'
)
def training(role_arn="", bucket_name="my-bucket"):
    train_channels = [
        training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/input/valid_data.csv", 'text/csv')
    ]
    train_debug_rules = [
        training_debug_rules("LossNotDecreasing", {"rule_to_invoke": "LossNotDecreasing", "tensor_regex": ".*"}),
        training_debug_rules("Overtraining", {'rule_to_invoke': 'Overtraining', 'patience_train': '10', 'patience_validation': '20'}),
    ]
    training = sagemaker_train_op(
        region='us-east-1',
        # Refer this link for xgboost Registry URLs: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html
        image='683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3',
        hyperparameters=bad_hyperparameters,
        channels=train_channels,
        instance_type='ml.m5.2xlarge',
        model_artifact_path=f's3://{bucket_name}/mnist_kmeans_example/output/model',
        debug_hook_config=training_debug_hook(f's3://{bucket_name}/mnist_kmeans_example/hook_config', collections),
        debug_rule_config=train_debug_rules,
        role=role_arn,
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(training, __file__ + '.zip')

Compiling the pipeline

Our pipeline is now complete and ready to be compiled using the following command:

dsl-compile --py debugger-component-demo.py --output debugger-component-demo.tar.gz

This creates debugger-component-demo.tar.gz in the same folder, and is the file we upload as our training job.

Deploying the pipeline

Now use kubectl to open up the KFP UI on our browser so we have access to the interface where we can upload the pipeline.

  1. In a new terminal window, run the following command (it’s possible to create pipelines and submit training jobs from the AWS Command Line Interface (AWS CLI)):
    $ kubectl port-forward -n kubeflow service/ml-pipeline-ui 8080:80

  1. Access the KFP UI by searching http://localhost:8080/ in your browser.
  2. Create a new pipeline and upload the compiled specification (.tar.gz file) as a new pipeline template.
  3. Provide the role_arn and bucket_name you created as pipeline inputs.

Reading the debugger output

When the training is complete, the logs display the status of each debugger rule.

The following screenshot shows an example of what the status of each debugger rule should be when the training job is complete.

The following screenshot shows an example of what the status of each debugger rule should be when the training job is complete.

We see here that our debugger rules haven’t found any issues with the model being overtrained. However, the debug rules indicate that our loss isn’t decreasing over time as it should.

The following screenshot shows the Amazon CloudWatch Logs, also printed on the Logs tab, which indeed show that the train-rmse is staying steady at 0.5 and isn’t decreasing.

The following screenshot shows the Amazon CloudWatch logs, also printed on the Logs tab, which indeed show that the train-rmse is staying steady at 0.5 and isn’t decreasing.

The reason that our loss isn’t decreasing is because our hyperparameters have been initialized suboptimally, specifically eta, which has been set to a poor value. eta determines the model’s learning rate and is currently at 0. This is clearly erroneous because it means that the subsequent steps aren’t progressing from the initial step. To address, this, use a non-zero learning rate, for example, set eta in hyperparameters to 0.2. You can see that the LossNotDecreasing rule is not triggered as train-rmse keeps decreasing steadily throughout the entire training duration. Rerunning the pipeline with the fix results in a model with no issues found.

Conclusion

Model debugging tools are critical to reduce total time, cost, and resources spent on creating a model. Using SageMaker Debugger in your Kubeflow Pipelines lets you go beyond just looking at scalars like losses and accuracies during training. You can get full visibility into all tensors flowing through the graph during training. Furthermore, it helps you monitor your training in near-real time using rules, and provides alerts if it detects an inconsistency in the training flow, which ultimately reduces costs and improves your company’s effectiveness on ML.

To get started using Kubeflow Pipelines with SageMaker, see the GitHub repo. You can also explore our native integration of SageMaker Operators for Kubernetes for MLOps.


About the Authors

Alex Chung is a Senior Product Manager with AWS in Deep Learning. His role is to make AWS Deep Learning products more accessible and cater to a wider audience. He’s passionate about social impact and technology, getting his regular gym workout, and cooking healthy meals.

 

 

Suraj Kota is a Software Engineer specialized in Machine Learning infrastructure. He builds tools to easily get started and scale machine learning workload on AWS. He worked on the Amazon Deep Learning Containers, Deep Learning AMI, SageMaker Operators for Kubernetes, and other open source integrations like Kubeflow.

 

 

Dustin Luong is a Software Development Engineering Intern with AWS in Deep Engines. He works on developing SageMaker integrations with open source platforms like Kubernetes and Kubeflow Pipelines. He’s currently a student at UC Berkeley and in his spare time he enjoys playing basketball, hiking, and playing board games.

Read More

Translate and analyze text using SQL functions with Amazon Athena, Amazon Translate, and Amazon Comprehend

You have Amazon Simple Storage Service (Amazon S3) buckets full of files containing incoming customer chats, product reviews, and social media feeds, in many languages. Your task is to identify the products that people are talking about, determine if they’re expressing happy thoughts or sad thoughts, translate their comments into a single common language, and create copies of the data for your business analysts with this new information added to each record. Additionally, you need to remove any personally identifiable information (PII), such as names, addresses, and credit card numbers.

You already know how to use Amazon Athena to transform data in Amazon S3 using simple SQL commands and the built-in functions in Athena. Now you can also use Athena to translate and analyze text fields, thanks to Amazon Translate, Amazon Comprehend, and the power of Athena User Defined Functions (UDFs).

Athena is an interactive query service that makes it easy to analyze data stored in Amazon S3 using SQL. Amazon Comprehend is a Natural Language Processing (NLP) service that makes it easy to uncover insights from text. Amazon Translate is a neural machine translation service that delivers fast, high-quality, affordable, and customizable language translation. In this post, I show you how you can now use them together to perform the following actions:

  • Detect the dominant language of a text field
  • Detect the prevailing sentiment expressed—positive, negative, neither, or both
  • Detect or redact entities (such as items, places, or quantities)
  • Detect or redact PII
  • Translate text from one language to another

This post accomplishes the following goals:

  • Show you how to quickly set up the text analytics functions in your own AWS account (it’s fast and easy!)
  • Briefly explain how the functions work
  • Discuss performance and cost
  • Provide a tutorial where we do some text analytics on Amazon product reviews
  • Describe all the available functions

We include a list of all the available functions at the end of the post; the following code shows a few example queries and results:

USING FUNCTION detect_sentiment(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_sentiment('I am very happy', 'en') as sentiment
	sentiment
	POSITIVE

USING FUNCTION detect_pii_entities(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_pii_entities('I am Bob, I live in Herndon VA, and I love cars', 'en') as pii
	pii
	[["NAME","Bob"],["ADDRESS","Herndon VA"]]

USING FUNCTION redact_pii_entities(text_col VARCHAR, lang VARCHAR, type VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT redact_pii_entities('I am Bob, I live in Herndon VA, and I love cars', 'en', 'NAME,ADDRESS') as pii_redacted
	pii_redacted
	I am [NAME], I live in [ADDRESS], and I love cars

USING FUNCTION translate_text(text_col VARCHAR, sourcelang VARCHAR, targetlang VARCHAR, terminologyname VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT translate_text('It is a beautiful day in the neighborhood', 'auto', 'fr', NULL) as translated_text
	translated_text
	C'est une belle journée dans le quartier

Install the text analytics UDF

An Athena UDF uses AWS Lambda to implement the function capability. I discuss more details later in this post, but you don’t need to understand the inner workings to use the text analytics UDF, so let’s get started.

Install the prebuilt Lambda function with the following steps:

  1. Navigate to the TextAnalyticsUDFHandler application in the AWS Serverless Application Repository.
  2. In the Application settings section, keep the settings at their defaults.
  3. Select I acknowledge that this app creates custom IAM roles.
  4. Choose Deploy.

And that’s it! Now you have a new Lambda function called textanalytics-udf. You’re ready to try some text analytics queries in Athena.

If you prefer to build and deploy from the source code instead, see the directions at the end of the GitHub repository README.

Run your first text analytics query

If you’re new to Athena, you may want to review the Getting Started guide.

As of this writing, the Athena UDF feature is still in preview. To enable it, create an Athena workgroup named AmazonAthenaPreviewFunctionality and run all the UDF queries from that workgroup.

Enter the following query into the SQL editor:

USING FUNCTION detect_sentiment(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_sentiment('I am very happy', 'en') as sentiment

You get a simple POSITIVE result. Now try again, varying the input text—try something less positive to see how the returned sentiment value changes.

To get the sentiment along with confidence scores for each potential sentiment value, use the following query instead:

USING FUNCTION detect_sentiment_all(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_sentiment_all('I am very happy', 'en') as sentiment

Now you get a JSON string containing the sentiment and all the sentiment scores:

{"sentiment":"POSITIVE","sentimentScore":{"positive":0.999519,"negative":7.407639E-5,"neutral":2.7478999E-4,"mixed":1.3210243E-4}}

You can use the built-in JSON extraction functions in Athena on this result to extract the fields for further analysis.

How the UDF works

For more information about the Athena UDF framework, see Querying with User Defined Functions.

The Java class TextAnalyticsUDFHandler implements our UDF Lambda function handler. Each text analytics function has a corresponding public method in this class.

Athena invokes our UDF Lambda function with batches of input records. The TextAnalyticsUDFHandler subdivides these batches into smaller batches of up to 25 rows to take advantage of the Amazon Comprehend synchronous multi-document batch APIs where they are available (for example, for detecting language, entities, and sentiment). When there is no synchronous multi-document API available (such as for DetectPiiEntity and TranslateText), we use the single-document API instead.

Amazon Comprehend API service quotas provide guardrails to limit your cost exposure from unintentional high usage (we discuss this more in the following section). By default, the multi-document batch APIs process up to 250 records per second, and the single-document APIs process up to 20 records per second. Our UDFs use exponential back off and retry to throttle the request rate to stay within these limits. You can request increases to the transactions per second quota for APIs using the Quota Request Template on the AWS Management Console.

Amazon Comprehend and Amazon Translate each enforce a maximum input string length of 5,000 utf-8 bytes. Text fields that are longer than 5,000 utf-8 bytes are truncated to 5,000 bytes for language and sentiment detection, and split on sentence boundaries into multiple text blocks of under 5,000 bytes for translation and entity or PII detection and redaction. The results are then combined.

Optimizing cost

In addition to Athena query costs, the text analytics UDF incurs usage costs from Lambda and Amazon Comprehend and Amazon Translate. The amount you pay is a factor of the total number of records and characters that you process with the UDF. For more information, see AWS Lambda pricing, Amazon Comprehend pricing, and Amazon Translate pricing.

To minimize the costs, I recommend that you avoid processing the same records multiple times. Instead, materialize the results of the text analytics UDF by using CREATE TABLE AS SELECT (CTAS) queries to capture the results in a separate table that you can then cost-effectively query as often as needed without incurring additional UDF charges. Process newly arriving records incrementally using INSERT INTO…SELECT queries to analyze and enrich only the new records and add them to the target table.

Avoid calling the text analytics functions needlessly on records that you will subsequently discard. Write your queries to filter the dataset first using temporary tables, views, or nested queries, and then apply the text analytics functions to the resulting filtered records.

Always assess the potential cost before you run text analytics queries on tables with vary large numbers of records.

In this section, we provide two example cost assessments.

Example 1: Analyze the language and sentiment of tweets

Let’s assume you have 10,000 tweet records, with average length 100 characters per tweet. Your SQL query detects the dominant language and sentiment for each tweet. You’re in your second year of service (the Free Tier no longer applies). The cost details are as follows:

  • Size of each tweet = 100 characters
  • Number of units (100 character) per record (minimum is 3 units) = 3
  • Total Units: 10,000 (records) x 3 (units per record) x 2 (Amazon Comprehend requests per record) = 60,000
  • Price per unit = $0.0001
  • Total cost for Amazon Comprehend = [number of units] x [cost per unit] = 60,000 x $0.0001 = $6.00 

Example 2: Translate tweets

Let’s assume that 2,000 of your tweets aren’t in your local language, so you run a second SQL query to translate them. The cost details are as follows:

  • Size of each tweet = 100 characters
  • Total characters: 2,000 (records) * 100 (characters per record) x 1 (Translate requests per record) = 200,000
  • Price per character = $0.000015
  • Total cost for Amazon Translate = [number of characters] x [cost per character] = 200,000 x $0.000015 = $3.00

Analyze insights from customer reviews

It’s time to put our new text analytics queries to use.

For a tutorial on getting actionable insights from customer reviews, see Tutorial: Analyzing Insights from Customer Reviews with Amazon Comprehend. This post provides an alternate approach to the same challenge: using SQL queries powered by Athena and Amazon Comprehend.

The tutorial takes approximately 10 minutes to complete, and costs up to $6 for Amazon Comprehend—there is no cost if you’re eligible for the Free Tier.

Create a new database in Athena

Run the following query in the Athena query editor:

CREATE DATABASE IF NOT EXISTS comprehendresults;

When connecting your data source, choose your new database.

Create a source table containing customer review data

We use the Amazon Customer Reviews Dataset, conveniently hosted for public access in Amazon S3.

  1. Run the following query in the Athena query editor:
    CREATE EXTERNAL TABLE amazon_reviews_parquet(
      marketplace string, 
      customer_id string, 
      review_id string, 
      product_id string, 
      product_parent string, 
      product_title string, 
      star_rating int, 
      helpful_votes int, 
      total_votes int, 
      vine string, 
      verified_purchase string, 
      review_headline string, 
      review_body string, 
      review_date bigint, 
      year int)
    PARTITIONED BY (product_category string)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://amazon-reviews-pds/parquet/'
    

  1. Under Tables, find the new table amazon_reviews_parquet.
  2. From the options menu, choose Load partitions.
  1. Preview the new table, amazon_reviews_parquet.
  1. Run the following query to assess the average review length:
    SELECT AVG(LENGTH(review_body)) AS average_review_length FROM amazon_reviews_parquet

The average review length is around 365 characters. This equates to 4 Amazon Comprehend units per record (1 unit = 100 characters).

Detect the language for each review

To detect the language of each review, run the following query in the Athena query editor—it takes just over 1 minute to run and costs $2:

CREATE TABLE amazon_reviews_with_language WITH (format='parquet') AS
USING FUNCTION detect_dominant_language(col1 VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf')
SELECT *, detect_dominant_language(review_body) AS language
FROM amazon_reviews_parquet
LIMIT 5000

This query creates a new table, amazon_reviews_with_language, with one new column added: language. The LIMIT clause limits the number of records to 5,000.

Cost is calculated as: 5,000 (records) x 4 (units per record) x 1 (requests per record) x $0.0001 (Amazon Comprehend price per unit) = $2. 

Run the following query to see the detected language codes, with the corresponding count of reviews for each language:

SELECT language, count(*) AS count FROM amazon_reviews_with_language GROUP BY language ORDER BY count DESC

Detect sentiment and entities for each review

To detect sentiment, run the following query in the Athena query editor—it uses two text analytics functions, takes around 1 minute to run, and costs $4:

CREATE TABLE amazon_reviews_with_text_analysis WITH (format='parquet') AS
USING
   FUNCTION detect_sentiment_all(col1 VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf'),
   FUNCTION detect_entities_all(col1 VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf')
SELECT *, 
   detect_sentiment_all(review_body, language) AS sentiment,
   detect_entities_all(review_body, language) AS entities
FROM amazon_reviews_with_language
WHERE language IN ('ar', 'hi', 'ko', 'zh-TW', 'ja', 'zh', 'de', 'pt', 'en', 'it', 'fr', 'es')

This query creates a new table, amazon_reviews_with_text_analysis, with two additional columns added: sentiment and entities. The WHERE clause restricts the result set to the list of languages supported by Amazon Comprehend sentiment and entity detection.

Cost is calculated as: 5,000 (records) x 4 (units per record) x 2 (requests per record) x $0.0001 (Amazon Comprehend price per unit) = $4.

Preview the new table and inspect some of the values for the new sentiment and entities columns. They contain JSON strings with nested structures and fields.

The following screenshot shows the sentiment column details.

The following screenshot shows the entities column details.

Next, we use the JSON functions in Athena to prepare these columns for analysis.

Prepare sentiment for analysis

Run the following SQL query to create a new table containing sentiment and sentiment scores expanded into separate columns:

CREATE TABLE sentiment_results_final WITH (format='parquet') AS
SELECT 
   review_date, year, product_title, star_rating, language, 
   CAST(JSON_EXTRACT(sentiment,'$.sentiment') AS VARCHAR) AS sentiment,
   CAST(JSON_EXTRACT(sentiment,'$.sentimentScore.positive') AS DOUBLE ) AS positive_score,
   CAST(JSON_EXTRACT(sentiment,'$.sentimentScore.negative') AS DOUBLE ) AS negative_score,
   CAST(JSON_EXTRACT(sentiment,'$.sentimentScore.neutral') AS DOUBLE ) AS neutral_score,
   CAST(JSON_EXTRACT(sentiment,'$.sentimentScore.mixed') AS DOUBLE ) AS mixed_score,
   review_headline, review_body
FROM amazon_reviews_with_text_analysis

Preview the new sentiment_results_final table (see the following screenshot). Does the sentiment generally align with the text of the review_body field? How does it correlate with the star_rating? If you spot any dubious sentiment assignments, check the confidence scores to see if the sentiment was assigned with a low confidence.

Prepare entities for analysis

Run the following SQL query to create a new table containing detected entities unnested into separate rows (inner subquery), with each field in a separate column (outer query):

CREATE TABLE entities_results_final WITH (format='parquet') AS
SELECT 
   review_date, year, product_title, star_rating, language, 
   CAST(JSON_EXTRACT(entity_element, '$.text') AS VARCHAR ) AS entity,
   CAST(JSON_EXTRACT(entity_element, '$.type') AS VARCHAR ) AS category,
   CAST(JSON_EXTRACT(entity_element, '$.score') AS DOUBLE ) AS score,
   CAST(JSON_EXTRACT(entity_element, '$.beginOffset') AS INTEGER ) AS beginoffset,
   CAST(JSON_EXTRACT(entity_element, '$.endOffset') AS INTEGER ) AS endoffset,
   review_headline, review_body
FROM
(
   SELECT * 
   FROM
      (
      SELECT *,
      CAST(JSON_PARSE(entities) AS ARRAY(json)) AS entities_array
      FROM amazon_reviews_with_text_analysis
      )
   CROSS JOIN UNNEST(entities_array) AS t(entity_element)
)

Preview the contents of the new table, entities_results_final (see the following screenshot).

Visualize in Amazon QuickSight (optional)

As an optional step, you can visualize your results with Amazon QuickSight. For instructions, see Step 5: Visualizing Amazon Comprehend Output in Amazon QuickSight.

You can use the new word cloud visual type for entities, instead of tree map. In the word cloud chart menu, select Hide “other” categories.

You now have a dashboard with sentiment and entities visualizations that looks similar to the following screenshot.

Troubleshooting

If your query fails, check the Amazon CloudWatch metrics and logs generated by the UDF Lambda function.

  1. On the Lambda console, find the textanalytics-udf function.
  2. Choose Monitoring.

You can view the CloudWatch metrics showing how often the function ran, how long it runs for, how often it failed, and more.

  1. Choose View logs in CloudWatch to open the function log streams for additional troubleshooting insights.

For more information about viewing CloudWatch metrics via Lambda, see Using the Lambda console.

Additional use cases

There are many use cases for SQL text analytics functions. In addition to the example shown in this post, consider the following:

  • Simplify ETL pipelines by using incremental SQL queries to enrich text data with sentiment and entities, such as streaming social media streams ingested by Amazon Kinesis Data Firehose
  • Use SQL queries to explore sentiment and entities in your customer support texts, emails, and support cases
  • Prepare research-ready datasets by redacting PII from customer or patient interactions
  • Standardize many languages to a single common language

You may have additional use cases for these functions, or additional capabilities you want to see added, such as the following:

  • SQL functions to call custom entity recognition and custom classification models in Amazon Comprehend
  • SQL functions for de-identification—extending the entity and PII redaction functions to replace entities with alternate unique identifiers

Additionally, the implementation is open source, which means that you can clone the repo, modify and extend the functions as you see fit, and (hopefully) send us pull requests so we can merge your improvements back into the project and make it better for everyone.

Cleaning up

After you complete this tutorial, you might want to clean up any AWS resources you no longer want to use. Active AWS resources can continue to incur charges in your account.

  1. In Athena, run the following query to drop the database and all the tables:
    DROP DATABASE comprehendresults CASCADE

  1. In AWS CloudFormation, delete the stack serverlessrepo-TextAnalyticsUDFHandler.
  2. Cancel your QuickSight subscription.

Conclusion

I have shown you how to install the sample text analytics UDF Lambda function for Athena, so that you can use simple SQL queries to translate text using Amazon Translate, generate insights from text using Amazon Comprehend, and redact sensitive information. I hope you find this useful, and share examples of how you can use it to simplify your architectures and implement new capabilities for your business.

Please share your thoughts with us in the comments section, or in the issues section of the project’s GitHub repository.

Appendix: Available function reference

This section summarizes the functions currently provided. The README file provides additional details.

Detect language

This function uses the Amazon Comprehend BatchDetectDominantLanguage API to identify the dominant language based on the first 5,000 bytes of input text.

The following code returns a language code, such as fr for French or en for English:

USING FUNCTION detect_dominant_language(text_col VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_dominant_language('il fait beau à Orlando') as language

The following code returns a JSON formatted array of language codes and corresponding confidence scores:

USING FUNCTION detect_dominant_language_all(text_col VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_dominant_language_all('il fait beau à Orlando') as language_all

Detect sentiment

This function uses the Amazon Comprehend BatchDetectSentiment API to identify the sentiment based on the first 5,000 bytes of input text.

The following code returns a sentiment as POSITIVE, NEGATIVE, NEUTRAL, or MIXED:

USING FUNCTION detect_sentiment(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_sentiment('Joe is very happy', 'en') as sentiment

The following code returns a JSON formatted object containing detected sentiment and confidence scores for each sentiment value:

USING FUNCTION detect_sentiment_all(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_sentiment_all('Joe is very happy', 'en') as sentiment_all

Detect entities

This function uses the Amazon Comprehend DetectEntities API to identify PII. Input text longer than 5,000 bytes results in multiple Amazon Comprehend API calls.

The following code returns a JSON formatted object containing an array of entity types and values:

USING FUNCTION detect_entities(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_entities('His name is Joe, he lives in Richmond VA, he bought an Amazon Echo Show on January 5th, and he loves it', 'en') as entities

The following code returns a JSON formatted object containing an array of PII entity types, with their values, scores, and character offsets:

USING FUNCTION detect_entities_all(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_entities_all('His name is Joe, he lives in Richmond VA, he bought an Amazon Echo Show on January 5th, and he loves it', 'en') as entities_all

Redact entities

This function replaces entity values for the specified entity types with “[ENTITY_TYPE]”. Input text longer than 5,000 bytes results in multiple Amazon Comprehend API calls. See the following code:

USING FUNCTION redact_entities(text_col VARCHAR, lang VARCHAR, types VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT redact_entities('His name is Joe, he lives in Richmond VA, he bought an Amazon Echo Show on January 5th, and he loves it', 'en', 'ALL') as entities_redacted

The command returns a redacted version on the input string. Specify one or more entity types to redact by providing a comma-separated list of valid types in the types string parameter, or ALL to redact all types.

Detect PII

This function uses the DetectPiiEntities API to identify PII. Input text longer than 5,000 bytes results in multiple Amazon Comprehend API calls.

The following code returns a JSON formatted object containing an array of PII entity types and values:

USING FUNCTION detect_pii_entities(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_pii_entities('His name is Joe, his username is joe123 and he lives in Richmond VA', 'en') as pii

The following code returns a JSON formatted object containing an array of PII entity types, with their scores and character offsets:

USING FUNCTION detect_pii_entities_all(text_col VARCHAR, lang VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT detect_pii_entities_all('His name is Joe, his username is joe123 and he lives in Richmond VA', 'en') as pii_all

Redact PII

This function replaces the PII values for the specified PII entity types with “[PII_ENTITY_TYPE]”. Input text longer than 5,000 bytes results in multiple Amazon Comprehend API calls. See the following code:

USING FUNCTION redact_pii_entities(text_col VARCHAR, lang VARCHAR, types VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT redact_pii_entities('His name is Joe, his username is joe123 and he lives in Richmond VA', 'en', 'ALL') as pii_redacted

The function returns a redacted version on the input string. Specify one or more PII entity types to redact by providing a comma-separated list of valid types in the type string parameter, or ALL to redact all type.

Translate text

This function translates text from the source language to target language. Input text longer than 5,000 bytes results in multiple Amazon Translate API calls. See the following code:

USING FUNCTION translate_text(text_col VARCHAR, sourcelang VARCHAR, targetlang VARCHAR, customterminologyname VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'textanalytics-udf') 
SELECT translate_text('It is a beautiful day in the neighborhood', 'auto', 'fr', NULL) as translated_text

The function returns the translated string. Optionally, auto-detect the source language (use auto as the language code, which uses Amazon Comprehend), and optionally specify a custom terminology (otherwise use NULL for customTerminologyName).


About the Author

Bob StrahanBob Strahan is a Principal Solutions Architect in the AWS Language AI Services team.

Read More

Setting up Amazon Personalize with AWS Glue

Data can be used in a variety of ways to satisfy the needs of different business units, such as marketing, sales, or product. In this post, we focus on using data to create personalized recommendations to improve end-user engagement. Most ecommerce applications consume a huge amount of customer data that can be used to provide personalized recommendations; however, that data may not be cleaned or in the right format to provide those valuable insights.

The goal of this post is to demonstrate how to use AWS Glue to extract, transform, and load your JSON data into a cleaned CSV format. We then show you how to run a recommendation engine powered by Amazon Personalize on your user interaction data to provide a tailored experience for your customers. The resulting output from Amazon Personalize is recommendations you can generate from an API.

A common use case is an ecommerce platform that collects user-item interaction data and suggests similar products or products that a customer may like. By the end of this post, you will be able to take your uncleaned JSON data and generate personalized recommendations based off of products each user has interacted with, creating a better experience for your end-users. For the purposes of this post, refer to this user-item-interaction dataset to build this solution.

The resources of this solution may incur a cost on your AWS account. For pricing information, see AWS Glue Pricing and Amazon Personalize Pricing.

The following diagram illustrates our solution architecture.

Prerequisites

For this post, you need the following:

For instructions on creating a bucket, see Step 1: Create your first S3 bucket. Make sure to attach the Amazon Personalize access policy.

These are very permissive policies; in practice it’s best to use least privilege and only give access where it’s needed. For instructions on creating a role, see Step 2: Create an IAM Role for AWS Glue.

Crawling your data with AWS Glue

We use AWS Glue to crawl through the JSON file to determine the schema of your data and create a metadata table in your AWS Glue Data Catalog. The Data Catalog contains references to data that is used as sources and targets of your ETL jobs in AWS Glue. AWS Glue is a serverless data preparation service that makes it easy to extract, clean, enrich, normalize, and load data. It helps prepare your data for analysis or machine learning (ML). In this section, we go through how to get your JSON data ready for Amazon Personalize, which requires a CSV file.

Your data can have different columns that you may not necessarily want or need to run through Amazon Personalize. In this post, we use the user-item-interaction.json file and clean that data using AWS Glue to only include the columns user_id, item_id, and timestamp, while also transforming it into CSV format. You can use a crawler to access your data store, extract metadata, and create table definitions in the Data Catalog. It automatically discovers new data and extracts schema definitions. This can help you gain a better understanding of your data and what you want to include while training your model.

The user-item-interaction JSON data is an array of records. The crawler treats the data as one object: just an array. We create a custom classifier to create a schema that is based on each record in the JSON array. You can skip this step if your data isn’t an array of records.

  1. On the AWS Glue console, under Crawlers, choose Classifiers.
  2. Choose Add classifier.
  3. For Classifier name¸ enter json_classifier.
  4. For Classifier type, select JSON.
  5. For JSON path, enter $[*].
  6. Choose Create.

Choose Create.

  1. On the Crawlers page, choose Add crawler.
  2. For Crawler name, enter json_crawler.
  3. For Custom classifiers, add the classifier you created.

For Custom classifiers, add the classifier you created.

  1. Choose Next.
  2. For Crawler source type, choose Data stores.
  3. Leave everything else as default and choose Next.
  4. For Choose a data store, enter the Amazon S3 path to your JSON data file.
  5. Choose Next.

Choose Next.

  1. Skip the section Add another data store.
  2. In the Choose an IAM role section, select Choose an existing IAM role.
  3. For IAM role, choose the role that you created earlier (AWSGlueServiceRole-xxx).
  4. Choose Next.

Choose Next.

  1. Leave the frequency as Run on Demand.
  2. On the Output page, choose Add database.
  3. For Database name, enter json_data.
  4. Choose Finish.
  5. Choose Run it now. 

You can also run your crawler by going to the Crawlers page, selecting your crawler, and choosing Run crawler.

Using AWS Glue to convert your files from CSV to JSON

After your crawler finishes running, go to the Tables page on the AWS Glue console. Navigate to the table your crawler created. Here you can see the schema of your data. Make note of the fields you want to use with your Amazon Personalize data. For this post, we want to keep the user_id, item_id, and timestamp columns for Amazon Personalize.

For this post, we want to keep the user_id, item_id, and timestamp columns for Amazon Personalize.

At this point, you have set up your database. Amazon Personalize requires CSV files, so you have to transform the data from JSON format into three cleaned CSV files that include only the data you need in Amazon Personalize. The following table shows examples of the three CSV files you can include in Amazon Personalize. It’s important to note that interactions data is required, whereas user and item data metadata is optional.

Dataset Type Required Fields Reserved Keywords
Users

USER_ID (string)

1 metadata field

Items

ITEM_ID (string)

1 metadata field

CREATION_TIMESTAMP(long)
Interactions

USER_ID (string)

ITEM_ID (string)

TIMESTAMP (long)

 

EVENT_TYPE (string)

IMPRESSION (string)

EVENT_VALUE (float,null)

It’s also important to make sure that you have at least 1,000 unique combined historical and event interactions in order to train the model. For more information about quotas, see Quotas in Amazon Personalize.

To save the data as a CSV, you need to run an AWS Glue job on the data. A job is the business logic that performs the ETL work in AWS Glue. The job changes the format from JSON into CSV. For more information about data formatting, see Formatting Your Input Data.

  1. On the AWS Glue Dashboard, choose AWS Glue Studio.

AWS Glue Studio is an easy-to-use graphical interface for creating, running, and monitoring AWS Glue ETL jobs.

  1. Choose Create and manage jobs.
  2. Select Source and target added to the graph.
  3. For Source, choose S3.
  4. For Target, choose S3.
  5. Choose Create.

Choose Create.

  1. Choose the data source S3 bucket.
  2. On the Data source properties – S3 tab, add the database and table we created earlier.

On the Data source properties – S3 tab, add the database and table we created earlier.

  1. On the Transform tab, select the boxes to drop user_login and location.

In this post, we don’t use any additional metadata to run our personalization algorithm.

In this post, we don’t use any additional metadata to run our personalization algorithm.

  1. Choose the data target S3 bucket.
  2. On the Data target properties – S3 tab, for Format, choose CSV.
  3. For S3 Target location, enter the S3 path for your target. 

For this post, we use the same bucket we used for the JSON file.

For this post, we use the same bucket we used for the JSON file.

  1. On the Job details page, for Name, enter a name for your job (for this post, json_to_csv).
  2. For IAM Role, choose the role you created earlier.

You should also have included the AmazonS3FullAccess policy earlier.

  1. Leave the rest of the fields at their default settings.

Leave the rest of the fields at their default settings.

  1. Choose Save.
  2. Choose Run.

It may take a few minutes for the job to run.

In your Amazon S3 bucket, you should now see the CSV file that you use in the next section.

Setting up Amazon Personalize

At this point, you have your data formatted in a file type that Amazon Personalize can use. Amazon Personalize is a fully managed service that uses ML and over 20 years of recommendation experience at Amazon.com to enable you to improve end-user engagement by powering real-time personalized product and content recommendations, and targeted marketing promotions. In this section, we go through how to create an Amazon Personalize solution to use your data to create personalized experiences.

  1. On the Amazon Personalize console, under New dataset groups, choose Get started.
  2. Enter the name for your dataset group.

A dataset group contains the datasets, solutions, and event ingestion API.

  1. Enter a dataset name, and enter in the schema details based on your data.

For this dataset, we use the following schema. You can change the schema according to the values in your dataset.

{
	"type": "record",
	"name": "Interactions",
	"namespace": "com.amazonaws.personalize.schema",
	"fields": [
		{
			"name": "USER_ID",
			"type": "string"
		},
		{
			"name": "ITEM_ID",
			"type": "string"
		},
		{
			"name": "TIMESTAMP",
			"type": "long"
		}
	],
	"version": "1.0"
}
  1. Choose Next.
  2. Enter your dataset import job name to import data from Amazon S3.

Make sure that your IAM service role has access to Amazon S3 and Amazon Personalize, and that your bucket has the correct bucket policy.

  1. Enter the path to your data (the Amazon S3 bucket from the previous section).
  2. On the Dashboard page for your dataset groups, under Upload datasets, import the user-item-interactions data (user data and item data are optional but can enhance the solution).

On the Dashboard page for your dataset groups, under Upload datasets,

We include an example item.csv file in the GitHub repo. The following screenshot shows an example of the item data.

The following screenshot shows an example of the item data.

  1. Under Create solutions, for Solutions training, choose Start.

A solution is a trained model of the data you provided with the algorithm, or recipe, that you select.

  1. For Solution name, enter aws-user-personalization.
  2. Choose Next.
  3. Review and choose Finish.
  4. On the dashboard, under Launch campaigns, for Campaign creation, choose Start.

A campaign allows your application to get recommendations from your solution version.

  1. For Campaign name, enter a name.
  2. Choose the solution you created.
  3. Choose Create campaign.

You have now successfully used the data from your data lake and created a recommendation model that can be used to get various recommendations. With this dataset, you can get personalized recommendations for houseware products based off the user’s interactions with other products in the dataset.

Using Amazon Personalize to get your recommendations

To test your solution, go to the campaign you created. In the Test campaign results section, under User ID, enter an ID to get recommendations for. A list of IDs shows up, along with a relative score. The item IDs correlate with specific products recommended.

The following screenshot shows a search for user ID 1. They have been recommended item ID 59, which correlates to a wooden picture frame. The score listed next to the item gives you the predicted relevance of each item to your user.

The following screenshot shows a search for user ID 1.

To learn more about Amazon Personalize scores, see Introducing recommendation scores in Amazon Personalize.

To generate recommendations, you can call the GetRecommendations or GetPersonalizedRanking API using the AWS Command Line Interface (AWS CLI) or a language-specific SDK. With Amazon Personalize, your recommendations can change as the user clicks on the items for more real-time use cases. For more information, see Getting Real-Time Recommendations.

Conclusion

AWS offers a wide range of AI/ML and analytics services that you can use to gain insights and guide better business decisions. In this post, you used a JSON dataset that included additional columns of data, and cleaned and transformed that data using AWS Glue. In addition, you built a custom model using Amazon Personalize to provide recommendations for your customers.

To learn more about Amazon Personalize, see the developer guide. Try this solution out and let us know if you have any questions in the comments.


About the Authors

Zoish PithwafaZoish Pithawala is a Startup Solutions Architect at Amazon Web Services based out of San Francisco. She primarily works with startup customers to help them build secure and scalable solutions on AWS.

 

 

 

Sam TranSam Tran is a Startup Solutions Architect at Amazon Web Services based out of Seattle. He focuses on helping his customers create well-architected solutions on AWS.

Read More

Amazon Rekognition Custom Labels Community Showcase

In our Community Showcase, Amazon Web Services (AWS) highlights projects created by AWS Heroes and AWS Community Builders. 

We worked with AWS Machine Learning (ML) Heroes and AWS ML Community Builders to bring to life projects and use cases that detect custom objects with Amazon Rekognition Custom Labels.

The AWS ML community is a vibrant group of developers, data scientists, researchers, and business decision-makers that dive deep into artificial intelligence and ML concepts, contribute with real-world experiences, and collaborate on building projects together.

Amazon Rekognition is a fully managed computer vision service that allows developers to analyze images and videos for a variety of use cases, including face identification and verification, media intelligence, custom industrial automation, and workplace safety.

Detecting custom objects and scenes can be hard, and training and improving a computer vision model with growing data makes the problem more complex. Amazon Rekognition Custom Labels allows you to detect custom labeled objects and scenes with zero Jupyter notebook experience. For example, you can identify logos in streaming media, simplify preventative maintenance, and scale supply chain inventory management. ML practitioners, data scientists, and developers with no previous ML experience benefit by moving their models to production faster, while Amazon Rekognition Custom Labels takes care of the heavy lifting of model development.

In this post, we highlight a few externally published getting started guides and tutorials from AWS ML Heroes and AWS ML Community Builders that applied Amazon Rekognition to a wide variety of use cases, from at-home projects like a fridge inventory checker to an enterprise-level HVAC filter cleanliness detector.

AWS ML Heroes and AWS ML Community Builders

Classify LEGO bricks with Amazon Rekognition Custom Labels by Mike Chambers. In this video, Mike walks you through this fun use case to use Amazon Rekognition Custom Labels to detect 250 different LEGO bricks.

Training models using Satellite imagery on Amazon Rekognition Custom Labels by Rustem Feyzkhanov (with code samples). Satellite imagery is becoming a more and more important source of insights with the advent of accessible satellite data from sources such as the Sentinel-2 on Open Data on AWS. In this guide, Rustem shows how you can find agricultural fields with Amazon Rekognition Custom Labels.

Detecting insights from X-ray data with Amazon Rekognition Custom Labels by Olalekan Elesin (with code samples). Learn how to detect anomalies quickly and with low cost and resource investment with Amazon Rekognition Custom Labels.

Building Natural Flower Classifier using Amazon Rekognition Custom Labels by Juv Chan (with code samples). Building a computer vision model from scratch can be daunting task. In this step-by-step guide, you learn how to build a natural flower classifier using the Oxford Flower 102 dataset and Amazon Rekognition Custom Labels.

What’s in my Fridge by Chris Miller and Siaterlis Konstantinos. How many times have you gone to the grocery store and forgot your list, or weren’t sure if you needed to buy milk, beer, or something else? Learn how AWS ML Community members Chris Miller and Siaterlis Konstantinos used Amazon Rekognition Custom Labels and AWS DeepLens to build a fridge inventory checker to let AI do the heavy lifting on your grocery list.

Clean or dirty HVAC? Using Amazon SageMaker and Amazon Rekognition Custom Labels to automate detection by Luca Bianchi. How can you manage 1–3,000 cleanliness checks with zero ML experience or data scientist on staff? Learn how to detect clean and dirty HVACs using Amazon Rekognition Custom Labels and Amazon SageMaker from AWS ML Hero Luca Bianchi.

Conclusion

Getting started with Amazon Rekognition Custom Labels is simple. Learn more with the getting started guide and example use cases.

Whether you’re just getting started with ML, already an expert, or something in between, there is always something to learn. Choose from community-created and ML-focused blogs, videos, eLearning guides, and much more from the AWS ML community.

Are you interested in contributing to the community? Apply to the AWS Community Builders program.

 

The content and opinions in the preceding linked posts are those of the third-party authors and AWS is not responsible for the content or accuracy of those posts.


About the Author

Cameron Peron is Senior Marketing Manager for AWS Amazon Rekognition and the AWS AI/ML community. He evangelizes how AI/ML innovation solves complex challenges facing community, enterprise, and startups alike. Out of the office, he enjoys staying active with kettlebell-sport, spending time with his family and friends, and is an avid fan of Euro-league basketball.

Read More

Using container images to run TensorFlow models in AWS Lambda

TensorFlow is an open-source machine learning (ML) library widely used to develop neural networks and ML models. Those models are usually trained on multiple GPU instances to speed up training, resulting in expensive training time and model sizes up to a few gigabytes. After they’re trained, these models are deployed in production to produce inferences. They can be synchronous, asynchronous, or batch-based workloads. Those endpoints need to be highly scalable and resilient in order to process from zero to millions of requests. This is where AWS Lambda can be a compelling compute service for scalable, cost-effective, and reliable synchronous and asynchronous ML inferencing. Lambda offers benefits such as automatic scaling, reduced operational overhead, and pay-per-inference billing.

This post shows you how to use any TensorFlow model with Lambda for scalable inferences in production with up to 10 GB of memory. This allows us to use ML models in Lambda functions up to a few gigabytes. For this post, we use TensorFlow-Keras pre-trained ResNet50 for image classification.

Overview of solution

Lambda is a serverless compute service that lets you run code without provisioning or managing servers. Lambda automatically scales your application by running code in response to every event, allowing event-driven architectures and solutions. The code runs in parallel and processes each event individually, scaling with the size of the workload, from a few requests per day to hundreds of thousands of workloads. The following diagram illustrates the architecture of our solution.

The following diagram illustrates the architecture of our solution.

You can package your code and dependencies as a container image using tools such as the Docker CLI. The maximum container size is 10 GB. After the model for inference is Dockerized, you can upload the image to Amazon Elastic Container Registry (Amazon ECR). You can then create the Lambda function from the container imaged stored in Amazon ECR.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Implementing the solution

We use a pre-trained model from the TensorFlow Hub for image classification. When an image is uploaded to an Amazon Simple Storage Service (Amazon S3) bucket, a Lambda function is invoked to detect the image and print it to the Amazon CloudWatch logs. The following diagram illustrates this workflow.

he following diagram illustrates this workflow.

To implement the solution, complete the following steps:

  1. On your local machine, create a folder with the name lambda-tensorflow-example.
  2. Create a requirements.txt file in that directory.
  3. Add all the needed libraries for your ML model. For this post, we use TensorFlow 2.4.
  4. Create an app.py script that contains the code for the Lambda function.
  5. Create a Dockerfile in the same directory.

The following text is an example of the requirements.txt file to run TensorFlow code for our use case:

# List all python libraries for the lambda
tensorflow==2.4.0
tensorflow_hub==0.11

The Python code is placed in app.py. The inference function in app.py needs to follow a specific structure to be invoked by the Lambda runtime. For more information about handlers for Lambda, see AWS Lambda function handler in Python. See the following code:

import json
import boto3
import numpy as np
import PIL.Image as Image

import tensorflow as tf
import tensorflow_hub as hub

IMAGE_WIDTH = 224
IMAGE_HEIGHT = 224

IMAGE_SHAPE = (IMAGE_WIDTH, IMAGE_HEIGHT)
model = tf.keras.Sequential([hub.KerasLayer("model/")])
model.build([None, IMAGE_WIDTH, IMAGE_HEIGHT, 3])

imagenet_labels= np.array(open('model/ImageNetLabels.txt').read().splitlines())
s3 = boto3.resource('s3')

def lambda_handler(event, context):
  bucket_name = event['Records'][0]['s3']['bucket']['name']
  key = event['Records'][0]['s3']['object']['key']

  img = readImageFromBucket(key, bucket_name).resize(IMAGE_SHAPE)
  img = np.array(img)/255.0

  prediction = model.predict(img[np.newaxis, ...])
  predicted_class = imagenet_labels[np.argmax(prediction[0], axis=-1)]

  print('ImageName: {0}, Prediction: {1}'.format(key, predicted_class))

def readImageFromBucket(key, bucket_name):
  bucket = s3.Bucket(bucket_name)
  object = bucket.Object(key)
  response = object.get()
  return Image.open(response['Body'])

The following Dockerfile for Python 3.8 uses the AWS provided open-source base images that can be used to create container images. The base images are preloaded with language runtimes and other components required to run a container image on Lambda.

# Pull the base image with python 3.8 as a runtime for your Lambda
FROM public.ecr.aws/lambda/python:3.8

# Install OS packages for Pillow-SIMD
RUN yum -y install tar gzip zlib freetype-devel 
    gcc 
    ghostscript 
    lcms2-devel 
    libffi-devel 
    libimagequant-devel 
    libjpeg-devel 
    libraqm-devel 
    libtiff-devel 
    libwebp-devel 
    make 
    openjpeg2-devel 
    rh-python36 
    rh-python36-python-virtualenv 
    sudo 
    tcl-devel 
    tk-devel 
    tkinter 
    which 
    xorg-x11-server-Xvfb 
    zlib-devel 
    && yum clean all

# Copy the earlier created requirements.txt file to the container
COPY requirements.txt ./

# Install the python requirements from requirements.txt
RUN python3.8 -m pip install -r requirements.txt
# Replace Pillow with Pillow-SIMD to take advantage of AVX2
RUN pip uninstall -y pillow && CC="cc -mavx2" pip install -U --force-reinstall pillow-simd

# Copy the earlier created app.py file to the container
COPY app.py ./

# Download ResNet50 and store it in a directory
RUN mkdir model
RUN curl -L https://tfhub.dev/google/imagenet/resnet_v1_50/classification/4?tf-hub-format=compressed -o ./model/resnet.tar.gz
RUN tar -xf model/resnet.tar.gz -C model/
RUN rm -r model/resnet.tar.gz

# Download ImageNet labels
RUN curl https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt -o ./model/ImageNetLabels.txt

# Set the CMD to your handler
CMD ["app.lambda_handler"]

Your folder structure should look like the following screenshot.

Your folder structure should look like the following screenshot.

You can build and push the container image to Amazon ECR with the following bash commands. Replace the <AWS_ACCOUNT_ID> with your own AWS account ID and also specify a <REGION>.

# Build the docker image
docker build -t  lambda-tensorflow-example .

# Create a ECR repository
aws ecr create-repository --repository-name lambda-tensorflow-example --image-scanning-configuration scanOnPush=true --region <REGION>

# Tag the image to match the repository name
docker tag lambda-tensorflow-example:latest <AWS_ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/lambda-tensorflow-example:latest

# Register docker to ECR
aws ecr get-login-password --region <REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com

# Push the image to ECR
docker push <AWS_ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/lambda-tensorflow-example:latest

If you want to test your model inference locally, the base images for Lambda include a Runtime Interface Emulator (RIE) that allows you to also locally test your Lambda function packaged as a container image to speed up the development cycles.

Creating an S3 bucket

As a next step, we create an S3 bucket to store the images used to predict the image class.

  1. On the Amazon S3 console, choose Create bucket.
  2. Give the S3 bucket a name, such as tensorflow-images-for-inference-<Random_String> and replace the <Random_String> with a random value.
  3. Choose Create bucket.

Creating the Lambda function with the TensorFlow code

To create your Lambda function, complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Choose Create function.
  3. Select Container image.
  4. For Function name, enter a name, such as tensorflow-endpoint.
  5. For Container image URI, enter the earlier created lambda-tensorflow-example repository.

  1. Choose Browse images to choose the latest image.
  2. Click Create function to initialize the creation of it.
  3. To improve the Lambda runtime, increase the function memory to at least 6 GB and timeout to 5 minutes in the Basic settings.

For more information about function memory and timeout settings, see New for AWS Lambda – Functions with Up to 10 GB of Memory and 6 vCPUs.

Connecting the S3 bucket to your Lambda function

After the successful creation of the Lambda function, we need to add a trigger to it so that whenever a file is uploaded to the S3 bucket, the function is invoked.

  1. On the Lambda console, choose your function.
  2. Choose Add trigger.

Choose Add trigger.

  1. Choose S3.
  2. For Bucket, choose the bucket you created earlier.

For Bucket, choose the bucket you created earlier.

After the trigger is added, you need to allow the Lambda function to connect to the S3 bucket by setting the appropriate AWS Identity and Access Management (IAM) rights for its execution role.

  1. On the Permissions tab for your function, choose the IAM role.
  2. Choose Attach policies.
  3. Search for AmazonS3ReadOnlyAccess and attach it to the IAM role.

Now you have configured all the necessary services to test your function. Upload a JPG image to the created S3 bucket by opening the bucket in the AWS management console and clicking Upload. After a few seconds, you can see the result of the prediction in the CloudWatch logs. As a follow-up step, you could store the predictions in an Amazon DynamoDB table.

After uploading a JPG picture to the S3 bucket we will get the predicted image class as a result printed to CloudWatch. The Lambda function will be triggered by EventBridge and pull the image from the bucket. As an example, we are going to use the picture of this parrot to get predicted by our inference endpoint.

In the CloudWatch logs the predicted class is printed. Indeed, the model predicts the correct class for the picture (macaw):

Performance

In order to achieve optimal performance, you can try various levels of memory setting (which linearly changes the assigned vCPU, to learn more, read this AWS News Blog). In the case of our deployed model, we realize most performance gains at about 3GB – 4GB (~2vCPUs) setting and gains beyond that are relatively low. Different models see different level of performance improvement by increased amount of CPU so it is best to determine this experimentally for your own model. Additionally, it is highly recommended that you compile your source code to take advantage of Advanced Vector Extensions 2 (AVX2) on Lambda that further increases the performance by allowing vCPUs to run higher number of integer and floating-point operations per clock cycle.

Conclusion

Container image support for Lambda allows you to customize your function even more, opening up a lot of new use cases for serverless ML. You can bring your custom models and deploy them on Lambda using up to 10 GB for the container image size. For smaller models that don’t need much computing power, you can perform online training and inference purely in Lambda. When the model size increases, cold start issues become more and more important and need to be mitigated. There is also no restriction on the framework or language with container images; other ML frameworks such as PyTorch, Apache MXNet, XGBoost, or Scikit-learn can be used as well!

If you do require GPU for your inference, you can consider using containers services such as Amazon Elastic Container Service (Amazon ECS), Kubernetes, or deploy the model to an Amazon SageMaker endpoint.


About the Author

Jan Bauer is a Cloud Application Developer at AWS Professional Services. His interests are serverless computing, machine learning, and everything that involves cloud computing.

Read More