New approach corrects for cases when average improvements are accompanied by specific regressions.Read More
Enghouse EspialTV enables TV accessibility with Amazon Polly
This is a guest post by Mick McCluskey, the VP of Product Management at Enghouse EspialTV. Enghouse provides software solutions that power digital transformation for communications service operators. EspialTV is an Enghouse SaaS solution that transforms the delivery of TV services for these operators across Set Top Boxes (STBs), media players, and mobile devices.
A large audience of consumers use TV services, and several of these groups may have disabilities that make it more difficult for them to access these services. To ensure that TV services are accessible to the broadest possible audience, we need to consider accessibility as a key element of the user experience (UX) for the service. Additionally, because TV is viewed as a key service by governments, it’s often subject to regulatory requirements for accessibility, including talking interfaces for the visually impaired. In the US, the Twenty-First Century Communications and Video Accessibility Act (CVAA) mandates improved accessibility for visual interfaces for users with limited hearing and vision in the US. The CVAA ensures accessibility laws from the 1980s and 1990s are brought up to date with modern technologies, including new digital, broadband, and mobile innovations.
This post describes how Enghouse uses Amazon Polly to significantly improve accessibility for EspialTV through talking interactive menu guides for visually impaired users while meeting regulatory requirements.
Challenges
A key challenge for visually impaired users is navigating TV menus to find the content they want to view. Most TV menus are designed for a 10-foot viewing experience, meaning that a consumer sitting 10 feet from the screen can easily see the menu items. For the visually impaired, these menu items aren’t easy to see and are therefore hard to navigate. To improve our UX for subscribers with limited vision, we sought to develop a mechanism to provide audible descriptions of the menu, allowing easier navigation of key functions such as the following:
- Channel and program selection
- Channel and program information
- Setup configuration, closed-caption control and options, and video description control
- Configuration information
- Playback
Overview of the AWS talking menu solution
Hosted on AWS, EspialTV is offered to communications service providers in a software as a service (SaaS) model. It was important for Enghouse to have a solution that not only supported the navigation currently offered at the time of launch, but was highly flexible to support changes and enhancements over time. This way, the voice assistance continuously evolved and improved to accommodate new capabilities as new services and features were added to the menu. For this reason, the solution had to be driven by real-time APIs calls as opposed to hardcoded text-to-speech menu configurations.
To ensure CVAA compliance and accelerate deployment, Enghouse chose to use Amazon Polly to implement this talking menu solution for the following reasons:
- We wanted a reliable and robust solution within minimal operational and management overhead
- It permitted faster time to market by using ready-made text-to-speech APIs
- The real-time API approach offered greater flexibility as we evolved the service over time
The following diagram illustrates the architecture of the talking menu solution.
Using the Amazon Polly text-to-speech API allowed us to build a simple solution that integrated with our current infrastructure and followed this flow:
- Steps 1 and 2 – When TV users open the menu guide service, the client software running on the Set Top Box (STB) makes a call via the internet or Data Over Cable Service Interface Specification (DOCSIS) cable modem, which is routed through the cable operators headend server to the Espial Guide service running on the AWS Cloud.
- Step 3 – As TV users interact with the menu guide on the STBs, the client software running on the STBs sends the string containing the specific menu description highlighted by the customer.
- Step 4 – The cable operators headend server routes the request to a local cache to verify whether the requested string’s text-to-speech is cached locally. If it is, the corresponding text-to-speech is sent back to the STB to be read out loud to the TV user.
- Step 5 – Each unique cable operator has a local cache. If the requested string isn’t cached locally in the cable operator’s environment, the requested string is sent to the EspialTV service in AWS, where it’s met by a secondary caching server to respond to the request. This secondary layer of caching hosted in the Espial environment ensures high availability and increases cache hit rates. For example, if the caching servers on the cable operator environment is unavailable, the cache request can be resolved by the secondary caching system hosted in the Espial environment.
- Steps 6 and 7 – If the requested string isn’t found in the caching server in the EspialTV service, it’s routed to the Amazon Polly API to be converted to text-to-speech, which is routed back to the cable operator headend server and then to the TV user’s STB to be read out loud to the user.
This architecture has several key considerations. Firstly, there are several layers of caching implemented to minimize latency for the end user. This also supports the spikey nature of this workload to ensure that only requests not found in the respective caches are made to Amazon Polly.
The ready-made text-to-speech APIs provided by Amazon Polly enables us able to implement the service with just one engineer. We also reduced the expected delivery time by 75% compared to our estimates for building an in-house custom solution. The Amazon Polly documentation was very clear, and the ramp-up time was limited. Since implementation, this solution is reliably supporting 40 cable operators, which each have between 1,000–100,000 STBs.
Conclusion
EspialTV offers operators a TV solution that provides fast time to revenue, low startup costs, and scalability from small to very large operators. EspialTV offers providers and consumers a compelling and always relevant experience for their TV services. With Amazon Polly, we have ensured operators can offer a TV service to the broadest possible range of consumers and align with regulatory requirements for accessibility. To learn more about Amazon Polly, visit the product page.
The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.
About the Author
Mick McCluskey is VP of Product Management at Enghouse, a leading provider of software solutions helping operators use digital transformation to drive profitability in fast-changing and emerging markets. In the area of video solutions, Mick has been pivotal in creating the EspialTV solution—a truly disruptive TVaaS solution run on the AWS Cloud that permits pay TV operators to manage transition while maintaining profitability in a rapidly changing market. He is currently working on solutions that help operators take advantage of key technology and industry trends like OTT video, IoT, and 5G. In addition to delivering cloud-based solutions, he continues his journey of learning how to play golf.
Upgrade your Amazon Polly voices to neural with one line of code
In 2019, Amazon Polly launched neural text-to-speech (NTTS) voices in US English and UK English. Neural voices use machine learning and provide a richer, more lifelike speech quality. Since the initial launch of NTTS, Amazon Polly has extended its neural offering by adding new voices in US Spanish, Brazilian Portuguese, Australian English, Canadian French, German and Korean. Some of them also are available in a Newscaster speaking style tailored to the specific needs of publishers.
If you’ve been using the standard voices in Amazon Polly, upgrading to neural voices is easy. No matter which programming language you use, the upgrade process only requires a simple addition or modification of the Engine parameter wherever you use the SynthesizeSpeech
and StartSynthesizeSpeechTask
method in your code. In this post, you’ll learn about the benefits of neural voices and how to migrate your voices to NTTS.
Benefits of neural vs. standard
Because neural voices provide a more expressive, natural-sounding quality than standard, migrating to neural improves the user experience and boosts engagement.
“We rely on speech synthesis to drive dynamic narrations for our educational content,” says Paul S. Ziegler, Chief Executive Officer at Reflare. “The switch from Amazon Polly’s standard to neural voices has allowed us to create narrations that are so good as to consistently be indistinguishable from human speech to non-native speakers and to occasionally even fool native speakers.”
The following is an example of Joanna’s standard voice.
The following is an example of the same words, but using Joanna’s neural voice.
“Switching to neural voices is as easy as switching to other non-neural voices,” Ziegler says. “Since our systems were already set up to automatically generate voiceovers on the fly, implementing the changes took less than 5 minutes.”
Quick migration checklist
Not all SSML tags, Regions, and languages support neural voices. Before making the switch, use this checklist to verify that NTTS is available for your specific business needs:
- Regional support – Verify that you’re making requests in Regions that support NTTS
- Language and voice support – Verify that you’re making requests to voices and languages that support NTTS by checking the current list of voices and languages
- SSML tag support – Verify that the SSML tags in your requests are supported by NTTS by checking SSML tag compatibility
Additional considerations
The following table summarizes additional considerations before you switch to NTTS.
Standard | Neural | |
Cost | $4 per million characters | $16 per million characters |
Free Tier | 5 million characters per month | 1 million characters per month |
Default Sample Rate | 22 kHz | 24 kHz |
Usage Quota | Quotas in Amazon Polly |
Code samples
If you’re already using Amazon Polly standard, the following samples demonstrate how to switch to neural for all SDKs. The required change is highlighted in bold.
Go:
input := &polly.SynthesizeSpeechInput{
OutputFormat: aws.String("mp3"),
Text: aws.String(“Hello World!”),
VoiceId: aws.String("Joanna"),
Engine: “neural”}
Java:
SynthesizeSpeechRequest synthReq = SynthesizeSpeechRequest.builder()
.text('Hello World!')
.voiceId('Joanna')
.outputFormat('mp3')
.engine('neural')
.build();
ResponseInputStream<SynthesizeSpeechResponse> synthRes = polly.synthesizeSpeech(synthReq);
Javascript:
polly.synthesizeSpeech({
Text: “Hello World!”,
OutputFormat: "mp3",
VoiceId: "Joanna",
TextType: "text",
Engine: “neural”});
.NET:
var response = client.SynthesizeSpeech(new SynthesizeSpeechRequest
{
Text = "Hello World!",
OutputFormat = "mp3",
VoiceId = "Joanna"
Engine = “neural”
});
PHP:
$result = $client->synthesizeSpeech([
'Text' => ‘Hello world!’,
'OutputFormat' => ‘mp3,
'VoiceId' => ‘Joanna’,
'Engine' => ‘neural’]);
Python:
polly.synthesize_speech(
Text="Hello world!",
OutputFormat="mp3",
VoiceId="Joanna",
Engine=”neural”)
Ruby:
resp = polly.synthesize_speech({
text: “Hello World!”,
output_format: "mp3",
voice_id: "Joanna",
engine: “neural”
})
Conclusion
You can start playing with neural voices immediately on the Amazon Polly console. If you have any questions or concerns, please post it to the AWS Forum for Amazon Polly, or contact your AWS Support team.
About the Author
Marta Smolarek is a Senior Program Manager in the Amazon Text-to-Speech team. Outside of work, she loves to go camping with her family
Extend Amazon SageMaker Pipelines to include custom steps using callback steps
Launched at AWS re:Invent 2020, Amazon SageMaker Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.
You can extend your pipelines to include steps for tasks performed outside of Amazon SageMaker by taking advantage of custom callback steps. This feature lets you include tasks that are performed using other AWS services, third parties, or tasks run outside AWS. Before the launch of this feature, steps within a pipeline were limited to the supported native SageMaker steps. With the launch of this new feature, you can use the new CallbackStep to generate a token and add a message to an Amazon Simple Queue Service (Amazon SQS) queue. The message on the SQS queue triggers a task outside of the currently supported native steps. When that task is complete, you can call the new SendStepSuccess
API with the generated token to signal that the callback step and corresponding tasks are finished and the pipeline run can continue.
In this post, we demonstrate how to use CallbackStep
to perform data preprocessing using AWS Glue. We use an Apache Spark job to prepare NYC taxi data for ML training. The raw data has one row per taxi trip, and shows information like the trip duration, number of passengers, and trip cost. To train an anomaly detection model, we want to transform the raw data into a count of the number of passengers that took taxi rides over 30-minute intervals.
Although we could run this specific Spark job in SageMaker Processing, we use AWS Glue for this post. In some cases, we may need capabilities that Amazon EMR or AWS Glue offer, like support for Hive queries or integration with the AWS Glue metadata catalog, so we demonstrate how to invoke AWS Glue from the pipeline.
Solution overview
The pipeline step that launches the AWS Glue job sends a message to an SQS queue. The message contains the callback token we need to send success or failure information back to the pipeline. This callback token triggers the next step in the pipeline. When handling this message, we need a handler that can launch the AWS Glue job and reliably check for job status until the job completes. We have to keep in mind that a Spark job can easily take longer than 15 minutes (the maximum duration of a single AWS Lambda function invocation), and the Spark job itself could fail for a number of reasons. That last point is worth emphasizing: in most Apache Spark runtimes, the job code itself runs in transient containers under the control of a coordinator like Apache YARN. We can’t add custom code to YARN, so we need something outside the job to check for completion.
We can accomplish this task several ways:
- Have a Lambda function launch a container task that creates the AWS Glue job and polls for job completion, then sends the callback back to the pipeline
- Have a Lambda function send a work notification to another SQS queue, with a separate Lambda function that picks up the message, checks for job status, and requeues the message if the job isn’t complete
- Use AWS Glue job event notifications to respond to job status events sent by AWS Glue
For this post, we use the first technique because it’s the simplest (but likely not the most efficient). For this, we build out the solution as shown in the following diagram.
The solution is one example of how to use the new CallbackStep
to extend your pipeline to steps outside SageMaker (such as AWS Glue). You can apply the same general steps and architectural guidance to extend pipelines to other custom processes or tasks. In our solution, the pipeline runs the following tasks:
Data preprocessing –
- This step (Step 1 in the preceding diagram) uses
CallbackStep
to send a generated token and defined input payload to the configured SQS queue (2). In this example, the input sent to the SQS queue is the Amazon Simple Storage Service (Amazon S3) locations of the input data and the step output training data.- The new message in the SQS queue triggers a Lambda function (3) that is responsible for running an AWS Fargate task with Amazon Elastic Container Service (Amazon ECS) (4).
- The Fargate task runs using a container image that is configured to run a task. The task in this case is an AWS Glue job (5) used to transform your raw data into training data stored in Amazon S3 (6). This task is also responsible for sending a callback message that signals either the job’s success or failure.
- Model training – This step (7) runs when the previous callback step has completed successfully. It uses the generated training data to train a model using a SageMaker training job and the Random Cut Forest algorithm.
- Package model – After the model is successfully trained, the model is packaged for deployment (8).
- Deploy model – In this final step (9), the model is deployed using a batch transform job.
These pipeline steps are just examples; you can modify the pipeline to meet your use case, such as adding steps to register the model in the SageMaker Model Registry.
In the next sections, we discuss how to set up this solution.
Prerequisites
For the preceding pipeline, you need the prerequisites outlined in this section. The detailed setup of each of these prerequisites is available in the supporting notebook.
Notebook dependencies
To run the provided notebook, you need the following:
- Studio environment – Callback steps were introduced in the SageMaker Python SDK v2.45.0 and Amazon SageMaker Studio v3.6.2. Update Studio if needed before you run the sample notebook.
- Studio Image Build CLI – Because we build a container image for the Fargate task from within the notebook, a step is included to install the Studio Image Build CLI. For more information about the CLI dependencies, see Using the Amazon SageMaker Studio Image Build CLI to build container images from your Studio notebooks.
Pipeline dependencies
Your pipeline uses the following services:
- SQS message queue – The callback step requires an SQS queue to trigger a task. For this, you need to create an SQS queue and ensure that AWS Identity and Access Management (IAM) permissions are in place that allow SageMaker to put a message in the queue and allow Lambda to poll the queue for new messages. See the following code:
sqs_client = boto3.client('sqs')
queue_url = ''
queue_name = 'pipeline_callbacks_glue_prep'
try:
response = sqs_client.create_queue(QueueName=queue_name)
except:
print(f"Failed to create queue")
- Lambda function: The function is triggered by new messages put to the SQS queue. The function consumes these new messages and starts the ECS Fargate task. In this case, the Lambda execution IAM role needs permissions to pull messages from Amazon SQS, notify SageMaker of potential failures, and run the Amazon ECS task. For this solution, the function starts a task on ECS Fargate using the following code:
%%writefile queue_handler.py
import json
import boto3
import os
import traceback
ecs = boto3.client('ecs')
sagemaker = boto3.client('sagemaker')
def handler(event, context):
print(f"Got event: {json.dumps(event)}")
cluster_arn = os.environ["cluster_arn"]
task_arn = os.environ["task_arn"]
task_subnets = os.environ["task_subnets"]
task_sgs = os.environ["task_sgs"]
glue_job_name = os.environ["glue_job_name"]
print(f"Cluster ARN: {cluster_arn}")
print(f"Task ARN: {task_arn}")
print(f"Task Subnets: {task_subnets}")
print(f"Task SG: {task_sgs}")
print(f"Glue job name: {glue_job_name}")
for record in event['Records']:
payload = json.loads(record["body"])
print(f"Processing record {payload}")
token = payload["token"]
print(f"Got token {token}")
try:
input_data_s3_uri = payload["arguments"]["input_location"]
output_data_s3_uri = payload["arguments"]["output_location"]
print(f"Got input_data_s3_uri {input_data_s3_uri}")
print(f"Got output_data_s3_uri {output_data_s3_uri}")
response = ecs.run_task(
cluster = cluster_arn,
count=1,
launchType='FARGATE',
taskDefinition=task_arn,
networkConfiguration={
'awsvpcConfiguration': {
'subnets': task_subnets.split(','),
'securityGroups': task_sgs.split(','),
'assignPublicIp': 'ENABLED'
}
},
overrides={
'containerOverrides': [
{
'name': 'FargateTask',
'environment': [
{
'name': 'inputLocation',
'value': input_data_s3_uri
},
{
'name': 'outputLocation',
'value': output_data_s3_uri
},
{
'name': 'token',
'value': token
},
{
'name': 'glue_job_name',
'value': glue_job_name
}
]
}
]
}
)
if 'failures' in response and len(response['failures']) > 0:
f = response['failures'][0]
print(f"Failed to launch task for token {token}: {f['reason']}")
sagemaker.send_step_failure(
CallbackToken=token,
FailureReason = f['reason']
)
else:
print(f"Launched task {response['tasks'][0]['taskArn']}")
except Exception as e:
trc = traceback.format_exc()
print(f"Error handling record: {str(e)}:m {trc}")
sagemaker.send_step_failure(
CallbackToken=token,
FailureReason = e
)
- After we create the SQS queue and Lambda function, we need to set up the function as an SQS target so that when new messages are placed in the queue, the function is automatically triggered:
lambda_client.create_event_source_mapping(
EventSourceArn=f'arn:aws:sqs:{region}:{account}:{queue_name}',
FunctionName='SMPipelineQueueHandler',
Enabled=True,
BatchSize=10
)
- Fargate cluster – Because we use Amazon ECS to run and monitor the status of the AWS Glue job, we need to ensure we have an ECS Fargate cluster running:
import boto3
ecs = boto3.client('ecs')
response = ecs.create_cluster(clusterName='FargateTaskRunner')
- Fargate task: We also need to create a container image with the code (task.py) that starts the data preprocessing job on AWS Glue and reports the status back to the pipeline upon the success or failure of that task. The IAM role attached to the task must include permissions that allow the task to pull images from Amazon ECR, create logs in Amazon CloudWatch, start and monitor an AWS Glue job, and send the callback token when the task is complete. When we issue
send_pipeline_execution_step_success
back to the pipeline, we also indicate the output file with the prepared training data. We use the output parameter in the model training step in the pipeline. The following is the code fortask.py
:
import boto3
import os
import sys
import traceback
import time
if 'inputLocation' in os.environ:
input_uri = os.environ['inputLocation']
else:
print("inputLocation not found in environment")
sys.exit(1)
if 'outputLocation' in os.environ:
output_uri = os.environ['outputLocation']
else:
print("outputLocation not found in environment")
sys.exit(1)
if 'token' in os.environ:
token = os.environ['token']
else:
print("token not found in environment")
sys.exit(1)
if 'glue_job_name' in os.environ:
glue_job_name = os.environ['glue_job_name']
else:
print("glue_job_name not found in environment")
sys.exit(1)
print(f"Processing from {input_uri} to {output_uri} using callback token {token}")
sagemaker = boto3.client('sagemaker')
glue = boto3.client('glue')
poll_interval = 60
try:
t1 = time.time()
response = glue.start_job_run(
JobName=glue_job_name,
Arguments={
'--output_uri': output_uri,
'--input_uri': input_uri
}
)
job_run_id = response['JobRunId']
print(f"Starting job {job_run_id}")
job_status = 'STARTING'
job_error = ''
while job_status in ['STARTING','RUNNING','STOPPING']:
time.sleep(poll_interval)
response = glue.get_job_run(
JobName=glue_job_name,
RunId=job_run_id,
PredecessorsIncluded=False
)
job_status = response['JobRun']['JobRunState']
if 'ErrorMessage' in response['JobRun']:
job_error = response['JobRun']['ErrorMessage']
print(f"Job is in state {job_status}")
t2 = time.time()
total_time = (t2 - t1) / 60.0
if job_status == 'SUCCEEDED':
print("Job succeeded")
sagemaker.send_pipeline_execution_step_success(
CallbackToken=token,
OutputParameters=[
{
'Name': 'minutes',
'Value': str(total_time)
},
{
'Name': 's3_data_out',
'Value': str(output_uri),
}
]
)
else:
print(f"Job failed: {job_error}")
sagemaker.send_pipeline_execution_step_failure(
CallbackToken=token,
FailureReason = job_error
)
except Exception as e:
trc = traceback.format_exc()
print(f"Error running ETL job: {str(e)}:m {trc}")
sagemaker.send_pipeline_execution_step_failure(
CallbackToken=token,
FailureReason = str(e)
)
- Data preprocessing code – The pipeline callback step does the actual data preprocessing using a PySpark job running in AWS Glue, so we need to create the code that is used to transform the data:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_uri', 'output_uri'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.format("csv").option("header", "true").load("{0}*.csv".format(args['input_uri']))
df = df.withColumn("Passengers", df["passenger_count"].cast(IntegerType()))
df = df.withColumn(
'pickup_time',
F.to_timestamp(
F.unix_timestamp('tpep_pickup_datetime', 'yyyy-MM-dd HH:mm:ss').cast('timestamp')))
dfW = df.groupBy(F.window("pickup_time", "30 minutes")).agg(F.sum("Passengers").alias("passenger"))
dfOut = dfW.drop('window')
dfOut.repartition(1).write.option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv(args['output_uri'])
job.commit()
- Data preprocessing job – We need to also configure the AWS Glue job that runs the preceding code when triggered by your Fargate task. The IAM role used must have permissions to read and write from the S3 bucket. See the following code:
glue = boto3.client('glue')
response = glue.create_job(
Name='GlueDataPrepForPipeline',
Description='Prepare data for SageMaker training',
Role=glue_role_arn,
ExecutionProperty={
'MaxConcurrentRuns': 1
},
Command={
'Name': 'glueetl',
'ScriptLocation': glue_script_location,
},
MaxRetries=0,
Timeout=60,
MaxCapacity=10.0,
GlueVersion='2.0'
)
glue_job_name = response['Name']
After these prerequisites are in place, including the necessary IAM permissions outlined in the example notebook, we’re ready to configure and run the pipeline.
Configure the pipeline
To build out the pipeline, we rely on the preceding prerequisites in the callback step that perform data processing. We also combine that with steps native to SageMaker for model training and deployment to create an end-to-end pipeline.
To configure the pipeline, complete the following steps:
- Initialize the pipeline parameters:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
)
input_data = ParameterString(
name="InputData",
default_value=f"s3://{default_bucket}/{taxi_prefix}/"
)
id_out = ParameterString(
name="IdOut",
default_value="taxiout"+ str(timestamp)
)
output_data = ParameterString(
name="OutputData",
default_value=f"s3://{default_bucket}/{taxi_prefix}_output/"
)
training_instance_count = ParameterInteger(
name="TrainingInstanceCount",
default_value=1
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.c5.xlarge"
)
- Configure the first step in the pipeline, which is
CallbackStep
.
This step uses the SQS queue created in the prerequisites in combination with arguments that are used by tasks in this step. These arguments include the inputs of the Amazon S3 location of the input (raw taxi data) and output training data. The step also defines the outputs, which in this case includes the callback output and Amazon S3 location of the training data. The outputs become the inputs to the next step in the pipeline. See the following code:
from sagemaker.workflow.callback_step import CallbackStep,CallbackOutput,CallbackOutputTypeEnum
callback1_output=CallbackOutput(output_name="s3_data_out", output_type=CallbackOutputTypeEnum.String)
step_callback_data = CallbackStep(
name="GluePrepCallbackStep",
sqs_queue_url=queue_url,
inputs={
"input_location": f"s3://{default_bucket}/{taxi_prefix}/",
"output_location": f"s3://{default_bucket}/{taxi_prefix}_{id_out}/"
},
outputs=[
callback1_output
],
)
- We use
TrainingStep
to train a model using the Random Cut Forest algorithm.
We first need to configure an estimator, then we configure the actual pipeline step. This step takes the output of the previous step and Amazon S3 location of the training data created by AWS Glue as input to train the model. See the following code:
containers = {
'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest',
'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest',
'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest',
'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest'}
region_name = boto3.Session().region_name
container = containers[region_name]
model_prefix = 'model'
session = sagemaker.Session()
rcf = sagemaker.estimator.Estimator(
container,
sagemaker.get_execution_role(),
output_path='s3://{}/{}/output'.format(default_bucket, model_prefix),
instance_count=training_instance_count,
instance_type=training_instance_type,
sagemaker_session=session)
rcf.set_hyperparameters(
num_samples_per_tree=200,
num_trees=50,
feature_dim=1)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="TrainModel",
estimator=rcf,
inputs={
"train": TrainingInput(
#s3_data = Output of the previous call back
steps3_data=step_callback_data.properties.Outputs['s3_data_out'],
content_type="text/csv;label_size=0",
distribution='ShardedByS3Key'
),
},
)
- We use
CreateModelStep
to package the model for SageMaker deployment:
from sagemaker.model import Model
from sagemaker import get_execution_role
role = get_execution_role()
image_uri = sagemaker.image_uris.retrieve("randomcutforest", region)
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep
inputs = CreateModelInput(
instance_type="ml.m5.large",
)
create_model = CreateModelStep(
name="TaxiModel",
model=model,
inputs=inputs,
)
- We deploy the trained model using a SageMaker batch transform job using
TransformStep
.
This step loads the trained model and processes the prediction request data stored in Amazon S3, then outputs the results (anomaly scores in this case) to the specified Amazon S3 location. See the following code:
base_uri = step_callback_data.properties.Outputs['s3_data_out']
output_prefix = 'batch-out'
from sagemaker.transformer import Transformer
transformer = Transformer(
model_name=create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
assemble_with = "Line",
accept = 'text/csv',
instance_count=1,
output_path=f"s3://{default_bucket}/{output_prefix}/",
)
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
batch_data=step_callback_data.properties.Outputs['s3_data_out']
step_transform = TransformStep(
name="TaxiTransform",
transformer=transformer,
inputs=TransformInput(data=batch_data,content_type="text/csv",split_type="Line",input_filter="$[0]",join_source='Input',output_filter='$[0,-1]')
)
Create and run the pipeline
You’re now ready to create and run the pipeline. To do this, complete the following steps:
- Define the pipeline including the parameters accepted and steps:
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"GluePipeline-{id_out}"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
input_data,
training_instance_type,
training_instance_count,
id_out,
],
steps=[step_callback_data, step_train,create_model,step_transform],
)
- Submit the pipeline definition to create the pipeline using the role that is used to create all the jobs defined in each step:
from sagemaker import get_execution_role
pipeline.upsert(role_arn = get_execution_role())
- Run the pipeline:
execution = pipeline.start()
You can monitor your pipeline using the SageMaker SDK, execution.list_steps()
, or via the Studio console, as shown in the following screenshot.
Use CallbackStep to integrate other tasks outside of SageMaker
You can follow the same pattern to integrate any long-running tasks or jobs with Pipelines. This may include running AWS Batch jobs, Amazon EMR job flows, or Amazon ECS or Fargate tasks.
You can also implement an email approval step for your models as part of your ML pipeline.
CallbackStep
runs after the model EvaluationStep
and sends an email containing approve or reject links with model metrics to a user. The workflow progresses to the next state after the user approves the task to proceed.
You can implement this pattern using a Lambda function and Amazon Simple Notification Service (Amazon SNS).
Conclusion
In this post, we showed you an example of how to use CallbackStep
in Pipelines to extend your pipelines to integrate an AWS Glue job for data preprocessing. You can follow the same process to integrate any task or job outside of SageMaker. You can walk through the full solution explained in the example notebook.
About the Author
Shelbee Eigenbrode is a Principal AI and Machine Learning Specialist Solutions Architect at Amazon Web Services (AWS). She holds 6 AWS certifications and has been in technology for 23 years spanning multiple industries, technologies, and roles. She is currently focusing on combining her DevOps and ML background to deliver and manage ML workloads at scale. With over 35 patents granted across various technology domains, she has a passion for continuous innovation and using data to drive business outcomes. Shelbee co-founded the Denver chapter of Women in Big Data.
Sofian Hamiti is an AI/ML specialist Solutions Architect at AWS. He helps customers across industries accelerate their AI/ML journey by helping them build and operationalize end-to-end machine learning solutions.
Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.
Payton Staub is a senior engineer with Amazon SageMaker. His current focus includes model building pipelines, experiment management, image management and other tools to help customers productionize and automate machine learning at scale.
Enhancing customer service experiences using Conversational AI: Power your contact center with Amazon Lex and Genesys Cloud
Customers expect personalized contact center experiences. They want easy access to customer support and quick resolution of their issues. Delighting callers with a quick and easy interaction remains central to the customer experience (CX) strategy for support organizations. Enterprises often deploy omni-channel contact centers so that they can provide simple mechanisms for their customers to access customer support. But even with these efforts, callers face long wait times, especially during peak hours, which can lead to lower CSAT scores. In addition, organizations have to manage support costs as their footprint expands. As the customer base grows, operational costs for managing a contact center can rapidly increase.
With Amazon Lex bots, you can use conversational AI capabilities to provide highly engaging and lifelike conversational experiences. Organizations can use Amazon Lex to automate customer service interactions and deliver faster responses to queries. As a result, customer issues are resolved in real time, reducing wait times and driving higher satisfaction. You can use Amazon Lex to handle the most common problems encountered by customers. Furthermore, complex issues that require human intervention can be seamlessly handed over from the Amazon Lex bot to a human agent. Augmenting your contact center operations with Amazon Lex bots provides an enhanced caller experience, while optimizing your operational costs with self-service automation. In addition, you can seamlessly scale your contact center operations on the AWS Cloud as your user base grows.
We’re excited to announce Amazon Lex V2 bot support on the Genesys Cloud platform. With this launch, you can build an Amazon Lex bot and set up your contact center in minutes.
About Amazon Lex V2 APIs and Genesys Cloud
Amazon Lex launched V2 APIs and a new console interface that makes it easier to build, deploy, and manage conversational experiences. The Lex V2 console and API enhancements provide support for multiple languages in a single bot, enables simplified versioning, and provides builder productivity tools. These features provide you more control over the bot building and deployment processes.
Genesys Cloud (an omni-channel orchestration and customer relationship platform) provides a contact center platform in a public cloud model that enables quick and simple integration of AWS Contact Center Intelligence (AWS CCI) to transform the modern contact center from a cost center into a profit center. As part of AWS CCI, Genesys cloud integrates with Amazon Lex, Amazon Polly (text to speech) and Amazon Kendra (intelligent search) to offer self-service conversational AI capabilities.
Key features
Genesys Cloud uses the continuous streaming capability with Amazon Lex V2 APIs to enable advanced IVR conversations. With this integration, you can now enable the following:
- Interruptions (“barge-in”) – Callers can now interrupt the bot and answer a question before the prompt is completed
- Wait and Continue – Callers can instruct the bot to wait if they need time for retrieving additional information during the call (such as a credit card number or booking ID)
- DTMF support – Callers can provide information via speech or DTMF interchangeably
- SSML support – You can configure prompts within the Amazon Lex bot using SSML tags, enabling greater control over speech generation from text
- Configurable timeouts – You can configure how long to wait for the customer to finish speaking before Amazon Lex collects speech input from callers, such as answering a yes/no question, or providing a date or credit card number
Creating the bot
Let’s create a banking bot as an example and integrate with Genesys Cloud for IVR-based interactions. For a step-by-step process to build an Amazon Lex bot, refer to banker bot workshop. You can also download the bot and import it using the Amazon Lex V2 console.
In addition to the intents presented in the workshop, we add a SpeakToAgent
intent to enable handing over the conversation to a human agent based on user requests.
Enabling the integrations
The Amazon Lex V2 integration is available for installation via Genesys AppFoundry. You need an active subscription for premium applications to access the Integration page from the Genesys Cloud Admin dashboard. Genesys also offers a free trial for validation purposes.
1. Configure the IAM role
As invocations for Amazon Lex take place in your AWS environment, you configure an AWS Identity and Access Management (IAM) role with proper permission for Genesys Cloud to assume the role and use resources.
- Create an IAM role and select trusted entity to be Another AWS account.
- Enter the Genesys Cloud production ID
765628985471
in the Account ID field. - As part of the AWS best practices, you should select Require external ID and enter your organization’s ID to prevent the confused deputy problem and enhance integration security.
By default, IAM roles don’t have permission to create or modify AWS resources. For Genesys Cloud to successfully access Amazon Lex bots, a few permissions are required.
- Choose Create Policy and enter the following JSON blob into the policy editor.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "GenesysLexPolicy",
"Effect": "Allow",
"Action": [
"lex:List*",
"lex:Describe*",
"lex:StartConversation",
"lex:Recognize*",
"lex:DeleteSession",
"lex:GetSession",
"lex:PutSession"
],
"Resource": "*"
}
]
}
- Attach the policy to the role created previously.
- Copy the role ARN and configure it within Genesys Cloud.
- Save and set the integration status to activate the bot.
2. Configure Amazon Polly
To use Amazon Lex for a voice bot, you set up the text to speech (TTS) capability. Genesys Cloud supports several TTS engines, including Amazon Polly. You can install and configure the Amazon Polly integration following the Genesys documentation. You can then select Amazon Polly as the engine and configure the voice you prefer. To keep the IVR voice consistent in the call flow, the Amazon Polly voice selected in Genesys Cloud should be the same voice configured in your Lex bot. For additional details, see a list of available voices and the associated characteristics.
3. Configure the Genesys Cloud Architect flow
Create an Inbound Call Flow in Architect to orchestrate your bot interaction. You add a Reusable Tasks and use Call Lex V2 bot action to bring in the Amazon Lex bot and design various actions in the call flow.
The integration also allows Genesys Cloud to capture the preconfigured slots as Architect variables. These variables can be used outside of the bot for use-cases such as application of business rules. For example, if a customer provides an account ID that matches with the VIP customer segment, the call can be routed to the priority support queue when transferring to an agent.
4. Configure graceful escalation
When the automated solution can’t fulfill a customer’s request, the interaction should be escalated gracefully. This fallback process allows a human agent to take over the interaction for more complex tasks.
You can save key information from the prior exchange (such as intents, slots, and conversation transcripts) into a script to provide historical context to the agent so that conversations can be picked up seamlessly. This prevents customers from wasting valuable time to repeat the information provided previously.
In the following example, the call is transferred to an available Tier 1 support agent when a customer asks for more help or to be connected to an agent. You can also collect additional context from the customer and hand off to either another bot or human based on specialty.
5. Test the integrations
You can use the native soft phone in Genesys Cloud to make calls as you would with a desktop phone and validate the integration. Enter the bot’s name in the Enter Names and Numbers field and choose Call to follow the prompts.
Summary
Enterprises increasingly invest in automated solutions such as IVR and chatbots as a part of their customer service strategy in contact centers. Automation provides highly available support that handles common tasks without the presence of a live agent, while reducing operational cost.
With the adoption of the Amazon Lex V2 APIs, Genesys Cloud provides an overall improved user experience using the continuous streaming architecture, and enables a more natural customer-bot interaction.
This post outlines the key steps to enable the Amazon Lex V2 integration in your Genesys Cloud environment, and should give you a jump start to create and customize your own chatbot initiative. Check out the following resources for additional information:
- Navigate to the Amazon Lex V2 console and learn more at Amazon Lex V2 Developer Guide
- Get started with Amazon Lex V2 integration on Genesys Cloud AppFoundry
- Learn more about Genesys Lex V2 integration
- Acquire Genesys Cloud from AWS Marketplace
About the Author
Anubhav Mishra is a Product Manager with AWS. He spends his time understanding customers and designing product experiences to address their business challenges.
Jessica Ho is a Solutions Architect at Amazon Web Services supporting ISV partners who build business applications on AWS. She is passionate about creating differentiated solutions that unlock customers for cloud adoption. Outside of work, she enjoys spoiling her garden into a mini jungle.
USC + Amazon Center on Secure and Trusted Machine Learning selects two PhD fellows
Chaoyang He and Ninareh Mehrabi will focus on federated learning, distributed machine learning and fairness in machine learning.Read More
Simplify and automate anomaly detection in streaming data with Amazon Lookout for Metrics
Do you want to monitor your business metrics and detect anomalies in your existing streaming data pipelines? Amazon Lookout for Metrics is a service that uses machine learning (ML) to detect anomalies in your time series data. The service goes beyond simple anomaly detection. It allows developers to set up autonomous monitoring for important metrics to detect anomalies and identify their root cause in a matter of few clicks, using the same technology used by Amazon internally to detect anomalies in its metrics—all with no ML experience required. However, one limitation you may face if you have an existing Amazon Kinesis Data Streams data pipeline is not being able to directly run anomaly detection on your data streams using Lookout for Metrics. As of this writing, Lookout for Metrics doesn’t have native integration with Kinesis Data Streams to ingest streaming data and run anomaly detection on it.
In this post, we show you how to solve this problem by using an AWS Glue Spark streaming extract, transform, and load (ETL) script to ingest and organize streaming data in Amazon Simple Storage Service (Amazon S3) and using a Lookout for Metrics live detector to detect anomalies. If you have an existing Kinesis Data Streams pipeline that ingests ecommerce data, for example, you can use the solution to detect anomalies such as unexpected dips in revenue, high rates of abandoned shopping carts, increases in new user signups, and many more.
Included in this post is a sample streaming data generator to help you get started quickly. The included GitHub repo provides step-by-step deployment instructions, and uses the AWS Cloud Development Kit (AWS CDK) to simplify and automate the deployment.
Lookout for Metrics allows users to set up anomaly detectors in both continuous and backtest modes. Backtesting allows you to detect anomalies on historical data. This feature is helpful when you want to try out the service on past data or validate against known anomalies that occurred in the past. For this post, we use continuous mode, where you can detect anomalies on live data as they occur. In continuous mode, the detector monitors an input S3 bucket for continuous data and runs anomaly detection on new data at specified time intervals. For the live detector to consume continuous time series data from Amazon S3 correctly, it needs to know where to look for data for the current time interval, therefore, it requires continuous input data in S3 buckets organized by time interval.
Overview of solution
The solution architecture consists of the following components:
- Streaming data generator – To help you get started quickly, we provide Python code that generates sample time series data and writes to a Kinesis data stream at a specified time interval. The provided code generates sample data for an ecommerce schema (
platform
,marketplace
,event_time
,views
,revenue
). You can also use your own data stream and data, but you must update the downstream processes in the architecture to process your schema. - Kinesis Data Streams to Lookout for Metrics – The AWS Glue Spark streaming ETL code is the core component of the solution. It contains logic to do the following:
- Ingest streaming data
- Micro-batch data by time interval
- Filter dimensions and metrics columns
- Deliver filtered data to Amazon S3 grouped by timestamp
- Lookout for Metrics continuous detector – The AWS Glue streaming ETL code writes time series data as CSV files to the S3 bucket, with objects organized by time interval. The Lookout for Metrics continuous detector monitors the S3 bucket for live data and runs anomaly detection at the specified time interval (for example, every 5 minutes). You can view the detected anomalies on the Lookout for Metrics dashboard.
The following diagram illustrates the solution architecture.
AWS Glue Spark streaming ETL script
The main component of the solution is the AWS Glue serverless streaming ETL script. The script contains the logic to ingest the streaming data and write the output, grouped by time interval, to an S3 bucket. This makes it possible for Lookout for Metrics to use streaming data from Kinesis Data Streams to detect anomalies. In this section, we walk through the Spark streaming ETL script used by AWS Glue.
The AWS Glue Spark streaming ETL script performs the following steps:
- Read from the AWS Glue table that uses Kinesis Data Streams as the data source.
The following screenshot shows the AWS Glue table created for the ecommerce data schema.
- Ingest the streaming data from the AWS Glue table (
table_name
parameter) batched by time window (stream_batch_time
parameter) and create a data frame for each micro-batch usingcreate_data_frame.from_catalog()
, as shown in the following code:
data_frame_datasource0 = glueContext.create_data_frame.from_catalog(stream_batch_time = BATCH_WIN_SIZE,
database = glue_dbname, table_name = glue_tablename, transformation_ctx = "datasource0",
additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false"})
- Perform the following processing steps for each batch of data (data frame) ingested:
- Select only the required columns and convert the data frame to the AWS Glue native DynamicFrame.
datasource0 = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields(['marketplace','event_time', 'views'])
As shown in the preceding example code, select only the columns marketplace
, event_time
, and views
to write to output CSV files in Amazon S3. Lookout for Metrics uses these columns for running anomaly detection. In this example, marketplace
is the optional dimension column used for grouping anomalies, views
is the metric to be monitored for anomalies, and event_time
is the timestamp for time series data.
-
- Populate the time interval in each streaming record ingested:
datasource1 = Map.apply(frame=datasource0, f=populateTimeInterval)
In the preceding code, we provide the custom function populateTimeInterval
, which determines the appropriate time interval for the given data point based on its event_time
timestamp column.
The following table includes example time intervals determined by the function for a 5-minute frequency.
Input timestamp | Start of time interval determined by populateTimeInterval function |
2021-05-24 19:18:28 | 2021-05-24 19:15 |
2021-05-24 19:21:15 | 2021-05-24 19:20 |
The following table includes example time intervals determined by the function for a 10-minute frequency.
Input timestamp | Start of time interval determined by populateTimeInterval function |
2021-05-24 19:18:28 | 2021-05-24 19:10 |
2021-05-24 19:21:15 | 2021-05-24 19:20 |
-
- The
write_dynamic_frame
() function uses the time interval (as determined in the previous step) as the partition key to write output CSV files to the appropriate S3 prefix structure:
- The
datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource1, connection_type = "s3",
connection_options = {"path": path_datasink1, "partitionKeys": ["intvl_date", "intvl_hhmm"]},
format_options={"quoteChar": -1, "timestamp.formats": "yyyy-MM-dd HH:mm:ss"},
format = src_format, transformation_ctx = "datasink1")
For example, the following screenshot shows that the ETL script writes output to the S3 folder structure organized by 5-minute time intervals.
For additional details on partitions for ETL outputs, see Managing Partitions for ETL Output in AWS Glue.
You can set up a live detector using Amazon S3 as a continuous data source to start detecting the anomalies in streaming data. For detailed instructions, see GitHub repo.
Prerequisites
You need the following to deploy the solution:
- An AWS account with permissions to deploy the solution using AWS CDK
- A workstation or development environment with the following installed and configured:
- npm
- Typescript
- AWS CDK
- AWS account credentials
You can find detailed instructions in the “Getting Started” section of the GitHub repo.
Deploy the solution
Follow the step-by-step instructions in the GitHub repo to deploy the solution components. AWS CDK templates are provided for each of the solution components, organized in their own directory structure within the GitHub repo. The templates deploy the following resources:
- Data generator – The Lambda function, Amazon EventBridge rule, and Kinesis data stream
- Connector for Lookout for Metrics – The AWS Glue Spark streaming ETL job and S3 bucket
- Lookout for Metrics continuous detector – Our continuous detector
Clean up
To avoid incurring future charges, delete the resources by deleting the stacks deployed by the AWS CDK.
Conclusion
In this post, we showed how you can detect anomalies in streaming data sources using a Lookout for Metrics continuous detector. The solution used serverless streaming ETL with AWS Glue to prepare the data for Lookout for Metrics anomaly detection. The reference implementation used an ecommerce sample data schema (platform
, marketplace
, event_time
, views
, revenue
) to demonstrate and test the solution.
You can easily extend the provided data generator code and ETL script to process your own schema and sample data. Additionally, you can adjust the solution parameters such as anomaly detection frequency to match your use case. With minor changes, you can replace the sample data generator with an existing Kinesis Data Streams streaming data source.
To learn more about Amazon Lookout for Metrics, see Introducing Amazon Lookout for Metrics: An anomaly detection service to proactively monitor the health of your business and the Lookout for Metrics Developer Guide. For additional information about streaming ETL jobs with AWS Glue, see Crafting serverless streaming ETL jobs with AWS Glue and Adding Streaming ETL Jobs in AWS Glue.
About the Author
Babu Srinivasan is a Sr. Solutions Architect at AWS, with over 24 years of experience in IT and the last 6 years focused on the AWS Cloud. He is passionate about AI/ML. Outside of work, he enjoys woodworking and entertains friends and family (sometimes strangers) with sleight of hand card magic.
Eugene Yan and the art of writing about science
Why the Amazon applied scientist takes the time to break down his work for readers.Read More
Analyze customer churn probability using call transcription and customer profiles with Amazon SageMaker
Regardless of the industry or product, customers are the most important component in a business’s success and growth. Businesses go to great lengths to acquire and more importantly retain their existing customers. Customer satisfaction links directly to revenue growth, business credibility, and reputation. These are all key factors in a sustainable and long-term business growth strategy.
Given the marketing and operational costs of customer acquisition and satisfaction, and how costly losing a customer to a competitor can be, generally it’s less costly to retain new customers. Therefore, it’s crucial for businesses to understand why and when a customer might stop using their services or switch to a competitor, so they can take proactive measures by providing incentives or offering upgrades for new packages that could encourage the customer to stay with the business.
Customer service interactions provide invaluable insight into the customer’s opinion about the business and its services, and can be used, in addition to other quantitative factors, to enable the business to better understand the sentiment and trends of customer conversations and to identify crucial company and product feedback. Customer churn prediction using machine learning (ML) techniques can be a powerful tool for customer service and care.
In this post, we walk you through the process of training and deploying a churn prediction model on Amazon SageMaker that uses Hugging Face Transformers to find useful signals in customer-agent call transcriptions. In addition to textual inputs, we show you how to incorporate other types of data, such as numerical and categorical features in order to predict customer churn.
Interested in learning more about customer churn models? These posts might interest you: |
Prerequisites
To try out the solution in your own account, make sure that you have the following in place:
- An AWS account. If you don’t have an account, you can sign up for one.
- The solution outlined in the post is part of Amazon SageMaker JumpStart. To run this JumpStart 1P solution and have the infrastructure deploy to your AWS account, you must create an active Amazon SageMaker Studio instance (see Onboard to Amazon SageMaker Studio). When your Studio instance is ready, use the instructions in JumpStart to launch the solution.
The JumpStart solution launch creates the resources properly set up and configured to successfully run the solution.
Architecture overview
In this solution, we focus on SageMaker components. We use SageMaker training jobs to train the churn prediction model and a SageMaker endpoint to deploy the model. We use Amazon Simple Storage Service (Amazon S3) to store the training data and model artifacts, and Amazon CloudWatch to log training and endpoint outputs. The following figure illustrates the architecture for the solution.
Exploring the data
In this post, we use a mobile operator’s historical records of which customers ended up churning and which continued using the service. The data also includes transcriptions of the latest phone call conversations between the customer and the agent (which could also be the streaming transcription as the call is happening). We can use this historical information to train an ML classifier model, which we can then use to predict the probability of customer churn based on the customer’s profile information and the content of the phone call transcription. We create a SageMaker endpoint to make real-time predictions using the model and provide more insight to customer service agents as they handle customer phone calls.
The dataset we use is synthetically generated and available under the CC BY 4.0 license. The data used to generate the numerical and categorical features is based on the public dataset KDD Cup 2009: Customer relationship prediction. We have generated over 50,000 samples and randomly split the data into 45,000 samples for training and 5,000 samples for testing. In addition, the phone conversation transcripts were synthetically generated using the GPT2 (Generative Pre-trained Transformer 2) algorithm. The data is hosted on Amazon S3.
More details on customer churn classification models using similar data, and also step-by-step instructions on how to build a binary classifier model using similar data, can be found in the blog post Predicting Customer Churn with Amazon Machine Learning. That post is focused more on binary classification using the tabular data. This blog post approaches this problem from a different perspective, and brings in natural language processing (NLP) by processing the context of agent-customer phone conversations.
The following are the attributes (features) of the customer profiles dataset:
- CustServ Calls – The number of calls placed to customer service
- State: The US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ
- VMail Message – The average number of voice mail messages per month
- Account Length – The number of days that this account has been active
- Day Mins, Day Calls, Day Charge – The billed cost for calls placed during the day
- Eve Mins, Eve Calls, Eve Charge – The billed cost for calls placed during the evening
- Night Mins, Night Calls, Night Charge – The billed cost for calls placed during nighttime
- Intl Mins, Intl Calls, Intl Charge – The billed cost for international calls
- Location – Whether the customer is located in urban, suburban, rural, or other areas
- State – The state location of the customer
- Plan – The plan category
- Limit – Limited or unlimited plan type
- Text – The synthetic GPT-2 generated transcription of the customer-agent phone conversation
- Y: Whether the customer left the service (true/false)
The last attribute, Y
, is known as the target feature, or the feature we want the ML model to predict. Because the target feature is binary (true/false), the type of modeling is a binary classification model. The model we train later in this post predicts the likelihood of churn as well.
We don’t go over exploratory data analysis in this post. For more details, see Predicting Customer Churn with Amazon Machine Learning and the Customer Churn Prediction with XGBoost sample notebook.
The training script is developed to allow the ML practitioner to pick and choose the features used in training. For example, we don’t use all the features in training. We focus more on the maturity of the customer’s account, number of times the customer has contacted customer service, type of plan they have, and transcription of the latest phone call. You can use additional features in training by including the list in the hyperparameters, as we show in the next section.
The transcription of customer-agent phone call in the text
column is synthetic text generated by ML models using the GPT2 algorithm. Its purpose is to show how you can apply this solution to real-world customer service phone conversations. GPT2 is an unsupervised transformer language model developed by OpenAI. It’s a powerful generative NLP model that excels in processing long-range dependencies, and is pre-trained on a diverse corpus of text. For more details on how to generate text using GPT2, see Experimenting with GPT-2 XL machine learning model package on Amazon SageMaker and the Creative Writing using GPT2 Text Generation example notebook.
Train the model
For this post, we use the SageMaker PyTorch Estimator to build a SageMaker estimator using an Amazon-built Docker container that runs functions defined in the supplied entry_point
Python script within a SageMaker training job. The training job is started by calling .fit()
on this estimator. Later, we deploy the model by calling the .deploy()
method on the estimator. Visit Amazon SageMaker Python SDK technical documentation for more details on preparing PyTorch scripts for SageMaker training and using the PyTorch Estimator.
Also, visit Available Deep Learning Containers Images on GitHub to get a list of supported PyTorch versions. At the time of this writing, the latest version available is PyTorch 1.8.1 with Python version 3.6. You can update the framework version to the latest supported version by changing the framework_version
parameter in the PyTorch Estimator. You can also use SageMaker utility API image URIs to get the latest list of supported versions.
The hyperparameters dictionary defines which features we want to use for training and also the number of trees in the forest (n-estimators
) for the model. You can add any other hyperparameters for the RandomForestClassifier; however, you also need revise your custom training script to receive these parameters in the form of arguments (using the argparse library) and add them to your model. See the following code:
hyperparameters = {
"n-estimators": 100,
"numerical-feature-names": "CustServ Calls,Account Length",
"categorical-feature-names": "plan,limit",
"textual-feature-names": "text",
"label-name": "y"
}
estimator = PyTorch(
framework_version='1.8.1',
py_version='py3',
entry_point='entry_point.py',
source_dir='path/to/source/directory',
hyperparameters=hyperparameters,
role=iam_role,
instance_count=1,
instance_type='ml.p3.2xlarge',
output_path='s3://path/to/output/location',
code_location='s3://path/to/code/location',
base_job_name=base_job_name,
sagemaker_session=sagemaker_session,
train_volume_size=30
)
If you launched the SageMaker JumpStart solution in your account, the custom scripts are available in your Studio files. We use the entry_point.py
script. This script receives a list of numerical features, categorical features, textual features, and the target label, and trains a SKLearn RandomForestClassifier on the data. However, the key here is processing the features before using them in the classifier, especially the call transcription. The following figure shows this process, which applies imputing to numerical features and replaces missing values with mean, one-hot encoding to categorical features, and embeds transformers to textual features.
The purpose of the script presented in this post is to provide an example of how you can develop your own custom feature transformation pipeline. You can apply other transformations to the data based on your specific use case and the nature of your dataset, and make it as complex or as simple as you want. For example, depending on the nature of your dataset and the results of the exploratory data analysis, you may want to consider normalization, log transformation, or dropping records with null values. For a more complete list of feature transformation techniques, visit SKLearn Dataset Transformations.
The following code snippet shows you how to instantiate these transformers for numerical and categorical features, and how to apply them to your dataset. More details on how these are done in the training script is available in the entry_point.py
script that is launched in your files by the JumpStart solution.
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
# Instantiate transformers
numerical_transformer = SimpleImputer(missing_values=np.nan,
strategy='mean',
add_indicator=True)
categorical_transformer = OneHotEncoder(handle_unknown="ignore")
# Train transformers on data, and store transformers for future use by predict function
numerical_transformer.fit(numerical_features)
joblib.dump(numerical_transformer, Path(args.model_dir, "numerical_transformer.joblib"))
categorical_transformer.fit(categorical_features)
joblib.dump(categorical_transformer, Path(args.model_dir, "categorical_transformer.joblib"))
# transform the data
numerical_features = numerical_transformer.transform(numerical_features)
categorical_features = categorical_transformer.transform(categorical_features)
Now let’s focus on the textual data. We use Hugging Face sentence transformers, which you can use for sentence embedding generation. They come with pre-trained models that you can use out of the box based on your use case. In this post, we use the bert-base-nli-cls-token model, which is described in Sentence-BERT: Sentence Embeddings using Siamese BERT-Networks.
Recently, SageMaker introduced new Hugging Face Deep Learning Containers (DLCs) that enable you to train, fine-tune, and run inference using Hugging Face models for NLP on SageMaker. In this post, we use the PyTorch container and a custom training script. For this purpose, in our training script, we define a BertEncoder
class based on Hugging Face SentenceTransformer
and define the pre-trained model as bert-base-nli-cls-token
, as shown in the following code. The reason for this is to be able to apply the transformer to the dataset in the same way as the other dataset transformers, with the applying .transform()
method. The benefit of using Hugging Face pre-trained models is that you don’t need to do additional training to be able to use the model. However, you can still fine-tune the models with custom data, as described in Fine-tuning a pretrained model.
from sentence_transformers import SentenceTransformer
# Define a class for BertEncoder
class BertEncoder(BaseEstimator, TransformerMixin):
def __init__(self, model_name='bert-base-nli-cls-token'):
self.model = SentenceTransformer(model_name)
self.model.parallel_tokenization = False
def fit(self, X, y=None):
return self
def transform(self, X):
output = []
for sample in X:
encodings = self.model.encode(sample)
output.append(encodings)
return output
# Instantiate the class
textual_transformer = BertEncoder()
# Apply the transformation to textual features
textual_features = textual_transformer.transform(textual_features)
Now that the dataset is processed and ready to be consumed by an ML model, we can train any classifier model to predict if a customer will churn or not. In addition to predicting the class (0/1 or true/false) for customer churn, these models also generate the probability of each class, meaning the probability of a customer churning. This is particularly useful for customer service teams for strategizing the incentives or upgrades they can offer to the customer based on how likely the customer is to cancel the service or subscription. In this post, we use the SKLearn RandomForestClassifier model. You can choose from many hyperparameters for this model and also optimize the hyperparameters for a more accurate model prediction by using strategies like grid search, random search, and Bayesian search. SageMaker automatic hyperparameter tuning can be a powerful tool for this purpose.
Training the model in entry_point.py
is handled by the train_fn()
function in the custom script. This function is called when the .fit()
method is applied to the estimator. This function also stores the trained model and trained data transformers on Amazon S3. These files are used later by model_fn()
to load the model for inference purposes.
train_fn()
also includes evaluation of the trained model, and provides accuracy scores for the model for both train and test datasets. This helps you better evaluate model performance. Because this is a classification problem, we recommend including other metrics in your evaluation script, for example F1 score, ROC AUC score, and recall score, the same way we added accuracy scores. These are printed as the training progresses. Because we’re using synthetic data for training the model in this example notebook, especially for the agent-customer call transcription, we’re not expecting to see high-performing models with regards to classification metrics, and therefore we’re not focusing on these metrics in this example. However, when you use your own data, you should consider how each classification metric could impact the applicability of the model to your use case. Training this model on 45,000 samples on an ml.p3.2xlarge instance takes about 30 minutes.
estimator.fit({
'train': 's3://path/to/your/train.jsonl')),
'test': 's3://path/to/your/test.jsonl'))
})
When you’re comfortable with the performance of your model, you can move to the next step, which is deploying your model for real-time inference.
Deploy the model
When the training is complete, you can deploy the model as a SageMaker hosted endpoint for real-time inference, or use the model for offline batch inference, using SageMaker batch transform. The task of performing inference (either real time or batch) is handled by four main functions in the custom script:
input_fn()
processes the input datamodel_fn()
loads the trained model artifacts from Amazon S3predict_fn()
makes predictionsoutput_fn()
prepares the model output
The following diagram illustrates this process.
The following script is a snippet of the entry_point.py
script, and shows how the four functions work together to perform inference:
# Model function to load the trained model and trained transformers from S3
def model_fn(model_dir):
print('loading feature_names')
numerical_feature_names, categorical_feature_names, textual_feature_names = load_feature_names(Path(model_dir, "feature_names.json"))
print('loading numerical_transformer')
numerical_transformer = joblib.load(Path(model_dir, "numerical_transformer.joblib"))
print('loading categorical_transformer')
categorical_transformer = joblib.load(Path(model_dir, "categorical_transformer.joblib"))
print('loading textual_transformer')
textual_transformer = BertEncoder()
classifier = joblib.load(Path(model_dir, "classifier.joblib"))
model_assets = {
'numerical_feature_names': numerical_feature_names,
'numerical_transformer': numerical_transformer,
'categorical_feature_names': categorical_feature_names,
'categorical_transformer': categorical_transformer,
'textual_feature_names': textual_feature_names,
'textual_transformer': textual_transformer,
'classifier': classifier
}
return model_assets
# Input Preparation Function to receive the request body and ensure proper format
def input_fn(request_body_str, request_content_type):
assert (
request_content_type == "application/json"
), "content_type must be 'application/json'"
request_body = json.loads(request_body_str)
return request_body
# Predict function to make inference
def predict_fn(request, model_assets):
print('making batch')
request = [request]
print('extracting features')
numerical_features, categorical_features, textual_features = extract_features(
request,
model_assets['numerical_feature_names'],
model_assets['categorical_feature_names'],
model_assets['textual_feature_names']
)
print('transforming numerical_features')
numerical_features = model_assets['numerical_transformer'].transform(numerical_features)
print('transforming categorical_features')
categorical_features = model_assets['categorical_transformer'].transform(categorical_features)
print('transforming textual_features')
textual_features = model_assets['textual_transformer'].transform(textual_features)
# Concatenate Features
print('concatenating features')
categorical_features = categorical_features.toarray()
textual_features = np.array(textual_features)
textual_features = textual_features.reshape(textual_features.shape[0], -1)
features = np.concatenate([
numerical_features,
categorical_features,
textual_features
], axis=1)
print('predicting using model')
prediction = model_assets['classifier'].predict_proba(features)
probability = prediction[0][1].tolist()
output = {
'probability': probability
}
return output
# Output function to prepare the output
def output_fn(prediction, response_content_type):
assert (
response_content_type == "application/json"
), "accept must be 'application/json'"
response_body_str = json.dumps(prediction)
return response_body_str
To deploy the model, when the training is complete, we use the .deploy()
method on the estimator and define the number and type of instances we want to attach to the endpoint, and SageMaker manages the infrastructure on your behalf. When calling the endpoint from the notebook, we use a SageMaker SDK predictor. The predictor sends data to an endpoint (as part of a request), and interprets the response. See the following code:
# Deploy the predictor
predictor = estimator.deploy(
endpoint_name=endpoint_name,
instance_type='ml.p3.2xlarge',
initial_instance_count=1
)
predictor.serializer = JSONSerializer()
predictor.deserializer = JSONDeserializer()
This deploys the model as an endpoint predictor. After deployment is complete, we can use that to make predictions on sample data. Let’s determine the probability of churn for a hypothetical customer:
data = {
"CustServ Calls": 10.0,
"Account Length": 66,
"plan": "B",
"limit": "limited",
'text': "Well, I've been dealing with TelCom for three months now and I am quite happy with your service"}
response = predictor.predict(data=data)
print("{:.2%} probability of churn".format(response['probability']))
In this case, the probability of churn is about 31%. For the same customer, we change the transcript to “I have been using your service for 6 months and I am disappointed in your customer service.” The probability of churn increases to over 46%. This demonstrates that a change in the customer’s sentiment affects the probability of churn.
Clean up
To clean up the resources and stop incurring charges in your account, you can delete the endpoint:
predictor.delete_endpoint()
Extensions
As we explained earlier, you can use additional features in training and also incorporate more feature transformers in the feature engineering pipeline, which can help improve model performance.
In addition, now that you have a working endpoint that is performing real-time inference, you can use it for your applications or website. However, your SageMaker endpoint is still not public facing, so you need to build an API Gateway to allow external traffic to your SageMaker endpoint. Amazon API Gateway is a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. You can use API Gateway to present an external-facing, single point of entry for SageMaker endpoints, and provide security, throttling, authentication, firewall as provided by AWS WAF, and more. With API Gateway mapping templates, you can invoke your SageMaker endpoint with a REST API request and receive an API response back without needing any intermediate AWS Lambda functions, thereby improving the performance and cost-effectiveness of your applications.
To create an API Gateway and use it to perform real-time inference with your SageMaker endpoint (see the following architecture), you can follow the instructions outlined in Creating a machine learning-powered REST API with Amazon API Gateway mapping templates and Amazon SageMaker.
In addition, you can use Amazon Transcribe to generate transcriptions of recorded customer-agent conversations and use them for training purposes, and also use Amazon Transcribe streaming to send the conversation audio stream and receive a stream of text in real time. You can use this text stream to add a real-time speech-to-text capability to your applications and also send that text to the endpoint and provide customer churn insights to your customer service agents in real time.
Conclusions
In this post, we explained an end-to-end solution for creating a customer churn prediction model based on customer profiles and customer-agent call transcriptions. The solution included training a PyTorch model with a custom script and creating an endpoint for real-time model hosting. We also explained how you can create a public-facing API Gateway that can be securely used in your mobile applications or website. In addition, we explained how you can use Amazon Transcribe for batch or real-time transcription of customer-agent conversations, which you can use for training of your model or real-time inference.
For more SageMaker examples, visit the Amazon SageMaker Examples GitHub repo. For more PyTorch BYO script examples, visit the following GitHub repository. For more SageMaker Python examples for MXNet, TensorFlow, and PyTorch, visit the Amazon SageMaker Pre-Built Framework Containers and the Python SDK GitHub repo. Additional information about SageMaker is available in the technical documentation.
About the Author
Nick Minaie is an Sr AI/ML Specialist Solutions Architect with AWS, helping customers on their journey to well-architected machine learning solutions at scale. In his spare time, Nick enjoys family time, abstract painting, and exploring nature.
Ehsan M. Kermani is a Machine Learning Engineer in the AWS ML Automation Services group. He helps customers through their MLOps journey by providing his expertise in Software Engineering best practices to solve customers’ end-to-end Machine Learning tasks from infrastructure to deployment.
Dr. Li Zhang is a Principal Product Manager-Technical for Amazon SageMaker JumpStart and Amazon SageMaker built-in algorithms, a service that helps data scientists and machine learning practitioners get started with training and deploying their models, and uses reinforcement learning with Amazon SageMaker. His past work as a principal research staff member and master inventor at IBM Research has won the test of time paper award at IEEE INFOCOM.
More efficient and reliable retrieval of distributed data
“Anytime query” approach adapts to the available resources.Read More