Accelerate and improve recommender system training and predictions using Amazon SageMaker Feature Store

Many companies must tackle the difficult use case of building a highly optimized recommender system. The challenge comes from processing large volumes of data to train and tune the model daily with new data and then make predictions based on user behavior during an active engagement. In this post, we show you how to use Amazon SageMaker Feature Store, a purpose-built repository where you can store, access, and share model features across teams in your company. With both online and offline Feature Store, you can address the complex task of creating a product recommendation engine based on consumer behavior. This post comes with an accompanying workshop and GitHub repo.

The post and workshop are catered towards data scientists and expert machine learning (ML) practitioners who need to build custom models. For the non-data scientist or ML expert audience, check out our AI service Amazon Personalize, which allows developers to build a wide array of personalized experiences without needing ML expertise. This is the same technology powering amazon.com.

Solution overview

In machine learning, expert practitioners know how crucial it is to feed high-quality data when training a model and designing features that influence the model’s overall prediction accuracy. This process is often quite cumbersome and takes multiple iterations to achieve a desired state. This step in the ML workflow is called feature engineering, and usually 60–70% of the process is spent on just this step. In large organizations, the problem is exacerbated and adds to a greater loss of productivity, because different teams often run identical training jobs, or even write duplicate feature engineering code because they have no knowledge of prior work, which leads to inconsistent results. In addition, there is no versioning of features, and having access to the latest feature isn’t possible because there is no notion of a central repository.

To address these challenges, Feature Store provides a fully managed central repository for ML features, making it easy to securely store and retrieve features without the heavy lifting of managing the infrastructure. It lets you define groups of features, use batch ingestion and streaming ingestion, and retrieve the latest feature values with low latency. For more information, see Getting Started with Amazon Sagemaker Feature Store.

The following Feature Store components are relevant to our use case:

  • Feature group – This is a group of features that is defined via a schema in Feature Store to describe a record. You can configure the feature group to an online or offline store, or both.
  • Online store – The online store is primarily designed for supporting real-time predictions that need low millisecond latency reads and high throughput writes.
  • Offline store – The offline store is primarily intended for batch predictions and model training. It’s an append-only store and can be used to store and access historical feature data. The offline store can help you store and serve features for exploration and model training.

Real-time recommendations are time sensitive, mission critical, and depend on the context. Demand for real-time recommendations fades quickly as customers lose interest or demand is met elsewhere. In this post, we build a real-time recommendation engine for an ecommerce website using a synthetic online grocer dataset.

We use Feature Store (both online and offline) to store customers, products, and orders data using feature groups, which we use for model training, validation, and real-time inference. The recommendation engine retrieves features from the online feature store, which is purpose-built for ultra-low latency and high throughput predictions. It suggests the top products that a customer is likely to purchase while browsing through the ecommerce website based on the customer’s purchase history, real-time clickstream data, and other customer profile information. This solution is not intended to be a state-of-the-art recommender, but to provide a rich enough example for exploring the use of Feature Store.

We walk you through the following high-level steps:

  1. Set up the data and ingest it into Feature Store.
  2. Train your models.
  3. Simulate user activity and capture clickstream events.
  4. Make real-time recommendations.

Prerequisites

To follow along with this post, you need the following prerequisites:

Set up data and ingest it into Feature Store

We work with five different datasets based on the synthetic online grocer dataset. Each dataset has its own feature group in Feature Store. The first step is to ingest this data into Feature Store so that we can initiate training jobs for our two models. Refer to the 1_feature_store.ipynb notebook on GitHub.

The following tables show examples of the data that we’re storing in Feature Store.

Customers

A customer_id name state age is_married customer_health_index
0 C1 justin gutierrez alaska 52 1 0.59024
1 C2 karen cross idaho 29 1 0.6222
2 C3 amy king oklahoma 70 1 0.22548
3 C4 nicole hartman missouri 52 1 0.97582
4 C5 jessica powers minnesota 31 1 0.88613

Products

