Detect anomalies in operational metrics using Dynatrace and Amazon Lookout for Metrics

Organizations of all sizes and across all industries gather and analyze metrics or key performance indicators (KPIs) to help their businesses run effectively and efficiently. Operational metrics are used to evaluate performance, compare results, and track relevant data to improve business outcomes. For example, you can use operational metrics to determine application performance (the average time it takes to render a page for an end user) or application availability (the duration of time the application was operational). One challenge that most organizations face today is detecting anomalies in operational metrics, which are key in ensuring continuity of IT system operations.

Traditional rule-based methods are manual and look for data that falls outside of numerical ranges that have been arbitrarily defined. An example of this is an alert when transactions per hour fall below a certain number. This results in false alarms if the range is too narrow, or missed anomalies if the range is too broad. These ranges are also static. They don’t change based on evolving conditions like the time of the day, day of the week, seasons, or business cycles. When anomalies are detected, developers, analysts, and business owners can spend weeks trying to identify the root cause of the change before they can take action.

Amazon Lookout for Metrics uses machine learning (ML) to automatically detect and diagnose anomalies without any prior ML experience. In a couple of clicks, you can connect Lookout for Metrics to popular data stores like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon Relational Database Service (Amazon RDS), as well as third-party software as a service (SaaS) applications (such as Salesforce, Dynatrace, Marketo, Zendesk, and ServiceNow) via Amazon AppFlow and start monitoring metrics that are important to your business.

This post demonstrates how you can connect to your IT operational infrastructure monitored by Dynatrace using Amazon AppFlow and set up an accurate anomaly detector across metrics and dimensions using Lookout for Metrics. The solution allows you to set up a continuous anomaly detector and optionally set up alerts to receive notifications when anomalies occur.

Lookout for Metrics integrates seamlessly with Dynatrace to detect anomalies within your operational metrics. Once connected, Lookout for Metrics uses ML to start monitoring data and metrics for anomalies and deviations from the norm. Dynatrace enables monitoring of your entire infrastructure, including your hosts, processes, and network. You can perform log monitoring and view information such as the total traffic of your network, the CPU usage of your hosts, the response time of your processes, and more.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon S3 and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

Solution overview

In this post, we demonstrate how to integrate with an environment monitored by Dynatrace and detect anomalies in the operation metrics. We also determine how application availability and performance (resource contention) were impacted.

The source data is a cluster of Amazon Elastic Compute Cloud (Amazon EC2) instances that is monitored by Dynatrace. Each EC2 instance is installed with Dynatrace OneAgent to collect all monitored telemetry data (CPU utilization, memory, network utilization, and disk I/O). Amazon AppFlow enables you to securely integrate SaaS applications like Dynatrace and automate data flows, while providing options to configure and connect to such services natively from the AWS Management Console or via API. In this post, we focus on connecting to Dynatrace as our source and Lookout for Metrics as the target, both of which are natively supported applications in Amazon AppFlow.

The solution enables you to create an Amazon AppFlow data flow from Dynatrace to Lookout for Metrics. You can then use Lookout for Metrics to detect any anomalies in the telemetry data, as shown in the following diagram. Optionally, you can send automated anomaly alerts to AWS Lambda functions, webhooks, or Amazon Simple Notification Service (Amazon SNS) topics.

The following are the high-level steps to implement the solution:

  1. Set up Amazon AppFlow integration with Dynatrace.
  2. Create an anomaly detector with Lookout for Metrics.
  3. Add a dataset to the detector and integrate Dynatrace metrics.
  4. Activate the detector.
  5. Create an alert.
  6. Review the detector and data flow status.
  7. Review and analyze any anomalies.

Set up Amazon AppFlow integration with Dynatrace

To set up the data flow, complete the following steps:

  1. On the Amazon AppFlow console, choose Create flow.
  2. For Flow name, enter a name.
  3. For Flow description, enter an optional description.
  4. In the Data encryption section, you can choose or create an AWS Key Management Service (AWS KMS) key.
  5. Choose Next.
  6. For Source name, choose Dynatrace.
  7. For Choose Dynatrace Connection, choose the connection you created.
  8. For Choose Dynatrace object, choose Problems (this is the only object supported as of this writing).

For more information about Dynatrace problems, see Problem overview page.

  1. For Destination name, choose Amazon Lookout for Metrics.
  2. For API token, generate an API token from the Dynatrace console.
  3. For Subdomain, enter your Dynatrace portal URL address.
  4. For Data encryption, choose the AWS KMS key.
  5. For Connection Name, enter a name.
  6. Choose Connect.
  7. For Flow trigger, select Run flow on schedule.
  8. For Repeats, choose Minutes (alternatively, you can choose hourly or daily).
  9. Set the trigger to repeat every 5 minutes.
  10. Enter a starting time.
  11. Enter a start date.

Dynatrace requires a between date range filter to be set.

  1. For Field name, choose Date range.
  2. For Condition, choose is between.
  3. For Criteria 1, choose your start date.
  4. For Criteria 2, choose your end date.
  5. Review your settings and choose Create flow.

Create an anomaly detector with Lookout for Metrics

To create your anomaly detector, complete the following steps:

  1. On the Lookout for Metrics console, choose Create detector.
  2. For Detector name, enter a name.
  3. For Description, enter an optional description.
  4. For Interval, choose the time between each analysis. This should match the interval set on the flow.
  5. For Encryption, create or choose an existing AWS KMS key.
  6. Choose Create.

Add a dataset to the detector and integrate Dynatrace metrics

The next step in activating your anomaly detector is to add a dataset and integrate the Dynatrace metrics.

  1. On the detector details, choose Add a dataset.
  2. For Name, enter the data source name.
  3. For Description, enter an optional description.
  4. For Timezone, choose the time zone relevant to your dataset. This should match the time zone used in Amazon AppFlow (which picks up from the browser).
  5. For Datasource, choose Dynatrace.
  6. For Amazon AppFlow flow, choose the flow that you created.
  7. For Permissions, choose a service role.
  8. Choose Next.
  9. For Map fields, the detector tracks 5 measures; in this example I choose impactLevel and hasRootCause.

The map fields are the primary fields that the detector monitors. The fields that are relevant to monitor from an operational KPI should be considered.

  1. For Dimensions, the detector creates segments in measure values. For this post, I choose severityLevel.
  2. Review the settings and choose Save dataset.

Activate the detector

You’re now ready to activate the newly created detector.

Create an alert

You can create an alert to send automated anomaly alerts to Lambda functions; webhooks; cloud applications like Slack, PagerDuty, and DataDog; or to SNS topics with subscribers that use SMS, email, or push notifications.

  1. On the detector details, choose Add alerts.
  2. For Alert Name, enter the name.
  3. For Sensitivity threshold, enter a threshold at which the detector sends anomaly alerts.
  4. For Channel, choose either Amazon SNS or Lambda as the notification method. For this post, I use Amazon SNS.
  5. For SNS topic, create or choose an existing SNS topic.
  6. For Service role, choose an execution role.
  7. Choose Add alert.

Review the detector and flow status

