Arcanum makes Hungarian heritage accessible with Amazon Rekognition

Arcanum makes Hungarian heritage accessible with Amazon Rekognition

Arcanum specializes in digitizing Hungarian language content, including newspapers, books, maps, and art. With over 30 years of experience, Arcanum serves more than 30,000 global subscribers with access to Hungarian culture, history, and heritage.

Amazon Rekognition Solutions Architects worked with Arcanum to add highly scalable image analysis to Hungaricana, a free service provided by Arcanum, which enables you to search and explore Hungarian cultural heritage, including 600,000 faces over 500,000 images. For example, you can find historical works by author Mór Jókai or photos on topics like weddings. The Arcanum team chose Amazon Rekognition to free valuable staff from time and cost-intensive manual labeling, and improved label accuracy to make 200,000 previously unsearchable images (approximately 40% of image inventory), available to users.

Amazon Rekognition makes it easy to add image and video analysis to your applications using highly scalable machine learning (ML) technology that requires no previous ML expertise to use. Amazon Rekognition also provides highly accurate facial recognition and facial search capabilities to detect, analyze, and compare faces.

Arcanum uses this facial recognition feature in their image database services to help you find particular people in Arcanum’s articles. This post discusses their challenges and why they chose Amazon Rekognition as their solution.

Automated image labeling challenges

Arcanum dedicated a team of three people to start tagging and labeling content for Hungaricana. The team quickly learned that they would need to invest more than 3 months of time-consuming and repetitive human labor to provide accurate search capabilities to their customers. Considering the size of the team and scope of the existing project, Arcanum needed a better solution that would automate image and object labelling at scale.

Automated image labeling solutions

To speed up and automate image labeling, Arcanum turned to Amazon Rekognition to enable users to search photos by keywords (for example, type of historic event, place name, or a person relevant to Hungarian history).

For the Hungaricana project, preprocessing all the images was challenging. Arcanum ran a TensorFlow face search across all 28 million pages on a machine with 8 GPUs in their own offices to extract only faces from images.

The following screenshot shows what an extract looks like (image provided by Arcanum Database Ltd).

The images containing only faces are sent to Amazon Rekognition, invoking the IndexFaces operation to add a face to the collection. For each face that is detected in the specified face collection, Amazon Rekognition extracts facial features into a feature vector and stores it in an Amazon Aurora database. Amazon Rekognition uses feature vectors when it performs face match and search operations using the SearchFaces and SearchFacesByImage operations.

The image preprocessing helped create a very efficient and cost-effective way to index faces. The following diagram summarizes the preprocessing workflow.

As for the web application, the workflow starts with a Hungaricana user making a face search request. The following diagram illustrates the application workflow.

The workflow includes the following steps:

  1. The user requests a facial match by uploading the image. The web request is automatically distributed by the Elastic Load Balancer to the webserver fleet.
  2. Amazon Elastic Compute Cloud (Amazon EC2) powers application servers that handle the user request.
  3. The uploaded image is stored in Amazon Simple Storage Service (Amazon S3).
  4. Amazon Rekognition indexes the face and runs SearchFaces to look for a face similar to the new face ID.
  5. The output of the search face by image operation is stored in Amazon ElastiCache, a fully managed in-memory data store.
  6. The metadata of the indexed faces are stored in an Aurora relational database built for the cloud.
  7. The resulting face thumbnails are served to the customer via the fast content-delivery network (CDN) service Amazon CloudFront.

Experimenting and live testing Hungaricana

During our test of Hungaricana, the application performed extremely well. The searches not only correctly identified people, but also provided links to all publications and sources in Arcanum’s privately owned database where found faces are present. For example, the following screenshot shows the result of the famous composer and pianist Franz Liszt.

The application provided 42 pages of 6×4 results. The results are capped to 1,000. The 100% scores are the confidence scores returned by Amazon Rekognition and are rounded up to whole numbers.

The application of Hungaricana has always promptly, and with a high degree of certainty, presented results and links to all corresponding publications.

Business results

By introducing Amazon Rekognition into their workflow, Arcanum enabled a better customer experience, including building family trees, searching for historical figures, and researching historical places and events.

The concept of face searching using artificial intelligence certainly isn’t new. But Hungaricana uses it in a very creative, unique way.

Amazon Rekognition allowed Arcanum to realize three distinct advantages:

  • Time savings – The time to market speed increased dramatically. Now, instead of spending several months of intense manual labor to label all the images, the company can do this job in a few days. Before, basic labeling on 150,000 images took months for three people to complete.
  • Cost savings – Arcanum saved around $15,000 on the Hungaricana project. Before using Amazon Rekognition, there was no automation, so a human workforce had to scan all the images. Now, employees can shift their focus to other high-value tasks.
  • Improved accuracy – Users now have a much better experience regarding hit rates. Since Arcanum started using Amazon Rekognition, the number of hits has doubled. Before, out of 500,000 images, about 200,000 weren’t searchable. But with Amazon Rekognition, search is now possible for all 500,000 images.

 “Amazon Rekognition made Hungarian culture, history, and heritage more accessible to the world,” says Előd Biszak, Arcanum CEO. “It has made research a lot easier for customers building family trees, searching for historical figures, and researching historical places and events. We cannot wait to see what the future of artificial intelligence has to offer to enrich our content further.”

Conclusion

In this post, you learned how to add highly scalable face and image analysis to an enterprise-level image gallery to improve label accuracy, reduce costs, and save time.

You can test Amazon Rekognition features such as facial analysis, face comparison, or celebrity recognition on images specific to your use case on the Amazon Rekognition console.

For video presentations and tutorials, see Getting Started with Amazon Rekognition. For more information about Amazon Rekognition, see Amazon Rekognition Documentation.

 


About the Authors

Siniša Mikašinović is a Senior Solutions Architect at AWS Luxembourg, covering Central and Eastern Europe—a region full of opportunities, talented and innovative developers, ISVs, and startups. He helps customers adopt AWS services as well as acquire new skills, learn best practices, and succeed globally with the power of AWS. His areas of expertise are Game Tech and Microsoft on AWS. Siniša is a PowerShell enthusiast, a gamer, and a father of a small and very loud boy. He flies under the flags of Croatia and Serbia.

 

 

 

Cameron Peron is Senior Marketing Manager for AWS Amazon Rekognition and the AWS AI/ML community. He evangelizes how AI/ML innovation solves complex challenges facing community, enterprise, and startups alike. Out of the office, he enjoys staying active with kettlebell-sport, spending time with his family and friends, and is an avid fan of Euro-league basketball.

Read More

Securing Amazon SageMaker Studio connectivity using a private VPC

Securing Amazon SageMaker Studio connectivity using a private VPC

Amazon SageMaker Studio is the first fully integrated development environment (IDE) for machine learning (ML). With a single click, data scientists and developers can quickly spin up Amazon SageMaker Studio Notebooks for exploring datasets and building models. With the new ability to launch Amazon SageMaker Studio in your Amazon Virtual Private Cloud (Amazon VPC), you can control the data flow from your Amazon SageMaker Studio notebooks. This allows you to restrict internet access, monitor and inspect traffic using standard AWS networking and security capabilities, and connect to other AWS resources through AWS PrivateLink or VPC endpoints.

In this post, we explore how the Amazon SageMaker Studio VPC connectivity works, implement a sample architecture, and demonstrate some security controls in action.

Solution overview

When experimenting with and deploying ML workflows, you need access to multiple resources, such as libraries, packages, and datasets. If you’re in a highly regulated industry, controlling access to these resources is a paramount requirement. Amazon SageMaker Studio allows you to implement security in depth, with features such as data encryption, AWS Identity and Access Management (IAM), and AWS Single Sign-On (AWS SSO) integration. The ability to launch Amazon SageMaker Studio in your own private VPC adds another layer of security.

Amazon SageMaker Studio runs on an environment managed by AWS. When launching a new Studio domain, the parameter AppNetworkAccessType defines the external connectivity for such domain. Previously, the only option available for this parameter was DirectInternetOnly, meaning the traffic from the notebook flowed from an AWS managed internet gateway, as described in the following diagram.

The Amazon Elastic File System (Amazon EFS) volumes that store the Studio users’ home directories resides in the customer VPC, even when AppNetworkAccessType=DirectInternetOnly. You can optionally specify which VPC and subnet to use.

With the newly introduced feature to launch Studio in your VPC, you can set the AppNetworkAccessType parameter to VpcOnly. This launches Studio inside the specified VPC, communicating with the domain through an elastic network interface (ENI). You can apply security groups to that ENI to enforce a first layer of security control.

You can also use VPC endpoints to establish a private connection between the Studio domain and other AWS services, such as Amazon Simple Storage Service (Amazon S3) for data storage and Amazon CloudWatch for logging and monitoring, without requiring internet connectivity. VPC endpoints can impose additional networking controls such as VPC endpoint IAM policies that may, for example, only allow traffic to certain S3 buckets. The following diagram illustrates this architecture.

Prerequisites

Before getting started, make sure you have the following prerequisites:

  • An AWS account
  • An IAM user or role with administrative access
  • Curiosity 🙂

Setting up your environment

To better understand how the feature works, we provide an AWS CloudFormation template to set up a basic environment where you can experiment with Amazon SageMaker Studio running inside a VPC. After deployment, the environment looks like the following diagram.

This template deploys the following resources in your account:

  • A new VPC, with a private subnet and security group. Because communication occurs across multiple Studio resources, this security group applied to the Studio ENI should allow inbound traffic to itself.
  • An encrypted S3 bucket, with bucket policies restricting access to our S3 endpoint.
  • VPC endpoints with policies for access control:
    • We use an Amazon S3 endpoint to demonstrate the ability to limit traffic to specific S3 buckets.
    • Because Studio has its traffic routed through the VPC, access to supporting services needs to be provisioned through VPC endpoints. Amazon CloudWatch Logs allows Studio to push logs generated by the service. We need an Amazon SageMaker API endpoint to launch Studio notebooks, training jobs, processing jobs, and deploy endpoints, and an Amazon SageMaker RunTime endpoint for services to call the Amazon SageMaker inference endpoint.
  • An IAM execution role. This role is assigned to Amazon SageMaker and defines which access permissions Studio has.