A product_name product_category product_id product_health_index
0 chocolate sandwich cookies cookies_cakes P1 0.1
1 nutter butter cookie bites go-pak cookies_cakes P25 0.1
2 danish butter cookies cookies_cakes P34 0.1
3 gluten free all natural chocolate chip cookies cookies_cakes P55 0.1
4 mini nilla wafers munch pack cookies_cakes P99 0.1

Orders

A customer_id product_id purchase_amount
0 C1 P10852 87.71
1 C1 P10940 101.71
2 C1 P13818 42.11
3 C1 P2310 55.37
4 C1 P393 55.16

Clickstream historical

A customer_id product_id bought healthy_activity_last_2m rating
0 C1 P10852 1 1 3.04843
1 C3806 P10852 1 1 1.67494
2 C5257 P10852 1 0 2.69124
3 C8220 P10852 1 1 1.77345
4 C1 P10852 0 9 3.04843

Clickstream real-time

A customer_id sum_activity_weight_last_2m avg_product_health_index_last_2m
0 C09234 8 0.2
1 D19283 3 0.1
2 C1234 9 0.8

We then create the relevant feature groups in Feature Store:

customers_feature_group = create_feature_group(df_customers, customers_feature_group_name,'customer_id', prefix, sagemaker_session)

products_feature_group = create_feature_group(df_products, products_feature_group_name, 'product_id',prefix, sagemaker_session)

orders_feature_group = create_feature_group(df_orders, orders_feature_group_name, 'order_id', prefix,sagemaker_session)

click_stream_historical_feature_group = create_feature_group(df_click_stream_historical,click_stream_historical_feature_group_name,'click_stream_id', prefix, sagemaker_session)

click_stream_feature_group = create_feature_group(df_click_stream, click_stream_feature_group_name, 'customer_id',prefix, sagemaker_session)

After the feature groups are created and available, we ingest the data into each group:

ingest_data_into_feature_group(df_customers, customers_feature_group)
customers_count = df_customers.shape[0]

ingest_data_into_feature_group(df_products, products_feature_group)
products_count = df_products.shape[0]

ingest_data_into_feature_group(df_orders, orders_feature_group)
orders_count = df_orders.shape[0]

ingest_data_into_feature_group(df_click_stream_historical, click_stream_historical_feature_group)
click_stream_historical_count = df_click_stream_historical.shape[0]

# Add Feature Group counts for later use
ps.add({'customers_count': customers_count,
        'products_count': products_count,
        'orders_count': orders_count,
        'click_stream_historical_count': click_stream_historical_count,
        'click_stream_count': 0})

We don’t ingest data into the click_stream_feature_group because we expect the data to come from real-time clickstream events.

Train your models

We train two models for this use case: a collaborative filtering model and a ranking model. The following diagram illustrates the training workflow.

The collaborative filtering model recommends products based on historical user-product interactions.

The ranking model reranks the recommended products from the collaborative filtering model by taking the user’s clickstream activity and using that to make personalized recommendations. The 2_recommendation_engine_models.ipynb notebook to train the models is available on GitHub.

Collaborative filtering model

We use a collaborative filtering model based on matrix factorization using the Factorization Machines algorithm to retrieve product recommendations for a customer. This is based on a customer profile and their past purchase history in addition to features such as product category, name, and description. The customer’s historical purchase data and product data from the ecommerce store’s product catalog are stored in three separate offline Feature Store feature groups: customers, products, and click-stream-historical, which we created in the last section. After we retrieve our training data, we need to transform a few variables so that we have a proper input for our model. We use two types of transformations: one-hot encoding and TF-IDF.

  1. Let’s query the Feature Store feature groups we created to get this historical data to help with training:
    query = f'''
    select click_stream_customers.customer_id,
           products.product_id,
           rating,
           state,
           age,
           is_married,
           product_name
    from (
        select c.customer_id,
               cs.product_id,
               cs.bought,
               cs.rating,
               c.state,
               c.age,
               c.is_married
        from "{click_stream_historical_table}" as cs
        left join "{customers_table}" as c
        on cs.customer_id = c.customer_id
    ) click_stream_customers
    left join
    (select * from "{products_table}") products
    on click_stream_customers.product_id = products.product_id
    where click_stream_customers.bought = 1
    '''
    
    df_cf_features, query = query_offline_store(click_stream_feature_group_name, query,
                                                sagemaker_session)
    df_cf_features.head()