On the Run history tab, you can confirm that the flows are running successfully for the interval chosen.

On the Detector log tab, you can confirm that the detector records the results after each interval.

Review and analyze any anomalies

On the main detector page, choose View anomalies to review and analyze any anomalies.

On the Anomalies page, you can adjust the severity score on the threshold dial to filter anomalies above a given score.

The following analysis represents the severity level and impacted metrics. The graph suggests anomalies detected by the detector with the availability and resource contention being impacted. The anomaly was detected on June 28 at 14:30 PDT and has a severity score of 98, indicating a high severity anomaly that needs immediate attention.

Lookout for Metrics also allows you to provide real-time feedback on the relevance of the detected anomalies, which enables a powerful human-in-the-loop mechanism. This information is fed back to the anomaly detection model to improve its accuracy continuously, in near-real time.

Conclusion

Anomaly detection can be very useful in identifying anomalies that could signal potential issues within your operational environment. Timely detection of anomalies can aid in troubleshooting, help avoid loss in revenue, and help maintain your company’s reputation. Lookout for Metrics automatically inspects and prepares the data, selects the best-suited ML algorithm, begins detecting anomalies, groups related anomalies together, and summarizes potential root causes.

To get started with this capability, see Amazon Lookout for Metrics. You can use this capability in all Regions where Lookout for Metrics is publicly available. For more information about Region availability, see AWS Regional Services.


About the Author

Sumeeth Siriyur is a Solutions Architect based out of AWS, Sydney. He is passionate about infrastructure services and uses AI services to influence IT infrastructure observability and management. In his spare time, he likes binge-watching and works to continually improve his outdoor sports.

Read More

Accenture promotes machine learning growth with world’s largest private AWS DeepComposer Battle of the Bands League

Accenture is known for pioneering innovative solutions to achieve customer success by using artificial intelligence (AI) and machine learning (ML) powered solutions with AWS services. To keep teams updated with latest ML services, Accenture seeks to gamify hands-on learning. One such event, AWS DeepComposer Battle of the Bands, hosted by Accenture, is the world’s first and largest global league.

Accenture’s league spanned 16 global regions and 55 countries, with each location competing for global superstardom and a real-life gold record! With around 500 bands in the competition, Accenture employees from different skills and domain knowledge proved themselves as aspiring ML musicians, generating a playlist of 150 original songs using AWS DeepComposer. There was no shortage of fans either, with thousands of votes being cast by supportive colleagues and teammates.

Why an AWS DeepComposer Battle of Bands and why now?

According to a recent Gartner report, “Despite the global impact of COVID-19, 47% of AI investments were unchanged since the start of the pandemic and 30% of organizations actually planned to increase such investments”. Additionally, there have been few opportunities in this pandemic to share a fun and enjoyable experience with our teammates, let alone colleagues around the globe.

Accenture and their Amazon Business Group are always looking for unique and exciting ways to help employees up-skill in the latest and greatest tech. Being inspired by their massively successful annual AWS DeepRacer Grand Prix, Accenture switched out the racetrack for the big stage and created their own Battle of the Bands using AWS DeepComposer.

This Battle of the Bands brought together fans and bands from around the globe, generating thousands of views, shares, votes, and opportunities to connect, laugh, and smile together.

Education was Accenture’s number one priority when crafting the competition. The goal was to expose those unfamiliar with AWS or ML to a fun and approachable experience that would increase their confidence with this technology and start them down a path of greater learning. According to registration metrics, around half of all participants were working with AWS and ML hands-on for the first time. Participants have shared that this competition inspired them to learn more about both AWS and ML. Some feedback received included:

“I enjoyed doing something creative and tackling music, which I had no experience with previously.”

“It was fun trying to make a song with the tool and to learn about other ML techniques.”

“I was able to feel like a musician even though I don’t know much about music composition.”

A hall of fame alliance

Accenture and AWS have always demonstrated a great alliance. In 2019, Accenture hosted one of the world’s largest private AWS DeepRacer Leagues. In 2021, multiple individuals and groups participated in the AWS DeepComposer Battle of the Bands League. These bands were able to create a video to go along with their song submission, allowing for more creative freedom and a chance to stand out from the crowd. Some bands made artistic music videos, others saw an opportunity to make something funny and share laughs around the world. Going above and beyond, one contestant turned their AWS DeepComposer competition into a sing-along training video for Accenture’s core values, while another dedicated their video to honoring “sheroes” and famous women in tech.

The dedication of Accenture’s bands to the spirit of the competition really showed in the array of pun-filled band names such as “Doggo-as-a-service,” “The Oracles,” “Anna and the AlgoRhythms,” and “#000000 Sabbath.”

AWS offers a portfolio of educational devices—AWS DeepLens, AWS DeepRacer, and AWS DeepComposer—designed for developers of all skill levels to learn the fundamentals of ML in fun and practical ways. The hands-on nature of AWS AI devices makes them great tools to engage and educate employees.

Accelerating innovation with the Accenture AWS Business Group

By working with the Accenture AWS Business Group (AABG), you can learn from the resources, technical expertise, and industry knowledge of two leading innovators, helping you accelerate the pace of innovation to deliver disruptive products and services. The AABG helps you ideate and innovate cloud solutions through rapid prototype development.

Connect with our team at accentureaws@amazon.com to learn how to use and accelerate ML in your products and services.

You can also organize your own event. To learn more about AWS DeepComposer events, see AWS DeepRacer Community Blog and also check out blog on How to run an AI powered musical challenge: “AWS DeepComposer Got Talent” to learn more about how to host your first event with AWS DeepComposer.

About Accenture

Accenture is a global professional services company with leading capabilities in digital, cloud and security. Combining unmatched experience and specialized skills across more than 40 industries, we offer Strategy and Consulting, Interactive, Technology and Operations services — all powered by the world’s largest network of Advanced Technology and Intelligent Operations centers. Our 569,000 people deliver on the promise of technology and human ingenuity every day, serving clients in more than 120 countries. We embrace the power of change to create value and shared success for our clients, people, shareholders, partners and communities. Visit us at www.accenture.com.

Copyright © 2021 Accenture. All rights reserved. Accenture and its logo are trademarks of Accenture.

This document is produced by consultants at Accenture as general guidance. It is not intended to provide specific advice on your circumstances. If you require advice or further details on any matters referred to, please contact your Accenture representative.

This document makes descriptive reference to trademarks that may be owned by others. The use of such trademarks herein is not an assertion of ownership of such trademarks by Accenture and is not intended to represent or imply the existence of an association between Accenture and the lawful owners of such trademarks. No sponsorship, endorsement, or approval of this content by the owners of such trademarks is intended, expressed, or implied.

Accenture provides the information on an “as-is” basis without representation or warranty and accepts no liability for any action or failure to act taken in response to the information contained or referenced in this publication.


About the Authors

Marc DeMory is a senior emerging tech consultant with Accenture’s Chicago Liquid Studio, focusing on rapid-prototyping and cloud-native development in the fields of Machine Learning, Computer Vision, Automation, and Extended Reality.

 

 

 