To set up your environment, click on the link below. The template is also available at this GitHub repo.

Creating an Amazon SageMaker Studio domain inside a VPC

With the infrastructure in place, you’re ready to create an Amazon SageMaker Studio domain and assign it to a VPC.

For more information about the options available to set up Studio, see Onboard to Amazon SageMaker Studio. If you have an existing domain, you might want to delete it and recreate it, or create a separate one.

To create the domain, you can use the following:

To use the console to create a Studio domain and tie it to the VPC infrastructure deployed by the template, complete the following steps:

  1. On the Amazon SageMaker console, choose SageMaker Studio.

If you don’t have a domain created, a screen appears.

  1. For Get Started, select Standard setup.
  2. For Authentication method, select AWS Identity and Access Management (IAM).
  3. For Execution role for all users, choose your notebook IAM role (the default is studiovpc-notebook-role).
  4. In the Network section, for VPC, choose your VPC (the default is studiovpc-vpc).
  5. For Subnet, choose your subnet (the default is studiovpc-private-subnet).

Make sure to not choose studiovpc-endpoint-private-subnet.

  1. For Network Access for Studio, select VPC Only.

  1. Choose Submit.

To create and link the domain with the AWS CLI, enter the following code. The option --app-network-access-type VpcOnly links the domain to our VPC. The VPC and subnet parameters are set by the --default-user-settings option.

#Please replace the variable below according to your environment
REGION= #AWS Region where the Domain will be created
AWS_ACCOUNT_ID= #AWS Account ID 
VPC_DOMAIN_NAME= #Select a name for your Domain

#The values below can be obtained on the "Output" section of the CloudFormation used on the previous step
VPC_ID=
PRIVATE_SUBNET_IDS=
SECURITY_GROUP=
EXECUTION_ROLE_ARN=

#Now let's create the domain
aws sagemaker create-domain 
--region $REGION 
--domain-name $VPC_DOMAIN_NAME 
--vpc-id $VPC_ID 
--subnet-ids $PRIVATE_SUBNET_IDS 
--app-network-access-type VpcOnly 
--auth-mode IAM 
--default-user-settings "ExecutionRole=${EXECUTION_ROLE_ARN},SecurityGroups=${SECURITY_GROUP}"

#Please note the DomainArn output - we will use it on the next step

Creating a user profile

Now that the domain is created, we need to create a user profile. You can create multiple user profiles associated to a single domain.

To create your user profile on the console, complete the following steps:

  1. On the Amazon SageMaker Studio console, choose Control Panel.
  2. Choose Add user profile.
  3. For User name, enter a name (for example, demo-user).
  4. For Execution role, choose your IAM role (the default is studiovpc-notebook-role).

To create your user profile with the AWS CLI, enter the following code:

#Please replace the variable below according to your environment
DOMAIN_ID= #From previous step
USER_PROFILE_NAME= #Select a name for your user profile

#Now let's create the profile
aws sagemaker create-user-profile 
--region $REGION 
--domain-id $DOMAIN_ID 
--user-profile-name $USER_PROFILE_NAME

Accessing Amazon SageMaker Studio

We now have a Studio domain associated to our VPC and a user profile in this domain. Now we need to give access to the user. To do so, we create a pre-signed URL.

To use the console, on the Studio Control Panel, locate your user name and choose Open Studio.

To use the AWS CLI, enter the following code:

#Now let's create the pre-signed URL
aws sagemaker create-presigned-domain-url 
--region $REGION 
 --domain-id $DOMAIN_ID 
--user-profile-name $USER_PROFILE_NAME

#Please take note of the Domain URL, and paste it on a browser that have VPC Connectivity

At this point, our deployment looks like the following diagram.

We made it! Now you can use your browser to connect to the Amazon SageMaker Studio domain. After a few minutes, Studio finishes creating your environment and you’re greeted with the launcher screen (see the following screenshot).

Security controls

Some examples of security best practices are Amazon S3 access control and limiting internet ingress and egress. In this section, we see how to implement them in combination with running Amazon SageMaker Studio in a private VPC.

Amazon S3 access control

Developing ML models requires access to sensitive data stored on specific S3 buckets. You might want to implement controls to guarantee that:

  • Only specific Studio domains can access these buckets
  • Each Studio domain only have access to the defined S3 buckets

We can achieve this using the sample architecture provided in the CloudFormation template.

Our CloudFormation template created an S3 bucket with the following S3 bucket policy attached to it. The condition StringsNotEquals evaluates the VPC endpoint ID with the effect set to deny, meaning that access to the S3 bucket is denied if the access doesn’t come from the designated VPC endpoint. You can find your specific bucket name on the AWS CloudFormation console, on the Outputs tab for the stack.

{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Principal": "*",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<s3-bucket-name>/*",
                "arn:aws:s3:::<s3-bucket-name>"
            ],
            "Condition": {
                "StringNotEquals": {
                    "aws:sourceVpce": "<s3-vpc-endpoint-id>"
                }
            }
        }
    ]

The Amazon S3 VPC endpoint also has a policy attached to it. This policy only allows access to the S3 bucket created by AWS CloudFormation:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<s3-bucket-name>",
                "arn:aws:s3:::<s3-bucket-name>/*"
            ]
        }
    ]

This combination of S3 bucket policy and VPC endpoint policy, together with Studio VPC connectivity, establishes that Studio can only access the referenced S3 bucket, and this S3 bucket can only be accessed from the VPC endpoint.

To test it, open a notebook in Studio and try to copy a file into your S3 bucket. The following screenshot shows that it works as expected.

If you try the same with a different S3 bucket, you should get a permission denied error.

If you try to access the bucket from outside Studio, you should also get a permission error.

Limiting internet ingress and egress

To develop ML models, data scientists often need access to public code repos or Python packages (for example, from PyPI) to explore data and train models. If you need to restrict access to only approved datasets and libraries, you need to restrict internet access. In our sample architecture, we achieve this by using a private subnet on our VPC, without an internet gateway or NAT gateway deployed.

We can test this by trying to clone a public repository containing Amazon SageMaker example notebooks.

In your Studio environment, open a notebook and enter the following code:

! git clone https://github.com/awslabs/amazon-sagemaker-examples.git

You can also run it in your notebook directly.

As expected, the connection times out.

If you want to provide internet access through your VPC, just add an internet gateway and the proper routing entries. The internet traffic flows through your VPC, and you can implement other security controls such as inline inspections with a firewall or internet proxy. For more information, see Understanding Amazon SageMaker notebook instance networking configurations and advanced routing options.

Cleaning up

To avoid incurring future charges, delete the resources you created:

Conclusion

You can use Amazon SageMaker Studio to streamline developing, experimenting with, training, and deploying ML models. With the new ability to launch Studio inside a VPC, regulated industries such as financial services, healthcare, and others with strict security requirements can use Studio while meeting their enterprise security needs.

Go test this new feature and let us know what you think. For more information about Amazon SageMaker security, see the following:

 


About the Authors

Rafael Suguiura is a Principal Solutions Architect at Amazon Web Services. He guides some of the world’s largest financial services companies in their cloud journey. When the weather is nice, he enjoys cycling and finding new hiking trails— and when it’s not, he catches up with sci-fi books, TV series, and video games.

 

 

 

Stefan Natu is a Sr. Machine Learning Specialist at Amazon Web Services. He is focused on helping financial services customers build end-to-end machine learning solutions on AWS. In his spare time, he enjoys reading machine learning blogs, playing the guitar, and exploring the food scene in New York City.

 

 

 

Han Zhang is a Software Development Engineer at Amazon Web Services. She is part of the launch team for Amazon SageMaker Notebooks and Amazon SageMaker Studio, and has been focusing on building secure machine learning environments for customers. In her spare time, she enjoys hiking and skiing in the Pacific Northwest.

 

 

 

Read More

Using Amazon SageMaker inference pipelines with multi-model endpoints

Using Amazon SageMaker inference pipelines with multi-model endpoints

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants to add personalization to the recommendations by using individual subscriber information, the number of custom models further increases. Hosting each custom model on a distinct compute instance is not only cost prohibitive, but also leads to underutilization of the hosting resources if not all models are frequently used.

Amazon SageMaker is a fully managed service that enables developers and data scientists to quickly and easily build, train, and deploy ML models at any scale. After you train an ML model, you can deploy it on Amazon SageMaker endpoints that are fully managed and can serve inferences in real time with low latency. Amazon SageMaker multi-model endpoints (MMEs) are a cost-effective solution to deploy a large number of ML models or per-user models. You can deploy multiple models on a single multi-model enabled endpoint such that all models share the compute resources and the serving container. You get significant cost savings and also simplify model deployments and updates. For more information about MME, see Save on inference costs by using Amazon SageMaker multi-model endpoints.

The following diagram depicts how MMEs work.

Multiple model artifacts are persisted in an Amazon S3 bucket. When a specific model is invoked, Amazon SageMaker dynamically loads it onto the container hosting the endpoint. If the model is already loaded in the container’s memory, invocation is faster because Amazon SageMaker doesn’t need to download and load it.

Until now, you could use MME with several frameworks, such as TensorFlow, PyTorch, MXNet, SKLearn, and build your own container with a multi-model server. This post introduces the following feature enhancements to MME:

  • MME support for Amazon SageMaker built-in algorithms – MME is now supported natively in the following popular Amazon SageMaker built-in algorithms: XGBoost, linear learner, RCF, and KNN. You can directly use the Amazon SageMaker provided containers while using these algorithms without having to build your own custom container.
  • MME support for Amazon SageMaker inference pipelines – The Amazon SageMaker inference pipeline model consists of a sequence of containers that serve inference requests by combining preprocessing, predictions, and postprocessing data science tasks. An inference pipeline allows you to reuse the same preprocessing code used during model training to process the inference request data used for predictions. You can now deploy an inference pipeline on an MME where one of the containers in the pipeline can dynamically serve requests based on the model being invoked.
  • IAM condition keys for granular access to models – Prior to this enhancement, an AWS Identity and Access Management (IAM) principal with InvokeEndpoint permission on the endpoint resource could invoke all the models hosted on that endpoint. Now, we support granular access to models using IAM condition keys. For example, the following IAM condition restricts the principal’s access to a model persisted in the Amazon Simple Storage Service (Amazon S3) bucket with company_a or common prefixes:
           Condition": {
                "StringLike": {
                    "sagemaker:TargetModel": ["company_a/*", "common/*"]
                }
            }

