Scale your Amazon Kendra index

Amazon Kendra is a fully managed, intelligent search service powered by machine learning. Amazon Kendra reimagines enterprise search for your websites and applications so your employees and customers can easily find the content they’re looking for. Using keyword or natural language queries, employees and customers can find the right content even when it’s scattered across multiple locations and content repositories within your organization.

Although Amazon Kendra is designed for large-scale search applications with millions of documents and thousands of queries per second, you can run smaller experiments to evaluate Amazon Kendra. You can run a proof of concept, or simply have a smaller workload and still use features that Amazon Kendra Enterprise Edition has to offer. On July 1, 2021, Amazon Kendra introduced new, smaller capacity units for smaller workloads. In addition, to promote experimentation, the price for Amazon Kendra Developer Edition was reduced by 55%.

Amazon Kendra Enterprise Edition capacity units

The base capacity for Amazon Kendra supports up to 100,000 documents and 8,000 searches per day, with adaptive bursting capability to better handle unpredictable query spikes. You can increase the query and the document capacity of your Amazon Kendra index through storage capacity units and query capacity units, and these can be updated independently from each other.

Storage capacity units offer scaling in increments of 100,000 documents (up to 30 GB storage), each. For example, if you need to index 1 million documents, you need nine storage capacity units (100,000 documents with base Amazon Kendra Enterprise Edition, and, 900,00 additional documents from the storage capacity units).

Query capacity units (QCUs) offer scaling increments of 8,000 searches for day, with built-in adaptive bursting. For example, if you need 16K queries per day (average QPS of 0.2) you can provision two units.

For more information about the maximum number of storage capacity units and query capacity units available for a single index, see Quotas for Amazon Kendra.

About capacity bursting

Amazon Kendra has a provisioned base capacity of one query capacity unit. You can use up to 8,000 queries per day with a minimum throughput of 0.1 queries per second (per query capacity unit).

An adaptive approach to handling unexpected traffic beyond the provisioned throughput is to use the built-in adaptive query bursting feature in Amazon Kendra. This allows you to apply unused query capacity to handle unexpected traffic. Amazon Kendra accumulates your unused queries at your provisioned queries per second rate, every second, up to the maximum number of queries you’ve provisioned for your Amazon Kendra index. These accumulated queries are automatically used to help handle unexpected traffic spikes above the currently allocated QPS capacity.

Optimal performance of adaptive query bursting can vary, depending on several factors such as your total index size, query complexity, accumulated unused queries, and overall load on your index. We recommend performing your own load tests to accurately measure bursting capacity.

Best practices

When dimensioning your Amazon Kendra index, you need to consider how many documents you’re indexing, how many queries you expect per day, how many queries per second you need to accommodate, and if you have usage patterns that require additional capacity due to sustained usage. You could also experience short peak times where you can accommodate brief periods of time for additional QPS requirements.

It’s therefore good practice to observe your query usage patterns for a few weeks, especially when the patterns are not easily predictable. This will allow you to define an optimal balance between using the built-in adaptive bursting capability for short, unsustained QPS peaks, and adding/removing capacity units to better handle longer, more sustained peaks and lows.

For information about visualizing and building a rough estimate of your usage patterns in Amazon Kendra, see Automatically scale Amazon Kendra query capacity units with Amazon EventBridge and AWS Lambda.

Amazon Kendra Enterprise Edition allows you to add document storage capacity in units of 100,000 documents with maximum storage of 30 GB. You can add and remove storage capacity at any time, but you can’t remove storage capacity beyond your used capacity (number of documents ingested or storage space used). We recommend estimating how often documents are added to your data sources in order to determine when to increase storage capacity in your Amazon Kendra through storage capacity units. You can monitor the document count with Amazon CloudWatch or on the Amazon Kendra console.

Queries per second represent the number of concurrent queries your Amazon Kendra index receives at a given time. If you’re replacing a search solution with Amazon Kendra, you should be able to retrieve this information from query logs. If you exceed your provisioned and bursting capacity, your request may receive a 400 HTTP status code (client error) with the message ThrottlingException. For example, using the AWS SDK for Python (Boto3), you may receive an exception like the following:

ThrottlingException: An error occurred (ThrottlingException) when calling the Query operation (reached max retries: 4)

For cases like this, Boto3 includes the retries feature, which retries the query call (in this case to Amazon Kendra) after obtaining an exception. If you aren’t using an AWS SDK, you may need to implement an error handling mechanism that, for example, could use exponential backoff to handle this error.

You can monitor your Amazon Kendra index queries with CloudWatch metrics. For example, you could follow the metric IndexQueryCount, which represents the number of index queries per minute. If you want to use the IndexQueryCount metric, you should divide that number by 60 to obtain the average queries per second. Additionally, you can get a report of the queries per second on the Amazon Kendra console, as shown in the following screenshot.

The preceding graph shows three patterns:

  • Peaks of ˜2.5 QPS during business hours, between 8 AM and 8 PM.
  • Sustained QPS usage over ˜0.5 QPS and below 1 QPS between 8 PM and 8 AM.
  • Less than 0.3 QPS usage on the weekend (Feb 7, 2021, was a Sunday and Feb 13,2021 was a Saturday)

Taking into account these capacity requirements, you could start defining your Amazon Kendra index additional capacity units as follows:

  • For the high usage times (between 8 AM and 8 PM Monday through Friday), your Amazon Kendra index adds 24 VQUs (each query capacity unit provides capacity for at least 0.1 QPS) which when added to the initial Amazon Kendra Enterprise Edition query capacity (0.1 QPS), can support 2.5 queries per second
  • For the second usage pattern (Monday through Friday from 8 PM until 8 AM), you add four VQUs, which when combined with your initial Amazon Kendra Enterprise Edition (0.1 QPS), provides capacity for 0.5 QPS.
  • For the weekends, you add two VQUs, provisioning capacity for 0.3 QPS.

The following table summarizes this configuration.

Period Additional VQUS Capacity (Without Bursting)
Mon – Fri 8 AM – 8 PM 24 2.5 QPS
Mon – Fri 8 PM – 8 AM 4 0.5 QPS
Sat – Sun 2 0.3 QPS

You can use this initial approach to define a baseline that needs to be reevaluated to ensure the right sizing of your Amazon Kendra resources.

It’s also important to keep in mind that query autocomplete capacity is defined by your query capacity. Query autocomplete capacity is calculated as five times the provisioned query capacity for an index with a base capacity of 2.5 calls per second. This means that if your Amazon Kendra index query capacity is below 0.6 QPS, you have 2.5 QPS for query autocomplete. If your Amazon Kendra index query capacity is above 0.6 QPS, your query autocomplete capacity is calculated as 2.5 times your current index query capacity.

Conclusion

In this blog post you learned how to estimate capacity and scale for your Amazon Kendra index.

Now it’s easier than ever to experience Amazon Kendra, with 750 hours of Free Tier and the new reduced price for Amazon Kendra Developer Edition. Get started, visit our workshop, or check out the AWS Machine Learning Blog.


About the Author