Sameer Goel is a Sr. Solutions Architect in Netherlands, who drives customer success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a master’s degree from NEU Boston, with a concentration in data science. He enjoys building and experimenting with AI/ML projects on Raspberry Pi.

 

 

Maryam rezapoor is a Senior Product Manager with AWS DeepLabs team based in Santa Clara, CA. She works on developing products to put Machine Learning in the hands of everyone. She loves hiking through the US national parks and is currently training for 1-day Grand Canyon Rim to Rim hike. She is a fan of Metallica and Evanescence. The drummer, Lars Ulrich, has inspired her to pick up those sticks and play drum while singing “nothing else matters.”

Read More

Scale your Amazon Kendra index

Amazon Kendra is a fully managed, intelligent search service powered by machine learning. Amazon Kendra reimagines enterprise search for your websites and applications so your employees and customers can easily find the content they’re looking for. Using keyword or natural language queries, employees and customers can find the right content even when it’s scattered across multiple locations and content repositories within your organization.

Although Amazon Kendra is designed for large-scale search applications with millions of documents and thousands of queries per second, you can run smaller experiments to evaluate Amazon Kendra. You can run a proof of concept, or simply have a smaller workload and still use features that Amazon Kendra Enterprise Edition has to offer. On July 1, 2021, Amazon Kendra introduced new, smaller capacity units for smaller workloads. In addition, to promote experimentation, the price for Amazon Kendra Developer Edition was reduced by 55%.

Amazon Kendra Enterprise Edition capacity units

The base capacity for Amazon Kendra supports up to 100,000 documents and 8,000 searches per day, with adaptive bursting capability to better handle unpredictable query spikes. You can increase the query and the document capacity of your Amazon Kendra index through storage capacity units and query capacity units, and these can be updated independently from each other.

Storage capacity units offer scaling in increments of 100,000 documents (up to 30 GB storage), each. For example, if you need to index 1 million documents, you need nine storage capacity units (100,000 documents with base Amazon Kendra Enterprise Edition, and, 900,00 additional documents from the storage capacity units).

Query capacity units (QCUs) offer scaling increments of 8,000 searches for day, with built-in adaptive bursting. For example, if you need 16K queries per day (average QPS of 0.2) you can provision two units.

For more information about the maximum number of storage capacity units and query capacity units available for a single index, see Quotas for Amazon Kendra.

About capacity bursting

Amazon Kendra has a provisioned base capacity of one query capacity unit. You can use up to 8,000 queries per day with a minimum throughput of 0.1 queries per second (per query capacity unit).

An adaptive approach to handling unexpected traffic beyond the provisioned throughput is to use the built-in adaptive query bursting feature in Amazon Kendra. This allows you to apply unused query capacity to handle unexpected traffic. Amazon Kendra accumulates your unused queries at your provisioned queries per second rate, every second, up to the maximum number of queries you’ve provisioned for your Amazon Kendra index. These accumulated queries are automatically used to help handle unexpected traffic spikes above the currently allocated QPS capacity.

Optimal performance of adaptive query bursting can vary, depending on several factors such as your total index size, query complexity, accumulated unused queries, and overall load on your index. We recommend performing your own load tests to accurately measure bursting capacity.

Best practices

When dimensioning your Amazon Kendra index, you need to consider how many documents you’re indexing, how many queries you expect per day, how many queries per second you need to accommodate, and if you have usage patterns that require additional capacity due to sustained usage. You could also experience short peak times where you can accommodate brief periods of time for additional QPS requirements.

It’s therefore good practice to observe your query usage patterns for a few weeks, especially when the patterns are not easily predictable. This will allow you to define an optimal balance between using the built-in adaptive bursting capability for short, unsustained QPS peaks, and adding/removing capacity units to better handle longer, more sustained peaks and lows.

For information about visualizing and building a rough estimate of your usage patterns in Amazon Kendra, see Automatically scale Amazon Kendra query capacity units with Amazon EventBridge and AWS Lambda.

Amazon Kendra Enterprise Edition allows you to add document storage capacity in units of 100,000 documents with maximum storage of 30 GB. You can add and remove storage capacity at any time, but you can’t remove storage capacity beyond your used capacity (number of documents ingested or storage space used). We recommend estimating how often documents are added to your data sources in order to determine when to increase storage capacity in your Amazon Kendra through storage capacity units. You can monitor the document count with Amazon CloudWatch or on the Amazon Kendra console.

Queries per second represent the number of concurrent queries your Amazon Kendra index receives at a given time. If you’re replacing a search solution with Amazon Kendra, you should be able to retrieve this information from query logs. If you exceed your provisioned and bursting capacity, your request may receive a 400 HTTP status code (client error) with the message ThrottlingException. For example, using the AWS SDK for Python (Boto3), you may receive an exception like the following:

ThrottlingException: An error occurred (ThrottlingException) when calling the Query operation (reached max retries: 4)

For cases like this, Boto3 includes the retries feature, which retries the query call (in this case to Amazon Kendra) after obtaining an exception. If you aren’t using an AWS SDK, you may need to implement an error handling mechanism that, for example, could use exponential backoff to handle this error.

You can monitor your Amazon Kendra index queries with CloudWatch metrics. For example, you could follow the metric IndexQueryCount, which represents the number of index queries per minute. If you want to use the IndexQueryCount metric, you should divide that number by 60 to obtain the average queries per second. Additionally, you can get a report of the queries per second on the Amazon Kendra console, as shown in the following screenshot.

The preceding graph shows three patterns:

  • Peaks of ˜2.5 QPS during business hours, between 8 AM and 8 PM.
  • Sustained QPS usage over ˜0.5 QPS and below 1 QPS between 8 PM and 8 AM.
  • Less than 0.3 QPS usage on the weekend (Feb 7, 2021, was a Sunday and Feb 13,2021 was a Saturday)

Taking into account these capacity requirements, you could start defining your Amazon Kendra index additional capacity units as follows:

  • For the high usage times (between 8 AM and 8 PM Monday through Friday), your Amazon Kendra index adds 24 VQUs (each query capacity unit provides capacity for at least 0.1 QPS) which when added to the initial Amazon Kendra Enterprise Edition query capacity (0.1 QPS), can support 2.5 queries per second
  • For the second usage pattern (Monday through Friday from 8 PM until 8 AM), you add four VQUs, which when combined with your initial Amazon Kendra Enterprise Edition (0.1 QPS), provides capacity for 0.5 QPS.
  • For the weekends, you add two VQUs, provisioning capacity for 0.3 QPS.

The following table summarizes this configuration.

Period Additional VQUS Capacity (Without Bursting)
Mon – Fri 8 AM – 8 PM 24 2.5 QPS
Mon – Fri 8 PM – 8 AM 4 0.5 QPS
Sat – Sun 2 0.3 QPS

You can use this initial approach to define a baseline that needs to be reevaluated to ensure the right sizing of your Amazon Kendra resources.

It’s also important to keep in mind that query autocomplete capacity is defined by your query capacity. Query autocomplete capacity is calculated as five times the provisioned query capacity for an index with a base capacity of 2.5 calls per second. This means that if your Amazon Kendra index query capacity is below 0.6 QPS, you have 2.5 QPS for query autocomplete. If your Amazon Kendra index query capacity is above 0.6 QPS, your query autocomplete capacity is calculated as 2.5 times your current index query capacity.