We also provide a fully functional notebook to demonstrate these enhancements.

Walkthrough overview

To demonstrate these capabilities, the notebook discusses the use case of predicting house prices in multiple cities using linear regression. House prices are predicted based on features like number of bedrooms, number of garages, square footage, and more. Depending on the city, the features affect the house price differently. For example, small changes in the square footage cause a drastic change in house prices in New York City when compared to price changes in Houston.

For accurate house price predictions, we train multiple linear regression models, with a unique location-specific model per city. Each location-specific model is trained on synthetic housing data with randomly generated characteristics. To cost-effectively serve the multiple housing price prediction models, we deploy the models on a single multi-model enabled endpoint, as shown in the following diagram.

The walkthrough includes the following high-level steps:

  1. Examine the synthetic housing data generated.
  2. Preprocess the raw housing data using Scikit-learn.
  3. Train regression models using the built-in Amazon SageMaker linear learner algorithm.
  4. Create an Amazon SageMaker model with multi-model support.
  5. Create an Amazon SageMaker inference pipeline with an Sklearn model and multi-model enabled linear learner model.
  6. Test the inference pipeline by getting predictions from the different linear learner models.
  7. Update the MME with new models.
  8. Monitor the MME with Amazon CloudWatch
  9. Explore fine-grained access to models hosted on the MME using IAM condition keys.

Other steps necessary to import libraries, set up IAM permissions, and use utility functions are defined in the notebook, which this post doesn’t discuss. You can walk through and run the code with the following notebook on the GitHub repo.

Examining the synthetic housing data

The dataset consists of six numerical features that capture the year the house was built, house size in square feet, number of bedrooms, number of bathrooms, lot size, number of garages, and two categorical features: deck and front porch, indicating whether these are present or not.

To see the raw data, enter the following code:

_houses.head()

The following screenshot shows the results.

You can now preprocess the categorical variables (front_porch and deck) using Scikit-learn.

Preprocessing the raw housing data

To preprocess the raw data, you first create an SKLearn estimator and use the sklearn_preprocessor.py script as the entry_point:

#Create the SKLearn estimator with the sklearn_preprocessor.py as the script
from sagemaker.sklearn.estimator import SKLearn
script_path = 'sklearn_preprocessor.py'
sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session_gamma)

You then launch multiple Scikit-learn training jobs to process the raw synthetic data generated for multiple locations. Before running the following code, take the training instance limits in your account and cost into consideration and adjust the PARALLEL_TRAINING_JOBS value accordingly:

preprocessor_transformers = []

for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]):
    print("preprocessing fit input data at ", index , " for loc ", loc)
    job_name='scikit-learn-preprocessor-{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
    
    sklearn_preprocessor.fit({'train': train_inputs[index]}, job_name=job_name, wait=True)
    
    ##Once the preprocessor is fit, use tranformer to preprocess the raw training data and store the transformed data right back into s3.
    transformer = sklearn_preprocessor.transformer(
        instance_count=1, 
        instance_type='ml.m4.xlarge',
        assemble_with='Line',
        accept='text/csv'
    )
    preprocessor_transformers.append(transformer)

When the preprocessors are properly fitted, preprocess the training data using batch transform to directly preprocess the raw data and store back into Amazon S3:

		preprocessed_train_data_path = []

for index, transformer in enumerate(preprocessor_transformers):
    transformer.transform(train_inputs[index], content_type='text/csv')
    
    print('Launching batch transform job:    
{}'.format(transformer.latest_transform_job.job_name))
    preprocessed_train_data_path.append(transformer.output_path)

Training regression models

In this step, you train multiple models, one for each location.

Start by accessing the built-in linear learner algorithm:

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')
container

Depending on the Region you’re using, you receive output similar to the following:

	382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1

Next, define a method to launch a training job for a single location using the Amazon SageMaker Estimator API. In the hyperparameter configuration, you use predictor_type='regressor' to indicate that you’re using the algorithm to train a regression model. See the following code:

def launch_training_job(location, transformer):
    """Launch a linear learner traing job"""
    
    train_inputs = '{}/{}'.format(transformer.output_path, "train.csv")
    val_inputs = '{}/{}'.format(transformer.output_path, "val.csv")
    
    print("train_inputs:", train_inputs)
    print("val_inputs:", val_inputs)
     
    full_output_prefix = '{}/model_artifacts/{}'.format(DATA_PREFIX, location)
    s3_output_path = 's3://{}/{}'.format(BUCKET, full_output_prefix)
    
    print("s3_output_path ", s3_output_path)
    
    s3_output_path = 's3://{}/{}/model_artifacts/{}'.format(BUCKET, DATA_PREFIX, location)
    
    linear_estimator = sagemaker.estimator.Estimator(
                            container,
                            role, 
                            train_instance_count=1, 
                            train_instance_type='ml.c4.xlarge',
                            output_path=s3_output_path,
                            sagemaker_session=sagemaker_session)
    
    linear_estimator.set_hyperparameters(
                           feature_dim=10,
                           mini_batch_size=100,
                           predictor_type='regressor',
                           epochs=10,
                           num_models=32,
                           loss='absolute_loss')
    DISTRIBUTION_MODE = 'FullyReplicated'
    train_input = sagemaker.s3_input(s3_data=train_inputs, 
           distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1')
    val_input   = sagemaker.s3_input(s3_data=val_inputs,
           distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1')
    
    remote_inputs = {'train': train_input, 'validation': val_input}
    linear_estimator.fit(remote_inputs, wait=False)
    return linear_estimator.latest_training_job.name

You can now start multiple model training jobs, one for each location. Make sure to choose the correct value for PARALLEL TRAINING_JOBS, taking your AWS account service limits and cost into consideration. In the notebook, this value is set to 4. See the following code:

training_jobs = []
for transformer, loc in zip(preprocessor_transformers, LOCATIONS[:PARALLEL_TRAINING_JOBS]): 
    job = launch_training_job(loc, transformer)
    training_jobs.append(job)
print('{} training jobs launched: {}'.format(len(training_jobs), training_jobs))

You receive output similar to the following:

4 training jobs launched: [(<sagemaker.estimator.Estimator object at 0x7fb54784b6d8>, 'linear-learner-2020-06-03-03-51-26-548'), (<sagemaker.estimator.Estimator object at 0x7fb5478b3198>, 'linear-learner-2020-06-03-03-51-26-973'), (<sagemaker.estimator.Estimator object at 0x7fb54780dbe0>, 'linear-learner-2020-06-03-03-51-27-775'), (<sagemaker.estimator.Estimator object at 0x7fb5477664e0>, 'linear-learner-2020-06-03-03-51-31-457')]

Wait until all training jobs are complete before proceeding to the next step.

Creating an Amazon SageMaker model with multi-model support

When the training jobs are complete, you’re ready to create an MME.

First, define a method to copy model artifacts from the training job output to a location in Amazon S3 where the MME dynamically loads individual models:

def deploy_artifacts_to_mme(job_name):
    print("job_name :", job_name)
    response = sm_client.describe_training_job(TrainingJobName=job_name)
    source_s3_key,model_name =    parse_model_artifacts(response['ModelArtifacts']['S3ModelArtifacts'])
    copy_source = {'Bucket': BUCKET, 'Key': source_s3_key}
    key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name)
    print('Copying {} modeln   from: {}n     to: {}...'.format(model_name, source_s3_key, key))
    s3_client.copy_object(Bucket=BUCKET, CopySource=copy_source, Key=key)

Copy the model artifacts from all the training jobs to this location:

## Deploy all but the last model trained to MME
for job_name in training_jobs[:-1]:
	deploy_artifacts_to_mme(job_name)

You receive output similar to the following:

linear-learner-2020-06-03-03-51-26-973
Copying LosAngeles_CA model
   from: DEMO_MME_LINEAR_LEARNER/model_artifacts/LosAngeles_CA/linear-learner-2020-06-03-03-51-26-973/output/model.tar.gz
     to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/LosAngeles_CA/LosAngeles_CA.tar.gz...
linear-learner-2020-06-03-03-51-27-775
Copying Chicago_IL model
   from: DEMO_MME_LINEAR_LEARNER/model_artifacts/Chicago_IL/linear-learner-2020-06-03-03-51-27-775/output/model.tar.gz
     to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/Chicago_IL/Chicago_IL.tar.gz...
linear-learner-2020-06-03-03-51-31-457

Create the Amazon SageMaker model entity using the MultiDataModel API:

MODEL_NAME = '{}-{}'.format(HOUSING_MODEL_NAME, strftime('%Y-%m-%d-%H-%M-%S', gmtime()))

_model_url  = 's3://{}/{}/{}/'.format(BUCKET, DATA_PREFIX, MULTI_MODEL_ARTIFACTS)

ll_multi_model = MultiDataModel(
        name=MODEL_NAME,
        model_data_prefix=_model_url,
        image=container,
        role=role,
        sagemaker_session=sagemaker

Creating an inference pipeline

Set up an inference pipeline with the PipelineModel API. This sets up a list of models in a single endpoint; for this post, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted MME linear learner model. See the following code:

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inference_model = sklearn_preprocessor.create_model()

model_name = '{}-{}'.format('inference-pipeline', timestamp_prefix)
endpoint_name = '{}-{}'.format('inference-pipeline-ep', timestamp_prefix)

sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    sagemaker_session=sagemaker_session,
    models=[
        scikit_learn_inference_model, 
        ll_multi_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

The MME is now ready to take inference requests and respond with predictions. With the MME, the inference request should include the target model to invoke.

Testing the inference pipeline

You can now get predictions from the different linear learner models. Create a RealTimePredictor with the inference pipeline endpoint:

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

Define a method to get predictions from the RealTimePredictor:

def predict_one_house_value(features, model_name, predictor_to_use):
    print('Using model {} to predict price of this house: {}'.format(model_name,
                                                                     features))
    body = ','.join(map(str, features)) + 'n'
    start_time = time.time()
     
    response = predictor_to_use.predict(features, target_model=model_name)
    
    response_json = json.loads(response)
        
    predicted_value = response_json['predictions'][0]['score']    
    
    duration = time.time() - start_time
    
    print('${:,.2f}, took {:,d} msn'.format(predicted_value, int(duration * 1000)))

With MME, the models are dynamically loaded into the container’s memory of the instance hosting the endpoint when invoked. Therefore, the model invocation may take longer when it’s invoked for the first time. When the model is already in the instance container’s memory, the subsequent invocations are faster. If an instance memory utilization is high and a new model needs to be loaded, unused models are unloaded. The unloaded models remain in the instance’s storage volume and can be loaded into container’s memory later without being downloaded from the S3 bucket again. If the instance’s storage volume is full, unused models are deleted from storage volume.

Amazon SageMaker fully manages the loading and unloading of the models, without you having to take any specific actions. However, it’s important to understand this behavior because it has implications on the model invocation latency.

Iterate through invocations with random inputs against a random model and show the predictions and the time it takes for the prediction to come back:

for i in range(10):
    model_name = LOCATIONS[np.random.randint(1, len(LOCATIONS[:PARALLEL_TRAINING_JOBS]))]
    full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
    predict_one_house_value(gen_random_house()[1:], full_model_name,runtime_sm_client)

You receive output similar to the following:

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1993, 2728, 6, 3.0, 0.7, 1, 'y', 'y']
$439,972.62, took 1,166 ms

Using model Houston_TX/Houston_TX.tar.gz to predict price of this house: [1989, 1944, 5, 3.0, 1.0, 1, 'n', 'y']
$280,848.00, took 1,086 ms

Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1968, 2427, 4, 3.0, 1.0, 2, 'y', 'n']
$266,721.31, took 1,029 ms

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2000, 4024, 2, 1.0, 0.82, 1, 'y', 'y']
$584,069.88, took 53 ms

Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1986, 3463, 5, 3.0, 0.9, 1, 'y', 'n']
$496,340.19, took 43 ms

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2002, 3885, 4, 3.0, 1.16, 2, 'n', 'n']
$626,904.12, took 39 ms

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 1531, 6, 3.0, 0.68, 1, 'y', 'n']
$257,696.17, took 36 ms

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 2327, 2, 3.0, 0.59, 3, 'n', 'n']
$337,758.22, took 33 ms

Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1995, 2656, 5, 1.0, 1.16, 0, 'y', 'n']
$390,652.59, took 35 ms

Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [2000, 4086, 2, 3.0, 1.03, 3, 'n', 'y']
$632,995.44, took 35 ms

