Deploy a Hugging Face (PyAnnote) speaker diarization model on Amazon SageMaker as an asynchronous endpoint

Speaker diarization, an essential process in audio analysis, segments an audio file based on speaker identity. This post delves into integrating Hugging Face’s PyAnnote for speaker diarization with Amazon SageMaker asynchronous endpoints.

We provide a comprehensive guide on how to deploy speaker segmentation and clustering solutions using SageMaker on the AWS Cloud. You can use this solution for applications dealing with multi-speaker (over 100) audio recordings.

Solution overview

Amazon Transcribe is the go-to service for speaker diarization in AWS. However, for non-supported languages, you can use other models (in our case, PyAnnote) that will be deployed in SageMaker for inference. For short audio files where the inference takes up to 60 seconds, you can use real-time inference. For longer than 60 seconds, asynchronous inference should be used. The added benefit of asynchronous inference is the cost savings by auto scaling the instance count to zero when there are no requests to process.

Hugging Face is a popular open source hub for machine learning (ML) models. AWS and Hugging Face have a partnership that allows a seamless integration through SageMaker with a set of AWS Deep Learning Containers (DLCs) for training and inference in PyTorch or TensorFlow, and Hugging Face estimators and predictors for the SageMaker Python SDK. SageMaker features and capabilities help developers and data scientists get started with natural language processing (NLP) on AWS with ease.

The integration for this solution involves using Hugging Face’s pre-trained speaker diarization model using the PyAnnote library. PyAnnote is an open source toolkit written in Python for speaker diarization. This model, trained on the sample audio dataset, enables effective speaker partitioning in audio files. The model is deployed on SageMaker as an asynchronous endpoint setup, providing efficient and scalable processing of diarization tasks.

The following diagram illustrates the solution architecture.Solution architecture

For this post, we use the following audio file.

Stereo or multi-channel audio files are automatically downmixed to mono by averaging the channels. Audio files sampled at a different rate are resampled to 16kHz automatically upon loading.

Prerequisites

Complete the following prerequisites:

  1. Create a SageMaker domain.
  2. Make sure your AWS Identity and Access Management (IAM) user has the necessary access permissions for creating a SageMaker role.
  3. Make sure the AWS account has a service quota for hosting a SageMaker endpoint for an ml.g5.2xlarge instance.

Create a model function for accessing PyAnnote speaker diarization from Hugging Face

You can use the Hugging Face Hub to access the desired pre-trained PyAnnote speaker diarization model. You use the same script for downloading the model file when creating the SageMaker endpoint.

Hugging face

See the following code:

from PyAnnote.audio import Pipeline

def model_fn(model_dir):
# Load the model from the specified model directory
model = Pipeline.from_pretrained(
"PyAnnote/speaker-diarization-3.1",
use_auth_token="Replace-with-the-Hugging-face-auth-token")
return model

Package the model code

Prepare essential files like inference.py, which contains the inference code:

%%writefile model/code/inference.py
from PyAnnote.audio import Pipeline
import subprocess
import boto3
from urllib.parse import urlparse
import pandas as pd
from io import StringIO
import os
import torch

def model_fn(model_dir):
    # Load the model from the specified model directory
    model = Pipeline.from_pretrained(
        "PyAnnote/speaker-diarization-3.1",
        use_auth_token="hf_oBxxxxxxxxxxxx)
    return model 


def diarization_from_s3(model, s3_file, language=None):
    s3 = boto3.client("s3")
    o = urlparse(s3_file, allow_fragments=False)
    bucket = o.netloc
    key = o.path.lstrip("/")
    s3.download_file(bucket, key, "tmp.wav")
    result = model("tmp.wav")
    data = {} 
    for turn, _, speaker in result.itertracks(yield_label=True):
        data[turn] = (turn.start, turn.end, speaker)
    data_df = pd.DataFrame(data.values(), columns=["start", "end", "speaker"])
    print(data_df.shape)
    result = data_df.to_json(orient="split")
    return result


def predict_fn(data, model):
    s3_file = data.pop("s3_file")
    language = data.pop("language", None)
    result = diarization_from_s3(model, s3_file, language)
    return {
        "diarization_from_s3": result
    }

Prepare a requirements.txt file, which contains the required Python libraries necessary to run the inference:

with open("model/code/requirements.txt", "w") as f:
    f.write("transformers==4.25.1n")
    f.write("boto3n")
    f.write("PyAnnote.audion")
    f.write("soundfilen")
    f.write("librosan")
    f.write("onnxruntimen")
    f.write("wgetn")
    f.write("pandas")

Lastly, compress the inference.py and requirements.txt files and save it as model.tar.gz:

!tar zcvf model.tar.gz *

Configure a SageMaker model

Define a SageMaker model resource by specifying the image URI, model data location in Amazon Simple Storage Service (S3), and SageMaker role:

import sagemaker
import boto3

sess = sagemaker.Session()

sagemaker_session_bucket = None
if sagemaker_session_bucket is None and sess is not None:
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client("iam")
    role = iam.get_role(RoleName="sagemaker_execution_role")["Role"]["Arn"]

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

Upload the model to Amazon S3

Upload the zipped PyAnnote Hugging Face model file to an S3 bucket:

s3_location = f"s3://{sagemaker_session_bucket}/whisper/model/model.tar.gz"
!aws s3 cp model.tar.gz $s3_location

Create a SageMaker asynchronous endpoint

Configure an asynchronous endpoint for deploying the model on SageMaker using the provided asynchronous inference configuration:

from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
from sagemaker.s3 import s3_path_join
from sagemaker.utils import name_from_base

async_endpoint_name = name_from_base("custom-asyc")

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
    model_data=s3_location,  # path to your model and script
    role=role,  # iam role with permissions to create an Endpoint
    transformers_version="4.17",  # transformers version used
    pytorch_version="1.10",  # pytorch version used
    py_version="py38",  # python version used
)