Conclusion

In this blog post you learned how to estimate capacity and scale for your Amazon Kendra index.

Now it’s easier than ever to experience Amazon Kendra, with 750 hours of Free Tier and the new reduced price for Amazon Kendra Developer Edition. Get started, visit our workshop, or check out the AWS Machine Learning Blog.


About the Author

Dr. Andrew Kane is an AWS Principal Specialist Solutions Architect based out of London. He focuses on the AWS Language and Vision AI services, helping our customers architect multiple AI services into a single use-case driven solution. Before joining AWS at the beginning of 2015, Andrew spent two decades working in the fields of signal processing, financial payments systems, weapons tracking, and editorial and publishing systems. He is a keen karate enthusiast (just one belt away from Black Belt) and is also an avid home-brewer, using automated brewing hardware and other IoT sensors.

 

Tapodipta Ghosh is a Senior Architect. He leads the Content And Knowledge Engineering Machine Learning team that focuses on building models related to AWS Technical Content. He also helps our customers with AI/ML strategy and implementation using our AI Language services like Amazon Kendra.

 

 

Jean-Pierre Dodel leads product management for Amazon Kendra, a new ML-powered enterprise search service from AWS. He brings 15 years of Enterprise Search and ML solutions experience to the team, having worked at Autonomy, HP, and search startups for many years prior to joining Amazon four years ago. JP has led the Amazon Kendra team from its inception, defining vision, roadmaps, and delivering transformative semantic search capabilities to customers like Dow Jones, Liberty Mutual, 3M, and PwC.

 

Juan Bustos is an AI Services Specialist Solutions Architect at Amazon Web Services, based in Dallas, TX. Outside of work, he loves spending time writing and playing music as well as trying random restaurants with his family.

Read More

Reimagine knowledge discovery using Amazon Kendra’s Web Crawler

When you deploy intelligent search in your organization, two important factors to consider are access to the latest and most comprehensive information, and a contextual discovery mechanism. Many companies are still struggling to make their internal documents searchable in a way that allows employees to get relevant information knowledge in a scalable, cost-effective manner. A 2018 International Data Corporation (IDC) study found that data professionals are losing 50% of their time every week—30% searching for, governing, and preparing data, plus 20% duplicating work. Amazon Kendra is purpose-built for addressing these challenges. Amazon Kendra is an intelligent search service that uses deep learning and reading comprehension to deliver more accurate search results.

The intelligent search capabilities of Amazon Kendra improve the search and discovery experience, but enterprises are still faced with the challenge of connecting troves of unstructured data and making that data accessible to search. Content is often unstructured and scattered across intranets and Wikis, making critical information hard to find and costing employees time and effort to track down the right answer.

Enterprises spend a lot of time and effort building complex extract, transform, and load (ETL) jobs that aggregate data sources. Amazon Kendra connectors allow you to quickly aggregate content as part of a single unified searchable index, without needing to copy or move data from an existing location to a new one. This reduces the time and effort typically associated with creating a new search solution.

With the recently launched Amazon Kendra web crawler, it’s now easier than ever to discover information stored within the vast amount of content spread across different websites and internal web portals. You can use the Amazon Kendra web crawler to quickly ingest and search content from your websites.

Sample use case

A common need is to reduce the complexity of searching across multiple data sources present in an organization. Most organizations have multiple departments, each having their own knowledge management and search systems. For example, the HR department may maintain a WordPress-based blog containing news and employee benefits-related articles, a Confluence site could contain internal knowledge bases maintained by engineering, sales may have sales plays stored on a custom content management system (CMS), and corporate office information could be stored in a Microsoft SharePoint Online site.

You can index all these types of webpages for search by using the Amazon web crawler. Specific connectors are also available to index documents directly from individual content data sources.

In this post, you learn how to ingest documents from a WordPress site using its sitemap with the Amazon Kendra web crawler.

Ingest documents with Amazon Kendra web crawler

For this post, we set up a WordPress site with information about AWS AI language services. In order to be able to search the contents of my website, we create a web crawler data source.

  1. On the Amazon Kendra console, choose Data sources in the navigation pane.

  1. Under WebCrawler, choose Add connector.

  1. For Data source name, enter a name for the data source.
  2. Add an optional description.

  1. Choose Next.

The web crawler allows you to define a series of source URLs or source sitemaps. WordPress generates a sitemap, which I use for this post.

  1. For Source, select Source sitemaps.
  2. For Source sitemaps, enter the sitemap URL.

  1. Add a web proxy or authentication if your host requires that.
  2. Create a new AWS Identity and Access Management (IAM) role.
  3. Choose Next.

  1. For this post, I set up the web crawler to crawl one page per second, so I modify the Maximum throttling value to 60.

The maximum value that’s allowed is 300.

For this post, I remove a blog entry that contains 2021/06/28/this-post-is-to-be-skipped/ in the URL, and also all the contents that have the term /feed/ in the URL. Keep in mind that the excluded content won’t be ingested into your Amazon Kendra index, so your users won’t be able to search across these documents.

  1. In the Additional configuration section, add these patterns on the Exclude patterns

  1. For Sync run schedule, choose Run on demand.
  2. Choose Next.

  1. Review the settings and choose Create.
  2. When the data source creation process is complete, choose Sync now.

When the sync job is complete, I can search on my website.

Conclusion

In this post, you saw how to set up the Amazon Kendra web crawler and how easy is to ingest your websites into your Amazon Kendra index. If you’re just getting started with Amazon Kendra, you can build an index, ingest your website, and take advantage of intelligent search to provide better results to your users. To learn more about Amazon Kendra, refer to the Amazon Kendra Essentials workshop and deep dive into the Amazon Kendra blog.


About the Authors

Tapodipta Ghosh is a Senior Architect. He leads the Content And Knowledge Engineering Machine Learning team that focuses on building models related to AWS Technical Content. He also helps our customers with AI/ML strategy and implementation using our AI Language services like Amazon Kendra.

 

 

Vijai Gandikota is a Senior Product Manager at Amazon Web Services for Amazon Kendra.

 

 

 

 

Juan Bustos is an AI Services Specialist Solutions Architect at Amazon Web Services, based in Dallas, TX. Outside of work, he loves spending time writing and playing music as well as trying random restaurants with his family.

Read More

Enghouse EspialTV enables TV accessibility with Amazon Polly

This is a guest post by Mick McCluskey, the VP of Product Management at Enghouse EspialTV. Enghouse provides software solutions that power digital transformation for communications service operators. EspialTV is an Enghouse SaaS solution that transforms the delivery of TV services for these operators across Set Top Boxes (STBs), media players, and mobile devices.