Dr. Andrew Kane is an AWS Principal Specialist Solutions Architect based out of London. He focuses on the AWS Language and Vision AI services, helping our customers architect multiple AI services into a single use-case driven solution. Before joining AWS at the beginning of 2015, Andrew spent two decades working in the fields of signal processing, financial payments systems, weapons tracking, and editorial and publishing systems. He is a keen karate enthusiast (just one belt away from Black Belt) and is also an avid home-brewer, using automated brewing hardware and other IoT sensors.

 

Tapodipta Ghosh is a Senior Architect. He leads the Content And Knowledge Engineering Machine Learning team that focuses on building models related to AWS Technical Content. He also helps our customers with AI/ML strategy and implementation using our AI Language services like Amazon Kendra.

 

 

Jean-Pierre Dodel leads product management for Amazon Kendra, a new ML-powered enterprise search service from AWS. He brings 15 years of Enterprise Search and ML solutions experience to the team, having worked at Autonomy, HP, and search startups for many years prior to joining Amazon four years ago. JP has led the Amazon Kendra team from its inception, defining vision, roadmaps, and delivering transformative semantic search capabilities to customers like Dow Jones, Liberty Mutual, 3M, and PwC.

 

Juan Bustos is an AI Services Specialist Solutions Architect at Amazon Web Services, based in Dallas, TX. Outside of work, he loves spending time writing and playing music as well as trying random restaurants with his family.

Read More

Reimagine knowledge discovery using Amazon Kendra’s Web Crawler

When you deploy intelligent search in your organization, two important factors to consider are access to the latest and most comprehensive information, and a contextual discovery mechanism. Many companies are still struggling to make their internal documents searchable in a way that allows employees to get relevant information knowledge in a scalable, cost-effective manner. A 2018 International Data Corporation (IDC) study found that data professionals are losing 50% of their time every week—30% searching for, governing, and preparing data, plus 20% duplicating work. Amazon Kendra is purpose-built for addressing these challenges. Amazon Kendra is an intelligent search service that uses deep learning and reading comprehension to deliver more accurate search results.

The intelligent search capabilities of Amazon Kendra improve the search and discovery experience, but enterprises are still faced with the challenge of connecting troves of unstructured data and making that data accessible to search. Content is often unstructured and scattered across intranets and Wikis, making critical information hard to find and costing employees time and effort to track down the right answer.

Enterprises spend a lot of time and effort building complex extract, transform, and load (ETL) jobs that aggregate data sources. Amazon Kendra connectors allow you to quickly aggregate content as part of a single unified searchable index, without needing to copy or move data from an existing location to a new one. This reduces the time and effort typically associated with creating a new search solution.

With the recently launched Amazon Kendra web crawler, it’s now easier than ever to discover information stored within the vast amount of content spread across different websites and internal web portals. You can use the Amazon Kendra web crawler to quickly ingest and search content from your websites.

Sample use case

A common need is to reduce the complexity of searching across multiple data sources present in an organization. Most organizations have multiple departments, each having their own knowledge management and search systems. For example, the HR department may maintain a WordPress-based blog containing news and employee benefits-related articles, a Confluence site could contain internal knowledge bases maintained by engineering, sales may have sales plays stored on a custom content management system (CMS), and corporate office information could be stored in a Microsoft SharePoint Online site.

You can index all these types of webpages for search by using the Amazon web crawler. Specific connectors are also available to index documents directly from individual content data sources.

In this post, you learn how to ingest documents from a WordPress site using its sitemap with the Amazon Kendra web crawler.

Ingest documents with Amazon Kendra web crawler

For this post, we set up a WordPress site with information about AWS AI language services. In order to be able to search the contents of my website, we create a web crawler data source.

  1. On the Amazon Kendra console, choose Data sources in the navigation pane.

  1. Under WebCrawler, choose Add connector.

  1. For Data source name, enter a name for the data source.
  2. Add an optional description.

  1. Choose Next.

The web crawler allows you to define a series of source URLs or source sitemaps. WordPress generates a sitemap, which I use for this post.

  1. For Source, select Source sitemaps.
  2. For Source sitemaps, enter the sitemap URL.

  1. Add a web proxy or authentication if your host requires that.
  2. Create a new AWS Identity and Access Management (IAM) role.
  3. Choose Next.

  1. For this post, I set up the web crawler to crawl one page per second, so I modify the Maximum throttling value to 60.

The maximum value that’s allowed is 300.

For this post, I remove a blog entry that contains 2021/06/28/this-post-is-to-be-skipped/ in the URL, and also all the contents that have the term /feed/ in the URL. Keep in mind that the excluded content won’t be ingested into your Amazon Kendra index, so your users won’t be able to search across these documents.

  1. In the Additional configuration section, add these patterns on the Exclude patterns

  1. For Sync run schedule, choose Run on demand.
  2. Choose Next.

  1. Review the settings and choose Create.
  2. When the data source creation process is complete, choose Sync now.

When the sync job is complete, I can search on my website.

Conclusion

In this post, you saw how to set up the Amazon Kendra web crawler and how easy is to ingest your websites into your Amazon Kendra index. If you’re just getting started with Amazon Kendra, you can build an index, ingest your website, and take advantage of intelligent search to provide better results to your users. To learn more about Amazon Kendra, refer to the Amazon Kendra Essentials workshop and deep dive into the Amazon Kendra blog.


About the Authors

Tapodipta Ghosh is a Senior Architect. He leads the Content And Knowledge Engineering Machine Learning team that focuses on building models related to AWS Technical Content. He also helps our customers with AI/ML strategy and implementation using our AI Language services like Amazon Kendra.

 

 

Vijai Gandikota is a Senior Product Manager at Amazon Web Services for Amazon Kendra.

 

 

 

 

Juan Bustos is an AI Services Specialist Solutions Architect at Amazon Web Services, based in Dallas, TX. Outside of work, he loves spending time writing and playing music as well as trying random restaurants with his family.

Read More

Enghouse EspialTV enables TV accessibility with Amazon Polly

This is a guest post by Mick McCluskey, the VP of Product Management at Enghouse EspialTV. Enghouse provides software solutions that power digital transformation for communications service operators. EspialTV is an Enghouse SaaS solution that transforms the delivery of TV services for these operators across Set Top Boxes (STBs), media players, and mobile devices.

A large audience of consumers use TV services, and several of these groups may have disabilities that make it more difficult for them to access these services. To ensure that TV services are accessible to the broadest possible audience, we need to consider accessibility as a key element of the user experience (UX) for the service. Additionally, because TV is viewed as a key service by governments, it’s often subject to regulatory requirements for accessibility, including talking interfaces for the visually impaired. In the US, the Twenty-First Century Communications and Video Accessibility Act (CVAA) mandates improved accessibility for visual interfaces for users with limited hearing and vision in the US. The CVAA ensures accessibility laws from the 1980s and 1990s are brought up to date with modern technologies, including new digital, broadband, and mobile innovations.

This post describes how Enghouse uses Amazon Polly to significantly improve accessibility for EspialTV through talking interactive menu guides for visually impaired users while meeting regulatory requirements.

Challenges

A key challenge for visually impaired users is navigating TV menus to find the content they want to view. Most TV menus are designed for a 10-foot viewing experience, meaning that a consumer sitting 10 feet from the screen can easily see the menu items. For the visually impaired, these menu items aren’t easy to see and are therefore hard to navigate. To improve our UX for subscribers with limited vision, we sought to develop a mechanism to provide audible descriptions of the menu, allowing easier navigation of key functions such as the following:

  • Channel and program selection
  • Channel and program information
  • Setup configuration, closed-caption control and options, and video description control
  • Configuration information
  • Playback

