Build a generative AI image description application with Anthropic’s Claude 3.5 Sonnet on Amazon Bedrock and AWS CDK

Build a generative AI image description application with Anthropic’s Claude 3.5 Sonnet on Amazon Bedrock and AWS CDK

Generating image descriptions is a common requirement for applications across many industries. One common use case is tagging images with descriptive metadata to improve discoverability within an organization’s content repositories. Ecommerce platforms also use automatically generated image descriptions to provide customers with additional product details. Descriptive image captions also improve accessibility for users with visual impairments.

With advances in generative artificial intelligence (AI) and multimodal models, producing image descriptions is now more straightforward. Amazon Bedrock provides access to the Anthropic’s Claude 3 family of models, which incorporates new computer vision capabilities enabling Anthropic’s Claude to comprehend and analyze images. This unlocks new possibilities for multimodal interaction. However, building an end-to-end application often requires substantial infrastructure and slows development.

The Generative AI CDK Constructs coupled with Amazon Bedrock offer a powerful combination to expedite application development. This integration provides reusable infrastructure patterns and APIs, enabling seamless access to cutting-edge foundation models (FMs) from Amazon and leading startups. Amazon Bedrock is a fully managed service that offers a choice of high-performing FMs from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API, along with a broad set of capabilities to build generative AI applications with security, privacy, and responsible AI. Generative AI CDK Constructs can accelerate application development by providing reusable infrastructure patterns, allowing you to focus your time and effort on the unique aspects of your application.

In this post, we delve into the process of building and deploying a sample application capable of generating multilingual descriptions for multiple images with a Streamlit UI, AWS Lambda powered with the Amazon Bedrock SDK, and AWS AppSync driven by the open source Generative AI CDK Constructs.

Multimodal models

Multimodal AI systems are an advanced type of AI that can process and analyze data from multiple modalities at once, including text, images, audio, and video. Unlike traditional AI models trained on a single data type, multimodal AI integrates diverse data sources to develop a more comprehensive understanding of complex information.

Anthropic’s Claude 3 on Amazon Bedrock is a leading multimodal model with computer vision capabilities to analyze images and generate descriptive text outputs. Anthropic’s Claude 3 excels at interpreting complex visual assets like charts, graphs, diagrams, reports, and more. The model combines its computer vision with language processing to provide nuanced text summaries of key information extracted from images. This allows Anthropic’s Claude 3 to develop a deeper understanding of visual data than traditional single-modality AI.

In March 2024, Amazon Bedrock provided access to the Anthropic’s Claude 3 family. The three models in the family are Anthropic’s Claude 3 Haiku, the fastest and most compact model for near-instant responsiveness, Anthropic’s Claude 3 Sonnet, the ideal balanced model between skills and speed, and Anthropic’s Claude 3 Opus, the most intelligent offering for top-level performance on highly complex tasks. In June 2024, Amazon Bedrock announced support for Anthropic’s Claude 3.5 as well. The sample application in this post supports Claude 3.5 Sonnet and all the three Claude 3 models.

Generative AI CDK Constructs

Generative AI CDK Constructs, an extension to the AWS Cloud Development Kit (AWS CDK), is an open source development framework for defining cloud infrastructure as code (IaC) and deploying it through AWS CloudFormation.

Constructs are the fundamental building blocks of AWS CDK applications. The AWS Construct Library categorizes constructs into three levels: Level 1 (the lowest-level construct with no abstraction), Level 2 (mapping directly to single AWS CloudFormation resources), and Level 3 (patterns with the highest level of abstraction).

The Generative AI CDK Constructs Library provides modular building blocks to seamlessly integrate AWS services and resources into solutions using generative AI capabilities. By using Amazon Bedrock to access FMs and combining with serverless AWS services such as Lambda and AWS AppSync, these AWS CDK constructs streamline the process of assembling cloud infrastructure for generative AI. You can rapidly configure and deploy solutions to generate content using intuitive abstractions. This approach boosts productivity and reduces time-to-market for delivering innovative applications powered by the latest advances in generative AI on the AWS Cloud.

Solution overview

The sample application in this post uses the aws-summarization-appsync-stepfn construct from the Generative AI CDK Constructs Library. The aws-summarization-appsync-stepfn construct provides a serverless architecture that uses AWS AppSync, AWS Step Functions, and Amazon EventBridge to deliver an asynchronous image summarization service. This construct offers a scalable and event-driven solution for processing and generating descriptions for image assets.

AWS AppSync acts as the entry point, exposing a GraphQL API that enables clients to initiate image summarization and description requests. The API utilizes subscription mutations, allowing for asynchronous runs of the requests. This decoupling promotes best practices for event-driven, loosely coupled systems.

EventBridge serves as the event bus, facilitating the communication between AWS AppSync and Step Functions. When a client submits a request through the GraphQL API, an event is emitted to EventBridge, invoking a run of the Step Functions workflow.

Step Functions orchestrates the run of three Lambda functions, each responsible for a specific task in the image summarization process:

  • Input validator – This Lambda function performs input validation, making sure the provided requests adhere to the expected format. It also handles the upload of the input image assets to an Amazon Simple Storage Service (Amazon S3) bucket designated for raw assets.
  • Document reader – This Lambda function retrieves the raw image assets from the input asset bucket, performs image moderation checks using Amazon Rekognition, and uploads the processed assets to an S3 bucket designated for transformed files. This separation of raw and processed assets facilitates auditing and versioning.
  • Generate summary – This Lambda function generates a textual summary or description for the processed image assets, using machine learning (ML) models or other image analysis techniques.