A large audience of consumers use TV services, and several of these groups may have disabilities that make it more difficult for them to access these services. To ensure that TV services are accessible to the broadest possible audience, we need to consider accessibility as a key element of the user experience (UX) for the service. Additionally, because TV is viewed as a key service by governments, it’s often subject to regulatory requirements for accessibility, including talking interfaces for the visually impaired. In the US, the Twenty-First Century Communications and Video Accessibility Act (CVAA) mandates improved accessibility for visual interfaces for users with limited hearing and vision in the US. The CVAA ensures accessibility laws from the 1980s and 1990s are brought up to date with modern technologies, including new digital, broadband, and mobile innovations.

This post describes how Enghouse uses Amazon Polly to significantly improve accessibility for EspialTV through talking interactive menu guides for visually impaired users while meeting regulatory requirements.

Challenges

A key challenge for visually impaired users is navigating TV menus to find the content they want to view. Most TV menus are designed for a 10-foot viewing experience, meaning that a consumer sitting 10 feet from the screen can easily see the menu items. For the visually impaired, these menu items aren’t easy to see and are therefore hard to navigate. To improve our UX for subscribers with limited vision, we sought to develop a mechanism to provide audible descriptions of the menu, allowing easier navigation of key functions such as the following:

  • Channel and program selection
  • Channel and program information
  • Setup configuration, closed-caption control and options, and video description control
  • Configuration information
  • Playback

Overview of the AWS talking menu solution

Hosted on AWS, EspialTV is offered to communications service providers in a software as a service (SaaS) model. It was important for Enghouse to have a solution that not only supported the navigation currently offered at the time of launch, but was highly flexible to support changes and enhancements over time. This way, the voice assistance continuously evolved and improved to accommodate new capabilities as new services and features were added to the menu. For this reason, the solution had to be driven by real-time APIs calls as opposed to hardcoded text-to-speech menu configurations.

To ensure CVAA compliance and accelerate deployment, Enghouse chose to use Amazon Polly to implement this talking menu solution for the following reasons:

  • We wanted a reliable and robust solution within minimal operational and management overhead
  • It permitted faster time to market by using ready-made text-to-speech APIs
  • The real-time API approach offered greater flexibility as we evolved the service over time

The following diagram illustrates the architecture of the talking menu solution.

Using the Amazon Polly text-to-speech API allowed us to build a simple solution that integrated with our current infrastructure and followed this flow:

  • Steps 1 and 2 – When TV users open the menu guide service, the client software running on the Set Top Box (STB) makes a call via the internet or Data Over Cable Service Interface Specification (DOCSIS) cable modem, which is routed through the cable operators headend server to the Espial Guide service running on the AWS Cloud.
  • Step 3 – As TV users interact with the menu guide on the STBs, the client software running on the STBs sends the string containing the specific menu description highlighted by the customer.
  • Step 4 – The cable operators headend server routes the request to a local cache to verify whether the requested string’s text-to-speech is cached locally. If it is, the corresponding text-to-speech is sent back to the STB to be read out loud to the TV user.
  • Step 5 – Each unique cable operator has a local cache. If the requested string isn’t cached locally in the cable operator’s environment, the requested string is sent to the EspialTV service in AWS, where it’s met by a secondary caching server to respond to the request. This secondary layer of caching hosted in the Espial environment ensures high availability and increases cache hit rates. For example, if the caching servers on the cable operator environment is unavailable, the cache request can be resolved by the secondary caching system hosted in the Espial environment.
  • Steps 6 and 7 – If the requested string isn’t found in the caching server in the EspialTV service, it’s routed to the Amazon Polly API to be converted to text-to-speech, which is routed back to the cable operator headend server and then to the TV user’s STB to be read out loud to the user.

This architecture has several key considerations. Firstly, there are several layers of caching implemented to minimize latency for the end user. This also supports the spikey nature of this workload to ensure that only requests not found in the respective caches are made to Amazon Polly.

The ready-made text-to-speech APIs provided by Amazon Polly enables us able to implement the service with just one engineer. We also reduced the expected delivery time by 75% compared to our estimates for building an in-house custom solution. The Amazon Polly documentation was very clear, and the ramp-up time was limited. Since implementation, this solution is reliably supporting 40 cable operators, which each have between 1,000–100,000 STBs.

Conclusion

EspialTV offers operators a TV solution that provides fast time to revenue, low startup costs, and scalability from small to very large operators. EspialTV offers providers and consumers a compelling and always relevant experience for their TV services. With Amazon Polly, we have ensured operators can offer a TV service to the broadest possible range of consumers and align with regulatory requirements for accessibility. To learn more about Amazon Polly, visit the product page.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the Author

Mick McCluskey is VP of Product Management at Enghouse, a leading provider of software solutions helping operators use digital transformation to drive profitability in fast-changing and emerging markets. In the area of video solutions, Mick has been pivotal in creating the EspialTV solution—a truly disruptive TVaaS solution run on the AWS Cloud that permits pay TV operators to manage transition while maintaining profitability in a rapidly changing market. He is currently working on solutions that help operators take advantage of key technology and industry trends like OTT video, IoT, and 5G. In addition to delivering cloud-based solutions, he continues his journey of learning how to play golf.

Read More

Upgrade your Amazon Polly voices to neural with one line of code

In 2019, Amazon Polly launched neural text-to-speech (NTTS) voices in US English and UK English. Neural voices use machine learning and provide a richer, more lifelike speech quality. Since the initial launch of NTTS, Amazon Polly has extended its neural offering by adding new voices in US Spanish, Brazilian Portuguese, Australian English, Canadian French, German and Korean. Some of them also are available in a Newscaster speaking style tailored to the specific needs of publishers.

If you’ve been using the standard voices in Amazon Polly, upgrading to neural voices is easy. No matter which programming language you use, the upgrade process only requires a simple addition or modification of the Engine parameter wherever you use the SynthesizeSpeech and StartSynthesizeSpeechTask method in your code. In this post, you’ll learn about the benefits of neural voices and how to migrate your voices to NTTS.

Benefits of neural vs. standard

Because neural voices provide a more expressive, natural-sounding quality than standard, migrating to neural improves the user experience and boosts engagement.

“We rely on speech synthesis to drive dynamic narrations for our educational content,” says Paul S. Ziegler, Chief Executive Officer at Reflare. “The switch from Amazon Polly’s standard to neural voices has allowed us to create narrations that are so good as to consistently be indistinguishable from human speech to non-native speakers and to occasionally even fool native speakers.”

The following is an example of Joanna’s standard voice.

The following is an example of the same words, but using Joanna’s neural voice.

“Switching to neural voices is as easy as switching to other non-neural voices,” Ziegler says. “Since our systems were already set up to automatically generate voiceovers on the fly, implementing the changes took less than 5 minutes.”

Quick migration checklist

Not all SSML tags, Regions, and languages support neural voices. Before making the switch, use this checklist to verify that NTTS is available for your specific business needs:

  • Regional support – Verify that you’re making requests in Regions that support NTTS
  • Language and voice support – Verify that you’re making requests to voices and languages that support NTTS by checking the current list of voices and languages
  • SSML tag support – Verify that the SSML tags in your requests are supported by NTTS by checking SSML tag compatibility

Additional considerations

The following table summarizes additional considerations before you switch to NTTS.