Overview of the AWS talking menu solution

Hosted on AWS, EspialTV is offered to communications service providers in a software as a service (SaaS) model. It was important for Enghouse to have a solution that not only supported the navigation currently offered at the time of launch, but was highly flexible to support changes and enhancements over time. This way, the voice assistance continuously evolved and improved to accommodate new capabilities as new services and features were added to the menu. For this reason, the solution had to be driven by real-time APIs calls as opposed to hardcoded text-to-speech menu configurations.

To ensure CVAA compliance and accelerate deployment, Enghouse chose to use Amazon Polly to implement this talking menu solution for the following reasons:

  • We wanted a reliable and robust solution within minimal operational and management overhead
  • It permitted faster time to market by using ready-made text-to-speech APIs
  • The real-time API approach offered greater flexibility as we evolved the service over time

The following diagram illustrates the architecture of the talking menu solution.

Using the Amazon Polly text-to-speech API allowed us to build a simple solution that integrated with our current infrastructure and followed this flow:

  • Steps 1 and 2 – When TV users open the menu guide service, the client software running on the Set Top Box (STB) makes a call via the internet or Data Over Cable Service Interface Specification (DOCSIS) cable modem, which is routed through the cable operators headend server to the Espial Guide service running on the AWS Cloud.
  • Step 3 – As TV users interact with the menu guide on the STBs, the client software running on the STBs sends the string containing the specific menu description highlighted by the customer.
  • Step 4 – The cable operators headend server routes the request to a local cache to verify whether the requested string’s text-to-speech is cached locally. If it is, the corresponding text-to-speech is sent back to the STB to be read out loud to the TV user.
  • Step 5 – Each unique cable operator has a local cache. If the requested string isn’t cached locally in the cable operator’s environment, the requested string is sent to the EspialTV service in AWS, where it’s met by a secondary caching server to respond to the request. This secondary layer of caching hosted in the Espial environment ensures high availability and increases cache hit rates. For example, if the caching servers on the cable operator environment is unavailable, the cache request can be resolved by the secondary caching system hosted in the Espial environment.
  • Steps 6 and 7 – If the requested string isn’t found in the caching server in the EspialTV service, it’s routed to the Amazon Polly API to be converted to text-to-speech, which is routed back to the cable operator headend server and then to the TV user’s STB to be read out loud to the user.

This architecture has several key considerations. Firstly, there are several layers of caching implemented to minimize latency for the end user. This also supports the spikey nature of this workload to ensure that only requests not found in the respective caches are made to Amazon Polly.

The ready-made text-to-speech APIs provided by Amazon Polly enables us able to implement the service with just one engineer. We also reduced the expected delivery time by 75% compared to our estimates for building an in-house custom solution. The Amazon Polly documentation was very clear, and the ramp-up time was limited. Since implementation, this solution is reliably supporting 40 cable operators, which each have between 1,000–100,000 STBs.

Conclusion

EspialTV offers operators a TV solution that provides fast time to revenue, low startup costs, and scalability from small to very large operators. EspialTV offers providers and consumers a compelling and always relevant experience for their TV services. With Amazon Polly, we have ensured operators can offer a TV service to the broadest possible range of consumers and align with regulatory requirements for accessibility. To learn more about Amazon Polly, visit the product page.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the Author

Mick McCluskey is VP of Product Management at Enghouse, a leading provider of software solutions helping operators use digital transformation to drive profitability in fast-changing and emerging markets. In the area of video solutions, Mick has been pivotal in creating the EspialTV solution—a truly disruptive TVaaS solution run on the AWS Cloud that permits pay TV operators to manage transition while maintaining profitability in a rapidly changing market. He is currently working on solutions that help operators take advantage of key technology and industry trends like OTT video, IoT, and 5G. In addition to delivering cloud-based solutions, he continues his journey of learning how to play golf.

Read More

Upgrade your Amazon Polly voices to neural with one line of code

In 2019, Amazon Polly launched neural text-to-speech (NTTS) voices in US English and UK English. Neural voices use machine learning and provide a richer, more lifelike speech quality. Since the initial launch of NTTS, Amazon Polly has extended its neural offering by adding new voices in US Spanish, Brazilian Portuguese, Australian English, Canadian French, German and Korean. Some of them also are available in a Newscaster speaking style tailored to the specific needs of publishers.

If you’ve been using the standard voices in Amazon Polly, upgrading to neural voices is easy. No matter which programming language you use, the upgrade process only requires a simple addition or modification of the Engine parameter wherever you use the SynthesizeSpeech and StartSynthesizeSpeechTask method in your code. In this post, you’ll learn about the benefits of neural voices and how to migrate your voices to NTTS.

Benefits of neural vs. standard

Because neural voices provide a more expressive, natural-sounding quality than standard, migrating to neural improves the user experience and boosts engagement.

“We rely on speech synthesis to drive dynamic narrations for our educational content,” says Paul S. Ziegler, Chief Executive Officer at Reflare. “The switch from Amazon Polly’s standard to neural voices has allowed us to create narrations that are so good as to consistently be indistinguishable from human speech to non-native speakers and to occasionally even fool native speakers.”

The following is an example of Joanna’s standard voice.

The following is an example of the same words, but using Joanna’s neural voice.

“Switching to neural voices is as easy as switching to other non-neural voices,” Ziegler says. “Since our systems were already set up to automatically generate voiceovers on the fly, implementing the changes took less than 5 minutes.”

Quick migration checklist

Not all SSML tags, Regions, and languages support neural voices. Before making the switch, use this checklist to verify that NTTS is available for your specific business needs:

  • Regional support – Verify that you’re making requests in Regions that support NTTS
  • Language and voice support – Verify that you’re making requests to voices and languages that support NTTS by checking the current list of voices and languages
  • SSML tag support – Verify that the SSML tags in your requests are supported by NTTS by checking SSML tag compatibility

Additional considerations

The following table summarizes additional considerations before you switch to NTTS.

Standard Neural
Cost $4 per million characters $16 per million characters
Free Tier 5 million characters per month 1 million characters per month
Default Sample Rate 22 kHz 24 kHz
Usage Quota Quotas in Amazon Polly

Code samples

If you’re already using Amazon Polly standard, the following samples demonstrate how to switch to neural for all SDKs. The required change is highlighted in bold.

Go:

input := &polly.SynthesizeSpeechInput{
    OutputFormat: aws.String("mp3"),
    Text: aws.String(“Hello World!”),
    VoiceId: aws.String("Joanna"),
    Engine: “neural”}

Java:

SynthesizeSpeechRequest synthReq = SynthesizeSpeechRequest.builder()
    .text('Hello World!')
    .voiceId('Joanna')
    .outputFormat('mp3')
    .engine('neural')
    .build();
ResponseInputStream<SynthesizeSpeechResponse> synthRes = polly.synthesizeSpeech(synthReq);

Javascript:

polly.synthesizeSpeech({
    Text: “Hello World!”,
    OutputFormat: "mp3",
    VoiceId: "Joanna",
    TextType: "text",
    Engine: “neural”});

.NET:

var response = client.SynthesizeSpeech(new SynthesizeSpeechRequest 
{
    Text = "Hello World!",
    OutputFormat = "mp3",
    VoiceId = "Joanna"
    Engine = “neural”
});

PHP:

$result = $client->synthesizeSpeech([
    'Text' => ‘Hello world!’,
    'OutputFormat' => ‘mp3,
    'VoiceId' => ‘Joanna’,
    'Engine' => ‘neural’]);

Python:

polly.synthesize_speech(
    Text="Hello world!",
    OutputFormat="mp3",
    VoiceId="Joanna",
    Engine=”neural”)

Ruby:

resp = polly.synthesize_speech({
    text: “Hello World!”,
    output_format: "mp3",
    voice_id: "Joanna",
    engine: “neural”
  })

Conclusion

You can start playing with neural voices immediately on the Amazon Polly console. If you have any questions or concerns, please post it to the AWS Forum for Amazon Polly, or contact your AWS Support team.


About the Author

Marta Smolarek is a Senior Program Manager in the Amazon Text-to-Speech team. Outside of work, she loves to go camping with her family

Read More

Extend Amazon SageMaker Pipelines to include custom steps using callback steps

Launched at AWS re:Invent 2020, Amazon SageMaker Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.

You can extend your pipelines to include steps for tasks performed outside of Amazon SageMaker by taking advantage of custom callback steps. This feature lets you include tasks that are performed using other AWS services, third parties, or tasks run outside AWS. Before the launch of this feature, steps within a pipeline were limited to the supported native SageMaker steps. With the launch of this new feature, you can use the new CallbackStep to generate a token and add a message to an Amazon Simple Queue Service (Amazon SQS) queue. The message on the SQS queue triggers a task outside of the currently supported native steps. When that task is complete, you can call the new SendStepSuccess API with the generated token to signal that the callback step and corresponding tasks are finished and the pipeline run can continue.

In this post, we demonstrate how to use CallbackStep to perform data preprocessing using AWS Glue. We use an Apache Spark job to prepare NYC taxi data for ML training. The raw data has one row per taxi trip, and shows information like the trip duration, number of passengers, and trip cost. To train an anomaly detection model, we want to transform the raw data into a count of the number of passengers that took taxi rides over 30-minute intervals.

Although we could run this specific Spark job in SageMaker Processing, we use AWS Glue for this post. In some cases, we may need capabilities that Amazon EMR or AWS Glue offer, like support for Hive queries or integration with the AWS Glue metadata catalog, so we demonstrate how to invoke AWS Glue from the pipeline.

Solution overview

The pipeline step that launches the AWS Glue job sends a message to an SQS queue. The message contains the callback token we need to send success or failure information back to the pipeline. This callback token triggers the next step in the pipeline. When handling this message, we need a handler that can launch the AWS Glue job and reliably check for job status until the job completes. We have to keep in mind that a Spark job can easily take longer than 15 minutes (the maximum duration of a single AWS Lambda function invocation), and the Spark job itself could fail for a number of reasons. That last point is worth emphasizing: in most Apache Spark runtimes, the job code itself runs in transient containers under the control of a coordinator like Apache YARN. We can’t add custom code to YARN, so we need something outside the job to check for completion.

We can accomplish this task several ways:

  • Have a Lambda function launch a container task that creates the AWS Glue job and polls for job completion, then sends the callback back to the pipeline
  • Have a Lambda function send a work notification to another SQS queue, with a separate Lambda function that picks up the message, checks for job status, and requeues the message if the job isn’t complete
  • Use AWS Glue job event notifications to respond to job status events sent by AWS Glue

For this post, we use the first technique because it’s the simplest (but likely not the most efficient). For this, we build out the solution as shown in the following diagram.

The solution is one example of how to use the new CallbackStep to extend your pipeline to steps outside SageMaker (such as AWS Glue). You can apply the same general steps and architectural guidance to extend pipelines to other custom processes or tasks. In our solution, the pipeline runs the following tasks:

Data preprocessing

  • This step (Step 1 in the preceding diagram) uses CallbackStep to send a generated token and defined input payload to the configured SQS queue (2). In this example, the input sent to the SQS queue is the Amazon Simple Storage Service (Amazon S3) locations of the input data and the step output training data.
    • The new message in the SQS queue triggers a Lambda function (3) that is responsible for running an AWS Fargate task with Amazon Elastic Container Service (Amazon ECS) (4).
    • The Fargate task runs using a container image that is configured to run a task. The task in this case is an AWS Glue job (5) used to transform your raw data into training data stored in Amazon S3 (6). This task is also responsible for sending a callback message that signals either the job’s success or failure.
  • Model training – This step (7) runs when the previous callback step has completed successfully. It uses the generated training data to train a model using a SageMaker training job and the Random Cut Forest algorithm.
  • Package model – After the model is successfully trained, the model is packaged for deployment (8).
  • Deploy model – In this final step (9), the model is deployed using a batch transform job.

These pipeline steps are just examples; you can modify the pipeline to meet your use case, such as adding steps to register the model in the SageMaker Model Registry.

In the next sections, we discuss how to set up this solution.

Prerequisites

For the preceding pipeline, you need the prerequisites outlined in this section. The detailed setup of each of these prerequisites is available in the supporting notebook.

Notebook dependencies

To run the provided notebook, you need the following:

Pipeline dependencies

Your pipeline uses the following services:

  • SQS message queue – The callback step requires an SQS queue to trigger a task. For this, you need to create an SQS queue and ensure that AWS Identity and Access Management (IAM) permissions are in place that allow SageMaker to put a message in the queue and allow Lambda to poll the queue for new messages. See the following code:
sqs_client = boto3.client('sqs')
queue_url = ''
queue_name = 'pipeline_callbacks_glue_prep'
try:
    response = sqs_client.create_queue(QueueName=queue_name)
except:
    print(f"Failed to create queue")
  • Lambda function: The function is triggered by new messages put to the SQS queue. The function consumes these new messages and starts the ECS Fargate task. In this case, the Lambda execution IAM role needs permissions to pull messages from Amazon SQS, notify SageMaker of potential failures, and run the Amazon ECS task. For this solution, the function starts a task on ECS Fargate using the following code:
%%writefile queue_handler.py
import json
import boto3
import os
import traceback

ecs = boto3.client('ecs')
sagemaker = boto3.client('sagemaker')

