Train machine learning models using Amazon Keyspaces as a data source

Many applications meant for industrial equipment maintenance, trade monitoring, fleet management, and route optimization are built using open-source Cassandra APIs and drivers to process data at high speeds and low latency. Managing Cassandra tables yourself can be time consuming and expensive. Amazon Keyspaces (for Apache Cassandra) lets you set up, secure, and scale Cassandra tables in the AWS Cloud without managing additional infrastructure.

In this post, we’ll walk you through AWS Services related to training machine learning (ML) models using Amazon Keyspaces at a high level, and provide step by step instructions for ingesting data from Amazon Keyspaces into Amazon SageMaker and training a model which can be used for a specific customer segmentation use case.

AWS has multiple services to help businesses implement ML processes in the cloud.

AWS ML Stack has three layers. In the middle layer is SageMaker, which provides developers, data scientists, and ML engineers with the ability to build, train, and deploy ML models at scale. It removes the complexity from each step of the ML workflow so that you can more easily deploy your ML use cases. This includes anything from predictive maintenance to computer vision to predict customer behaviors. Customers achieve up to 10 times improvement in data scientists’ productivity with SageMaker.

Apache Cassandra is a popular choice for read-heavy use cases with un-structured or semi-structured data. For example, a popular food delivery business estimates time of delivery, and a retail customer could persist frequently using product catalog information in the Apache Cassandra Database. Amazon Keyspaces is a scalable, highly available, and managed serverless Apache Cassandra–compatible database service. You don’t need to provision, patch, or manage servers, and you don’t need to install, maintain, or operate software. Tables can scale up and down automatically, and you only pay for the resources that you use. Amazon Keyspaces lets you you run your Cassandra workloads on AWS using the same Cassandra application code and developer tools that you use today.

SageMaker provides a suite of built-in algorithms to help data scientists and ML practitioners get started training and deploying ML models quickly. In this post, we’ll show you how a retail customer can use customer purchase history in the Keyspaces Database and target different customer segments for marketing campaigns.

K-means is an unsupervised learning algorithm. It attempts to find discrete groupings within data, where members of a group are as similar as possible to one another and as different as possible from members of other groups. You define the attributes that you want the algorithm to use to determine similarity. SageMaker uses a modified version of the web-scale k-means clustering algorithm. As compared with the original version of the algorithm, the version used by SageMaker is more accurate. However, like the original algorithm, it scales to massive datasets and delivers improvements in training time.

Solution overview

The instructions assume that you would be using SageMaker Studio to run the code. The associated code has been shared on AWS Sample GitHub. Following the instructions in the lab, you can do the following:

  • Install necessary dependencies.
  • Connect to Amazon Keyspaces, create a Table, and ingest sample data.
  • Build a classification ML model using the data in Amazon Keyspaces.
  • Explore model results.
  • Clean up newly created resources.

Once complete, you’ll have integrated SageMaker with Amazon Keyspaces to train ML models as shown in the following image.

Now you can follow the step-by-step instructions in this post to ingest raw data stored in Amazon Keyspaces using SageMaker and the data thus retrieved for ML processing.

Prerequisites

First, Navigate to SageMaker.

Next, if this is the first time that you’re using SageMaker, select Get Started.

Next, select Setup up SageMaker Domain.

Next, create a new user profile with Name – sagemakeruser, and select Create New Role in the Default Execution Role sub section.

Next, in the screen that pops up, select any Amazon Simple Storage Service (Amazon S3) bucket, and select Create role.

This role will be used in the following steps to allow SageMaker to access Keyspaces Table using temporary credentials from the role. This eliminates the need to store a username and password in the notebook.

Next, retrieve the role associated with the sagemakeruser that was created in the previous step from the summary section.

Then, navigate to the AWS Console and look up AWS Identity and Access Management (IAM). Within IAM, navigate to Roles. Within Roles, search for the execution role identified in the previous step.

Next, select the role identified in the previous step and select Add Permissions. In the drop down that appears, select Create Inline Policy. SageMaker lets you provide a granular level of access that restricts what actions a user/application can perform based on business requirements.