The output that shows the predicted house price and the time it took for the prediction.

You should consider two different invocations of the same model. The second time, you don’t need to download from Amazon S3 because they’re already present on the instance. You see the inferences return in less time than before. For this use case, the invocation time for the Chicago_IL/Chicago_IL.tar.gz model reduced from 1,166 milliseconds the first time to 53 milliseconds the second time. Similarly, the invocation time for the LosAngeles_CA /LosAngeles_CA.tar.gz model reduced from 1,029 milliseconds to 43 milliseconds.

Updating an MME with new models

To deploy a new model to an existing MME, copy a new set of model artifacts to the same Amazon S3 location you set up earlier. For example, copy the model for the Houston location with the following code:

## Copy the last model
last_training_job=training_jobs[PARALLEL_TRAINING_JOBS-1]
deploy_artifacts_to_mme(last_training_job)

Now you can make predictions using the last model. See the following code:

model_name = LOCATIONS[PARALLEL_TRAINING_JOBS-1]
full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictor)

Monitoring MMEs with CloudWatch metrics

Amazon SageMaker provides CloudWatch metrics for MMEs so you can determine the endpoint usage and the cache hit rate and optimize your endpoint. To analyze the endpoint and the container behavior, you invoke multiple models in this sequence:

##Create 200 copies of the original model and save with different names.
copy_additional_artifacts_to_mme(200)
##Starting with no models loaded into the container
##Invoke the first 100 models
invoke_multiple_models_mme(0,100)
##Invoke the same 100 models again
invoke_multiple_models_mme(0,100)
##This time invoke all 200 models to observe behavior
invoke_multiple_models_mme(0,200)

The following chart shows the behavior of the CloudWatch metrics LoadedModelCount and MemoryUtilization corresponding to these model invocations.

The LoadedModelCount metric continuously increases as more models are invoked, until it levels off at 121. The MemoryUtilization metric of the container also increased correspondingly to around 79%. This shows that the instance chosen to host the endpoint could only maintain 121 models in memory when 200 model invocations were made.

The following chart adds the ModelCacheHit metric to the previous two.

As the number of models loaded to the container memory increase, the ModelCacheHit metric improves. When the same 100 models are invoked the second time, ModelCacheHit reaches 1. When new models not yet loaded are invoked, ModelCacheHit decreases again.

You can use CloudWatch charts to help make ongoing decisions on the optimal choice of instance type, instance count, and number of models that a given endpoint should host.

Exploring granular access to models hosted on an MME

Because of the role attached to the notebook instance, it can invoke all models hosted on the MME. However, you can restrict this model invocation access to specific models by using IAM condition keys. To explore this, you create a new IAM role and IAM policy with a condition key to restrict access to a single model. You then assume this new role and verify that only a single target model can be invoked.

The role assigned to the Amazon SageMaker notebook instance should allow IAM role and IAM policy creation for the next steps to be successful.

Create an IAM role with the following code:

#Create a new role that can be assumed by this notebook.  The roles should allow access to only a single model.
path='/'
role_name="{}{}".format('allow_invoke_ny_model_role', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
description='Role that allows invoking a single model'
action_string = "sts:AssumeRole"
trust_policy={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "statement1",
      "Effect": "Allow",
      "Principal": {
        "AWS": role
      },
      "Action": "sts:AssumeRole"
    }
  ]
        	} 

response = iam_client.create_role(
    Path=path,
    RoleName=role_name,
    AssumeRolePolicyDocument=json.dumps(trust_policy),
    Description=description,
    MaxSessionDuration=3600
)

print(response)

Create an IAM policy with a condition key to restrict access to only the NewYork model:

managed_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SageMakerAccess",
            "Action": "sagemaker:InvokeEndpoint",
            "Effect": "Allow",
            "Resource":endpoint_resource_arn,
            "Condition": {
                "StringLike": {
                    "sagemaker:TargetModel": ["NewYork_NY/*"]
                }
            }
        }
    ]
}
response = iam_client.create_policy(
  PolicyName='allow_invoke_ny_model_policy',
  PolicyDocument=json.dumps(managed_policy)
)

Attach the IAM policy to the IAM role:

iam_client.attach_role_policy(
    PolicyArn=policy_arn,
    RoleName=role_name
)

Assume the new role and create a RealTimePredictor object runtime client:

## Invoke with the role that has access to only NY model
sts_connection = boto3.client('sts')
assumed_role_limited_access = sts_connection.assume_role(
    RoleArn=role_arn,
    RoleSessionName="MME_Invoke_NY_Model"
)
assumed_role_limited_access['AssumedRoleUser']['Arn']

#Create sagemaker runtime client with assumed role
ACCESS_KEY = assumed_role_limited_access['Credentials']['AccessKeyId']
SECRET_KEY = assumed_role_limited_access['Credentials']['SecretAccessKey']
SESSION_TOKEN = assumed_role_limited_access['Credentials']['SessionToken']

runtime_sm_client_with_assumed_role = boto3.client(
    service_name='sagemaker-runtime', 
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN,
)

#SageMaker session with the assumed role
sagemakerSessionAssumedRole = sagemaker.Session(sagemaker_runtime_client=runtime_sm_client_with_assumed_role)
#Create a RealTimePredictor with the assumed role.
predictorAssumedRole = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemakerSessionAssumedRole,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

Now invoke the NewYork_NY model:

full_model_name = 'NewYork_NY/NewYork_NY.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name, predictorAssumedRole) 

You receive output similar to the following:

Using model NewYork_NY/NewYork_NY.tar.gz to predict price of this house: [1992, 1659, 2, 2.0, 0.87, 2, 'n', 'y']
$222,008.38, took 154 ms

Next, try to invoke a different model (Chicago_IL/Chicago_IL.tar.gz). This should throw an error because the assumed role isn’t authorized to invoke this model. See the following code:

full_model_name = 'Chicago_IL/Chicago_IL.tar.gz'

predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole) 

You receive output similar to the following:

ClientError: An error occurred (AccessDeniedException) when calling the InvokeEndpoint operation: User: arn:aws:sts::xxxxxxxxxxxx:assumed-role/allow_invoke_ny_model_role/MME_Invoke_NY_Model is not authorized to perform: sagemaker:InvokeEndpoint on resource: arn:aws:sagemaker:us-east-1:xxxxxxxxxxxx:endpoint/inference-pipeline-ep-2020-07-01-15-46-51

Conclusion

Amazon SageMaker MMEs are a very powerful tool for teams developing multiple ML models to save significant costs and lower deployment overhead for a large number of ML models. This post discussed the new capabilities of Amazon SageMaker MMEs: native integration with Amazon SageMaker built-in algorithms (such as linear learner and KNN), native integration with inference pipelines, and fine-grained controlled access to the multiple models hosted on a single endpoint using IAM condition keys.

The notebook included with the post provided detailed instructions on training multiple linear learner models for house price predictions for multiple locations, hosting all the models on a single MME, and controlling access to the individual models.When considering multi-model enabled endpoints, you should balance the cost savings and the latency requirements.

Give Amazon SageMaker MMEs a try and leave your feedback in the comments.


About the Author

Sireesha Muppala is a AI/ML Specialist Solutions Architect at AWS, providing guidance to customers on architecting and implementing machine learning solutions at scale. She received her Ph.D. in Computer Science from University of Colorado, Colorado Springs. In her spare time, Sireesha loves to run and hike Colorado trails.

 

 

 

Michael Pham is a Software Development Engineer in the Amazon SageMaker team. His current work focuses on helping developers efficiently host machine learning models. In his spare time he enjoys Olympic weightlifting, reading, and playing chess.

