Hi everyone,
AWS recently released bedrock integration to step functions. They are providing InvokeModel **and **CreateModelCustomizationJob **states in the step functions. If you can search in the search bar of the step function with the **Bedrock name it will provide a list of the actions available.
Now we can create the workflows by invoking the models available in the Amazon Bedrock from the step functions.
If you are not aware of the embeddings and bedrock service, refer to my previous article using this link
Building a vector-based search engine using Amazon Bedrock and Amazon Open Search Service
Let’s dive into the article
In this article, we are going to create a state machine workflow to generate embeddings from a CSV file stored in an S3 bucket and store the generated embeddings in the OpenSearch index for further processing.
AWS services we are going to use in this experiment
S3 bucket for storing the data
Step function to create a workflow for generating embeddings and storing them OpenSearch index
BedRock contains the models for generating the embeddings
Lambda for storing the generating embedding in the OpenSearch
OpenSearch for storing the embeddings for implementing semantic search
Step 1: Collecting and storing the data:
For this experiment, using this link I downloaded a dataset containing a set of movie info https://www.kaggle.com/datasets/kayscrapes/movie-dataset
Create an S3 bucket and upload the CSV file to the bucket.
Step 2: Create an open Search Service cluster
For storing the embeddings we will create an open search cluster
Visit the open search service from the AWS search bar
Click on the Create Domain button
Provide the Name for the domain, choose standard create, and select dev/test template
- Deployment will be without standby as we are not doing this for production purposes.
- From the general purpose instances select t3.small.search instances, as we are experimenting and nodes will only have 1
- Instead of VPC deploy it publicly and provide a master username and password
- Configure the access policy to allow all to access OpenSearch dashboards and endpoints. But for production make it according to your security requirements
Click on Create Domain and wait for a few minutes for the cluster to come online
Once the cluster is ready copy the open search endpoints from the dashboard to use in the Lambda function
Visit the OpenSearch dashboard and create an index for storing the data
Visit dev tools ****from the dashboard and use the following code to create an index
PUT contents
{
"settings": {
"index.knn": true
},
"mappings": {
"properties": {
"Summary":{
"type": "text"
},
"Title": {
"type": "text"
},
"Embedding": {
"type": "knn_vector",
"dimension": 1536
}
}
}
}
Step 3: Create a Lambda function for storing the embeddings
Visit the Lambda service and create a Lambda function with the Python 3.9 environment
Here is the lambda code
import boto3
import requests
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
# Extract the relevant data
summary = event['Summary']#summary column of csv file
title = event['Title']#Title column of the csv file
embedding = event['output']['Body']['embedding'] #contains embedding generataed for Summary columns
# Define the document to be indexed
document = {
'Summary': summary,
'Title': title,
'Embedding': embedding
}
#Username and password of the opensearch endpoint
auth = ('username',"password")
# OpenSearch domain endpoint
opensearch_domain_endpoint = "https://search-contents-oflzhkvsjgukdwvszyd5erztza.us-east-1.es.amazonaws.com" # e.g., https://search-mydomain.us-west-1.es.amazonaws.com
index_name = 'contents'
url = f"{opensearch_domain_endpoint}/{index_name}/_doc"
headers = { "Content-Type": "application/json" }
# Make the signed HTTP request
response = requests.post(url, auth=auth, json=document, headers=headers)
return {
'statusCode': 200,
'body': response.text
}
- This is how we are going to send the data to Lambda from the state machine
{
"Summary": "value1",
"Title": "value2",
"output": {
"Body": {
"embedding": "[0,1,2,3]"
}
}
}
Step 4: Create a state machine in step functions
Visit step functions from the AWS search bar
From the flow, we are using **Map **for iterating through the CSV file records
- From the bedrock section Invoke Model action
- **Invoke Lambda **action for sending embeddings to Open Search
- Create a workflow like the below picture. You can drag and drop items from the left menu
After creating, configure each state like this
Map: configure it to use the S3 bucket CSV file we stored in the first step and make it iterate through the **Summary, Title **Columns of the document
Output of the Map function to another s3 bucket for avoiding limit exceed errors from the state machine
- Configure **the Invoke Model **state to use **amazon.titan-embed-text-v1 **model and input from the Map function like this
- Configure the **Lambda Invoke **state with the function name
- This is the final code of the state machine after configuring all the states
{
"Comment": "A description of my state machine",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "Invoke Model",
"States": {
"Invoke Model": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId": "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1",
"Body": {
"inputText.$": "$.Summary"
}
},
"Next": "Lambda Invoke",
"ResultPath": "$.output"
},
"Lambda Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:556343216872:function:dump-embeddings:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"End": true
}
}
},
"Label": "Map",
"MaxConcurrency": 10,
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "GIVEN",
"MaxItems": 10,
"CSVHeaders": [
"Summary",
"Title"
]
},
"Parameters": {
"Bucket": "netflix-titles-csv",
"Key": "Hydra-Movie-Scrape.csv"
}
},
"End": true,
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "embeddings-ouput",
"Prefix": "output"
}
}
}
}
}
Step 5: Execute the state machine workflow and check the output
From the step function console top right corner click on the **Execute **button
Leave the input of the state machine as it is and click on execute
After the successful execution of the workflow, you can see the results like this
- Visit the OpenSearch dashboard and click on Query Workbench to execute the query and check whether embeddings are stored or not
- As embeddings are a very large dataset they won’t be visible on the dashboard
That’s it. We successfully create a workflow to generate embedding using a bedrock model.
If you want to use the generated embeddings for further implementing search engine, you can visit the link provided at the start of the article. It will contain the steps on how to build the vector search engine in OpenSearch
If you are having any doubts or need more clarification or help, Please feel free to comment on this article. Will surely get back to you.
Thanks :)
Top comments (0)