A customer_id product_id rating state age is_married product_name
0 C6019 P15581 1.97827 kentucky 51 0 organic strawberry lemonade fruit juice drink
1 C1349 P1629 1.76518 nevada 74 0 sea salt garden veggie chips
2 C3750 P983 2.6721 arkansas 41 1 hair balance shampoo
3 C4537 P399 2.14151 massachusetts 33 1 plain yogurt
4 C5265 P13699 2.40822 arkansas 44 0 cacao nib crunch stone ground organic
  1. Next, prepare the data so that we can feed it to the model for training:
    X, y = load_dataset(df_cf_features)

  2. Then we split the data into train and test sets:
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

  3. Finally, we start training using Amazon SageMaker:
    container = sagemaker.image_uris.retrieve("factorization-machines", region=region)
    
    fm = sagemaker.estimator.Estimator(
        container,
        role,
        instance_count=1,
        instance_type="ml.c5.xlarge",
        output_path=output_prefix,
        sagemaker_session=sagemaker_session,
    )
    
    # Set our hyperparameters
    input_dims = X_train.shape[1]
    fm.set_hyperparameters(
        feature_dim=input_dims,
        predictor_type="regressor",
        mini_batch_size=1000,
        num_factors=64,
        epochs=20,
    )

  4. Start training the model using the following:
    fm.fit({'train': train_data_location, 'test': test_data_location})

  5. When our model has completed training, we deploy a real-time endpoint for use later:
    cf_model_predictor = fm.deploy(
        endpoint_name = cf_model_endpoint_name,
        initial_instance_count=1,
        instance_type="ml.m4.xlarge",
        serializer=FMSerializer(),
        deserializer=JSONDeserializer(),
        wait=False
    )

Ranking model

We also train an XGBoost model based on clickstream historical aggregates data to predict a customer’s propensity to buy a given product. We use aggregated features on real-time clickstream data (stored and retrieved in real time from Feature Store) along with product category features. We use Amazon Kinesis Data Streams to stream real-time clickstream data and Amazon Kinesis Data Analytics to aggregate the streaming data using a stagger window query over a period of the last 2 minutes. This aggregated data is stored in an online Feature Store feature group in real time to be subsequently used for inference by the ranking model. For this use case, we predict bought, which is a Boolean variable that indicates whether a user bought an item or not.

  1. Let’s query the feature groups we created to get data to train the ranking model:
    query = f'''
    select bought,
           healthy_activity_last_2m,
           product_health_index,
           customer_health_index,
           product_category
    from (
        select c.customer_health_index,
               cs.product_id,
               cs.healthy_activity_last_2m,
               cs.bought
        from "{click_stream_historical_table}" as cs
        left join "{customers_table}" as c
        on cs.customer_id = c.customer_id
    ) click_stream_customers
    left join
    (select * from "{products_table}") products
    on click_stream_customers.product_id = products.product_id
    '''
    
    df_rank_features, query = query_offline_store(click_stream_feature_group_name, query,
                                                  sagemaker_session)
    df_rank_features.head()

