Running on-demand, serverless Apache Spark data processing jobs using Amazon SageMaker managed Spark containers and the Amazon SageMaker SDK

Apache Spark is a unified analytics engine for large scale, distributed data processing. Typically, businesses with Spark-based workloads on AWS use their own stack built on top of Amazon Elastic Compute Cloud (Amazon EC2), or Amazon EMR to run and scale Apache Spark, Hive, Presto, and other big data frameworks. This is useful for persistent workloads, in which you want these Spark clusters to be up and running 24/7, or at best, would have to come up with an architecture to spin up and spin down the cluster on a schedule or on demand.

Amazon SageMaker Processing lets you easily run preprocessing, postprocessing, model evaluation or other fairly generic transform workloads on a fully managed infrastructure. Previously, Amazon SageMaker Processing included a built-in container for Scikit-learn style preprocessing. For using other libraries like Spark, you have the flexibility to bring in your own Docker containers. Amazon SageMaker Processing jobs can also be part of your Step Functions workflow for ML involving pre- and post-processing steps. For more information, see AWS Step Functions adds support for Amazon SageMaker Processing.

Several machine learning(ML) workflows involve preprocessing data with Spark (or other libraries) and then passing in training data to a training step. The following workflow shows an Extract, Transform and Load (ETL) step that leads to model training and finally to model endpoint deployment using AWS Step Functions.

Including Spark steps in such workflows requires additional steps to provision and set up these clusters. Alternatively, you can do using AWS Glue, a fully managed ETL service that makes it easy for customers to write Python or Scala based scripts to preprocess data for ML training.

We’re happy to add a managed Spark container and associated SDK enhancements to Amazon SageMaker Processing, which lets you perform large scale, distributed processing on Spark by simply submitting a PySpark or Java/Scala Spark application. You can use this feature in Amazon SageMaker Studio and Amazon SageMaker notebook instances.

To demonstrate, the following code example runs a PySpark script on Amazon SageMaker Processing by using the PySparkProcessor:

from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
) 

spark_processor.run(
    submit_app_py="./path/to/your/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix],
    spark_event_logs_s3_uri='s3://' + bucket + '/' + prefix + '/spark_event_logs',
    logs=False
)

We can look at this example in some more detail. The PySpark script name ‘preprocess.py’ such as the one shown below, that loads a large CSV file from Amazon Simple Storage Service (Amazon S3) into a Spark dataframe, fits and transforms this dataframe into an output dataframe, and converts and saves a CSV back to Amazon S3:

import time
import sys
import os
import shutil
import csv

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *


def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    
    # Convert command line args into a map of args
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))

    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    
    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType([StructField("sex", StringType(), True), 
                         StructField("length", DoubleType(), True),
                         StructField("diameter", DoubleType(), True),
                         StructField("height", DoubleType(), True),
                         StructField("whole_weight", DoubleType(), True),
                         StructField("shucked_weight", DoubleType(), True),
                         StructField("viscera_weight", DoubleType(), True), 
                         StructField("shell_weight", DoubleType(), True), 
                         StructField("rings", DoubleType(), True)])

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(('s3://' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],'abalone.csv')), header=False, schema=schema)

    #StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    #vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(inputCols=["sex_vec", 
                                           "length", 
                                           "diameter", 
                                           "height", 
                                           "whole_weight", 
                                           "shucked_weight", 
                                           "viscera_weight", 
                                           "shell_weight"], 
                                outputCol="features")
    
    # The pipeline comprises of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
    
    # This step trains the feature transformers
    model = pipeline.fit(total_df)
    
    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)
    
    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
    
    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile('s3://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))
    
    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile('s3://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))


if __name__ == "__main__":
    main()

You can easily start a Spark based processing job by using the PySparkProcessor() class as shown below:

from sagemaker.spark.processing import PySparkProcessor

# Upload the raw input dataset to S3
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix_abalone = prefix + '/input/raw/abalone'
input_preprocessed_prefix_abalone = prefix + '/input/preprocessed/abalone'
model_prefix = prefix + '/model'

sagemaker_session.upload_data(path='./data/abalone.csv', bucket=bucket, key_prefix=input_prefix_abalone)

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app_py="./code/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    spark_event_logs_s3_uri='s3://' + bucket + '/' + prefix + '/spark_event_logs',
    logs=False
)

When running this in Amazon SageMaker Studio or Amazon SageMaker notebook instance, the output shows the job’s progress:

Job Name:  sm-spark-<...>
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://<bucketname>/<prefix>/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'S3Output': {'S3Uri': 's3://<bucketname>/<prefix>', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]

In Amazon SageMaker Studio, you can describe your processing jobs and view relevant details by choosing the processing job name (right-click), and choosing Open in trial details.

You can also track the processing job’s settings, logs, and metrics on the Amazon SageMaker console as shown in the following screenshot.

After a job completes, if the spark_event_logs_s3_uri was specified in the run() function, the Spark UI can be viewed by running the history server:

spark_processor.start_history_server()

If run from an Amazon SageMaker Notebook instance, the output will include a proxy URL where the history server can be accessed:

Starting history server...
History server is up on https://<your-notebook>.notebook.us-west-2.sagemaker.aws/proxy/15050

Visiting this URL will bring you to the history server web interface as shown in the screenshot below:

Additional python and jar file dependencies can also be specified in your Spark jobs. For example, if you want to serialize an MLeap model, you can specify these additional dependencies by modifying the call to the run() function of PySparkProcessor:

spark_processor.run(
    submit_app_py="./code/preprocess-mleap.py",
    submit_py_files=["./spark-mleap/mleap-0.15.0.zip"],
    submit_jars=["./spark-mleap/mleap-spark-assembly.jar"],
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    logs=False
)

Finally, overriding Spark configuration is crucial for several tasks such as tuning your Spark application or configuring the Hive metastore. You can override Spark, Hive, Hadoop configurations using our Python SDK.

For example, the following code overrides spark.executor.memory and spark.executor.cores:

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
)

configuration = [{
  "Classification": "spark-defaults",
  "Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
}]

spark_processor.run(
    submit_app_py="./code/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    configuration=configuration,
    logs=False
)

Try out this example on your own by navigating to the examples tab in your Amazon SageMaker notebook instance, or by cloning the Amazon SageMaker examples directory and navigating to the folder with Amazon SageMaker Processing examples.

Additionally, you can set up an end-to-end Spark workflow for your use cases using Amazon SageMaker and other AWS services:

Conclusion

Amazon SageMaker makes extensive use of Docker containers to allow users to build a runtime environment for data preparation, training, and inference code. Amazon SageMaker’s built-in Spark container for Amazon SageMaker Processing provides a managed Spark runtime including all library components and dependencies needed to run distributed data processing workloads. The example discussed in the blog shows how developers and data scientists can take advantage of the built-in Spark container on Amazon SageMaker to focus on more important aspects of preparing and preprocessing data. Instead of spending time tuning, scaling, or managing Spark infrastructure, developers can focus on core implementation.


About the Authors

 Shreyas Subramanian is a AI/ML specialist Solutions Architect, and helps customers by using Machine Learning to solve their business challenges using the AWS platform.

 

 

 

 

Andrew Packer is a Software Engineer in Amazon AI where he is excited about building scalable, distributed machine learning infrastructure for the masses. In his spare time, he likes playing guitar and exploring the PNW.

 

 

 

 

Vidhi Kastuar is a Sr. Product Manager for Amazon SageMaker, focusing on making machine learning and artificial intelligence simple, easy to use and scalable for all users and businesses. Prior to AWS, Vidhi was Director of Product Management at Veritas Technologies. For fun outside work, Vidhi loves to sketch and paint, work as a career coach, and spend time with her family and friends.

Read More