Standard Neural
Cost $4 per million characters $16 per million characters
Free Tier 5 million characters per month 1 million characters per month
Default Sample Rate 22 kHz 24 kHz
Usage Quota Quotas in Amazon Polly

Code samples

If you’re already using Amazon Polly standard, the following samples demonstrate how to switch to neural for all SDKs. The required change is highlighted in bold.

Go:

input := &polly.SynthesizeSpeechInput{
    OutputFormat: aws.String("mp3"),
    Text: aws.String(“Hello World!”),
    VoiceId: aws.String("Joanna"),
    Engine: “neural”}

Java:

SynthesizeSpeechRequest synthReq = SynthesizeSpeechRequest.builder()
    .text('Hello World!')
    .voiceId('Joanna')
    .outputFormat('mp3')
    .engine('neural')
    .build();
ResponseInputStream<SynthesizeSpeechResponse> synthRes = polly.synthesizeSpeech(synthReq);

Javascript:

polly.synthesizeSpeech({
    Text: “Hello World!”,
    OutputFormat: "mp3",
    VoiceId: "Joanna",
    TextType: "text",
    Engine: “neural”});

.NET:

var response = client.SynthesizeSpeech(new SynthesizeSpeechRequest 
{
    Text = "Hello World!",
    OutputFormat = "mp3",
    VoiceId = "Joanna"
    Engine = “neural”
});

PHP:

$result = $client->synthesizeSpeech([
    'Text' => ‘Hello world!’,
    'OutputFormat' => ‘mp3,
    'VoiceId' => ‘Joanna’,
    'Engine' => ‘neural’]);

Python:

polly.synthesize_speech(
    Text="Hello world!",
    OutputFormat="mp3",
    VoiceId="Joanna",
    Engine=”neural”)

Ruby:

resp = polly.synthesize_speech({
    text: “Hello World!”,
    output_format: "mp3",
    voice_id: "Joanna",
    engine: “neural”
  })

Conclusion

You can start playing with neural voices immediately on the Amazon Polly console. If you have any questions or concerns, please post it to the AWS Forum for Amazon Polly, or contact your AWS Support team.


About the Author

Marta Smolarek is a Senior Program Manager in the Amazon Text-to-Speech team. Outside of work, she loves to go camping with her family

Read More

Extend Amazon SageMaker Pipelines to include custom steps using callback steps

Launched at AWS re:Invent 2020, Amazon SageMaker Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.

You can extend your pipelines to include steps for tasks performed outside of Amazon SageMaker by taking advantage of custom callback steps. This feature lets you include tasks that are performed using other AWS services, third parties, or tasks run outside AWS. Before the launch of this feature, steps within a pipeline were limited to the supported native SageMaker steps. With the launch of this new feature, you can use the new CallbackStep to generate a token and add a message to an Amazon Simple Queue Service (Amazon SQS) queue. The message on the SQS queue triggers a task outside of the currently supported native steps. When that task is complete, you can call the new SendStepSuccess API with the generated token to signal that the callback step and corresponding tasks are finished and the pipeline run can continue.

In this post, we demonstrate how to use CallbackStep to perform data preprocessing using AWS Glue. We use an Apache Spark job to prepare NYC taxi data for ML training. The raw data has one row per taxi trip, and shows information like the trip duration, number of passengers, and trip cost. To train an anomaly detection model, we want to transform the raw data into a count of the number of passengers that took taxi rides over 30-minute intervals.

Although we could run this specific Spark job in SageMaker Processing, we use AWS Glue for this post. In some cases, we may need capabilities that Amazon EMR or AWS Glue offer, like support for Hive queries or integration with the AWS Glue metadata catalog, so we demonstrate how to invoke AWS Glue from the pipeline.

Solution overview

The pipeline step that launches the AWS Glue job sends a message to an SQS queue. The message contains the callback token we need to send success or failure information back to the pipeline. This callback token triggers the next step in the pipeline. When handling this message, we need a handler that can launch the AWS Glue job and reliably check for job status until the job completes. We have to keep in mind that a Spark job can easily take longer than 15 minutes (the maximum duration of a single AWS Lambda function invocation), and the Spark job itself could fail for a number of reasons. That last point is worth emphasizing: in most Apache Spark runtimes, the job code itself runs in transient containers under the control of a coordinator like Apache YARN. We can’t add custom code to YARN, so we need something outside the job to check for completion.

We can accomplish this task several ways:

  • Have a Lambda function launch a container task that creates the AWS Glue job and polls for job completion, then sends the callback back to the pipeline
  • Have a Lambda function send a work notification to another SQS queue, with a separate Lambda function that picks up the message, checks for job status, and requeues the message if the job isn’t complete
  • Use AWS Glue job event notifications to respond to job status events sent by AWS Glue

For this post, we use the first technique because it’s the simplest (but likely not the most efficient). For this, we build out the solution as shown in the following diagram.

The solution is one example of how to use the new CallbackStep to extend your pipeline to steps outside SageMaker (such as AWS Glue). You can apply the same general steps and architectural guidance to extend pipelines to other custom processes or tasks. In our solution, the pipeline runs the following tasks:

Data preprocessing

  • This step (Step 1 in the preceding diagram) uses CallbackStep to send a generated token and defined input payload to the configured SQS queue (2). In this example, the input sent to the SQS queue is the Amazon Simple Storage Service (Amazon S3) locations of the input data and the step output training data.
    • The new message in the SQS queue triggers a Lambda function (3) that is responsible for running an AWS Fargate task with Amazon Elastic Container Service (Amazon ECS) (4).
    • The Fargate task runs using a container image that is configured to run a task. The task in this case is an AWS Glue job (5) used to transform your raw data into training data stored in Amazon S3 (6). This task is also responsible for sending a callback message that signals either the job’s success or failure.
  • Model training – This step (7) runs when the previous callback step has completed successfully. It uses the generated training data to train a model using a SageMaker training job and the Random Cut Forest algorithm.
  • Package model – After the model is successfully trained, the model is packaged for deployment (8).
  • Deploy model – In this final step (9), the model is deployed using a batch transform job.

These pipeline steps are just examples; you can modify the pipeline to meet your use case, such as adding steps to register the model in the SageMaker Model Registry.

In the next sections, we discuss how to set up this solution.

Prerequisites

For the preceding pipeline, you need the prerequisites outlined in this section. The detailed setup of each of these prerequisites is available in the supporting notebook.

Notebook dependencies

To run the provided notebook, you need the following:

Pipeline dependencies

Your pipeline uses the following services:

  • SQS message queue – The callback step requires an SQS queue to trigger a task. For this, you need to create an SQS queue and ensure that AWS Identity and Access Management (IAM) permissions are in place that allow SageMaker to put a message in the queue and allow Lambda to poll the queue for new messages. See the following code:
sqs_client = boto3.client('sqs')
queue_url = ''
queue_name = 'pipeline_callbacks_glue_prep'
try:
    response = sqs_client.create_queue(QueueName=queue_name)
except:
    print(f"Failed to create queue")
  • Lambda function: The function is triggered by new messages put to the SQS queue. The function consumes these new messages and starts the ECS Fargate task. In this case, the Lambda execution IAM role needs permissions to pull messages from Amazon SQS, notify SageMaker of potential failures, and run the Amazon ECS task. For this solution, the function starts a task on ECS Fargate using the following code:
%%writefile queue_handler.py
import json
import boto3
import os
import traceback

ecs = boto3.client('ecs')
sagemaker = boto3.client('sagemaker')

def handler(event, context):   
    print(f"Got event: {json.dumps(event)}")
    
    cluster_arn = os.environ["cluster_arn"]
    task_arn = os.environ["task_arn"]
    task_subnets = os.environ["task_subnets"]
    task_sgs = os.environ["task_sgs"]
    glue_job_name = os.environ["glue_job_name"]
    print(f"Cluster ARN: {cluster_arn}")
    print(f"Task ARN: {task_arn}")
    print(f"Task Subnets: {task_subnets}")
    print(f"Task SG: {task_sgs}")
    print(f"Glue job name: {glue_job_name}")
    
    for record in event['Records']:
        payload = json.loads(record["body"])
        print(f"Processing record {payload}")
        
        token = payload["token"]
        print(f"Got token {token}")
        
        try:
            input_data_s3_uri = payload["arguments"]["input_location"]
            output_data_s3_uri = payload["arguments"]["output_location"]
            print(f"Got input_data_s3_uri {input_data_s3_uri}")
            print(f"Got output_data_s3_uri {output_data_s3_uri}")

            response = ecs.run_task(
                cluster = cluster_arn,
                count=1,
                launchType='FARGATE',
                taskDefinition=task_arn,
                networkConfiguration={
                    'awsvpcConfiguration': {
                        'subnets': task_subnets.split(','),
                        'securityGroups': task_sgs.split(','),
                        'assignPublicIp': 'ENABLED'
                    }
                },
                overrides={
                    'containerOverrides': [
                        {
                            'name': 'FargateTask',
                            'environment': [
                                {
                                    'name': 'inputLocation',
                                    'value': input_data_s3_uri
                                },
                                {
                                    'name': 'outputLocation',
                                    'value': output_data_s3_uri
                                },
                                {
                                    'name': 'token',
                                    'value': token
                                },
                                {
                                    'name': 'glue_job_name',
                                    'value': glue_job_name
                                }
                                
                            ]
                        }
                    ]
                }
            )
            if 'failures' in response and len(response['failures']) > 0:
                f = response['failures'][0]
                print(f"Failed to launch task for token {token}: {f['reason']}")
                sagemaker.send_step_failure(
                    CallbackToken=token,
                    FailureReason = f['reason']
                )
            else:
                print(f"Launched task {response['tasks'][0]['taskArn']}")
        except Exception as e:
            trc = traceback.format_exc()
            print(f"Error handling record: {str(e)}:m {trc}")
            sagemaker.send_step_failure(
                CallbackToken=token,
                FailureReason = e
            )
  • After we create the SQS queue and Lambda function, we need to set up the function as an SQS target so that when new messages are placed in the queue, the function is automatically triggered:
lambda_client.create_event_source_mapping(
    EventSourceArn=f'arn:aws:sqs:{region}:{account}:{queue_name}',
    FunctionName='SMPipelineQueueHandler',
    Enabled=True,
    BatchSize=10
) 
  • Fargate cluster – Because we use Amazon ECS to run and monitor the status of the AWS Glue job, we need to ensure we have an ECS Fargate cluster running:
import boto3
ecs = boto3.client('ecs')
response = ecs.create_cluster(clusterName='FargateTaskRunner')
  • Fargate task: We also need to create a container image with the code (task.py) that starts the data preprocessing job on AWS Glue and reports the status back to the pipeline upon the success or failure of that task. The IAM role attached to the task must include permissions that allow the task to pull images from Amazon ECR, create logs in Amazon CloudWatch, start and monitor an AWS Glue job, and send the callback token when the task is complete. When we issue send_pipeline_execution_step_success back to the pipeline, we also indicate the output file with the prepared training data. We use the output parameter in the model training step in the pipeline. The following is the code for task.py:
import boto3
import os
import sys
import traceback
import time

if 'inputLocation' in os.environ:
    input_uri = os.environ['inputLocation']
else:
    print("inputLocation not found in environment")
    sys.exit(1)
if 'outputLocation' in os.environ:
    output_uri = os.environ['outputLocation']
else:
    print("outputLocation not found in environment")
    sys.exit(1)
if 'token' in os.environ:
    token = os.environ['token']
else:
    print("token not found in environment")
    sys.exit(1)
if 'glue_job_name' in os.environ:
    glue_job_name = os.environ['glue_job_name']
else:
    print("glue_job_name not found in environment")
    sys.exit(1)

print(f"Processing from {input_uri} to {output_uri} using callback token {token}")
sagemaker = boto3.client('sagemaker')
glue = boto3.client('glue')

poll_interval = 60

try:
    
    t1 = time.time()
    response = glue.start_job_run(
        JobName=glue_job_name,
        Arguments={
            '--output_uri': output_uri,
            '--input_uri': input_uri
        }
    )
    job_run_id = response['JobRunId']
    print(f"Starting job {job_run_id}")
    
    job_status = 'STARTING'
    job_error = ''
    while job_status in ['STARTING','RUNNING','STOPPING']:
        time.sleep(poll_interval)
        response = glue.get_job_run(
            JobName=glue_job_name,
            RunId=job_run_id,
            PredecessorsIncluded=False
        )
        job_status = response['JobRun']['JobRunState']
        if 'ErrorMessage' in response['JobRun']:
            job_error = response['JobRun']['ErrorMessage']
        print(f"Job is in state {job_status}")
        
    t2 = time.time()
    total_time = (t2 - t1) / 60.0
    if job_status == 'SUCCEEDED':
        print("Job succeeded")
        sagemaker.send_pipeline_execution_step_success(
            CallbackToken=token,
            OutputParameters=[
                {
                    'Name': 'minutes',
                    'Value': str(total_time)
                },
                {
                    'Name': 's3_data_out',
                    'Value': str(output_uri),
                } 
            ]
        )
    else:
        print(f"Job failed: {job_error}")
        sagemaker.send_pipeline_execution_step_failure(
            CallbackToken=token,
            FailureReason = job_error
        )
except Exception as e:
    trc = traceback.format_exc()
    print(f"Error running ETL job: {str(e)}:m {trc}")
    sagemaker.send_pipeline_execution_step_failure(
        CallbackToken=token,
        FailureReason = str(e)
    )
  • Data preprocessing code – The pipeline callback step does the actual data preprocessing using a PySpark job running in AWS Glue, so we need to create the code that is used to transform the data:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_uri', 'output_uri'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.format("csv").option("header", "true").load("{0}*.csv".format(args['input_uri']))
df = df.withColumn("Passengers", df["passenger_count"].cast(IntegerType()))
df = df.withColumn(
  'pickup_time',
  F.to_timestamp(
  F.unix_timestamp('tpep_pickup_datetime', 'yyyy-MM-dd HH:mm:ss').cast('timestamp')))
  