A bought healthy_activity_last_2m product_health_index customer_health_index product_category
0 0 2 0.9 0.34333 tea
1 0 0 0.9 0.74873 vitamins_supplements
2 0 0 0.8 0.37688 yogurt
3 0 0 0.7 0.42828 refrigerated
4 1 3 0.2 0.24883 chips_pretzels
  1. Prepare the data for the XGBoost ranking model:
    df_rank_features = pd.concat([df_rank_features, 
        pd.get_dummies(df_rank_features['product_category'], 
        prefix='prod_cat')], axis=1)del df_rank_features['product_category']

  2. Split the data into train and test sets:
    train_data, validation_data, _ = np.split(
        df_rank_features.sample(frac=1, random_state=1729), 
            [int(0.7 * len(df_rank_features)), 
                int(0.9 * len(df_rank_features))])
                
    train_data.to_csv('train.csv', header=False, index=False)
    
    validation_data.to_csv('validation.csv', header=False, index=False)

  3. Begin model training:
    container = sagemaker.image_uris.retrieve('xgboost', region, version='1.2-2')
    
    xgb = sagemaker.estimator.Estimator(container,
                                        role, 
                                        instance_count=1, 
                                        instance_type='ml.m4.xlarge',
                                        output_path='s3://{}/{}/output'.format(default_bucket, prefix),
                                        sagemaker_session=sagemaker_session)
    
    xgb.set_hyperparameters(
        max_depth= 5,
        eta= 0.2,
        gamma= 4,
        min_child_weight= 6,
        subsample= 0.7,
        objective= 'binary:logistic',
        num_round= 50,
        verbosity= 2
    )
    
    xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

  4. When our model has completed training, we deploy a real-time endpoint for use later:
    xgb_predictor = xgb.deploy(
        endpoint_name = ranking_model_endpoint_name,
        initial_instance_count = 1,
        instance_type = 'ml.m4.xlarge',
        serializer = CSVSerializer(),
        wait=False
    )

Simulate user activity and capture clickstream events

As the user interacts with the ecommerce website, we need a way to capture their activity in the form of clickstream events. In the 3_click_stream_kinesis.ipynb notebook, we simulate user activity and capture these clickstream events with Kinesis Data Streams, aggregate them with Kinesis Data Analytics, and ingest these events into Feature Store. The following diagram illustrates this workflow.

A producer emits clickstream events (simulating user activity) to the Kinesis data stream; we use Kinesis Data Analytics to aggregate the clickstream data for the last 2 minutes of activity.

Finally, an AWS Lambda function takes the data from Kinesis Data Analytics and ingests it into Feature Store (specifically the click_stream feature group).