Read More

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

As the volume of unstructured data such as text and voice continues to grow, businesses are increasingly looking for ways to incorporate this data into their time series predictive modeling workflows. One example use case is transcribing calls from call centers to forecast call handle times and improve call volume forecasting. In the retail or media industry, companies are interested in using related information about products or content to forecast popularity of existing or new products or content from unstructured information such as product type, description, audience reviews, or social media feeds. However, combining this unstructured data with time series is challenging because most traditional time series models require numerical inputs for forecasting. In this post, we describe how you can combine Amazon SageMaker with Amazon Forecast to include unstructured text data into your time series use cases.

Solution overview

For our use case, we predict the popularity of news articles based on their topics looking forward over a 15 day horizon. You first download and preprocess the data and then run the NTM algorithm to generate topic vectors. After generating the topic vectors, you save them and use these vectors as a related time series to create the forecast.

The following diagram illustrates the architecture of this solution.

AWS services

Forecast is a fully managed service that uses machine learning (ML) to generate highly accurate forecasts without requiring any prior ML experience. Forecast is applicable in a wide variety of use cases, including energy demand forecasting, estimating product demand, workforce planning, and computing cloud infrastructure usage.

With Forecast, there are no servers to provision or ML models to build manually. Additionally, you only pay for what you use, and there is no minimum fee or upfront commitment. To use Forecast, you only need to provide historical data for what you want to forecast, and, optionally, any related data that you believe may impact your forecasts. This related data may include time-varying data (such as price, events, and weather) and categorical data (such as color, genre, or region). The service automatically trains and deploys ML models based on your data and provides you with a custom API to retrieve forecasts.

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy ML models quickly. The Neural Topic Model (NTM) algorithm is an unsupervised learning algorithm that can organize a collection of documents into topics that contain word groupings based on their statistical distribution. For example, documents that contain frequent occurrences of words such as “bike,” “car,” “train,” “mileage,” and “speed” are likely to share a topic on “transportation.” You can use topic modeling to classify or summarize documents based on the topics detected. You can also use it to retrieve information and recommend content based on topic similarities.

The derived topics that NTM learns are characterized as a latent representation because they are inferred from the observed word distributions in the collection. The semantics of topics are usually inferred by examining the top ranking words they contain. Because the method is unsupervised, only the number of topics, not the topics themselves, are pre-specified. In addition, the topics aren’t guaranteed to align with how a human might naturally categorize documents. NTM is one of the built-in algorithms you can train and deploy using Amazon SageMaker.

Prerequisites

To follow along with this post, you must create the following:

To create the aforementioned resources and clone the forecast-samples GitHub repo into the notebook instance, launch the following AWS CloudFormation stack:

In the Parameters section, enter unique names for your S3 bucket and notebook and leave all other settings at their default.

When the CloudFormation script is complete, you can view the created resources on the Resources tab of the stack.

Navigate to Sagemaker and open the notebook instance created from the CloudFormation template. Open Jupyter and continue to the /notebooks/blog_materials/Time_Series_Forecasting_with_Unstructured_Data_and_Amazon_SageMaker_Neural_Topic_Model/ folder and start working your way through the notebooks.

Creating the resources manually

For the sake of completeness, we explain in detail the steps necessary to create the resources that the CloudFormation script creates automatically.

  1. Create an IAM role that can do the following:
    1. Has permission to access Forecast and Amazon S3 to store the training and test datasets.
    2. Has an attached trust policy to give Amazon SageMaker permission to assume the role.
    3. Allows Forecast to access Amazon S3 to pull the stored datasets into Forecast.

For more information, see Set Up Permissions for Amazon Forecast.

  1. Create an Amazon SageMaker notebook instance.
  2. Attach the IAM role you created for Amazon SageMaker to this notebook instance.
  3. Create an S3 bucket to store the outputs of your human workflow.
  4. Copy the ARN of the bucket to use in the accompanying Jupyter notebook.

 This project consists of three notebooks, available in the GitHub repo. They cover the following:

  • Preprocessing the dataset
  • NTM with Amazon SageMaker
  • Using Amazon Forecast to predict the topic’s popularity on various social media platforms going forward

Training and deploying the forecast

In the first notebook, 1_preprocess.ipynb, you download the New Popularity in Multiple Social Media Platforms dataset from the University of California Irvine (UCI) Machine Learning Repository using the requests library [1]. The following screenshot shows a sample of the dataset, where we have anonymized the topic names without loss of generality. It consists of news articles and their popularity on various social channels.

Because we’re focused on predictions based on the Headline and Title columns, we drop the Source and IDLink columns. We examine the current state of the data with a simple histogram plot. The following plot depicts the popularity of a subset of articles on Facebook.

The following plot depicts the popularity of a subset of articles on GooglePlus.

The distributions are heavily skewed towards a very small number of views; however, there are a few outlier articles that have an extremely high popularity.

Preprocessing the data

You may notice the popularity of the articles is extremely skewed. To convert this into a usable time series for ML, we need to convert the PublishDate column, which is read in as a string type, to a datetime type using the Pandas to_datetime method:

df['PublishDate'] = pd.to_datetime(df['PublishDate'], infer_datetime_format=True)

We then group by topic and save the preprocessed.csv to be used by the next notebook, 2_NTM.ipynb. In the directory /data, you should see a file called NewsRatingsdataset.csv. You can now move to the next notebook, where you build a neural topic model to extract topic vectors from the processed dataset.

Before creating the topic model, it’s helpful to explore the data some more. In the following code, we plot the daily time series for the popularity of a given topic across the three social media channels, as well as a daily time series for the sentiment of a topic based on news article titles and headlines:

topic = 1 # Change this to any of [0, 1, 2, 3]
subdf = df[(df['Topic']==topic)&(df['PublishDate']>START_DATE)]
subdf = subdf.reset_index().set_index('PublishDate')
subdf.index = pd.to_datetime(subdf.index)
subdf.head()
subdf[["LinkedIn", 'GooglePlus', 'Facebook']].resample("1D").mean().dropna().plot(figsize=(15, 4))
subdf[["SentimentTitle", 'SentimentHeadline']].resample("1D").mean().dropna().plot(figsize=(15, 4))

The following are the plots for the topic Topic_1.

The dataset still needs a bit more cleaning before it’s ready for the NTM algorithm to use. Not much data exists before October 13, 2015, so you can drop the data before that date and reset the indexes accordingly. Moreover, some of the headlines and ratings contain missing values, denoted by NaN and -1, respectively. You can use regex to find and replace those headlines with empty strings and convert these ratings to zeros. There is a difference in scale for the popularity of a topic on Facebook vs. LinkedIn vs. GooglePlus. For this post, you focus on forecasting popularity on Facebook only.

Topic modeling

Now you use the built-in NTM algorithm on Amazon SageMaker to extract topics from the news headlines. When preparing a corpus of documents for NTM, you must clean and standardize the data by converting the text to lower case, remove stop words, remove any numeric characters that may not be meaningful to your corpus, and tokenize the document’s text.

We use the Natural Language Toolkit and sklearn Python libraries to convert the headlines into tokens and create vectors of the token’s counts. Also, we drop the Title column in the dataframe, but store the titles in a separate dataframe. This is because the Headline column contains similar information as the Title column, but the headlines are longer and more descriptive, and we want to use the titles later on as a validation set for our NTM during training.

Lastly, we type cast the vectors into a sparse array in order to reduce the amount of memory utilization, because the bag-of-words matrix can quickly become quite large and memory intensive. For more information, see the notebook or Build a semantic content recommendation system with Amazon SageMaker.

Training an NTM

To extract text vectors, you convert each headline into a 20 (NUM_TOPICS)-dimensional topic vector. This can be viewed as an effective lower-dimensional embedding of all the text in the corpus into some predefined topics. Each topic has a representation as a vector, and related topics have a related vector representation. This topic is a derived topic and is not to be confused with the original Topic field in the raw dataset. Assuming that there is some correlation between topics from one day to the next (for example, the top topics don’t change very frequently on a daily basis), you can represent all the text in the dataset as a collection of 20 topics.

You then set the training dataset and trained model artifact location in Amazon S3 and upload the data. To train the model, you can use one or more instances (specified by the parameter train_instance_count) and choose a strategy to either fully replicate the data on each instance or use ShardedByS3Key, which only puts certain data shards on each instance. This speeds up training at the cost of each instance only seeing a fraction of the data.

To reduce overfitting, it’s helpful to introduce a validation dataset in addition to the training dataset. The hyperparameter num_patience_epochs controls the early stopping behavior, which makes sure the training is stopped if the change in the loss is less than the specified tolerance (set by tolerance) consistently for num_patience_epochs. The epochs hyperparameter specifies the total number of epochs to run the job. For this post, we chose hyperparameters to balance the tradeoff between accuracy and training time:

%time
from sagemaker.session import s3_input

sess = sagemaker.Session()
ntm = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.c4.xlarge',
                                    output_path=output_path,
                                    sagemaker_session=sess)
ntm.set_hyperparameters(num_topics=NUM_TOPICS, feature_dim=vocab_size, mini_batch_size=128, 
                        epochs=100, num_patience_epochs=5, tolerance=0.001)
s3_train = s3_input(s3_train_data, distribution='FullyReplicated') 
ntm.fit({'train': s3_train})

To further improve the model performance, you can take advantage of hyperparameter tuning in Amazon SageMaker.

Deploying and testing the model

To generate the feature vectors for the headlines, you first deploy the model and run inferences on the entire training dataset to obtain the topic vectors. An alternative option is to run a batch transform job.

To ensure that the topic model works as expected, we show the extracted topic vectors from the titles, and check if the topic distribution of the title is similar to that of the corresponding headline. Remember that the model hasn’t seen the titles before. As a measure of similarity, we compute the cosine similarity for a random title and associated headline. A high cosine similarity indicates that titles and headlines have a similar representation in this low-dimensional embedding space.

You can also use a cosine similarity of the title-headline as a feature: well-written titles that correlate well with the actual headline may obtain a higher popularity score. You could use this to check if titles and headlines represent the content of the document accurately, but we don’t explore this further in this notebook [2].