def handler(event, context):   
    print(f"Got event: {json.dumps(event)}")
    
    cluster_arn = os.environ["cluster_arn"]
    task_arn = os.environ["task_arn"]
    task_subnets = os.environ["task_subnets"]
    task_sgs = os.environ["task_sgs"]
    glue_job_name = os.environ["glue_job_name"]
    print(f"Cluster ARN: {cluster_arn}")
    print(f"Task ARN: {task_arn}")
    print(f"Task Subnets: {task_subnets}")
    print(f"Task SG: {task_sgs}")
    print(f"Glue job name: {glue_job_name}")
    
    for record in event['Records']:
        payload = json.loads(record["body"])
        print(f"Processing record {payload}")
        
        token = payload["token"]
        print(f"Got token {token}")
        
        try:
            input_data_s3_uri = payload["arguments"]["input_location"]
            output_data_s3_uri = payload["arguments"]["output_location"]
            print(f"Got input_data_s3_uri {input_data_s3_uri}")
            print(f"Got output_data_s3_uri {output_data_s3_uri}")

            response = ecs.run_task(
                cluster = cluster_arn,
                count=1,
                launchType='FARGATE',
                taskDefinition=task_arn,
                networkConfiguration={
                    'awsvpcConfiguration': {
                        'subnets': task_subnets.split(','),
                        'securityGroups': task_sgs.split(','),
                        'assignPublicIp': 'ENABLED'
                    }
                },
                overrides={
                    'containerOverrides': [
                        {
                            'name': 'FargateTask',
                            'environment': [
                                {
                                    'name': 'inputLocation',
                                    'value': input_data_s3_uri
                                },
                                {
                                    'name': 'outputLocation',
                                    'value': output_data_s3_uri
                                },
                                {
                                    'name': 'token',
                                    'value': token
                                },
                                {
                                    'name': 'glue_job_name',
                                    'value': glue_job_name
                                }
                                
                            ]
                        }
                    ]
                }
            )
            if 'failures' in response and len(response['failures']) > 0:
                f = response['failures'][0]
                print(f"Failed to launch task for token {token}: {f['reason']}")
                sagemaker.send_step_failure(
                    CallbackToken=token,
                    FailureReason = f['reason']
                )
            else:
                print(f"Launched task {response['tasks'][0]['taskArn']}")
        except Exception as e:
            trc = traceback.format_exc()
            print(f"Error handling record: {str(e)}:m {trc}")
            sagemaker.send_step_failure(
                CallbackToken=token,
                FailureReason = e
            )
  • After we create the SQS queue and Lambda function, we need to set up the function as an SQS target so that when new messages are placed in the queue, the function is automatically triggered:
lambda_client.create_event_source_mapping(
    EventSourceArn=f'arn:aws:sqs:{region}:{account}:{queue_name}',
    FunctionName='SMPipelineQueueHandler',
    Enabled=True,
    BatchSize=10
) 
  • Fargate cluster – Because we use Amazon ECS to run and monitor the status of the AWS Glue job, we need to ensure we have an ECS Fargate cluster running:
import boto3
ecs = boto3.client('ecs')
response = ecs.create_cluster(clusterName='FargateTaskRunner')
  • Fargate task: We also need to create a container image with the code (task.py) that starts the data preprocessing job on AWS Glue and reports the status back to the pipeline upon the success or failure of that task. The IAM role attached to the task must include permissions that allow the task to pull images from Amazon ECR, create logs in Amazon CloudWatch, start and monitor an AWS Glue job, and send the callback token when the task is complete. When we issue send_pipeline_execution_step_success back to the pipeline, we also indicate the output file with the prepared training data. We use the output parameter in the model training step in the pipeline. The following is the code for task.py:
import boto3
import os
import sys
import traceback
import time

if 'inputLocation' in os.environ:
    input_uri = os.environ['inputLocation']
else:
    print("inputLocation not found in environment")
    sys.exit(1)
if 'outputLocation' in os.environ:
    output_uri = os.environ['outputLocation']
else:
    print("outputLocation not found in environment")
    sys.exit(1)
if 'token' in os.environ:
    token = os.environ['token']
else:
    print("token not found in environment")
    sys.exit(1)
if 'glue_job_name' in os.environ:
    glue_job_name = os.environ['glue_job_name']
else:
    print("glue_job_name not found in environment")
    sys.exit(1)

print(f"Processing from {input_uri} to {output_uri} using callback token {token}")
sagemaker = boto3.client('sagemaker')
glue = boto3.client('glue')

poll_interval = 60

try:
    
    t1 = time.time()
    response = glue.start_job_run(
        JobName=glue_job_name,
        Arguments={
            '--output_uri': output_uri,
            '--input_uri': input_uri
        }
    )
    job_run_id = response['JobRunId']
    print(f"Starting job {job_run_id}")
    
    job_status = 'STARTING'
    job_error = ''
    while job_status in ['STARTING','RUNNING','STOPPING']:
        time.sleep(poll_interval)
        response = glue.get_job_run(
            JobName=glue_job_name,
            RunId=job_run_id,
            PredecessorsIncluded=False
        )
        job_status = response['JobRun']['JobRunState']
        if 'ErrorMessage' in response['JobRun']:
            job_error = response['JobRun']['ErrorMessage']
        print(f"Job is in state {job_status}")
        
    t2 = time.time()
    total_time = (t2 - t1) / 60.0
    if job_status == 'SUCCEEDED':
        print("Job succeeded")
        sagemaker.send_pipeline_execution_step_success(
            CallbackToken=token,
            OutputParameters=[
                {
                    'Name': 'minutes',
                    'Value': str(total_time)
                },
                {
                    'Name': 's3_data_out',
                    'Value': str(output_uri),
                } 
            ]
        )
    else:
        print(f"Job failed: {job_error}")
        sagemaker.send_pipeline_execution_step_failure(
            CallbackToken=token,
            FailureReason = job_error
        )
except Exception as e:
    trc = traceback.format_exc()
    print(f"Error running ETL job: {str(e)}:m {trc}")
    sagemaker.send_pipeline_execution_step_failure(
        CallbackToken=token,
        FailureReason = str(e)
    )
  • Data preprocessing code – The pipeline callback step does the actual data preprocessing using a PySpark job running in AWS Glue, so we need to create the code that is used to transform the data:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_uri', 'output_uri'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.format("csv").option("header", "true").load("{0}*.csv".format(args['input_uri']))
df = df.withColumn("Passengers", df["passenger_count"].cast(IntegerType()))
df = df.withColumn(
  'pickup_time',
  F.to_timestamp(
  F.unix_timestamp('tpep_pickup_datetime', 'yyyy-MM-dd HH:mm:ss').cast('timestamp')))
  
dfW = df.groupBy(F.window("pickup_time", "30 minutes")).agg(F.sum("Passengers").alias("passenger"))
dfOut = dfW.drop('window')
dfOut.repartition(1).write.option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv(args['output_uri'])

job.commit()
  • Data preprocessing job – We need to also configure the AWS Glue job that runs the preceding code when triggered by your Fargate task. The IAM role used must have permissions to read and write from the S3 bucket. See the following code:
glue = boto3.client('glue')
response = glue.create_job(
    Name='GlueDataPrepForPipeline',
    Description='Prepare data for SageMaker training',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=10.0,
    GlueVersion='2.0'
)
glue_job_name = response['Name']

After these prerequisites are in place, including the necessary IAM permissions outlined in the example notebook, we’re ready to configure and run the pipeline.

Configure the pipeline

To build out the pipeline, we rely on the preceding prerequisites in the callback step that perform data processing. We also combine that with steps native to SageMaker for model training and deployment to create an end-to-end pipeline.

To configure the pipeline, complete the following steps:

  1. Initialize the pipeline parameters:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

input_data = ParameterString(
    name="InputData",
    default_value=f"s3://{default_bucket}/{taxi_prefix}/"
)
id_out = ParameterString(
    name="IdOut",
    default_value="taxiout"+ str(timestamp)
)
output_data = ParameterString(
    name="OutputData",
    default_value=f"s3://{default_bucket}/{taxi_prefix}_output/"
)
training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.c5.xlarge"
)
  1. Configure the first step in the pipeline, which is CallbackStep.