We simulate customer clickstream activity on a web application like saving products to cart, liking products, and so on. For this, we use Kinesis Data Streams, a scalable real-time streaming service.

  1. Simulate the clickstream activity with the following code:
    kinesis_client = boto3.client('kinesis')
    kinesis_client.create_stream(StreamName=kinesis_stream_name, ShardCount=1)
    
    active_stream = False
    while not active_stream:
        status = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamStatus']
        if (status == 'CREATING'):
            print('Waiting for the Kinesis stream to become active...')
            time.sleep(20)  
        elif (status == 'ACTIVE'): 
            active_stream = True
            print('ACTIVE')
            
    stream_arn = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamARN']
    print(f'Amazon kinesis stream arn: {stream_arn}')

    The ranking model recommends ranked products to a customer based on a customer’s last 2 minutes of activity on the ecommerce website. To aggregate the streaming infomation over a window of last 2 minutes, we use Kinesis Data Analytics and create a Kinesis Data Analytics application. Kinesis Data Analytics can process data with sub-second latency from Kinesis Data Streams using SQL transformations.

  2. Create the application with the following code:
    kda_client = boto3.client('kinesisanalytics')
    
    sql_code = '''
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (  
        customer_id VARCHAR(8),   
        sum_activity_weight_last_2m INTEGER,   
        avg_product_health_index_last_2m DOUBLE);
        
    CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT   
        STREAM CUSTOMER_ID,   
        SUM(ACTIVITY_WEIGHT) AS sum_activity_weight_last_2m,   
        AVG(PRODUCT_HEALTH_INDEX) AS avg_product_health_index_last_2m
    FROM   
        "SOURCE_SQL_STREAM_001" 
    WINDOWED BY STAGGER (    PARTITION BY CUSTOMER_ID RANGE INTERVAL '2' MINUTE);
    '''

  3. Use the following input schema to define how data from the Kinesis data stream is made available to SQL queries in the Kinesis Data Analytics application:
    kda_input_schema = [{
                    'NamePrefix': 'SOURCE_SQL_STREAM',
                    'KinesisStreamsInput': {
                           'ResourceARN': stream_arn,
                           'RoleARN': role
                    },
                    'InputSchema': {
                          'RecordFormat': {
                              'RecordFormatType': 'JSON',
                              'MappingParameters': {
                                  'JSONMappingParameters': {
                                      'RecordRowPath': '$'
                                  }
                              },
                          },
                          'RecordEncoding': 'UTF-8',
                          'RecordColumns': [
                              {'Name': 'EVENT_TIME',  'Mapping': '$.event_time',   'SqlType': 'TIMESTAMP'},
                              {'Name': 'CUSTOMER_ID','Mapping': '$.customer_id', 'SqlType': 'VARCHAR(8)'},
                              {'Name': 'PRODUCT_ID', 'Mapping': '$.product_id', 'SqlType': 'VARCHAR(8)'},
                              {'Name': 'PRODUCT_CATEGORY', 'Mapping': '$.product_category', 'SqlType': 'VARCHAR(20)'},
                              {'Name': 'HEALTH_CATEGORY', 'Mapping': '$.health_category', 'SqlType': 'VARCHAR(10)'},
                              {'Name': 'ACTIVITY_TYPE', 'Mapping': '$.activity_type', 'SqlType': 'VARCHAR(10)'},
                              {'Name': 'ACTIVITY_WEIGHT', 'Mapping': '$.activity_weight', 'SqlType': 'INTEGER'},
                              {'Name': 'PRODUCT_HEALTH_INDEX', 'Mapping': '$.product_health_index', 'SqlType': 'DOUBLE'}
                          ]
                    }
                  }
                 ]

    Now we need to create a Lambda function to take the output from our Kinesis Data Analytics application and ingest that data into Feature Store. Specifically, we ingest that data into our click stream feature group.

  4. Create the Lambda function using the lambda-stream.py code on GitHub.
  5. We then define an output schema, which contains the Lambda ARN and destination schema:
    kda_output_schema = [{'LambdaOutput': {'ResourceARN': lambda_function_arn, 
        'RoleARN': role},'Name': 'DESTINATION_SQL_STREAM','DestinationSchema': 
            {'RecordFormatType': 'JSON'}}]
    print(f'KDA output schema: {kda_output_schema}')

    Next, we invoke the API to create the Kinesis Data Analytics application. This application aggregates the incoming streaming data from Kinesis Data Streams using the SQL provided earlier using the input, output schemas, and Lambda function.

  6. Invoke the API with the following code:
    creating_app = False
    while not creating_app:
        response = kda_client.create_application(ApplicationName=kinesis_analytics_application_name, 
                                  Inputs=kda_input_schema,
                                  Outputs=kda_output_schema,
                                  ApplicationCode=sql_code)
        status = response['ApplicationSummary']['ApplicationStatus']
        if (status != 'READY'):
            print('Waiting for the Kinesis Analytics Application to be in READY state...')
            time.sleep(20)  
        elif (status == 'READY'): 
            creating_app = True
            print('READY')

  7. When the app status is Ready, we start the Kinesis Data Analytics application:
    kda_client.start_application(ApplicationName=kinesis_analytics_application_name,
        InputConfigurations=[{'Id': '1.1',
            'InputStartingPositionConfiguration':{'InputStartingPosition':'NOW'}}])

  8. For this workshop, we created two helper functions that simulate clickstream events generated on a website and send it to the Kinesis data stream:
    def generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high):
        # Let's get some random product categories to help us generate click stream data
        query = f'''
        select product_category,
               product_health_index,
               product_id
        from "{products_table}"
        where product_health_index between {product_health_index_low} and {product_health_index_high}
        order by random()
        limit 1
        '''
    
        event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
        random_products_df, query = query_offline_store(products_feature_group_name, query,
                                                        sagemaker_session)
        # Pick randon activity type and activity weights
        activities = ['liked', 'added_to_cart', 'added_to_wish_list', 'saved_for_later']
        activity_weights_dict = {'liked': 1, 'added_to_cart': 2,
                                'added_to_wish_list': 1, 'saved_for_later': 2}
        random_activity_type = random.choice(activities)
        random_activity_weight = activity_weights_dict[random_activity_type]
        
        data = {
            'event_time': event_time.isoformat(),
            'customer_id': customer_id,
            'product_id': random_products_df.product_id.values[0],
            'product_category': random_products_df.product_category.values[0],
            'activity_type': random_activity_type,
            'activity_weight': random_activity_weight,
            'product_health_index': random_products_df.product_health_index.values[0]
        }
        return data
        
    def put_records_in_kinesis_stream(customer_id, product_health_index_low,product_health_index_high):
        for i in range(n_range):
            data = generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high)
            print(data)
            
            kinesis_client = boto3.client('kinesis')
            response = kinesis_client.put_record(
                StreamName=kinesis_stream_name,
                Data=json.dumps(data),
                PartitionKey="partitionkey")