The Step Functions workflow orchestrator employs a Map state, enabling parallel runs of multiple image assets. This concurrent processing capability provides optimal resource utilization and minimizes latency, delivering a highly scalable and efficient image summarization solution.

User authentication and authorization are handled by Amazon Cognito, providing secure access management and identity services for the application’s users. This makes sure only authenticated and authorized users can access and interact with the image summarization service. The solution incorporates observability features through integration with Amazon CloudWatch and AWS X-Ray.

The UI for the application is implemented using the Streamlit open source framework, providing a modern and responsive experience for interacting with the image summarization service. You can access the source code for the project in the public GitHub repository.

The following diagram shows the architecture to deliver this use case.

architecture diagram

The workflow to generate image descriptions includes the following steps:

  1. The user uploads the input image to an S3 bucket designated for input assets.
  2. The upload invokes the image summarization mutation API exposed by AWS AppSync. This will initiate the serverless workflow.
  3. AWS AppSync publishes an event to EventBridge to invoke the next step in the workflow.
  4. EventBridge routes the event to a Step Functions state machine.
  5. The Step Functions state machine invokes a Lambda function that validates the input request parameters.
  6. Upon successful validation, the Step Functions state machine invokes a document reader Lambda function. This function runs an image moderation check using Amazon Rekognition. If no unsafe or explicit content is detected, it pushes the image to a transformed assets S3 bucket.
  7. A summary generator Lambda function is invoked, which reads the transformed image. It uses the Amazon Bedrock library to invoke the Anthropic’s Claude 3 Sonnet model, passing the image bytes as input.
  8. Anthropic’s Claude 3 Sonnet generates a textual description for the input image.
  9. The summary generator publishes the generated description through an AWS AppSync subscription. The Streamlit UI application listens for events from this subscription and displays the generated description to the user once received.

The following figure illustrates the workflow of the Step Functions state machine.

Step Functions workflow

Prerequisites

To implement this solution, you should have the following prerequisites:

aws configure --profile [your-profile]
AWS Access Key ID [None]: xxxxxx
AWS Secret Access Key [None]:yyyyyyyyyy
Default region name [None]: us-east-1
Default output format [None]: json

Build and deploy the solution

Complete the following steps to set up the solution:

  1. Clone the GitHub repository.
    If using HTTPS, use the following code:

    git clone https://github.com/aws-samples/generative-ai-cdk-constructs-samples.git

    If using SSH, use the following code:

    git clone git@github.com:aws-samples/generative-ai-cdk-constructs-samples.git

  2. Change the directory to the sample solution:
    cd samples/image-description

  3. Update the stage variable to a unique value:
    cd lib

  4. Open image-description-stack.ts
    const stage= <Unique value>

  5. Install all dependencies:
    npm install

  6. Bootstrap AWS CDK resources on the AWS account. Replace ACCOUNT_ID and REGION with your own values:
    cdk bootstrap aws://ACCOUNT_ID/REGION

  7. Deploy the solution:
    cdk deploy

The preceding command deploys the stack in your account. The deployment will take approximately 5 minutes to complete.

  1. Configure client_app:
    cd client_app
    python -m venv venv
    source venv/bin/activate
    pip install -r requirements.txt

  2. Within the /client_app directory, create a new file named .env with the following content. Replace the property values with the values retrieved from the stack outputs.
    COGNITO_DOMAIN="<ImageDescriptionStack.CognitoDomain>"
    REGION="<ImageDescriptionStack.Region>"
    USER_POOL_ID="<ImageDescriptionStack.UserPoolId>"
    CLIENT_ID="<ImageDescriptionStack.ClientId>"
    CLIENT_SECRET="COGNITO_CLIENT_SECRET"
    IDENTITY_POOL_ID="<ImageDescriptionStack.IdentityPoolId>"
    APP_URI="http://localhost:8501/"
    AUTHENTICATED_ROLE_ARN="<ImageDescriptionStack.AuthenticatedRoleArn>"
    GRAPHQL_ENDPOINT = "<ImageDescriptionStack.GraphQLEndpoint>"
    S3_INPUT_BUCKET = "<ImageDescriptionStack.InputsAssetsBucket>"
    S3_PROCESSED_BUCKET = "<ImageDescriptionStack.processedAssetsBucket>"

COGNITO_CLIENT_SECRET is a secret value that can be retrieved from the Amazon Cognito console. Navigate to the user pool created by the stack. Under App integration, navigate to App clients and analytics, and choose App client name. Under App client information, choose Show client secret and copy the value of the client secret.

  1. Run client_app:
    streamlit run Home.py