This step uses the SQS queue created in the prerequisites in combination with arguments that are used by tasks in this step. These arguments include the inputs of the Amazon S3 location of the input (raw taxi data) and output training data. The step also defines the outputs, which in this case includes the callback output and Amazon S3 location of the training data. The outputs become the inputs to the next step in the pipeline. See the following code:

from sagemaker.workflow.callback_step import CallbackStep,CallbackOutput,CallbackOutputTypeEnum

callback1_output=CallbackOutput(output_name="s3_data_out", output_type=CallbackOutputTypeEnum.String)

step_callback_data = CallbackStep(
                    name="GluePrepCallbackStep",
                    sqs_queue_url=queue_url,
                    inputs={
                        "input_location": f"s3://{default_bucket}/{taxi_prefix}/",
                        "output_location": f"s3://{default_bucket}/{taxi_prefix}_{id_out}/"
                    },
                    outputs=[
                        callback1_output
                    ],
                )
  1. We use TrainingStep to train a model using the Random Cut Forest algorithm.

We first need to configure an estimator, then we configure the actual pipeline step. This step takes the output of the previous step and Amazon S3 location of the training data created by AWS Glue as input to train the model. See the following code:

containers = {
    'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest',
    'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest',
    'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest',
    'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest'}
region_name = boto3.Session().region_name
container = containers[region_name]
model_prefix = 'model'

session = sagemaker.Session()

rcf = sagemaker.estimator.Estimator(
    container,
    sagemaker.get_execution_role(),
    output_path='s3://{}/{}/output'.format(default_bucket, model_prefix),
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    sagemaker_session=session)

rcf.set_hyperparameters(
    num_samples_per_tree=200,
    num_trees=50,
    feature_dim=1)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="TrainModel",
    estimator=rcf,
    inputs={
        "train": TrainingInput(
        #s3_data = Output of the previous call back 
        steps3_data=step_callback_data.properties.Outputs['s3_data_out'],
        content_type="text/csv;label_size=0",
        distribution='ShardedByS3Key'
        ),
    },
)
  1. We use CreateModelStep to package the model for SageMaker deployment:
from sagemaker.model import Model
from sagemaker import get_execution_role
role = get_execution_role()

image_uri = sagemaker.image_uris.retrieve("randomcutforest", region)

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
    )
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

inputs = CreateModelInput(
    instance_type="ml.m5.large",
)

create_model = CreateModelStep(
    name="TaxiModel",
    model=model,
    inputs=inputs,
)
  1. We deploy the trained model using a SageMaker batch transform job using TransformStep.

This step loads the trained model and processes the prediction request data stored in Amazon S3, then outputs the results (anomaly scores in this case) to the specified Amazon S3 location. See the following code:

base_uri = step_callback_data.properties.Outputs['s3_data_out']
output_prefix = 'batch-out'

from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    assemble_with = "Line",
    accept = 'text/csv',
    instance_count=1,
    output_path=f"s3://{default_bucket}/{output_prefix}/",
)
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

batch_data=step_callback_data.properties.Outputs['s3_data_out']

step_transform = TransformStep(
    name="TaxiTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data,content_type="text/csv",split_type="Line",input_filter="$[0]",join_source='Input',output_filter='$[0,-1]')
)

Create and run the pipeline

You’re now ready to create and run the pipeline. To do this, complete the following steps:

  1. Define the pipeline including the parameters accepted and steps:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"GluePipeline-{id_out}"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        training_instance_type,
        training_instance_count,
        id_out,
    ],
    steps=[step_callback_data, step_train,create_model,step_transform],
)
  1. Submit the pipeline definition to create the pipeline using the role that is used to create all the jobs defined in each step:
from sagemaker import get_execution_role
pipeline.upsert(role_arn = get_execution_role())
  1. Run the pipeline:
execution = pipeline.start()

You can monitor your pipeline using the SageMaker SDK, execution.list_steps(), or via the Studio console, as shown in the following screenshot.

Use CallbackStep to integrate other tasks outside of SageMaker

You can follow the same pattern to integrate any long-running tasks or jobs with Pipelines. This may include running AWS Batch jobs, Amazon EMR job flows, or Amazon ECS or Fargate tasks.

You can also implement an email approval step for your models as part of your ML pipeline.
CallbackStep runs after the model EvaluationStep and sends an email containing approve or reject links with model metrics to a user. The workflow progresses to the next state after the user approves the task to proceed.

You can implement this pattern using a Lambda function and Amazon Simple Notification Service (Amazon SNS).

Conclusion

In this post, we showed you an example of how to use CallbackStep in Pipelines to extend your pipelines to integrate an AWS Glue job for data preprocessing. You can follow the same process to integrate any task or job outside of SageMaker. You can walk through the full solution explained in the example notebook.


About the Author

Shelbee Eigenbrode is a Principal AI and Machine Learning Specialist Solutions Architect at Amazon Web Services (AWS). She holds 6 AWS certifications and has been in technology for 23 years spanning multiple industries, technologies, and roles. She is currently focusing on combining her DevOps and ML background to deliver and manage ML workloads at scale. With over 35 patents granted across various technology domains, she has a passion for continuous innovation and using data to drive business outcomes. Shelbee co-founded the Denver chapter of Women in Big Data.

 

Sofian Hamiti is an AI/ML specialist Solutions Architect at AWS. He helps customers across industries accelerate their AI/ML journey by helping them build and operationalize end-to-end machine learning solutions.

 

 

 

Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

 

 

 

Payton Staub is a senior engineer with Amazon SageMaker. His current focus includes model building pipelines, experiment management, image management and other tools to help customers productionize and automate machine learning at scale.

Read More

Enhancing customer service experiences using Conversational AI: Power your contact center with Amazon Lex and Genesys Cloud

Customers expect personalized contact center experiences. They want easy access to customer support and quick resolution of their issues. Delighting callers with a quick and easy interaction remains central to the customer experience (CX) strategy for support organizations. Enterprises often deploy omni-channel contact centers so that they can provide simple mechanisms for their customers to access customer support. But even with these efforts, callers face long wait times, especially during peak hours, which can lead to lower CSAT scores. In addition, organizations have to manage support costs as their footprint expands. As the customer base grows, operational costs for managing a contact center can rapidly increase.

With Amazon Lex bots, you can use conversational AI capabilities to provide highly engaging and lifelike conversational experiences. Organizations can use Amazon Lex to automate customer service interactions and deliver faster responses to queries. As a result, customer issues are resolved in real time, reducing wait times and driving higher satisfaction. You can use Amazon Lex to handle the most common problems encountered by customers. Furthermore, complex issues that require human intervention can be seamlessly handed over from the Amazon Lex bot to a human agent. Augmenting your contact center operations with Amazon Lex bots provides an enhanced caller experience, while optimizing your operational costs with self-service automation. In addition, you can seamlessly scale your contact center operations on the AWS Cloud as your user base grows.

We’re excited to announce Amazon Lex V2 bot support on the Genesys Cloud platform. With this launch, you can build an Amazon Lex bot and set up your contact center in minutes.

About Amazon Lex V2 APIs and Genesys Cloud