dfW = df.groupBy(F.window("pickup_time", "30 minutes")).agg(F.sum("Passengers").alias("passenger"))
dfOut = dfW.drop('window')
dfOut.repartition(1).write.option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv(args['output_uri'])

job.commit()
  • Data preprocessing job – We need to also configure the AWS Glue job that runs the preceding code when triggered by your Fargate task. The IAM role used must have permissions to read and write from the S3 bucket. See the following code:
glue = boto3.client('glue')
response = glue.create_job(
    Name='GlueDataPrepForPipeline',
    Description='Prepare data for SageMaker training',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=10.0,
    GlueVersion='2.0'
)
glue_job_name = response['Name']

After these prerequisites are in place, including the necessary IAM permissions outlined in the example notebook, we’re ready to configure and run the pipeline.

Configure the pipeline

To build out the pipeline, we rely on the preceding prerequisites in the callback step that perform data processing. We also combine that with steps native to SageMaker for model training and deployment to create an end-to-end pipeline.

To configure the pipeline, complete the following steps:

  1. Initialize the pipeline parameters:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

input_data = ParameterString(
    name="InputData",
    default_value=f"s3://{default_bucket}/{taxi_prefix}/"
)
id_out = ParameterString(
    name="IdOut",
    default_value="taxiout"+ str(timestamp)
)
output_data = ParameterString(
    name="OutputData",
    default_value=f"s3://{default_bucket}/{taxi_prefix}_output/"
)
training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.c5.xlarge"
)
  1. Configure the first step in the pipeline, which is CallbackStep.

This step uses the SQS queue created in the prerequisites in combination with arguments that are used by tasks in this step. These arguments include the inputs of the Amazon S3 location of the input (raw taxi data) and output training data. The step also defines the outputs, which in this case includes the callback output and Amazon S3 location of the training data. The outputs become the inputs to the next step in the pipeline. See the following code:

from sagemaker.workflow.callback_step import CallbackStep,CallbackOutput,CallbackOutputTypeEnum

callback1_output=CallbackOutput(output_name="s3_data_out", output_type=CallbackOutputTypeEnum.String)

step_callback_data = CallbackStep(
                    name="GluePrepCallbackStep",
                    sqs_queue_url=queue_url,
                    inputs={
                        "input_location": f"s3://{default_bucket}/{taxi_prefix}/",
                        "output_location": f"s3://{default_bucket}/{taxi_prefix}_{id_out}/"
                    },
                    outputs=[
                        callback1_output
                    ],
                )
  1. We use TrainingStep to train a model using the Random Cut Forest algorithm.

We first need to configure an estimator, then we configure the actual pipeline step. This step takes the output of the previous step and Amazon S3 location of the training data created by AWS Glue as input to train the model. See the following code:

containers = {
    'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest',
    'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest',
    'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest',
    'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest'}
region_name = boto3.Session().region_name
container = containers[region_name]
model_prefix = 'model'

session = sagemaker.Session()

rcf = sagemaker.estimator.Estimator(
    container,
    sagemaker.get_execution_role(),
    output_path='s3://{}/{}/output'.format(default_bucket, model_prefix),
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    sagemaker_session=session)

rcf.set_hyperparameters(
    num_samples_per_tree=200,
    num_trees=50,
    feature_dim=1)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="TrainModel",
    estimator=rcf,
    inputs={
        "train": TrainingInput(
        #s3_data = Output of the previous call back 
        steps3_data=step_callback_data.properties.Outputs['s3_data_out'],
        content_type="text/csv;label_size=0",
        distribution='ShardedByS3Key'
        ),
    },
)
  1. We use CreateModelStep to package the model for SageMaker deployment:
from sagemaker.model import Model
from sagemaker import get_execution_role
role = get_execution_role()

image_uri = sagemaker.image_uris.retrieve("randomcutforest", region)

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
    )
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

inputs = CreateModelInput(
    instance_type="ml.m5.large",
)

create_model = CreateModelStep(
    name="TaxiModel",
    model=model,
    inputs=inputs,
)
  1. We deploy the trained model using a SageMaker batch transform job using TransformStep.

This step loads the trained model and processes the prediction request data stored in Amazon S3, then outputs the results (anomaly scores in this case) to the specified Amazon S3 location. See the following code:

base_uri = step_callback_data.properties.Outputs['s3_data_out']
output_prefix = 'batch-out'

from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    assemble_with = "Line",
    accept = 'text/csv',
    instance_count=1,
    output_path=f"s3://{default_bucket}/{output_prefix}/",
)
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

batch_data=step_callback_data.properties.Outputs['s3_data_out']

step_transform = TransformStep(
    name="TaxiTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data,content_type="text/csv",split_type="Line",input_filter="$[0]",join_source='Input',output_filter='$[0,-1]')
)

Create and run the pipeline

You’re now ready to create and run the pipeline. To do this, complete the following steps:

  1. Define the pipeline including the parameters accepted and steps:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"GluePipeline-{id_out}"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        training_instance_type,
        training_instance_count,
        id_out,
    ],
    steps=[step_callback_data, step_train,create_model,step_transform],
)
  1. Submit the pipeline definition to create the pipeline using the role that is used to create all the jobs defined in each step:
from sagemaker import get_execution_role
pipeline.upsert(role_arn = get_execution_role())
  1. Run the pipeline:
execution = pipeline.start()

You can monitor your pipeline using the SageMaker SDK, execution.list_steps(), or via the Studio console, as shown in the following screenshot.

Use CallbackStep to integrate other tasks outside of SageMaker

You can follow the same pattern to integrate any long-running tasks or jobs with Pipelines. This may include running AWS Batch jobs, Amazon EMR job flows, or Amazon ECS or Fargate tasks.

You can also implement an email approval step for your models as part of your ML pipeline.
CallbackStep runs after the model EvaluationStep and sends an email containing approve or reject links with model metrics to a user. The workflow progresses to the next state after the user approves the task to proceed.

You can implement this pattern using a Lambda function and Amazon Simple Notification Service (Amazon SNS).

Conclusion

In this post, we showed you an example of how to use CallbackStep in Pipelines to extend your pipelines to integrate an AWS Glue job for data preprocessing. You can follow the same process to integrate any task or job outside of SageMaker. You can walk through the full solution explained in the example notebook.


About the Author

Shelbee Eigenbrode is a Principal AI and Machine Learning Specialist Solutions Architect at Amazon Web Services (AWS). She holds 6 AWS certifications and has been in technology for 23 years spanning multiple industries, technologies, and roles. She is currently focusing on combining her DevOps and ML background to deliver and manage ML workloads at scale. With over 35 patents granted across various technology domains, she has a passion for continuous innovation and using data to drive business outcomes. Shelbee co-founded the Denver chapter of Women in Big Data.

 

Sofian Hamiti is an AI/ML specialist Solutions Architect at AWS. He helps customers across industries accelerate their AI/ML journey by helping them build and operationalize end-to-end machine learning solutions.

 

 

 

Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

 

 

 

Payton Staub is a senior engineer with Amazon SageMaker. His current focus includes model building pipelines, experiment management, image management and other tools to help customers productionize and automate machine learning at scale.

Read More