When the client application is up and running, it will open the browser 8501 port (http://localhost:8501/Home).

Make sure your virtual environment is free from SSL certificate issues. If any SSL certificate issues are present, reinstall the CA certificates and OpenSSL package using the following command:

brew reinstall ca-certificates openssl

Test the solution

To test the solution, we upload some sample images and generate descriptions in different applications. Complete the following steps:

  1. In the Streamlit UI, choose Log In and register the user for the first time
    Home page
  2. After the user is registered and logged in, choose Image Description in the navigation pane.
    home page
  3. Upload multiple images and select the preferred model configuration ( Anthropic’s Claude 3.5 Sonnet or Anthropic’s Claude 3), then choose Submit.

The uploaded image and the generated description are shown in the center pane.

  1. Set the language as French in the left pane and upload a new image, then choose Submit.

The image description is generated in French.

Clean up

To avoid incurring unintended charges, delete the resources you created:

  1. Remove all data from the S3 buckets.
  2. Run the CDK destroy
  3. Delete the S3 buckets.

Conclusion

In this post, we discussed how to integrate Amazon Bedrock with Generative AI CDK Constructs. This solution enables the rapid development and deployment of cloud infrastructure tailored for an image description application by using the power of generative AI, specifically Anthropic’s Claude 3. The Generative AI CDK Constructs abstract the intricate complexities of infrastructure, thereby accelerating development timelines.

The Generative AI CDK Constructs Library offers a comprehensive suite of constructs, empowering developers to augment and enhance generative AI capabilities within their applications, unlocking a myriad of possibilities for innovation. Try out the Generative AI CDK Constructs Library for your own use cases, and share your feedback and questions in the comments.


About the Authors

Dinesh Sajwan is a Senior Solutions Architect with the Prototyping Acceleration team at Amazon Web Services. He helps customers to drive innovation and accelerate their adoption of cutting-edge technologies, enabling them to stay ahead of the curve in an ever-evolving technological landscape. Beyond his professional endeavors, Dinesh enjoys a quiet life with his wife and three children.

Justin Lewis leads the Emerging Technology Accelerator at AWS. Justin and his team help customers build with emerging technologies like generative AI by providing open source software examples to inspire their own innovation. He lives in the San Francisco Bay Area with his wife and son.

Alain Krok is a Senior Solutions Architect with a passion for emerging technologies. His past experience includes designing and implementing IIoT solutions for the oil and gas industry and working on robotics projects. He enjoys pushing the limits and indulging in extreme sports when he is not designing software.

Michael Tran is a Sr. Solutions Architect with Prototyping Acceleration team at Amazon Web Services. He provides technical guidance and helps customers innovate by showing the art of the possible on AWS. He specializes in building prototypes in the AI/ML space. You can contact him @Mike_Trann on Twitter.

Read More

Use LangChain with PySpark to process documents at massive scale with Amazon SageMaker Studio and Amazon EMR Serverless

Use LangChain with PySpark to process documents at massive scale with Amazon SageMaker Studio and Amazon EMR Serverless

Harnessing the power of big data has become increasingly critical for businesses looking to gain a competitive edge. From deriving insights to powering generative artificial intelligence (AI)-driven applications, the ability to efficiently process and analyze large datasets is a vital capability. However, managing the complex infrastructure required for big data workloads has traditionally been a significant challenge, often requiring specialized expertise. That’s where the new Amazon EMR Serverless application integration in Amazon SageMaker Studio can help.

With the introduction of EMR Serverless support for Apache Livy endpoints, SageMaker Studio users can now seamlessly integrate their Jupyter notebooks running sparkmagic kernels with the powerful data processing capabilities of EMR Serverless. This allows SageMaker Studio users to perform petabyte-scale interactive data preparation, exploration, and machine learning (ML) directly within their familiar Studio notebooks, without the need to manage the underlying compute infrastructure. By using the Livy REST APIs, SageMaker Studio users can also extend their interactive analytics workflows beyond just notebook-based scenarios, enabling a more comprehensive and streamlined data science experience within the Amazon SageMaker ecosystem.

In this post, we demonstrate how to leverage the new EMR Serverless integration with SageMaker Studio to streamline your data processing and machine learning workflows.

Benefits of integrating EMR Serverless with SageMaker Studio

The EMR Serverless application integration in SageMaker Studio offers several key benefits that can transform the way your organization approaches big data:

  • Simplified infrastructure management – By abstracting away the complexities of setting up and managing Spark clusters, the EMR Serverless integration allows you to quickly spin up the compute resources needed for your big data workloads, without the work of provisioning and configuring the underlying infrastructure.
  • Seamless integration with SageMaker – As a built-in feature of the SageMaker platform, the EMR Serverless integration provides a unified and intuitive experience for data scientists and engineers. You can access and utilize this functionality directly within the SageMaker Studio environment, allowing for a more streamlined and efficient development workflow.
  • Cost optimization – The serverless nature of the integration means you only pay for the compute resources you use, rather than having to provision and maintain a persistent cluster. This can lead to significant cost savings, especially for workloads with variable or intermittent usage patterns.
  • Scalability and performance – The EMR Serverless integration automatically scales the compute resources up or down based on your workload’s demands, making sure you always have the necessary processing power to handle your big data tasks. This flexibility helps optimize performance and minimize the risk of bottlenecks or resource constraints.
  • Reduced operational overhead – The EMR Serverless integration with AWS streamlines big data processing by managing the underlying infrastructure, freeing up your team’s time and resources. This feature in SageMaker Studio empowers data scientists, engineers, and analysts to focus on developing data-driven applications, simplifying infrastructure management, reducing costs, and enhancing scalability. By unlocking the potential of your data, this powerful integration drives tangible business results.

Solution overview

SageMaker Studio is a fully integrated development environment (IDE) for ML that enables data scientists and developers to build, train, debug, deploy, and monitor models within a single web-based interface. SageMaker Studio runs inside an AWS managed virtual private cloud (VPC), with network access for SageMaker Studio domains, in this setup configured as VPC-only. SageMaker Studio automatically creates an elastic network interface within your VPC’s private subnet, which connects to the required AWS services through VPC endpoints. This same interface is also used for provisioning EMR clusters. The following diagram illustrates this solution.

An ML platform administrator can manage permissioning for the EMR Serverless integration in SageMaker Studio. The administrator can configure the appropriate privileges by updating the runtime role with an inline policy, allowing SageMaker Studio users to interactively create, update, list, start, stop, and delete EMR Serverless clusters. SageMaker Studio users are presented with built-in forms within the SageMaker Studio UI that don’t require additional configuration to interact with both EMR Serverless and Amazon Elastic Compute Cloud (Amazon EC2) based clusters.

Apache Spark and its Python API, PySpark, empower users to process massive datasets effortlessly by using distributed computing across multiple nodes. These powerful frameworks simplify the complexities of parallel processing, enabling you to write code in a familiar syntax while the underlying engine manages data partitioning, task distribution, and fault tolerance. With scalability as a core strength, Spark and PySpark allow you to handle datasets of virtually any size, eliminating the constraints of a single machine.

Empowering knowledge retrieval and generation with scalable Retrieval Augmented Generation (RAG) architecture is increasingly important in today’s era of ever-growing information. Effectively using data to provide contextual and informative responses has become a crucial challenge. This is where RAG systems excel, combining the strengths of information retrieval and text generation to deliver comprehensive and accurate results. In this post, we explore how to build a scalable and efficient RAG system using the new EMR Serverless integration, Spark’s distributed processing, and an Amazon OpenSearch Service vector database powered by the LangChain orchestration framework. This solution enables you to process massive volumes of textual data, generate relevant embeddings, and store them in a powerful vector database for seamless retrieval and generation.

Authentication mechanism

When integrating EMR Serverless in SageMaker Studio, you can use runtime roles. Runtime roles are AWS Identity and Access Management (IAM) roles that you can specify when submitting a job or query to an EMR Serverless application. These runtime roles provide the necessary permissions for your workloads to access AWS resources, such as Amazon Simple Storage Service (Amazon S3) buckets. When integrating EMR Serverless in SageMaker Studio, you can configure the IAM role to be used by SageMaker Studio. By using EMR runtime roles, you can make sure your workloads have the minimum set of permissions required to access the necessary resources, following the principle of least privilege. This enhances the overall security of your data processing pipelines and helps you maintain better control over the access to your AWS resources.

Cost attribution of EMR Serverless clusters

EMR Serverless clusters created within SageMaker Studio are automatically tagged with system default tags, specifically the domain-arn and user-profile-arn tags. These system-generated tags simplify cost allocation and attribution of Amazon EMR resources. See the following code:

# domain tag
sagemaker:domain-arn: arn:aws:sagemaker:<region>:<account-id>:domain/<domain-id>

# user profile tag
sagemaker:user-profile-arn: arn:aws:sagemaker:<region>:<account-id>:user-profile/<domain-id>/<user-profile-name>

To learn more about enterprise-level cost allocation for ML environments, refer to Set up enterprise-level cost allocation for ML environments and workloads using resource tagging in Amazon SageMaker.

Prerequisites

Before you get started, complete the prerequisite steps in this section.

Create a SageMaker Studio domain

This post walks you through the integration between SageMaker Studio and EMR Serverless using an interactive SageMaker Studio notebook. We assume you already have a SageMaker Studio domain provisioned with a UserProfile and an ExecutionRole. If you don’t have a SageMaker Studio domain available, refer to Quick setup to Amazon SageMaker to provision one.

Create an EMR Serverless job runtime role

EMR Serverless allows you to specify IAM role permissions that an EMR Serverless job run can assume when calling other services on your behalf. This includes access to Amazon S3 for data sources and targets, as well as other AWS resources like Amazon Redshift clusters and Amazon DynamoDB tables. To learn more about creating a role, refer to Create a job runtime role.

The sample following IAM inline policy attached to a runtime role allows EMR Serverless to assume a runtime role that provides access to an S3 bucket and AWS Glue. You can modify the role to include any additional services that EMR Serverless needs to access at runtime. Additionally, make sure you scope down the resources in the runtime policies to adhere to the principle of least privilege.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "ReadAccessForEMRSamples",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::*.elasticmapreduce",
        "arn:aws:s3:::*.elasticmapreduce/*"
      ]
    },
    {
      "Sid": "FullAccessToOutputBucket",
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::<emrs-sample-s3-bucket-name>",
        "arn:aws:s3:::<emrs-sample-s3-bucket-name>/*"
      ]
    },
    {
      "Sid": "GlueCreateAndReadDataCatalog",
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:CreateDatabase",
        "glue:GetDataBases",
        "glue:CreateTable",
        "glue:GetTable",
        "glue:UpdateTable",
        "glue:DeleteTable",
        "glue:GetTables",
        "glue:GetPartition",
        "glue:GetPartitions",
        "glue:CreatePartition",
        "glue:BatchCreatePartition",
        "glue:GetUserDefinedFunctions"
      ],
      "Resource": [
        "*"
      ]
    }
  ]
}