Now let’s ingest our clickstream data into Feature Store via Kinesis Data Streams and Kinesis Data Analytics. For inference_customer_id, we simulate a customer browsing pattern for unhealthy products like cookies, ice cream, and candy using a lower health index range of 0.1–0.3.

We produce six records, which are ingested into the data stream and aggregated by Kinesis Data Analytics into a single record, which is then ingested into the click stream feature group in Feature Store. This process should take 2 minutes.

  1. Ingest the clickstream data with the following code:
    put_records_in_kinesis_stream(inference_customer_id, 0.1, 0.3)
    # It takes 2 minutes for KDA to call lambda to update feature store 
    # because we are capturing 2 minute interval of customer activity 
    time.sleep(120)

  2. Make sure that the data is now in the click_stream feature group:
    record = featurestore_runtime.get_record(
                FeatureGroupName=click_stream_feature_group_name,
                RecordIdentifierValueAsString=inference_customer_id)
        
    print(f'Online feature store data for customer id {inference_customer_id}')
    print(f'Record: {record}')

Make real-time recommendations

The following diagram depicts how the real-time recommendations are provided.

After the model is trained and tuned, the model is deployed behind a live endpoint that the application can query over an API for real-time recommendations on items for a particular user. The collaborative filter model generates offline recommendations for particular users based on past orders and impressions. The clickstream gathers any events on recent browsing and provides this input to the ranking model, which produces the top-N recommendations to provide to the application to display to the user.

Refer to the 4_realtime_recommendations.ipynb notebook on GitHub.

  1. The first step is to create a Predictor object from our collaborative filtering model endpoint (which we created earlier) so that we can use it to make predictions:
    # Make sure model has finished deploying
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
        NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]
    while not existing_endpoints:
        time.sleep(60)
        existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]
    
    cf_model_predictor = sagemaker.predictor.Predictor(
                           endpoint_name=cf_model_endpoint_name, 
                           sagemaker_session=sagemaker_session,
                           serializer=FMSerializer(),
                           deserializer=JSONDeserializer())

  2. Then we pass the cached data to this predictor to get our initial set of recommendations for a particular customer:
    # Pass in our cached data as input to the Collaborative Filtering 
    modelpredictions = cf_model_predictor.predict(cf_inference_payload)['predictions']
    
    # Add those predictions to the input DataFrame
    predictions = [prediction["score"] for prediction in predictions]
    cf_inference_df['predictions'] = predictions
    
    # Sort by predictions and take top 10
    cf_inference_df = cf_inference_df.sort_values(
        by='predictions', ascending=False).head(10).reset_index()

  3. Let’s see the initial recommendations for this customer:
    cf_inference_df