Finally, you store the results of the headlines mapped across the extracted NUM_TOPICS (20) back into a dataframe and save the dataframe as preprocessed_data.csv in data/ for use in subsequent notebooks.

The following code tests the vector similarity:

ntm_predictor = ntm.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')
topic_data = np.array(topic_vectors.tocsr()[:10].todense())
topic_vecs = []
for index in range(10):
    results = ntm_predictor.predict(topic_data[index])
    predictions = np.array([prediction['topic_weights'] for prediction in results['predictions']])
    topic_vecs.append(predictions)
from sklearn.metrics.pairwise import cosine_similarity
comparisonvec = []
for i, idx in enumerate(range(10)):
    comparisonvec.append([df.Headline[idx], title_column[idx], cosine_similarity(topic_vecs[i], [pred_array_cc[idx]])[0][0]])
pd.DataFrame(comparisonvec, columns=['Headline', 'Title', 'CosineSimilarity'])

The following screenshot shows the output.

Visualizing headlines

Another way to visualize the results is to plot a T-SNE graph. T-SNE uses a nonlinear embedding model by attempting to check if the nearest neighbor joint probability distribution in the high-dimensional space (for this use case, NUM_TOPICS) matches the equivalent lower-dimensional (2) joint distribution by minimizing a loss known as the Kullback-Leibler divergence [3]. Essentially, this is a dimensionality reduction technique that can map high-dimensional vectors to a lower-dimensional space.

Computing the T-SNE can take quite some time, especially for large datasets, so we shuffle the dataset and extract only 10,000 headline embeddings for the T-SNE plot. For more information about the advantages and pitfalls of using T-SNE in topic modeling, see How to Use t-SNE Effectively.

The following T-SNE plot shows a few large topics (indicated by the similar color clusters—red green, purple, blue, and brown), which is consistent with the dataset containing four primary topics. But by expanding the dimensionality of the topic vectors to NUM_TOPICS = 20, we allow the NTM model to capture additional semantic information between the headlines than is captured by a single topic token.

With our topic modeling complete and our data saved, you can now delete the endpoint to avoid incurring any charges.

Forecasting topic popularity

Now you run the third and final notebook, where you use the Forecast DeepAR+ algorithm to forecast the popularity of the topics. First, you establish a Forecast session using the Forecast SDK. It’s very important the region of your bucket is in the same region as the session.

After this step, you read in preprocessed_data.csv into a dataframe for some additional preprocessing. Drop the Headline column and replace the index of the dataframe with the publish date of the news article. You do this so you can easily aggregate the data on a daily basis. The following screenshot shows your results.

Creating the target and related time series

For this post, you want to forecast the Facebook ratings for each of the four topics in the Topic column of the dataset. In Forecast, we need to define a target time series that consists of the item ID, timestamp, and the value we want to forecast.

Additionally, as of this writing, you can provide a related time series, which can include up to 13 dynamic features, which in our use case are the SentimentHeadline and the topic vectors. Because we can only choose 13 features in Forecast, we choose 10 out of the 20 topic vectors to illustrate building the Forecast model. Currently, the CNN-QR, DeepAR+ algorithm (which we use in this post), and Prophet algorithm support related time series.

As before, we start forecasting from 2015-11-01 and end our training data at 2016-06-21. Using this, we forecast for 15 days into the future. The following screenshot shows our target time series.

The following screenshot shows our related time series.

Upload the datasets to the S3 bucket.

Defining the dataset schemas and dataset group to ingest into Forecast

Forecast has several predefined domains that come with predefined schemas for data ingestion. Because we’re interested in web traffic, you can choose the WEB_TRAFFIC domain. For more information about dataset domains, see Predefined Dataset Domains and Dataset Types.

This provides a predefined schema and attribute types for the attributes you include in the target and related time series. The WEB_TRAFFIC domain doesn’t have item metadata; only target and related time series data is allowed.

Define the schema for the target time series with the following code:

# Set the dataset name to a new unique value. If it already exists, go to the Forecast console and delete any existing
# dataset ARNs and datasets.

datasetName = 'webtraffic_forecast_NLP'

schema ={
   "Attributes":[
      {
         "AttributeName":"item_id",
         "AttributeType":"string"
      },    
       {
         "AttributeName":"timestamp",
         "AttributeType":"timestamp"
      },
      {
         "AttributeName":"value",
         "AttributeType":"float"
      }      
   ]
}

try:
    response = forecast.create_dataset(
                    Domain="WEB_TRAFFIC",
                    DatasetType='TARGET_TIME_SERIES',
                    DatasetName=datasetName,
                    DataFrequency=DATASET_FREQUENCY, 
                    Schema = schema
                   )
    datasetArn = response['DatasetArn']
    print('Success')
except Exception as e:
    print(e)
    datasetArn = 'arn:aws:forecast:{}:{}:dataset/{}'.format(REGION_NAME, ACCOUNT_NUM, datasetName)

Define the schema for the related time series with the following code:

# Set the dataset name to a new unique value. If it already exists, go to the Forecast console and delete any existing
# dataset ARNs and datasets.

datasetName = 'webtraffic_forecast_related_NLP'
schema ={
   "Attributes":[{
         "AttributeName":"item_id",
         "AttributeType":"string"
      }, 
       {
         "AttributeName":"timestamp",
         "AttributeType":"timestamp"
      },
       {
         "AttributeName":"SentimentHeadline",
         "AttributeType":"float"
      }]
    + 
      [{
         "AttributeName":"Headline_{}".format(x),
         "AttributeType":"float"
      } for x in range(10)] 
}

try:
    response=forecast.create_dataset(
                    Domain="WEB_TRAFFIC",
                    DatasetType='RELATED_TIME_SERIES',
                    DatasetName=datasetName,
                    DataFrequency=DATASET_FREQUENCY, 
                    Schema = schema
                   )
    related_datasetArn = response['DatasetArn']
    print('Success')
except Exception as e:
    print(e)
    related_datasetArn = 'arn:aws:forecast:{}:{}:dataset/{}'.format(REGION_NAME, ACCOUNT_NUM, datasetName)

Before ingesting any data into Forecast, we need to combine the target and related time series into a dataset group:

datasetGroupName = 'webtraffic_forecast_NLPgroup'
    
#try:
create_dataset_group_response = forecast.create_dataset_group(DatasetGroupName=datasetGroupName,
                                                              Domain="WEB_TRAFFIC",
                                                              DatasetArns= [datasetArn, related_datasetArn]
                                                             )
datasetGroupArn = create_dataset_group_response['DatasetGroupArn']
except Exception as e:
    datasetGroupArn = 'arn:aws:forecast:{}:{}:dataset-group/{}'.format(REGION_NAME, ACCOUNT_NUM, datasetGroupName)

Ingesting the target and related time series data from Amazon S3

Next you import the target and related data previously stored in AmazonS3 to create a Forecast dataset. You provide the location of the training data in Amazon S3 and the ARN of the dataset placeholder you created.

Ingest the target and related time series with the following code:

s3DataPath = 's3://{}/{}/target_time_series.csv'.format(bucket, prefix)
datasetImportJobName = 'forecast_DSIMPORT_JOB_TARGET'

try:
    ds_import_job_response=forecast.create_dataset_import_job(DatasetImportJobName=datasetImportJobName,
                                                          DatasetArn=datasetArn,
                                                          DataSource= {
                                                              "S3Config" : {
                                                                 "Path":s3DataPath,
                                                                 "RoleArn": role_arn
                                                              } 
                                                          },
                                                          TimestampFormat=TIMESTAMP_FORMAT
                                                         )
    ds_import_job_arn=ds_import_job_response['DatasetImportJobArn']
    target_ds_import_job_arn = copy.copy(ds_import_job_arn) #used to delete the resource during cleanup
except Exception as e:
    print(e)
    ds_import_job_arn='arn:aws:forecast:{}:{}:dataset-import-job/{}/{}'.format(REGION_NAME, ACCOUNT_NUM, datasetArn, datasetImportJobName)
s3DataPath = 's3://{}/{}/related_time_series.csv'.format(bucket, prefix)
datasetImportJobName = 'forecast_DSIMPORT_JOB_RELATED'
try:
    ds_import_job_response=forecast.create_dataset_import_job(DatasetImportJobName=datasetImportJobName,
                                                          DatasetArn=related_datasetArn,
                                                          DataSource= {
                                                              "S3Config" : {
                                                                 "Path":s3DataPath,
                                                                 "RoleArn": role_arn
                                                              } 
                                                          },
                                                          TimestampFormat=TIMESTAMP_FORMAT
                                                         )
    ds_import_job_arn=ds_import_job_response['DatasetImportJobArn']
    related_ds_import_job_arn = copy.copy(ds_import_job_arn) #used to delete the resource during cleanup
except Exception as e:
    print(e)
    ds_import_job_arn='arn:aws:forecast:{}:{}:dataset-import-job/{}/{}'.format(REGION_NAME, ACCOUNT_NUM, related_datasetArn, datasetImportJobName)

Creating the predictor

The Forecast DeepAR+ algorithm is a supervised learning algorithm for forecasting scalar (one-dimensional) time series using recurrent neural networks (RNNs). Classic forecasting methods, such as ARIMA or exponential smoothing (ETS), fit a single model to each individual time series. In contrast, DeepAR+ creates a global model (one model for all the time series) with the potential benefit of learning across time series.

The DeepAR+ model is particularly useful when working with a large collection (over thousands) of target time series, in which certain time series have a limited amount of information. For example, as a generalization of this use case, global models such as DeepAR+ can use the information from related topics with strong statistical signals to predict the popularity of new topics with little historical data. Importantly, DeepAR+ also allows you to include related information such as the topic vectors in a related time series.

To create the predictor, use the following code:

try:
    create_predictor_response=forecast.create_predictor(PredictorName=predictorName, 
                                                  ForecastHorizon=forecastHorizon,
                                                  AlgorithmArn=algorithmArn,
                                                  PerformAutoML=False, # change to true if want to perform AutoML
                                                  PerformHPO=False, # change to true to perform HPO
                                                  EvaluationParameters= {"NumberOfBacktestWindows": 1, 
                                                                         "BackTestWindowOffset": 15}, 
                                                  InputDataConfig= {"DatasetGroupArn": datasetGroupArn},
                                                  FeaturizationConfig= {"ForecastFrequency": "D", 
                                                                        }
                                                 )
    predictorArn=create_predictor_response['PredictorArn']