Amazon Lex launched V2 APIs and a new console interface that makes it easier to build, deploy, and manage conversational experiences. The Lex V2 console and API enhancements provide support for multiple languages in a single bot, enables simplified versioning, and provides builder productivity tools. These features provide you more control over the bot building and deployment processes.

Genesys Cloud (an omni-channel orchestration and customer relationship platform) provides a contact center platform in a public cloud model that enables quick and simple integration of AWS Contact Center Intelligence (AWS CCI) to transform the modern contact center from a cost center into a profit center. As part of AWS CCI, Genesys cloud integrates with Amazon Lex, Amazon Polly (text to speech) and Amazon Kendra (intelligent search) to offer self-service conversational AI capabilities.

Key features

Genesys Cloud uses the continuous streaming capability with Amazon Lex V2 APIs to enable advanced IVR conversations. With this integration, you can now enable the following:

  • Interruptions (“barge-in”) – Callers can now interrupt the bot and answer a question before the prompt is completed
  • Wait and Continue – Callers can instruct the bot to wait if they need time for retrieving additional information during the call (such as a credit card number or booking ID)
  • DTMF support – Callers can provide information via speech or DTMF interchangeably
  • SSML support – You can configure prompts within the Amazon Lex bot using SSML tags, enabling greater control over speech generation from text
  • Configurable timeouts – You can configure how long to wait for the customer to finish speaking before Amazon Lex collects speech input from callers, such as answering a yes/no question, or providing a date or credit card number

Creating the bot

Let’s create a banking bot as an example and integrate with Genesys Cloud for IVR-based interactions. For a step-by-step process to build an Amazon Lex bot, refer to banker bot workshop. You can also download the bot and import it using the Amazon Lex V2 console.

In addition to the intents presented in the workshop, we add a SpeakToAgent intent to enable handing over the conversation to a human agent based on user requests.

Enabling the integrations

The Amazon Lex V2 integration is available for installation via Genesys AppFoundry. You need an active subscription for premium applications to access the Integration page from the Genesys Cloud Admin dashboard. Genesys also offers a free trial for validation purposes.

1. Configure the IAM role

As invocations for Amazon Lex take place in your AWS environment, you configure an AWS Identity and Access Management (IAM) role with proper permission for Genesys Cloud to assume the role and use resources.

  1. Create an IAM role and select trusted entity to be Another AWS account.
  2. Enter the Genesys Cloud production ID 765628985471 in the Account ID field.
  3. As part of the AWS best practices, you should select Require external ID and enter your organization’s ID to prevent the confused deputy problem and enhance integration security.

By default, IAM roles don’t have permission to create or modify AWS resources. For Genesys Cloud to successfully access Amazon Lex bots, a few permissions are required.

  1. Choose Create Policy and enter the following JSON blob into the policy editor.
{
     "Version": "2012-10-17",
     "Statement": [
          {
               "Sid": "GenesysLexPolicy",
               "Effect": "Allow",
               "Action": [
                    "lex:List*",
                    "lex:Describe*",
                    "lex:StartConversation",
             		"lex:Recognize*",
 					"lex:DeleteSession",
             		"lex:GetSession",
  			  		"lex:PutSession"
              		  ],
                "Resource": "*"
          }
     ]
}
  1. Attach the policy to the role created previously.
  2. Copy the role ARN and configure it within Genesys Cloud.
  3. Save and set the integration status to activate the bot.

2. Configure Amazon Polly

To use Amazon Lex for a voice bot, you set up the text to speech (TTS) capability. Genesys Cloud supports several TTS engines, including Amazon Polly. You can install and configure the Amazon Polly integration following the Genesys documentation. You can then select Amazon Polly as the engine and configure the voice you prefer. To keep the IVR voice consistent in the call flow, the Amazon Polly voice selected in Genesys Cloud should be the same voice configured in your Lex bot. For additional details, see a list of available voices and the associated characteristics.

3. Configure the Genesys Cloud Architect flow

Create an Inbound Call Flow in Architect to orchestrate your bot interaction. You add a Reusable Tasks and use Call Lex V2 bot action to bring in the Amazon Lex bot and design various actions in the call flow.

The integration also allows Genesys Cloud to capture the preconfigured slots as Architect variables. These variables can be used outside of the bot for use-cases such as application of business rules. For example, if a customer provides an account ID that matches with the VIP customer segment, the call can be routed to the priority support queue when transferring to an agent.

4. Configure graceful escalation

When the automated solution can’t fulfill a customer’s request, the interaction should be escalated gracefully. This fallback process allows a human agent to take over the interaction for more complex tasks.

You can save key information from the prior exchange (such as intents, slots, and conversation transcripts) into a script to provide historical context to the agent so that conversations can be picked up seamlessly. This prevents customers from wasting valuable time to repeat the information provided previously.

In the following example, the call is transferred to an available Tier 1 support agent when a customer asks for more help or to be connected to an agent. You can also collect additional context from the customer and hand off to either another bot or human based on specialty.

5. Test the integrations

You can use the native soft phone in Genesys Cloud to make calls as you would with a desktop phone and validate the integration. Enter the bot’s name in the Enter Names and Numbers field and choose Call to follow the prompts.

Summary

Enterprises increasingly invest in automated solutions such as IVR and chatbots as a part of their customer service strategy in contact centers. Automation provides highly available support that handles common tasks without the presence of a live agent, while reducing operational cost.

With the adoption of the Amazon Lex V2 APIs, Genesys Cloud provides an overall improved user experience using the continuous streaming architecture, and enables a more natural customer-bot interaction.

This post outlines the key steps to enable the Amazon Lex V2 integration in your Genesys Cloud environment, and should give you a jump start to create and customize your own chatbot initiative. Check out the following resources for additional information:


About the Author

Anubhav Mishra is a Product Manager with AWS. He spends his time understanding customers and designing product experiences to address their business challenges.

 

 

 

Jessica Ho is a Solutions Architect at Amazon Web Services supporting ISV partners who build business applications on AWS. She is passionate about creating differentiated solutions that unlock customers for cloud adoption. Outside of work, she enjoys spoiling her garden into a mini jungle.

Read More

Simplify and automate anomaly detection in streaming data with Amazon Lookout for Metrics

Do you want to monitor your business metrics and detect anomalies in your existing streaming data pipelines? Amazon Lookout for Metrics is a service that uses machine learning (ML) to detect anomalies in your time series data. The service goes beyond simple anomaly detection. It allows developers to set up autonomous monitoring for important metrics to detect anomalies and identify their root cause in a matter of few clicks, using the same technology used by Amazon internally to detect anomalies in its metrics—all with no ML experience required. However, one limitation you may face if you have an existing Amazon Kinesis Data Streams data pipeline is not being able to directly run anomaly detection on your data streams using Lookout for Metrics. As of this writing, Lookout for Metrics doesn’t have native integration with Kinesis Data Streams to ingest streaming data and run anomaly detection on it.

In this post, we show you how to solve this problem by using an AWS Glue Spark streaming extract, transform, and load (ETL) script to ingest and organize streaming data in Amazon Simple Storage Service (Amazon S3) and using a Lookout for Metrics live detector to detect anomalies. If you have an existing Kinesis Data Streams pipeline that ingests ecommerce data, for example, you can use the solution to detect anomalies such as unexpected dips in revenue, high rates of abandoned shopping carts, increases in new user signups, and many more.