Lastly, make sure your role has a trust relationship with EMR Serverless:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "emr-serverless.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Optionally, you can create a runtime role and policy using infrastructure as code (IaC), such as with AWS CloudFormation or Terraform, or using the AWS Command Line Interface (AWS CLI).

Update the SageMaker role to allow EMR Serverless access

This one-time task enables SageMaker Studio users to create, update, list, start, stop, and delete EMR Serverless clusters. We begin by creating an inline policy that grants the necessary permissions for these actions on EMR Serverless clusters, then attach the policy to the Studio domain or user profile role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "EMRServerlessUnTaggedActions",
      "Effect": "Allow",
      "Action": [
        "emr-serverless:ListApplications"
      ],
      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*"
    },
    {
      "Sid": "EMRServerlessPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam:<region>:<aws-account-id>:role/SM-EMRServerless-RunTime-role",
      "Condition": {
        "StringLike": {
          "iam:PassedToService": "emr-serverless.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EMRServerlessCreateApplicationAction",
      "Effect": "Allow",
      "Action": [
        "emr-serverless:CreateApplication",
        "emr-serverless:TagResource"
      ],
      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*",
      "Condition": {
        "ForAllValues:StringEquals": {
          "aws:TagKeys": [
            "sagemaker:domain-arn",
            "sagemaker:user-profile-arn",
            "sagemaker:space-arn"
          ]
        },
        "Null": {
          "aws:RequestTag/sagemaker:domain-arn": "false",
          "aws:RequestTag/sagemaker:user-profile-arn": "false",
          "aws:RequestTag/sagemaker:space-arn": "false"
        }
      }
    },
    {
      "Sid": "EMRServerlessDenyPermissiveTaggingAction",
      "Effect": "Deny",
      "Action": [
        "emr-serverless:TagResource",
        "emr-serverless:UntagResource"
      ],
      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*",
      "Condition": {
        "Null": {
          "aws:ResourceTag/sagemaker:domain-arn": "true",
          "aws:ResourceTag/sagemaker:user-profile-arn": "true",
          "aws:ResourceTag/sagemaker:space-arn": "true"
        }
      }
    },
    {
      "Sid": "EMRServerlessActions",
      "Effect": "Allow",
      "Action": [
        "emr-serverless:StartApplication",
        "emr-serverless:StopApplication",
        "emr-serverless:GetApplication",
        "emr-serverless:DeleteApplication",
        "emr-serverless:AccessLivyEndpoints",
        "emr-serverless:GetDashboardForJobRun"
      ],
      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/applications/*",
      "Condition": {
        "Null": {
          "aws:ResourceTag/sagemaker:domain-arn": "false",
          "aws:ResourceTag/sagemaker:user-profile-arn": "false",
          "aws:ResourceTag/sagemaker:space-arn": "false"
        }
      }
    }
  ]
}

Update the domain with EMR Serverless runtime roles

SageMaker Studio supports access to EMR Serverless clusters in two ways: in the same account as the SageMaker Studio domain or across accounts.

To interact with EMR Serverless clusters created in the same account as the SageMaker Studio domain, create a file named same-account-update-domain.json:

{
    "DomainId": "<emr-s-sm-studio-domain-id>",
    "DefaultUserSettings": {
        "JupyterLabAppSettings": {
            "EmrSettings": { 
                "ExecutionRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<same-account-emr-runtime-role>" ]
            }
        }
    }
}

Then run an update-domain command to allow all users inside a domain to allow users to use the runtime role:

aws –region <region> 
sagemaker update-domain 
--cli-input-json file://same-account-update-domain.json

For EMR Serverless clusters created in a different account, create a file named cross-account-update-domain.json:

{
    "DomainId": "<emr-s-sm-studio-domain-id>",
    "DefaultUserSettings": {
        "JupyterLabAppSettings": {
            "EmrSettings": { 
                "AssumableRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<cross-account-emr-runtime-role>" ]
            }
        }
    }
}

Then run an update-domain command to allow all users inside a domain to allow users to use the runtime role:

aws --region <region> 
sagemaker update-domain 
--cli-input-json file://cross-account-update-domain.json

Update the user profile with EMR Serverless runtime roles

Optionally, this update can be applied more granularly at the user profile level instead of the domain level. Similar to domain update, to interact with EMR Serverless clusters created in the same account as the SageMaker Studio domain, create a file named same-account-update-user-profile.json:

{
    "DomainId": "<emr-s-sm-studio-domain-id>",
    "UserProfileName": "<emr-s-sm-studio-user-profile-name>",
    "UserSettings": {
        "JupyterLabAppSettings": {
            "EmrSettings": { 
                "ExecutionRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<same-account-emr-runtime-role>" ]
            }
        }
    }
}

Then run an update-user-profile command to allow this user profile use this run time role:

aws –region <region> 
sagemaker update-domain 
--cli-input-json file://same-account-update-user-profile.json

For EMR Serverless clusters created in a different account, create a file named cross-account-update-user-profile.json:

{
    "DomainId": "<emr-s-sm-studio-domain-id>",
    "UserProfileName": "<emr-s-sm-studio-user-profile-name>",
    "UserSettings": {
        "JupyterLabAppSettings": {
            "EmrSettings": { 
                "AssumableRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<cross-account-emr-runtime-role>" ]
            }
        }
    }
}

Then run an update-user-profile command to allow all users inside a domain to allow users to use the runtime role:

aws --region <region> 
sagemaker update-user-profile 
--cli-input-json file://cross-account-update-user-profile.json

Grant access to the Amazon ECR repository

The recommended way to customize environments within EMR Serverless clusters is by using custom Docker images.

Make sure you have an Amazon ECR repository in the same AWS Region where you launch EMR Serverless applications. To create an ECR private repository, refer to Creating an Amazon ECR private repository to store images.

To grant users access to your ECR repository, add the following policies to the users and roles that create or update EMR Serverless applications using images from this repository:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ECRRepositoryListGetPolicy",
            "Effect": "Allow",
            "Action": [
                "ecr:GetDownloadUrlForLayer",
                "ecr:BatchGetImage",
                "ecr:DescribeImages"
            ],
            "Resource": "ecr-repository-arn"
        }
    ]
}

Customize the runtime environment in EMR Serverless clusters

Customizing cluster runtimes in advance is crucial for a seamless experience. As mentioned earlier, we use custom-built Docker images from an ECR repository to optimize our cluster environment, including the necessary packages and binaries. The simplest way to build these images is by using the SageMaker Studio built-in Docker functionality, as discussed in Accelerate ML workflows with Amazon SageMaker Studio Local Mode and Docker support. In this post, we build a Docker image that includes the Python 3.11 runtime and essential packages for a typical RAG workflow, such as langchain, sagemaker, opensearch-py, PyPDF2, and more.

Complete the following steps:

  1. Start by launching a SageMaker Studio JupyterLab notebook.
  2. Install Docker in your JupyterLab environment. For instructions, refer to Accelerate ML workflows with Amazon SageMaker Studio Local Mode and Docker support.
  3. Open a new terminal within your JupyterLab environment and verify the Docker installation by running the following:
    docker --version
    
    #OR
    
    docker info

  4. Create a Docker file (refer to Using custom images with EMR Serverless) and publish the image to an ECR repository:
    # example docker file for EMR Serverless
    
    FROM --platform=linux/amd64 public.ecr.aws/emr-serverless/spark/emr-7.0.0:latest
    USER root
    
    RUN dnf install python3.11 python3.11-pip
    
    WORKDIR /tmp
    RUN jar xf /usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar fake_shell.py && 
        sed -ie 's/version < "3.8"/version_info < (3,8)/' fake_shell.py && 
        jar uvf /usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar fake_shell.py
    WORKDIR /home/hadoop
    
    ENV PYSPARK_PYTHON=/usr/bin/python3.11
    
    RUN python3.11 -m pip install cython numpy matplotlib requests boto3 pandas PyPDF2 pikepdf pycryptodome langchain==0.0.310 opensearch-py seaborn plotly dash
    
    USER hadoop:hadoop

  5. From your JupyterLab terminal, run the following command to log in to the ECR repository:
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.us-east-1.amazonaws.com

  6. Run the following set of Docker commands to build, tag, and push the Docker image to the ECR repository:
    docker build --network sagemaker -t emr-serverless-langchain .
    
    docker tag emr-serverless-langchain:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-langchain:latest
    
    docker push --network sagemaker 123456789012.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-langchain:latest

Use the EMR Serverless integration with SageMaker Studio

In this section, we demonstrate the integration of EMR Serverless into SageMaker Studio and how you can effortlessly interact with your clusters, whether they are in the same account or across different accounts. To access SageMaker Studio, complete the following steps:

  1. On the SageMaker console, open SageMaker Studio.
  2. Depending on your organization’s setup, you can log in to Studio either through the IAM console or using AWS IAM Identity Center.

The new Studio experience is a serverless web UI, which makes sure any updates occur seamlessly and asynchronously, without interrupting your development experience.

  1. Under Data in the navigation pane, choose EMR Clusters.

You can navigate to two different tabs: EMR Serverless Applications or EMR Clusters (on Amazon EC2). For this post, we focus on EMR Serverless.

Create an EMR Serverless cluster

To create a new EMR Serverless cluster, complete the following steps:

  1. On the EMR Serverless Applications tab, choose Create.
  2. In the Network connections section, you can optionally select Connect to your VPC and nest your EMR Serverless cluster within a VPC and private subnet.
  3. To customize your cluster runtime, choose a compatible custom image from your ECR repository and make sure your user profile role has the necessary permissions to pull from this repository.

Interact with EMR Serverless clusters

EMR Serverless clusters can automatically scale down to zero when not in use, eliminating costs associated with idling resources. This feature makes EMR Serverless clusters highly flexible and cost-effective. You can list, view, create, start, stop, and delete all your EMR Serverless clusters directly within SageMaker Studio.

You can also interactively attach an existing cluster to a notebook by choosing Attach to new notebook.

Build a RAG document processing engine using PySpark

In this section, we use the SageMaker Studio cluster integration to parallelize data processing at a massive scale. A typical RAG framework consists of two main components:

  • Offline document embedding generation – This process involves extracting data (text, images, tables, and metadata) from various sources and generating embeddings using a large language embeddings model. These embeddings are then stored in a vector database, such as OpenSearch Service.
  • Online text generation with context – During this process, a user’s query is searched against the vector database, and the documents most similar to the query are retrieved. The retrieved documents, along with the user’s query, are combined into an augmented prompt and sent to a large language model (LLM), such as Meta Llama 3 or Anthropic Claude on Amazon Bedrock, for text generation.

In the following sections, we focus on the offline document embedding generation process and explore how to use PySpark on EMR Serverless using an interactive SageMaker Studio JupyterLab notebook to efficiently parallel process PDF documents.

Deploy an embeddings model

For this use case, we use the Hugging Face All MiniLM L6 v2 embeddings model from Amazon SageMaker JumpStart. To quickly deploy this embedding model, complete the following steps:

  1. In SageMaker Studio, choose JumpStart in the navigation pane.
  2. Search for and choose All MiniLM L6 v2.
  3. On the model card, choose Deploy.

Your model will be ready within a few minutes. Alternatively, you can choose any other embedding models from SageMaker JumpStart by filtering Task type to Text embedding.

Interactively build an offline document embedding generator

In this section, we use code from the following GitHub repo and interactively build a document processing engine using LangChain and PySpark. Complete the following steps:

  1. Create a SageMaker Studio JupyterLab development environment. For more details, see Boost productivity on Amazon SageMaker Studio: Introducing JupyterLab Spaces and generative AI tools.
  2. Choose an appropriate instance type and EBS storage volume for your development environment.

You can change the instance type at any time by stopping and restarting the space.

  1. Clone the sample code from the following GitHub repository and use the notebook available under use-cases/pyspark-langchain-rag-processor/Offline_RAG_Processor_on_SageMaker_Studio_using_EMR-Serverless.ipynb
  2. In SageMaker Studio, under Data in the navigation pane, choose EMR Clusters.
  3. On the EMR Serverless Applications tab, choose Create to create a cluster.
  4. Select your cluster and choose Attach to new notebook.
  5. Attach this cluster to a JupyterLab notebook running inside a space.

Alternatively, you can attach your cluster to any notebook within your JupyterLab space by choosing Cluster and selecting the EMR Serverless cluster you want to attach to the notebook.

Make sure you choose the SparkMagic PySpark kernel when interactively running PySpark workloads.

A successful cluster connection to a notebook should result in a useable Spark session and links to the Spark UI and driver logs.

When a notebook cell is run within a SparkMagic PySpark kernel, the operations are, by default, run inside a Spark cluster. However, if you decorate the cell with %%local, it allows the code to be run on the local compute where the JupyterLab notebook is hosted. We begin by reading a list of PDF documents from Amazon S3 directly into the cluster memory, as illustrated in the following diagram.

  1. Use the following code to read the documents:
    default_bucket = sess.default_bucket()
    destination_prefix = "test/raw-pdfs"
    
    # send default bucket context to spark using send_to_spark command
    %%send_to_spark -i default_bucket -t str -n SRC_BUCKET_NAME
    %%send_to_spark -i destination_prefix -t str -n SRC_FILE_PREFIX
    
    ...
    
    def list_files_in_s3_bucket_prefix(bucket_name, prefix):
        
        s3 = boto3.client('s3')
    
        # Paginate through the objects in the specified bucket and prefix, and collect all keys (file paths)
        paginator = s3.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    
        file_paths = []
        for page in page_iterator:
            if "Contents" in page:
                for obj in page["Contents"]:
                    if os.path.basename(obj["Key"]):
                        file_paths.append(obj["Key"])
    
        return file_paths
    
    def load_pdf_from_s3_into_memory(row):
        """
        Load a PDF file from an S3 bucket directly into memory.
        """
        try:
            src_bucket_name, src_file_key = row 
            s3 = boto3.client('s3')
            pdf_file = io.BytesIO()
            s3.download_fileobj(src_bucket_name, src_file_key, pdf_file)
            pdf_file.seek(0)
            pdf_reader = PdfReader(pdf_file)
            return (src_file_key, pdf_reader, len(pdf_reader.pages))
        
        except Exception as e:    
            return (os.path.basename(src_file_key), str(e))
    
    # create a list of file references in S3
    all_pdf_files = list_files_in_s3_bucket_prefix(
        bucket_name=SRC_BUCKET_NAME, 
        prefix=SRC_FILE_PREFIX
    )
    print(f"Found {len(all_pdf_files)} files ---> {all_pdf_files}")
    # Found 3 files ---> ['Lab03/raw-pdfs/AmazonSageMakerDeveloperGuide.pdf', 'Lab03/raw-pdfs/EC2DeveloperGuide.pdf', 'Lab03/raw-pdfs/S3DeveloperGuide.pdf']   
    
    # load documents into memory and return a single list of text-documents - map-reduce op
    pdfs_in_memory = pdfs_rdd.map(load_pdf_from_s3_into_memory).collect()

Next, you can visualize the size of each document to understand the volume of data you’re processing.

  1. You can generate charts and visualize your data within your PySpark notebook cell using static visualization tools like matplotlib and seaborn. See the following code:
    import numpy as np
    import matplotlib.pyplot as plt
    
    x_labels = [pdfx.split('/')[-1] for pdfx, _, _ in pdfs_in_memory]
    y_values = [pages_count for _, _, pages_count in pdfs_in_memory]
    x = range(len(y_values))
    
    ...
    
    # Adjust the layout
    plt.tight_layout()
    
    # Show the plot
    plt.show()
    
    %matplot plt

Every PDF document contains multiple pages to process, and this task can be run in parallel using Spark. Each document is split page by page, with each page referencing the global in-memory PDFs. We achieve parallelism at the page level by creating a list of pages and processing each one in parallel. The following diagram provides a visual representation of this process.

The extracted text from each page of multiple documents is converted into a LangChain-friendly Document class.

  1. The CustomDocument class, shown in the following code, is a custom implementation of the Document class that allows you to convert custom text blobs into a format recognized by LangChain. After conversion, the documents are split into chunks and prepared for embedding.
    class CustomDocument:
        def __init__(self, text, path, number):
         ...
    
    documents_custom = [
        CustomDocument(text=text, path=doc_source, number=page_num) 
        for text, doc_source, page_num in documents
    ]
    
    global_text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50
    )
    docs = global_text_splitter.split_documents(documents_custom)
    print(f"Total number of docs pre-split {len(documents_custom)} | after split {len(docs)}")

  2. Next, you can use LangChain’s built-in OpenSearchVectorSearch to create text embeddings. However, we use a custom EmbeddingsGenerator class that parallelizes (using PySpark) the embeddings generation process using a load-balanced SageMaker hosted embeddings model endpoint:
    import time
    from langchain.vectorstores import OpenSearchVectorSearch
    
    endpoint_name = 'jumpstart-all-MiniLM-L6-v2-endpoint'
    interface_component = 'jumpstart-all-MiniLM-L6-v2-endpoint-comp'
    client = boto3.client('runtime.sagemaker', region_name=REGION)
    
    def generate_embeddings(input):
    
        body = input.encode('utf-8')
        
        response = client.invoke_endpoint(
           ...
        
        
    class EmbeddingsGenerator:
     
        @staticmethod
        def embed_documents(input_text, normalize=True):
            assert isinstance(input_text, list), "Input type must me list to embed_documents function"
        
            input_text_rdd = spark.sparkContext.parallelize(input_text)
            embeddings_generated = input_text_rdd.map(generate_embeddings).collect()
            ...
        
        @staticmethod
        def embed_query(input_text):
            status_code, embedding = generate_embeddings(input_text)
            if status_code == 200:
                return embedding
            else: 
                return None
    
    
    start = time.time()
    docsearch = OpenSearchVectorSearch.from_documents(
        docs, 
        EmbeddingsGenerator, 
        opensearch_url=OPENSEARCH_DOMAIN_URL,
        bulk_size=len(docs),
        http_auth=(user, pwd),
        index_name=INDEX_NAME_OSE,
        engine="faiss"
    )
    
    end = time.time()
    print(f"Total Time for ingestion: {round(end - start, 2)} secs")

The custom EmbeddingsGenerator class can generate embeddings for approximately 2,500 pages (12,000 chunks) of documents in under 180 seconds using just two concurrent load-balanced SageMaker embedding model endpoints and 10 PySpark worker nodes. This process can be further accelerated by increasing the number of load-balanced embedding endpoints and worker nodes in the cluster.

Conclusion

The integration of EMR Serverless with SageMaker Studio represents a significant leap forward in simplifying and enhancing big data processing and ML workflows. By eliminating the complexities of infrastructure management, enabling seamless scalability, and optimizing costs, this powerful combination empowers organizations to use petabyte-scale data processing without the overhead typically associated with managing Spark clusters. The streamlined experience within SageMaker Studio enables data scientists and engineers to focus on what truly matters—driving insights and innovation from their data. Whether you’re processing massive datasets, building RAG systems, or exploring other advanced analytics, this integration opens up new possibilities for efficiency and scale, all within the familiar and user-friendly environment of SageMaker Studio.

As data continues to grow in volume and complexity, adopting tools like EMR Serverless and SageMaker Studio will be key to maintaining a competitive edge in the ever-evolving landscape of data-driven decision-making. We encourage you to try this feature today by setting up SageMaker Studio using the SageMaker quick setup guide. To learn more about the EMR Serverless integration with SageMaker Studio, refer to Prepare data using EMR Serverless. You can explore more generative AI samples and use cases in the GitHub repository.


About the authors

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Pranav Murthy is an AI/ML Specialist Solutions Architect at AWS. He focuses on helping customers build, train, deploy and migrate machine learning (ML) workloads to SageMaker. He previously worked in the semiconductor industry developing large computer vision (CV) and natural language processing (NLP) models to improve semiconductor processes using state of the art ML techniques. In his free time, he enjoys playing chess and traveling. You can find Pranav on LinkedIn.

Naufal Mir is an Senior GenAI/ML Specialist Solutions Architect at AWS. He focuses on helping customers build, train, deploy and migrate machine learning (ML) workloads to SageMaker. He previously worked at financial services institutes developing and operating systems at scale. He enjoys ultra endurance running and cycling.

Kunal Jha is a Senior Product Manager at AWS. He is focused on building Amazon SageMaker Studio as the best-in-class choice for end-to-end ML development. In his spare time, Kunal enjoys skiing and exploring the Pacific Northwest. You can find him on LinkedIn.

Ashwin Krishna is a Senior SDE working for SageMaker Studio at Amazon Web Services. He is focused on building interactive ML solutions for AWS enterprise customers to achieve their business needs. He is a big supporter of Arsenal football club and spends spare time playing and watching soccer.

Harini Narayanan is a software engineer at AWS, where she’s excited to build cutting-edge data preparation technology for machine learning at SageMaker Studio. With a keen interest in sustainability, interior design, and a love for all things green, Harini brings a thoughtful approach to innovation, blending technology with her diverse passions.

Read More