A index customer_id product_id state age is_married product_name predictions
0 1 C3571 P10682 maine 35 0 mini cakes birthday cake 1.65686
1 6 C3571 P6176 maine 35 0 pretzel ”shells” 1.64399
2 13 C3571 P7822 maine 35 0 degreaser 1.62522
3 14 C3571 P1832 maine 35 0 up beat craft brewed kombucha 1.60065
4 5 C3571 P6247 maine 35 0 fruit punch roarin’ waters 1.5686
5 8 C3571 P11086 maine 35 0 almonds mini nut-thins cheddar cheese 1.54271
6 12 C3571 P15430 maine 35 0 organic pork chop seasoning 1.53585
7 4 C3571 P4152 maine 35 0 white cheddar bunnies 1.52764
8 2 C3571 P16823 maine 35 0 pirouette chocolate fudge creme filled wafers 1.51293
9 9 C3571 P9981 maine 35 0 decaf tea, vanilla chai 1.483

We now create a Predictor object from our ranking model endpoint (which we created earlier) so that we can use it to get predictions for the customer using their recent activity. Remember that we simulated recent behavior using the helper scripts and streamed it using Kinesis Data Streams to the click stream feature group.

  1. Create the Predictor object with the following code:
    # Make sure model has finished deploying
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]
            
    while not existing_endpoints:
        time.sleep(60)
        existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]
    
    ranking_model_predictor = sagemaker.predictor.
                            Predictor(endpoint_name=ranking_model_endpoint_name, 
                            sagemaker_session=sagemaker_session,
                            serializer = CSVSerializer())

  2. To construct the input for the ranking model, we need to one-hot encode product categories as we did in training:
    query = f'''select product_categoryfrom "{products_table}"order by product_category'''
    
    product_categories_df, query = query_offline_store(
                                    products_feature_group_name, query,sagemaker_session)
    
    one_hot_cat_features = product_categories_df.product_category.unique()
    
    df_one_hot_cat_features = pd.DataFrame(one_hot_cat_features)
    
    df_one_hot_cat_features.columns = ['product_category']
    df_one_hot_cat_features = pd.concat([df_one_hot_cat_features, 
       pd.get_dummies(df_one_hot_cat_features['product_category'], prefix='cat')],axis=1)

    Now we create a function to take the output from the collaborative filtering model and join it with the one-hot encoded product categories and the real-time clickstream data from our click stream feature group, because this data will influence the ranking of recommended products. The following diagram illustrates this process.

  3. Create the function with the following code:
    def get_ranking_model_input_data(df, df_one_hot_cat_features):
        product_category_list = []
        product_health_index_list = []
        
        customer_id = df.iloc[0]['customer_id']
        # Get customer features from customers_feature_group_name
        customer_record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name,
                                                          RecordIdentifierValueAsString=customer_id,
                                                          FeatureNames=['customer_health_index'])
        
        customer_health_index = customer_record['Record'][0]['ValueAsString']
        
        # Get product features (instead of looping, you can optionally use
        # the `batch_get_record` Feature Store API)
        for index, row_tuple in df.iterrows():
            
            product_id = row_tuple['product_id']
            
            # Get product features from products_feature_group_name
            product_record = featurestore_runtime.get_record(FeatureGroupName=products_feature_group_name,
                                                             RecordIdentifierValueAsString=product_id,
                                                             FeatureNames=['product_category',
                                                                           'product_health_index'])
            
            product_category = product_record['Record'][0]['ValueAsString']
            product_health_index = product_record['Record'][1]['ValueAsString']
            
            product_category_list.append(product_category)
            product_health_index_list.append(product_health_index)
    
            
    
        # Get click stream features from customers_click_stream_feature_group_name
        click_stream_record = featurestore_runtime.get_record(FeatureGroupName=click_stream_feature_group_name,
                                                              RecordIdentifierValueAsString=customer_id,
                                                              FeatureNames=['sum_activity_weight_last_2m',
                                                                      'avg_product_health_index_last_2m'])
        
        # Calculate healthy_activity_last_2m as this will influence ranking as well
        sum_activity_weight_last_2m = click_stream_record['Record'][0]['ValueAsString']
        avg_product_health_index_last_2m = click_stream_record['Record'][1]['ValueAsString']
        healthy_activity_last_2m = int(sum_activity_weight_last_2m) * float(avg_product_health_index_last_2m)
    
        data = {'healthy_activity_last_2m': healthy_activity_last_2m,
                'product_health_index': product_health_index_list,
                'customer_health_index': customer_health_index,
                'product_category': product_category_list}
        
        ranking_inference_df = pd.DataFrame(data)
        ranking_inference_df = ranking_inference_df.merge(df_one_hot_cat_features, on='product_category',
                                                          how='left')
        del ranking_inference_df['product_category']
    
        return ranking_inference_d

  4. Let’s put everything together by calling the function we created to get real-time personalized product recommendations using data that’s being streamed to Feature Store to influence ranking on the initial list of recommended products from the collaborative filtering predictor:
    # Construct input data for the ranking model
    ranking_inference_df = get_ranking_model_input_data(
                                cf_inference_df, df_one_hot_cat_features)
    
    # Get our ranked product recommendations and attach the predictions to the model input
    ranking_inference_df['propensity_to_buy'] = ranking_model_predictor.predict(
                            ranking_inference_df.to_numpy()).decode('utf-8').split(',')

  5. Now that we have our personalized ranked recommendations, let’s see what the top five recommended products are:
    # Join all the data back together for inspection
    personalized_recommendations = pd.concat([cf_inference_df[['customer_id', 
    'product_id', 'product_name']],ranking_inference_df[['propensity_to_buy']]], axis=1)
        
    # And sort by propensity to buy
    personalized_recommendations.sort_values(by='propensity_to_buy', 
        ascending=False)[['product_id','product_name']].reset_index(drop=True).head(5)