# create async endpoint configuration
async_config = AsyncInferenceConfig(
    output_path=s3_path_join(
        "s3://", sagemaker_session_bucket, "async_inference/output"
    ),  # Where our results will be stored
    # Add nofitication SNS if needed
    notification_config={
        # "SuccessTopic": "PUT YOUR SUCCESS SNS TOPIC ARN",
        # "ErrorTopic": "PUT YOUR ERROR SNS TOPIC ARN",
    },  #  Notification configuration
)

env = {"MODEL_SERVER_WORKERS": "2"}

# deploy the endpoint endpoint
async_predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type="ml.xx",
    async_inference_config=async_config,
    endpoint_name=async_endpoint_name,
    env=env,
)

Test the endpoint

Evaluate the endpoint functionality by sending an audio file for diarization and retrieving the JSON output stored in the specified S3 output path:

# Replace with a path to audio object in S3
from sagemaker.async_inference import WaiterConfig
res = async_predictor.predict_async(data=data)
print(f"Response output path: {res.output_path}")
print("Start Polling to get response:")

config = WaiterConfig(
  max_attempts=10, #  number of attempts
  delay=10#  time in seconds to wait between attempts
  )
res.get_result(config)
#import waiterconfig

To deploy this solution at scale, we suggest using AWS Lambda, Amazon Simple Notification Service (Amazon SNS), or Amazon Simple Queue Service (Amazon SQS). These services are designed for scalability, event-driven architectures, and efficient resource utilization. They can help decouple the asynchronous inference process from the result processing, allowing you to scale each component independently and handle bursts of inference requests more effectively.

Results

Model output is stored at s3://sagemaker-xxxx /async_inference/output/. The output shows that the audio recording has been segmented into three columns:

  • Start (start time in seconds)
  • End (end time in seconds)
  • Speaker (speaker label)

The following code shows an example of our results:

[0.9762308998, 8.9049235993, "SPEAKER_01"]

[9.533106961, 12.1646859083, "SPEAKER_01"]

[13.1324278438, 13.9303904924, "SPEAKER_00"]

[14.3548387097, 26.1884550085, "SPEAKER_00"]

[27.2410865874, 28.2258064516, "SPEAKER_01"]

[28.3446519525, 31.298811545, "SPEAKER_01"]

Clean up

You can set a scaling policy to zero by setting MinCapacity to 0; asynchronous inference lets you auto scale to zero with no requests. You don’t need to delete the endpoint, it scales from zero when needed again, reducing costs when not in use. See the following code:

# Common class representing application autoscaling for SageMaker 
client = boto3.client('application-autoscaling') 

# This is the format in which application autoscaling references the endpoint
resource_id='endpoint/' + <endpoint_name> + '/variant/' + <'variant1'> 

# Define and register your endpoint variant
response = client.register_scalable_target(
    ServiceNamespace='sagemaker', 
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', # The number of EC2 instances for your Amazon SageMaker model endpoint variant.
    MinCapacity=0,
    MaxCapacity=5
)

If you want to delete the endpoint, use the following code:

async_predictor.delete_endpoint(async_endpoint_name)

Benefits of asynchronous endpoint deployment

This solution offers the following benefits:

  • The solution can efficiently handle multiple or large audio files.
  • This example uses a single instance for demonstration. If you want to use this solution for hundreds or thousands of videos and use an asynchronous endpoint to process across multiple instances, you can use an auto scaling policy, which is designed for a large number of source documents. Auto scaling dynamically adjusts the number of instances provisioned for a model in response to changes in your workload.
  • The solution optimizes resources and reduces system load by separating long-running tasks from real-time inference.

Conclusion

In this post, we provided a straightforward approach to deploy Hugging Face’s speaker diarization model on SageMaker using Python scripts. Using an asynchronous endpoint provides an efficient and scalable means to deliver diarization predictions as a service, accommodating concurrent requests seamlessly.

Get started today with asynchronous speaker diarization for your audio projects. Reach out in the comments if you have any questions about getting your own asynchronous diarization endpoint up and running.


About the Authors

Sanjay Tiwary is a Specialist Solutions Architect AI/ML who spends his time working with strategic customers to define business requirements, provide L300 sessions around specific use cases, and design AI/ML applications and services that are scalable, reliable, and performant. He has helped launch and scale the AI/ML powered Amazon SageMaker service and has implemented several proofs of concept using Amazon AI services. He has also developed the advanced analytics platform as a part of the digital transformation journey.

Kiran Challapalli is a deep tech business developer with the AWS public sector. He has more than 8 years of experience in AI/ML and 23 years of overall software development and sales experience. Kiran helps public sector businesses across India explore and co-create cloud-based solutions that use AI, ML, and generative AI—including large language models—technologies.

Read More