Then, select the JSON tab and copy the policy from the Note section of Github page. This policy allows the SageMaker notebook to connect to Keyspaces and retrieve data for further processing.

Then, select Add permissions again and from the drop down, and select Attach Policy.

Lookup AmazonKeyspacesFullAccess policy, and select the checkbox next to the matching result, and select Attach Policies.

Verify that the permissions policies section includes AmazonS3FullAccess, AmazonSageMakerFullAccess, AmazonKeyspacesFullAccess, as well as the newly added inline policy.

Next, navigate to SageMaker Studio using the AWS Console and select the SageMaker Studio. Once there, select Launch App and select Studio.

Notebook walkthrough

The preferred way to connect to Keyspaces from SageMaker Notebook is by using AWS Signature Version 4 process (SigV4) based Temporary Credentials for authentication. In this scenario, we do NOT need to generate or store Keyspaces credentials and can use the credentials to authenticate with the SigV4 plugin. Temporary security credentials consist of an access key ID and a secret access key. However, they also include a security token that indicates when the credentials expire. In this post, we’ll create an IAM role and generate temporary security credentials.

First, we install a driver (cassandra-sigv4). This driver enables you to add authentication information to your API requests using the AWS Signature Version 4 Process (SigV4). Using the plugin, you can provide users and applications with short-term credentials to access Amazon Keyspaces (for Apache Cassandra) using IAM users and roles. Following this, you’ll import a required certificate along with additional package dependencies. In the end, you will allow the notebook to assume the role to talk to Keyspaces.

# Install missing packages and import dependencies
# Installing Cassandra SigV4
%pip install  cassandra-sigv4

# Get Security certificate
!curl https://certs.secureserver.net/repository/sf-class2-root.crt -O

# Import
from sagemaker import get_execution_role
from cassandra.cluster import Cluster
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
from cassandra_sigv4.auth import SigV4AuthProvider
import boto3

import pandas as pd
from pandas import DataFrame

import csv
from cassandra import ConsistencyLevel
from datetime import datetime
import time
from datetime import timedelta

import pandas as pd
import datetime as dt
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler

# Getting credentials from the role
client = boto3.client("sts")

# Get notebook Role
role = get_execution_role()
role_info = {"RoleArn": role, "RoleSessionName": "session1"}
print(role_info)

credentials = client.assume_role(**role_info)

Next, connect to Amazon Keyspaces and read systems data from Keyspaces into Pandas DataFrame to validate the connection.

# Connect to Cassandra Database from SageMaker Notebook 
# using temporary credentials from the Role.
session = boto3.session.Session()

###
### You can also pass specific credentials to the session
###
#session = boto3.session.Session(
# aws_access_key_id=credentials["Credentials"]["AccessKeyId"],
# aws_secret_access_key=credentials["Credentials"]["SecretAccessKey"],
# aws_session_token=credentials["Credentials"]["SessionToken"],
#)

region_name = session.region_name

# Set Context
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations("sf-class2-root.crt")
ssl_context.verify_mode = CERT_REQUIRED

auth_provider = SigV4AuthProvider(session)
keyspaces_host = "cassandra." + region_name + ".amazonaws.com"

cluster = Cluster([keyspaces_host], ssl_context=ssl_context, auth_provider=auth_provider, port=9142)
session = cluster.connect()

# Read data from Keyspaces system table. 
# Keyspaces is serverless DB so you don't have to create Keyspaces DB ahead of time.
r = session.execute("select * from system_schema.keyspaces")

# Read Keyspaces row into Panda DataFrame
df = DataFrame(r)
print(df)

Next, prepare the data for training on the raw data set. In the python notebook associated with this post, use a retail data set downloaded from here, and process it. Our business objective given the data set is to cluster the customers using a specific metric call RFM. The RFM model is based on three quantitative factors:

  • Recency: How recently a customer has made a purchase.
  • Frequency: How often a customer makes a purchase.
  • Monetary Value: How much money a customer spends on purchases.

RFM analysis numerically ranks a customer in each of these three categories, generally on a scale of 1 to 5 (the higher the number, the better the result). The “best” customer would receive a top score in every category. We’ll use pandas’s Quantile-based discretization function (qcut). It will help discretize values into equal-sized buckets based or based on sample quantiles.