Included in this post is a sample streaming data generator to help you get started quickly. The included GitHub repo provides step-by-step deployment instructions, and uses the AWS Cloud Development Kit (AWS CDK) to simplify and automate the deployment.

Lookout for Metrics allows users to set up anomaly detectors in both continuous and backtest modes. Backtesting allows you to detect anomalies on historical data. This feature is helpful when you want to try out the service on past data or validate against known anomalies that occurred in the past. For this post, we use continuous mode, where you can detect anomalies on live data as they occur. In continuous mode, the detector monitors an input S3 bucket for continuous data and runs anomaly detection on new data at specified time intervals. For the live detector to consume continuous time series data from Amazon S3 correctly, it needs to know where to look for data for the current time interval, therefore, it requires continuous input data in S3 buckets organized by time interval.

Overview of solution

The solution architecture consists of the following components:

  • Streaming data generator – To help you get started quickly, we provide Python code that generates sample time series data and writes to a Kinesis data stream at a specified time interval. The provided code generates sample data for an ecommerce schema (platform, marketplace, event_time, views, revenue). You can also use your own data stream and data, but you must update the downstream processes in the architecture to process your schema.
  • Kinesis Data Streams to Lookout for Metrics – The AWS Glue Spark streaming ETL code is the core component of the solution. It contains logic to do the following:
    • Ingest streaming data
    • Micro-batch data by time interval
    • Filter dimensions and metrics columns
    • Deliver filtered data to Amazon S3 grouped by timestamp
  • Lookout for Metrics continuous detector – The AWS Glue streaming ETL code writes time series data as CSV files to the S3 bucket, with objects organized by time interval. The Lookout for Metrics continuous detector monitors the S3 bucket for live data and runs anomaly detection at the specified time interval (for example, every 5 minutes). You can view the detected anomalies on the Lookout for Metrics dashboard.

The following diagram illustrates the solution architecture.

AWS Glue Spark streaming ETL script

The main component of the solution is the AWS Glue serverless streaming ETL script. The script contains the logic to ingest the streaming data and write the output, grouped by time interval, to an S3 bucket. This makes it possible for Lookout for Metrics to use streaming data from Kinesis Data Streams to detect anomalies. In this section, we walk through the Spark streaming ETL script used by AWS Glue.

The AWS Glue Spark streaming ETL script performs the following steps:

  1. Read from the AWS Glue table that uses Kinesis Data Streams as the data source.

The following screenshot shows the AWS Glue table created for the ecommerce data schema.

  1. Ingest the streaming data from the AWS Glue table (table_name parameter) batched by time window (stream_batch_time parameter) and create a data frame for each micro-batch using create_data_frame.from_catalog(), as shown in the following code:
data_frame_datasource0 = glueContext.create_data_frame.from_catalog(stream_batch_time = BATCH_WIN_SIZE, 
                            database = glue_dbname, table_name = glue_tablename, transformation_ctx = "datasource0", 
                            additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false"})
  1. Perform the following processing steps for each batch of data (data frame) ingested:
    1. Select only the required columns and convert the data frame to the AWS Glue native DynamicFrame.
datasource0 = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields(['marketplace','event_time', 'views'])

As shown in the preceding example code, select only the columns marketplace, event_time, and views to write to output CSV files in Amazon S3. Lookout for Metrics uses these columns for running anomaly detection. In this example, marketplace is the optional dimension column used for grouping anomalies, views is the metric to be monitored for anomalies, and event_time is the timestamp for time series data.

    1. Populate the time interval in each streaming record ingested:
datasource1 = Map.apply(frame=datasource0, f=populateTimeInterval)

In the preceding code, we provide the custom function populateTimeInterval, which determines the appropriate time interval for the given data point based on its event_time timestamp column.

The following table includes example time intervals determined by the function for a 5-minute frequency.

Input timestamp Start of time interval determined by populateTimeInterval function
2021-05-24 19:18:28 2021-05-24 19:15
2021-05-24 19:21:15 2021-05-24 19:20

The following table includes example time intervals determined by the function for a 10-minute frequency.

Input timestamp Start of time interval determined by populateTimeInterval function
2021-05-24 19:18:28 2021-05-24 19:10
2021-05-24 19:21:15 2021-05-24 19:20
    1. The write_dynamic_frame() function uses the time interval (as determined in the previous step) as the partition key to write output CSV files to the appropriate S3 prefix structure:
datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource1, connection_type = "s3", 
                        connection_options = {"path": path_datasink1, "partitionKeys": ["intvl_date", "intvl_hhmm"]}, 
                        format_options={"quoteChar": -1, "timestamp.formats": "yyyy-MM-dd HH:mm:ss"}, 
                        format = src_format, transformation_ctx = "datasink1")

For example, the following screenshot shows that the ETL script writes output to the S3 folder structure organized by 5-minute time intervals.

For additional details on partitions for ETL outputs, see Managing Partitions for ETL Output in AWS Glue.

You can set up a live detector using Amazon S3 as a continuous data source to start detecting the anomalies in streaming data. For detailed instructions, see GitHub repo.

Prerequisites

You need the following to deploy the solution:

  • An AWS account with permissions to deploy the solution using AWS CDK
  • A workstation or development environment with the following installed and configured:
    • npm
    • Typescript
    • AWS CDK
    • AWS account credentials

You can find detailed instructions in the “Getting Started” section of the GitHub repo.

Deploy the solution

Follow the step-by-step instructions in the GitHub repo to deploy the solution components. AWS CDK templates are provided for each of the solution components, organized in their own directory structure within the GitHub repo. The templates deploy the following resources:

  • Data generator – The Lambda function, Amazon EventBridge rule, and Kinesis data stream
  • Connector for Lookout for Metrics – The AWS Glue Spark streaming ETL job and S3 bucket
  • Lookout for Metrics continuous detector – Our continuous detector

Clean up

To avoid incurring future charges, delete the resources by deleting the stacks deployed by the AWS CDK.

Conclusion

In this post, we showed how you can detect anomalies in streaming data sources using a Lookout for Metrics continuous detector. The solution used serverless streaming ETL with AWS Glue to prepare the data for Lookout for Metrics anomaly detection. The reference implementation used an ecommerce sample data schema (platform, marketplace, event_time, views, revenue) to demonstrate and test the solution.

You can easily extend the provided data generator code and ETL script to process your own schema and sample data. Additionally, you can adjust the solution parameters such as anomaly detection frequency to match your use case. With minor changes, you can replace the sample data generator with an existing Kinesis Data Streams streaming data source.

To learn more about Amazon Lookout for Metrics, see Introducing Amazon Lookout for Metrics: An anomaly detection service to proactively monitor the health of your business and the Lookout for Metrics Developer Guide. For additional information about streaming ETL jobs with AWS Glue, see Crafting serverless streaming ETL jobs with AWS Glue and Adding Streaming ETL Jobs in AWS Glue.


About the Author

Babu Srinivasan is a Sr. Solutions Architect at AWS, with over 24 years of experience in IT and the last 6 years focused on the AWS Cloud. He is passionate about AI/ML. Outside of work, he enjoys woodworking and entertains friends and family (sometimes strangers) with sleight of hand card magic.

Read More