except Exception as e:
    predictorArn = 'arn:aws:forecast:{}:{}:predictor/{}'.format(REGION_NAME, ACCOUNT_NUM, predictorName)

When you call the create_predictor() method, it takes several minutes to complete.

Backtesting is a method of testing an ML model trained on and designed to predict time series data. Due to the sequential nature of time series data, training and test data can’t be randomized. Moreover, the most recent time series data is generally considered the most relevant for testing purposes. Therefore, backtesting uses the most recent windows that were unseen by the model during training to test the model and collect metrics. Amazon Forecast lets you choose up to five windows for backtesting. For more information, see Evaluating Predictor Accuracy.

For this post, we evaluate the DeepAR+ model for both the MAPE error, which is a common error metric in time series forecasting, and the root mean square error (RMSE), which penalizes larger deviations even more. The RMSE is an average deviation from the forecasted value and actual value in the same units as the dependent variable (in this use case, topic popularity on Facebook).

Creating and querying the forecast

When you’re satisfied with the accuracy metrics from your trained Forecast model, it’s time to generate a forecast. You can do this by creating a forecast for each item in the target time series used to train the predictor. Query the results to find out the popularity of the different topics in the original dataset.

The following is the result for Topic 0.

The following is the result for Topic 1.

The following is the result for Topic 2.

The following is the result for Topic 3.

Forecast accuracy

As an example, the RMSE for Topic 1 is 22.047559071991657. Although the actual range of popularity values in the ground truth set over the date range of the forecast is quite large [3:331], this RMSE does not in and of itself indicate if the model is production ready or not. The RMSE metric is simply an additional data point that should be used in the evaluation of the efficacy of your model.

Cleaning up

To avoid incurring future charges, delete each Forecast component. Also delete any other resources used in the notebook such as the Amazon SageMaker NTM endpoint, any S3 buckets used for storing data, and finally the Amazon SageMaker notebooks.

Conclusion

In this post, you learned how to build a forecasting model using unstructured raw text data. You also learned how to train a topic model and use the generated topic vectors as related time series for Forecast. Although this post is intended to demonstrate how you can combine these models together, you can improve the model accuracy by training on much larger datasets by having many more topics than in this dataset, using the same methodology.  Amazon Forecast also supports other deep learning models for time series forecasting such as CNN-Qr. To read more about how you can build an end-to-end operational workflow with Amazon Forecast and AWS StepFunctions, see here.

 

References

[1] Multi-Source Social Feedback of Online News Feeds, N. Moniz and L. Torgo, arXiv:1801.07055 (2018).

[2] Learning to determine the quality of news headlines, Omidvar, A. et al. arXiv:1911.11139.

[3] “Visualizing data using T-SNE”, L., Van der Maaten and G. Hinton, Journal of Machine Learning Research 9 2579-2605 (2008).


About the Authors

David Ehrlich is a Machine Learning Specialist at Amazon Web Services. He is passionate about helping customers unlock the true potential of their data. In his spare time, he enjoys exploring the different neighborhoods in New York City, going to comedy clubs, and traveling.

 

 

 

Stefan Natu is a Sr. Machine Learning Specialist at Amazon Web Services. He is focused on helping financial services customers build end-to-end machine learning solutions on AWS. In his spare time, he enjoys reading machine learning blogs, playing the guitar, and exploring the food scene in New York City.

 

Read More

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

Amazon Fraud Detector is a fully managed service that makes it easy to identify potentially fraudulent online activities, such as the creation of fake accounts or online payment fraud. Unlike general-purpose machine learning (ML) packages, Amazon Fraud Detector is designed specifically to detect fraud. Amazon Fraud Detector combines your data, the latest in ML science, and more than 20 years of fraud detection experience from Amazon.com and AWS to build ML models tailor-made to detect fraud in your business.

This post walks you through how to use Amazon Fraud Detector with Amazon Simple Storage Service (Amazon S3) and AWS Lambda to perform a batch of fraud predictions on event records (such as account registrations and transactions) in a CSV file. This architecture enables you to trigger a batch of predictions automatically upon uploading your CSV file to Amazon S3 and retrieve the fraud prediction results in a newly generated CSV also stored in Amazon S3.

Solution overview

Amazon Fraud Detector can perform low-latency fraud predictions, enabling your company to dynamically adjust the customer experience in your applications based on real-time fraud risk detection. But suppose you want to generate fraud predictions for a batch of events after the fact; perhaps you don’t need a low-latency response and want to evaluate events on an hourly or daily schedule. How do you accomplish this using Amazon Fraud Detector? One approach is to use an Amazon S3 event notification to trigger a Lambda function that processes a CSV file of events stored in Amazon S3 when the file is uploaded to an input S3 bucket. The function runs each event through Amazon Fraud Detector to generate predictions using a detector (ML model and rules) and uploads the prediction results to an S3 output bucket. The following diagram illustrates this architecture.

To create this Lambda-based batch prediction system, you complete the following high-level steps:

  1. Create and publish a detector version containing a fraud detection model and rules, or simply a ruleset.
  2. Create two S3 buckets. The first bucket is used to land your CSV file, and the second bucket is where your Lambda function writes the prediction results to.
  3. Create an AWS Identity and Access Management (IAM) role to use as the execution role in the Lambda function.
  4. Create a Lambda function that reads in a CSV file from Amazon S3, calls the Amazon Fraud Detector get_event_prediction function for each record in the CSV file, and writes a CSV file to Amazon S3.
  5. Add an Amazon S3 event trigger to invoke your Lambda function whenever a new CSV file is uploaded to the S3 bucket.
  6. Create a sample CSV file of event records to test the batch prediction process.
  7. Test the end-to-end process by uploading your sample CSV file to your input S3 bucket and reviewing prediction results in the newly generated CSV file in your output S3 bucket.

Creating and publishing a detector

You can create and publish a detector version using the Amazon Fraud Detector console or via the APIs. For console instructions, see Get started (console) or Amazon Fraud Detector is now Generally Available. After you complete this step, note the following items, which you need in later steps:

  • AWS Region you created the detector in
  • Detector name and version
  • Name of the entity type and event type used by your detector
  • List of variables for the entity type used in your detector

The following screenshot shows the detail view of a detector version.

The following screenshot shows the detail view of an event type.

Creating the input and output S3 buckets

Create the following S3 buckets on the Amazon S3 console:

  • fraud-detector-input – Where you upload the CSV file containing events for batch predictions
  • fraud-detector-output – Where the Lambda function writes the prediction results file

Make sure you create your buckets in the same Region as your detector. For more information, see How do I create an S3 Bucket?

Creating the IAM role