Clean up

When you’re done using this solution, run the 5_cleanup.ipynb notebook to clean up the resources that you created as part of this post.

Conclusion

In this post, we used SageMaker Feature Store to accelerate training for a recommendation model and improve the accuracy of predictions based on recent behavioral events. We discussed the concepts of feature groups and offline and online stores and how they work together solve the common challenges businesses face with ML and solving complex use cases such as recommendation systems. This post is a companion to the workshop that was conducted live at AWS re:Invent 2021. We encourage readers to use this post and try out the workshop to grasp the design and internal workings of Feature Store.


About the Author

Arnab Sinha is a Senior Solutions Architect for AWS, acting as Field CTO to help customers design and build scalable solutions supporting business outcomes across data center migrations, digital transformation and application modernization, big data analytics and AIML. He has supported customers across a variety of industries, including retail, manufacturing, health care & life sciences, and agriculture. Arnab holds nine AWS Certifications, including the ML Specialty Certification. Prior to joining AWS, Arnab was a technology leader, Principal Enterprise Architect, and software engineer for over 21 years.

 Bobby Lindsey is a Machine Learning Specialist at Amazon Web Services. He’s been in technology for over a decade, spanning various technologies and multiple roles. He is currently focused on combining his background in software engineering, DevOps, and machine learning to help customers deliver machine learning workflows at scale. In his spare time, he enjoys reading, research, hiking, biking, and trail running.

Vikram Elango is an AI/ML Specialist Solutions Architect at Amazon Web Services, based in Virginia USA. Vikram helps financial and insurance industry customers with design, thought leadership to build and deploy machine learning applications at scale. He is currently focused on natural language processing, responsible AI, inference optimization and scaling ML across the enterprise. In his spare time, he enjoys traveling, hiking, cooking and camping with his family.

Mark Roy is a Principal Machine Learning Architect for AWS, helping customers design and build AI/ML solutions. Mark’s work covers a wide range of ML use cases, with a primary interest in computer vision, deep learning, and scaling ML across the enterprise. He has helped companies in many industries, including insurance, financial services, media and entertainment, healthcare, utilities, and manufacturing. Mark holds six AWS certifications, including the ML Specialty Certification. Prior to joining AWS, Mark was an architect, developer, and technology leader for over 25 years, including 19 years in financial services.

Read More