# Prepare Data
r = session.execute("select * from " + keyspaces_schema + ".online_retail")

df = DataFrame(r)
df.head(100)

df.count()
df["description"].nunique()
df["totalprice"] = df["quantity"] * df["price"]
df.groupby("invoice").agg({"totalprice": "sum"}).head()

df.groupby("description").agg({"price": "max"}).sort_values("price", ascending=False).head()
df.sort_values("price", ascending=False).head()
df["country"].value_counts().head()
df.groupby("country").agg({"totalprice": "sum"}).sort_values("totalprice", ascending=False).head()

returned = df[df["invoice"].str.contains("C", na=False)]
returned.sort_values("quantity", ascending=True).head()

df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.describe([0.05, 0.01, 0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.99]).T
df.drop(df.loc[df["customer_id"] == ""].index, inplace=True)

# Recency Metric
import datetime as dt

today_date = dt.date(2011, 12, 9)
df["customer_id"] = df["customer_id"].astype(int)

# create get the most recent invoice for each customer
temp_df = df.groupby("customer_id").agg({"invoice_date": "max"})
temp_df["invoice_date"] = temp_df["invoice_date"].astype(str)
temp_df["invoice_date"] = pd.to_datetime(temp_df["invoice_date"]).dt.date
temp_df["Recency"] = (today_date - temp_df["invoice_date"]).dt.days
recency_df = temp_df.drop(columns=["invoice_date"])
recency_df.head()

# Frequency Metric
temp_df = df.groupby(["customer_id", "invoice"]).agg({"invoice": "count"})
freq_df = temp_df.groupby("customer_id").agg({"invoice": "count"})
freq_df.rename(columns={"invoice": "Frequency"}, inplace=True)

# Monetary Metric
monetary_df = df.groupby("customer_id").agg({"totalprice": "sum"})
monetary_df.rename(columns={"totalprice": "Monetary"}, inplace=True)
rfm = pd.concat([recency_df, freq_df, monetary_df], axis=1)

df = rfm
df["RecencyScore"] = pd.qcut(df["Recency"], 5, labels=[5, 4, 3, 2, 1])
df["FrequencyScore"] = pd.qcut(df["Frequency"].rank(method="first"), 5, labels=[1, 2, 3, 4, 5])
df["Monetary"] = df["Monetary"].astype(int)
df["MonetaryScore"] = pd.qcut(df["Monetary"], 5, labels=[1, 2, 3, 4, 5])
df["RFM_SCORE"] = (
    df["RecencyScore"].astype(str)
    + df["FrequencyScore"].astype(str)
    + df["MonetaryScore"].astype(str)
)
seg_map = {
    r"[1-2][1-2]": "Hibernating",
    r"[1-2][3-4]": "At Risk",
    r"[1-2]5": "Can't Loose",
    r"3[1-2]": "About to Sleep",
    r"33": "Need Attention",
    r"[3-4][4-5]": "Loyal Customers",
    r"41": "Promising",
    r"51": "New Customers",
    r"[4-5][2-3]": "Potential Loyalists",
    r"5[4-5]": "Champions",
}

df["Segment"] = df["RecencyScore"].astype(str) + rfm["FrequencyScore"].astype(str)
df["Segment"] = df["Segment"].replace(seg_map, regex=True)
df.head()
rfm = df.loc[:, "Recency":"Monetary"]
df.groupby("customer_id").agg({"Segment": "sum"}).head()

In this example, we use CQL to read records from the Keyspace table. In some ML use-cases, you may need to read the same data from the same Keyspaces table multiple times. In this case, we would recommend that you save your data into an Amazon S3 bucket to avoid incurring additional costs reading from Amazon Keyspaces. Depending on your scenario, you may also use Amazon EMR to ingest a very large Amazon S3 file into SageMaker.

## Optional Code to save Python DataFrame to S3
from io import StringIO # python3 (or BytesIO for python2)

smclient = boto3.Session().client('sagemaker')
sess = sagemaker.Session()
bucket = sess.default_bucket() # Set a default S3 bucket
print(bucket)

csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, ‘out/saved_online_retail.csv').put(Body=csv_buffer.getvalue())

Next, we train an ML model using the KMeans algorithm and make sure that the clusters are created. In this particular scenario, you would see that the created clusters are printed, showing that the customers in the raw data set have been grouped together based on various attributes in the data set. This cluster information can be used for targeted marketing campaigns.

# Training

sc = MinMaxScaler((0, 1))
df = sc.fit_transform(rfm)

# Clustering
kmeans = KMeans(n_clusters=6).fit(df)

# Result
segment = kmeans.labels_

# Visualize the clusters
import matplotlib.pyplot as plt

final_df = pd.DataFrame({"customer_id": rfm.index, "Segment": segment})
bucket_data = final_df.groupby("Segment").agg({"customer_id": "count"}).head()
index_data = final_df.groupby("Segment").agg({"Segment": "max"}).head()
index_data["Segment"] = index_data["Segment"].astype(int)
dataFrame = pd.DataFrame(data=bucket_data["customer_id"], index=index_data["Segment"])
dataFrame.rename(columns={"customer_id": "Total Customers"}).plot.bar(
    rot=70, title="RFM clustering"
)
# dataFrame.plot.bar(rot=70, title="RFM clustering");
plt.show(block=True);

(Optional) Next, we save the customer segments that have been identified by the ML model back to an Amazon Keyspaces table for targeted marketing. A batch job could read this data and run targeted campaigns to customers in specific segments.

# Create ml_clustering_results table to store results 
createTable = """CREATE TABLE IF NOT EXISTS %s.ml_clustering_results ( 
 run_id text,
 segment int,
 total_customers int,
 run_date date,
    PRIMARY KEY (run_id, segment));
"""
cr = session.execute(createTable % keyspaces_schema)
time.sleep(20)
print("Table 'ml_clustering_results' created")
    
insert_ml = (
    "INSERT INTO "
    + keyspaces_schema
    + '.ml_clustering_results'  
    + '("run_id","segment","total_customers","run_date") ' 
    + 'VALUES (?,?,?,?); '
)

prepared = session.prepare(insert_ml)
prepared.consistency_level = ConsistencyLevel.LOCAL_QUORUM

run_id = "101"
dt = datetime.now()

for ind in dataFrame.index:
    print(ind, dataFrame['customer_id'][ind])
    r = session.execute(
                    prepared,
                    (
                        run_id, ind, dataFrame['customer_id'][ind], dt,
                    ),
                )

Finally, we clean up the resources created during this tutorial to avoid incurring additional charges.

# Delete blog keyspace and tables
deleteKeyspace = "DROP KEYSPACE IF EXISTS blog"
dr = session.execute(deleteKeyspace)

time.sleep(5)
print("Dropping %s keyspace. It may take a few seconds to a minute to complete deletion keyspace and table." % keyspaces_schema )

It may take a few seconds to a minute to complete the deletion of keyspace and tables. When you delete a keyspace, the keyspace and all of its tables are deleted and you stop accruing charges from them.

Conclusion

This post showed you how to ingest customer data from Amazon Keyspaces into SageMaker and train a clustering model that allowed you to segment customers. You could use this information for targeted marketing, thus greatly improving your business KPI. To learn more about Amazon Keyspaces, review the following resources:


About the Authors

Vadim Lyakhovich is a Senior Solutions Architect at AWS in the San Francisco Bay Area helping customers migrate to AWS. He is working with organizations ranging from large enterprises to small startups to support their innovations. He is also helping customers to architect scalable, secure, and cost-effective solutions on AWS.

Parth Patel is a Solutions Architect at AWS in the San Francisco Bay Area. Parth guides customers to accelerate their journey to cloud and help them adopt AWS cloud successfully. He focuses on ML and Application Modernization.

Ram Pathangi is a Solutions Architect at AWS in the San Francisco Bay Area. He has helped customers in Agriculture, Insurance, Banking, Retail, Health Care & Life Sciences, Hospitality, and Hi-Tech verticals to run their business successfully on AWS cloud. He specializes in Databases, Analytics and ML.

Read More