To create the execution role in IAM that gives your Lambda function permission to access the AWS resources required for this solution, complete the following steps:

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. Select Lambda.
  4. Choose Next.
  5. Attach the following policies:
    • AWSLambdaBasicExecutionRole – Provides the Lambda function with write permissions to Amazon CloudWatch Logs.
    • AWSXRayDaemonWriteAccess – Allows the AWS X-Ray daemon to relay raw trace data and retrieve sampling data to be used by X-Ray.
    • AmazonFraudDetectorFullAccessPolicy – Provides permissions to create resources and generate fraud predictions in Amazon Fraud Detector.
    • AmazonS3FullAccess – Provides the Lambda function permissions to read and write objects in Amazon S3. This policy provides broad Amazon S3 access; as a best practice, consider reducing the scope of this policy to the S3 buckets required for this example, or use an inline policy such as the following:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::fraud-detector-input/*",
                "arn:aws:s3:::fraud-detector-output/*"
            ]
        }
    ]
}
  1. Choose Next.
  2. Enter a name for your role (for example, lambda-s3-role).
  3. Choose Create role.

Creating the Lambda function

Now let’s create our Lambda function on the Lambda console.

  1. On the Lambda console, choose Create function.
  2. For Function name, enter a name (for example, afd-batch-function).
  3. For Runtime, choose Python 3.8.
  4. For Execution role, select Use an existing role.
  5. For Existing role, choose the role you created.

  1. Choose Create function

Next, we walk through sections of the code used in the Lambda function. This code goes into the Function code section of your Lambda function. The full Lambda function code is available in the next section.

Packages

import json
import csv
import boto3

Defaults

# -- make a connection to fraud detector -- 
client = boto3.client("frauddetector")
# -- S3 bucket to write scored data to -- 
S3_BUCKET_OUT = "fraud-detector-output"
# -- specify event, entity, and detector  -- 
ENTITY_TYPE    = "customer"
EVENT_TYPE     = "new_account_registration_full_details"
DETECTOR_NAME  = "new_account_detector"
DETECTOR_VER   = "3"

We have entered the values from the detector we created and the output S3 bucket. Replace these default values with the values you used when creating your output S3 bucket and Amazon Fraud Detector resources.

Functions

We use a few helper functions along with the main lambda_handler() function:

  • get_event_variables(EVENT_TYPE) – Returns a list of the variables for the event type. We map these to the input file positions.
  • prep_record(record_map, event_map, line) – Returns a record containing just the data required by the detector.
  • get_score(event, record) – Returns the fraud prediction risk scores and rule outcomes from the Amazon Fraud Detector get_event_predictionfunction. The get_score function uses two extra helper functions to format model scores (prep_scores) and rule outcomes (prep_outcomes).

Finally, the lambda_handler(event, context) drives the whole process. See the following example code:

get_event_variables(EVENT_TYPE)
def get_event_variables(EVENT_TYPE):
    """ return list of event variables 
    """
    response = client.get_event_types(name=EVENT_TYPE)
    event_variables = []

    for v in response['eventTypes'][0]['eventVariables']:
        event_variables.append(v)
    return event_variables
prep_record(record_map, event_map, line)
def prep_record(record_map, event_map, line):
    """ structure the record for scoring 
    """
    record = {}
    for key in record_map.keys():
        record[key] = line[record_map[key]]
        
    event = {}
    for key in event_map.keys():
        event[key] = line[event_map[key]]
    return record, event

prep_scores(model_scores)
def prep_scores(model_scores):
    """ return list of models and scores
    """
    detector_models = []
    for m in model_scores:
        detector_models.append(m['scores'])
    return detector_models

prep_outcomes(rule_results)
def prep_outcomes(rule_results):
    """ return list of rules and outcomes 
    """
    detector_outcomes = []
    for rule in rule_results:
        rule_outcomes ={}
        rule_outcomes[rule['ruleId']] = rule['outcomes']
        detector_outcomes.append(rule_outcomes)
    return detector_outcomes 

def get_score(event, record):
def get_score(event, record):
    """ return the score to the function
    """
    pred_rec = {}
    
    try:
        pred = client.get_event_prediction(detectorId=DETECTOR_NAME, 
                                       detectorVersionId=DETECTOR_VER,
                                       eventId = event['EVENT_ID'],
                                       eventTypeName = EVENT_TYPE,
                                       eventTimestamp = event['EVENT_TIMESTAMP'], 
                                       entities = [{'entityType': ENTITY_TYPE, 'entityId':event['ENTITY_ID']}],
                                       eventVariables=  record) 
                                       
        pred_rec["score"]   = prep_scores(pred['modelScores'])
        pred_rec["outcomes"]= prep_outcomes(pred['ruleResults'])

    except: 
        pred_rec["score"]   = [-999]
        pred_rec["outcomes"]= ["error"]
    
    return pred_rec

The following is the full code for the Lambda function:

import boto3 
import csv
import json

# -- make a connection to fraud detector -- 
client = boto3.client("frauddetector")

# -- S3 bucket to write batch predictions out to -- 
S3_BUCKET_OUT = "fraud-detector-output"

# -- specify event, entity, and detector  -- 
ENTITY_TYPE    = "customer"
EVENT_TYPE     = "new_account_registration_full_details"
DETECTOR_NAME  = "new_account_detector"
DETECTOR_VER   = "3"

def get_event_variables(EVENT_TYPE):
    """ return list of event variables 
    """
    response = client.get_event_types(name=EVENT_TYPE)
    event_variables = []

    for v in response['eventTypes'][0]['eventVariables']:
        event_variables.append(v)
    return event_variables

def prep_record(record_map, event_map, line):
    """ structure the record for scoring 
    """
    record = {}
    for key in record_map.keys():
        record[key] = line[record_map[key]]
        
    event = {}
    for key in event_map.keys():
        event[key] = line[event_map[key]]
    return record, event

def prep_scores(model_scores):
    """ return list of models and scores
    """
    detector_models = []
    for m in model_scores:
        detector_models.append(m['scores'])
    return detector_models

def prep_outcomes(rule_results):
    """return list of rules and outcomes
    """
    detector_outcomes = []
    for rule in rule_results:
        rule_outcomes = {}
        rule_outcomes[rule['ruleId']] = rule['outcomes']
        detector_outcomes.append(rule_outcomes)
    return detector_outcomes

def get_score(event, record):
    """ return the score to the function
    """
    pred_rec = {}
    
    try:
        pred = client.get_event_prediction(detectorId=DETECTOR_NAME, 
                                       detectorVersionId=DETECTOR_VER,
                                       eventId = event['EVENT_ID'],
                                       eventTypeName = EVENT_TYPE,
                                       eventTimestamp = event['EVENT_TIMESTAMP'], 
                                       entities = [{'entityType': ENTITY_TYPE, 'entityId':event['ENTITY_ID']}],
                                       eventVariables=  record) 
                                       
        pred_rec["score"]   = prep_scores(pred['modelScores'])
        pred_rec["outcomes"]= prep_outcomes(pred['ruleResults'])

    except: 
        pred_rec["score"]   = [-999]
        pred_rec["outcomes"]= ["error"]
    
    return pred_rec

def lambda_handler(event, context):
    """ the lambda event handler triggers the process. 
    """
    S3_BUCKET_IN = event['Records'][0]['s3']['bucket']['name']
    S3_FILE      = event['Records'][0]['s3']['object']['key']
    S3_OUT_FILE  = "batch_{0}".format(S3_FILE)
    
    
    # -- open a temp file to write predictions to. 
    f = open("/tmp/csv_file.csv", "w+")
    temp_csv_file = csv.writer(f) 
    
    # -- get the input file -- 
    s3    = boto3.resource('s3')
    obj   = s3.Object(S3_BUCKET_IN, S3_FILE)
    data  = obj.get()['Body'].read().decode('utf-8').splitlines()
    lines = csv.reader(data)
    
    # -- get the file header -- 
    file_variables = next(lines)
    
    # -- write the file header to temporary file -- 
    temp_csv_file.writerow(file_variables + ["MODEL_SCORES", "DETECTOR_OUTCOMES"])
    
    # -- get list of event variables -- 
    event_variables = get_event_variables(EVENT_TYPE)
    
    # -- map event variables to file structure -- 
    record_map = {}
    for var in event_variables:
        record_map[var] = file_variables.index(var)
    
    # -- map event fields to file structure --
    event_map = {}
    for var in ['ENTITY_ID', 'EVENT_ID', 'EVENT_TIMESTAMP']:
        event_map[var] = file_variables.index(var)
    
   # -- for each record in the file, prep it, score it, write it to temp. 
    for i,line in enumerate(lines):
        record, event       = prep_record(record_map, event_map, line)
        record_pred         = get_score(event, record)
        #print(list(record_pred.values()))
        temp_csv_file.writerow(line + list(record_pred.values()))
    
    
    # -- close the temp file and upload it to your OUTPUT bucket    
    f.close()
    s3_client = boto3.client('s3')
    s3_client.upload_file('/tmp/csv_file.csv', S3_BUCKET_OUT, "batch_pred_results_" + S3_FILE  )
    
    return {
        'statusCode': 200,
        'body': json.dumps('Batch Complete!')
    }

After you add the code to your Lambda function, choose Deploy to save.

Configuring your Lambda settings and creating the Amazon S3 trigger

The batch prediction processes require memory and time to process, so we need to change the Lambda function’s default memory allocation and maximum run time.

  1. On the Lambda console, locate your function.
  2. On the function detail page, under Basic settings, choose Edit.
  3. For Memory, choose 2048 MB.
  4. For Timeout, enter 15 min.
  5. Choose Save.

A 15-minute timeout allows the function to process up to roughly 4,000 predictions per batch, so you should keep this in mind as you consider your CSV file creation and upload strategy.

You can now make it so that this Lambda function triggers when a CSV file is uploaded to your input S3 bucket.

  1. At the top of the Lambda function detail page, in the Designer box, choose Add trigger.
  2. Choose S3.
  3. For Bucket, choose your input S3 bucket.
  4. For Suffix, enter .csv.

A warning about recursive invocation appears. You don’t want to trigger a read and write to the same bucket, which is why you created a second S3 bucket for the output.

  1. Select the check-box to acknowledge the recursive invocation warning.
  2. Choose Add.

Creating a sample CSV file of event records

We need to create a sample CSV file of event records to test the batch prediction process. In this CSV file, include a column for each variable in your event type schema. In addition, include columns for:

  • EVENT_ID – An identifier for the event, such as a transaction number. The field values must satisfy the following regular expression pattern: ^[0-9a-z_-]+$.
  • ENTITY_ID – An identifier for the entity performing the event, such as an account number. The field values must also satisfy the following regular expression pattern: ^[0-9a-z_-]+$.
  • EVENT_TIMESTAMP – A timestamp, in ISO 8601 format, for when the event occurred.

Column header names must match their corresponding Amazon Fraud Detector variable names exactly.

In your CSV file, each row corresponds to one event that you want to generate a prediction for. The following screenshot shows an example of a test CSV file.

For more information about Amazon Fraud Detector variable data types and formatting, see Create a variable.

Performing a test batch prediction

To test our Lambda function, we simply upload our test file to the fraud-detector-input S3 bucket via the Amazon S3 console. This triggers the Lambda function. We can then check the fraud-detector-output S3 bucket for the results file.

The following screenshot shows that the test CSV file 20_event_test.csv is uploaded to the fraud-detector-input S3 bucket.

When batch prediction is complete, the results CSV file batch_pred_results_20_event_test.csv is uploaded to the fraud-detector-output S3 bucket (see the following screenshot).

The following screenshots show our results CSV file. The new file has two new columns: MODEL_SCORES and DETECTOR_OUTCOMES. MODEL_SCORES contains model names, model details, and prediction scores for any models used in the detector. DETECTOR_OUTCOMES contains all rule results, including any matched rules and their corresponding outcomes.

If the results file doesn’t appear in the output S3 bucket, you can check the CloudWatch log stream to see if the Lambda function ran into any issues. To do this, go to your Lambda function on the Lambda console and choose the Monitoring tab, then choose View logs in CloudWatch. In CloudWatch, choose the log stream covering the time period you uploaded your CSV file.

Conclusion

Congrats! You have successfully performed a batch of fraud predictions. Depending on your use case, you may want to use your prediction results in other AWS services. For example, you can analyze the prediction results in Amazon QuickSight or send results that are high risk to Amazon Augmented AI (Amazon A2I) for a human review of the prediction.

Amazon Fraud Detector has a 2-month free trial that includes 30,000 predictions per month. After that, pricing starts at $0.005 per prediction for rules-only predictions and $0.03 for ML-based predictions. For more information, see Amazon Fraud Detector pricing. For more information about Amazon Fraud Detector, including links to additional blog posts, sample notebooks, user guide, and API documentation, see Amazon Fraud Detector.

The next step is to start dropping files into your S3 bucket! Good luck!


About the Authors

Nick Tostenrude is a Senior Manager of Product in AWS, where he leads the Amazon Fraud Detector service team. Nick joined Amazon nine years ago. He has spent the past four years as part of the AWS Fraud Prevention organization. Prior to AWS, Nick spent five years in Amazon’s Kindle and Devices organizations, leading product teams focused on the Kindle reading experience, accessibility, and K-12 Education.

 

 

 

Mike Ames is a Research Science Manager working on Amazon Fraud Detector. He helps companies use machine learning to combat fraud, waste and abuse. In his spare time, you can find him jamming to 90s metal with an electric mandolin.

Read More