I've witnessed the development of AI this year, especially on AWS. Simple prompt engineering and Retrieval Augmented Generation (RAG) have given way to autonomous agents. These programs are capable of reasoning, planning, and carrying out tasks, turning abstract objectives into tangible actions.
Instead of using monolithic models, generative AI can solve complex problems by coordinating a number of specialized agents. Using AWS Step Functions for orchestration and Amazon Bedrock Agents for task execution, this article will walk you through creating such a workflow.
The Transition to Agentic AI: Why It's Important Now
Conventional applications of generative AI frequently include:
- A prompt is user input.
- RAG (Optional): Knowledge base context.
- LLM Call: Producing an answer.
This is useful for responding to enquiries, but what if the user wants to take action? "Summarize the last quarter's sales data, identify the top 3 underperforming products, and create a draft email to their respective product managers suggesting a review" is an example of what might happen.
This necessitates:
- Information retrieval: Getting sales information.
- Analysis: Finding underachievers.
- Writing emails is the action.
This multi-step, action-oriented process is difficult for a single LLM call to handle. Bedrock Agents excel in this situation. An agent can be given a high-level objective, evaluate it, decide on a course of action, and then employ predefined "tools" (APIs) to accomplish that objective.
However, what happens if you require the cooperation of several agents? What if their tasks require human approval, conditional logic, error handling, and a particular sequence? This is the point at which AWS Step Functions become essential.
Overview of Architecture: Managing Independent Workflows
An "Order Processing" workflow will be used in our example scenario:
- A fresh order arrives.
- An "Order Validation Agent" uses an API to compare order details to the product catalogue.
- A "Shipping Agent" communicates with a shipping provider API if it is legitimate.
- An alert is sent by a "Notification Agent" if any problems occur (validation failed, shipping error, etc.).
Parts:
- Order Ingestion: New order requests are sent to an API Gateway endpoint or an Amazon SQS queue.
- Step Functions on AWS The central orchestrator is the State Machine. It manages conditional logic, parallel execution, retries, and flow definition.
- Bedrock Agents on Amazon:
- Order Validation Agent: Verifies product IDs, stock, and prices by interacting with a Product Catalogue API (such as a Lambda function).
- Shipping Agent: Creates shipments by interacting with an API from an external shipping provider.
- Notification Agent: A more straightforward agent that can log to DynamoDB based on event details or send alerts via SNS.
- AWS Lambda: Provides the "tools" (APIs) for Bedrock Agents, serving as a safe bridge between the agent and internal data stores or external services.
- Amazon CloudWatch: For tracking and recording every step of the process.
Methodical Execution
Let's dissect the implementation using Python/Boto3 for agent definitions and interactions and CloudFormation for infrastructure.
- Describe the AWS Lambda Functions Agent Tools. Every Bedrock Agent must have access to "tools"—API functions that it can use to carry out tasks. These will be implemented as AWS Lambda functions.
a) Lambda Product Catalogue (for Order Validation Agent) Checking a product catalogue is simulated by this Lambda.
# product_catalog_lambda.py
import json
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
action_group = event['actionGroup']
api_path = event['apiPath']
http_method = event['httpMethod']
parameters = event['parameters']
response_body = {}
if api_path == '/products/{productId}':
product_id = next(p['value'] for p in parameters if p['name'] == 'productId')
if product_id == 'PROD123':
response_body = {
"productName": "AWS Widget Pro",
"price": 99.99,
"inStock": True
}
elif product_id == 'PROD456':
response_body = {
"productName": "Cloud Gadget",
"price": 49.99,
"inStock": False
}
else:
response_body = {
"message": "Product not found"
}
else:
response_body = {
"message": f"Unknown API path: {api_path}"
}
return {
'body': json.dumps(response_body),
'statusCode': 200,
'applicationContentType': 'application/json'
}
b) Shipping Provider Lambda (for Shipping Agent) This Lambda simulates interacting with an external shipping API.
import json
import random
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
action_group = event['actionGroup']
api_path = event['apiPath']
http_method = event['httpMethod']
parameters = event['parameters']
request_body = json.loads(event['requestBody']['content']['application/json']['properties']['requestBody'])
response_body = {}
if api_path == '/shipments':
# Extract details from request_body, e.g., request_body['orderId'], request_body['items']
order_id = request_body.get('orderId', 'UNKNOWN')
# Simulate an external API call
if random.random() < 0.1: # 10% chance of failure
response_body = {
"message": f"Failed to create shipment for Order {order_id}",
"status": "FAILED",
"errorCode": "EXTERNAL_API_ERROR"
}
status_code = 500
else:
tracking_number = f"TRACK-{order_id}-{random.randint(1000, 9999)}"
response_body = {
"message": f"Shipment created successfully for Order {order_id}",
"trackingNumber": tracking_number,
"status": "CREATED"
}
status_code = 200
else:
response_body = {
"message": f"Unknown API path: {api_path}"
}
status_code = 404
return {
'body': json.dumps(response_body),
'statusCode': status_code,
'applicationContentType': 'application/json'
}
- Create the Bedrock Agents Each agent requires:
- An Agent Role with permissions to invoke the LLM and the Lambda tools.
- An Agent Definition specifying the foundation model, a description, and the action groups (tools).
- Action Groups: Link to the Lambda functions and their OpenAPI schemas.
a) CloudFormation for Agent Roles and Lambda Permissions
AWSTemplateFormatVersion: '2010-09-09'
Description: AWS Bedrock Agents and supporting Lambda functions
Resources:
ProductCatalogLambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: ProductCatalogLambda
Handler: product_catalog_lambda.lambda_handler
Runtime: python3.9
Code:
ZipFile: |
import json
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
action_group = event['actionGroup']
api_path = event['apiPath']
http_method = event['httpMethod']
parameters = event['parameters']
response_body = {}
if api_path == '/products/{productId}':
product_id = next(p['value'] for p in parameters if p['name'] == 'productId')
if product_id == 'PROD123':
response_body = {
"productName": "AWS Widget Pro",
"price": 99.99,
"inStock": True
}
elif product_id == 'PROD456':
response_body = {
"productName": "Cloud Gadget",
"price": 49.99,
"inStock": False
}
else:
response_body = {
"message": "Product not found"
}
else:
response_body = {
"message": f"Unknown API path: {api_path}"
}
return {
'body': json.dumps(response_body),
'statusCode': 200,
'applicationContentType': 'application/json'
}
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 30
ShippingProviderLambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: ShippingProviderLambda
Handler: shipping_provider_lambda.lambda_handler
Runtime: python3.9
Code:
ZipFile: |
import json
import random
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
action_group = event['actionGroup']
api_path = event['apiPath']
http_method = event['httpMethod']
parameters = event['parameters']
request_body = json.loads(event['requestBody']['content']['application/json']['properties']['requestBody'])
response_body = {}
if api_path == '/shipments':
order_id = request_body.get('orderId', 'UNKNOWN')
if random.random() < 0.1:
response_body = {
"message": f"Failed to create shipment for Order {order_id}",
"status": "FAILED",
"errorCode": "EXTERNAL_API_ERROR"
}
status_code = 500
else:
tracking_number = f"TRACK-{order_id}-{random.randint(1000, 9999)}"
response_body = {
"message": f"Shipment created successfully for Order {order_id}",
"trackingNumber": tracking_number,
"status": "CREATED"
}
status_code = 200
else:
response_body = {
"message": f"Unknown API path: {api_path}"
}
status_code = 404
return {
'body': json.dumps(response_body),
'statusCode': status_code,
'applicationContentType': 'application/json'
}
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 30
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
BedrockAgentServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: bedrock.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: BedrockAgentPermissions
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- bedrock:InvokeModel # Allow agents to use Bedrock LLMs
Resource:
- !Sub "arn:aws:bedrock:${AWS::Region}::foundation-model/anthropic.claude-v2" # or your preferred model
- Effect: Allow
Action:
- lambda:InvokeFunction # Allow agents to call their tool Lambdas
Resource:
- !GetAtt ProductCatalogLambda.Arn
- !GetAtt ShippingProviderLambda.Arn
- Effect: Allow
Action:
- sns:Publish
- dynamodb:PutItem
Resource: "*"
b) Agent Creation (Boto3/CLI) Since Bedrock Agents are often managed programmatically due to their complexity (OpenAPI schemas, etc.), we'll use Boto3.
First, define the OpenAPI schemas for our Lambda functions:
product_catalog_openapi.yaml
openapi: 3.0.0
info:
title: ProductCatalogAPI
version: 1.0.0
paths:
/products/{productId}:
get:
summary: Get product details
description: Retrieves details for a specific product ID.
operationId: getProductDetails
parameters:
- name: productId
in: path
required: true
schema:
type: string
description: The ID of the product to retrieve.
responses:
'200':
description: Product details
content:
application/json:
schema:
type: object
properties:
productName:
type: string
price:
type: number
inStock:
type: boolean
'404':
description: Product not found
shipping_provider_openapi.yaml
openapi: 3.0.0
info:
title: ShippingProviderAPI
version: 1.0.0
paths:
/shipments:
post:
summary: Create a new shipment
description: Creates a new shipment with the specified order details.
operationId: createShipment
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
orderId:
type: string
description: The ID of the order to ship.
items:
type: array
items:
type: object
properties:
productId:
type: string
quantity:
type: integer
price:
type: number
required:
- orderId
- items
responses:
'200':
description: Shipment created successfully
content:
application/json:
schema:
type: object
properties:
message:
type: string
trackingNumber:
type: string
status:
type: string
'500':
description: Failed to create shipment
Now, create the agents using Boto3 (Python):
import boto3
import json
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
agent_role_arn = "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/BedrockAgentServiceRole" # From CloudFormation Output
product_catalog_lambda_arn = "arn:aws:lambda:<REGION>:<YOUR_ACCOUNT_ID>:function:ProductCatalogLambda"
shipping_provider_lambda_arn = "arn:aws:lambda:<REGION>:<YOUR_ACCOUNT_ID>:function:ShippingProviderLambda"
with open('product_catalog_openapi.yaml', 'r') as f:
product_catalog_schema = f.read()
def create_order_validation_agent():
try:
response = bedrock_agent_client.create_agent(
agentName='OrderValidationAgent',
agentResourceRoleArn=agent_role_arn,
description='Agent for validating product orders against a product catalog.',
foundationModel='anthropic.claude-v2', # Or your preferred model
idleSessionTTLInSeconds=1800,
instruction="You are an order validation assistant. Your goal is to verify product details using the provided product catalog tool. If a product is not found or out of stock, report it as an error."
)
agent_id = response['agent']['agentId']
print(f"Created OrderValidationAgent with ID: {agent_id}")
response = bedrock_agent_client.create_agent_action_group(
agentId=agent_id,
agentVersion='DRAFT', # Always create on DRAFT
actionGroupName='ProductCatalogActionGroup',
description='Provides operations to check product details.',
actionGroupExecutor={'lambda': product_catalog_lambda_arn},
apiSchema={'payload': product_catalog_schema}
)
print(f"Created action group for ProductCatalogLambda: {response['agentActionGroup']['actionGroupId']}")
response = bedrock_agent_client.prepare_agent(agentId=agent_id)
print(f"Prepared OrderValidationAgent: {response['agent']['agentStatus']}")
return agent_id
except Exception as e:
print(f"Error creating Order Validation Agent: {e}")
return None
with open('shipping_provider_openapi.yaml', 'r') as f:
shipping_provider_schema = f.read()
def create_shipping_agent():
try:
response = bedrock_agent_client.create_agent(
agentName='ShippingAgent',
agentResourceRoleArn=agent_role_arn,
description='Agent for creating shipments using an external shipping provider API.',
foundationModel='anthropic.claude-v2',
idleSessionTTLInSeconds=1800,
instruction="You are a shipping assistant. Your goal is to create shipments for orders using the shipping provider tool. If the shipment creation fails, report the error."
)
agent_id = response['agent']['agentId']
print(f"Created ShippingAgent with ID: {agent_id}")
response = bedrock_agent_client.create_agent_action_group(
agentId=agent_id,
agentVersion='DRAFT',
actionGroupName='ShippingProviderActionGroup',
description='Provides operations to create shipments.',
actionGroupExecutor={'lambda': shipping_provider_lambda_arn},
apiSchema={'payload': shipping_provider_schema}
)
print(f"Created action group for ShippingProviderLambda: {response['agentActionGroup']['actionGroupId']}")
response = bedrock_agent_client.prepare_agent(agentId=agent_id)
print(f"Prepared ShippingAgent: {response['agent']['agentStatus']}")
return agent_id
except Exception as e:
print(f"Error creating Shipping Agent: {e}")
return None
if __name__ == "__main__":
validation_agent_id = create_order_validation_agent()
shipping_agent_id = create_shipping_agent()
print(f"\nOrder Validation Agent ID: {validation_agent_id}")
print(f"Shipping Agent ID: {shipping_agent_id}")
- Orchestrate with AWS Step Functions Now, the magic of orchestration. Step Functions allows us to define the entire workflow visually and programmatically using Amazon States Language (ASL).
AWSTemplateFormatVersion: '2010-09-09'
Description: AWS Step Functions State Machine for Autonomous Order Processing
Parameters:
OrderValidationAgentId:
Type: String
Description: The ID of the Bedrock Order Validation Agent.
ShippingAgentId:
Type: String
Description: The ID of the Bedrock Shipping Agent.
Resources:
AutonomousOrderProcessingStateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: AutonomousOrderProcessingWorkflow
DefinitionString: !Sub |
{
"Comment": "Autonomous Order Processing Workflow using Bedrock Agents",
"StartAt": "InitializeOrder",
"States": {
"InitializeOrder": {
"Type": "Pass",
"Result": {
"orderId.$": "$.orderId",
"items.$": "$.items",
"validationResult": null,
"shippingResult": null
},
"Next": "InvokeOrderValidationAgent"
},
"InvokeOrderValidationAgent": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeAgent",
"Parameters": {
"AgentId": "${OrderValidationAgentId}",
"AgentAliasId": "TSTALIAS", # Use TSTALIAS for the DRAFT version
"InputText.$": "States.Format('Validate the following order details: Order ID {}. Items: {}.', $.orderId, States.JsonToString($.items))",
"SessionState": {
"sessionAttributes": {
"orderId.$": "$.orderId"
}
},
"EnableTrace": true
},
"ResultPath": "$.validationResult",
"Catch": [
{
"ErrorEquals": ["States.TaskFailed", "Bedrock.InvokeAgent.Failed"],
"Next": "OrderValidationFailed"
}
],
"Next": "CheckValidationStatus"
},
"CheckValidationStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.validationResult.output.text",
"StringContains": "Validation successful",
"Next": "InvokeShippingAgent"
},
{
"Variable": "$.validationResult.output.text",
"StringContains": "Product not found",
"Next": "OrderValidationFailed"
},
{
"Variable": "$.validationResult.output.text",
"StringContains": "out of stock",
"Next": "OrderValidationFailed"
}
],
"Default": "OrderValidationFailed" # Catch-all for unexpected validation results
},
"OrderValidationFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish", # Example Notification Action
"Parameters": {
"TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic",
"Message": !Sub "Order ${$.orderId} failed validation. Reason: ${$.validationResult.output.text}"
},
"End": true
},
"InvokeShippingAgent": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeAgent",
"Parameters": {
"AgentId": "${ShippingAgentId}",
"AgentAliasId": "TSTALIAS",
"InputText.$": "States.Format('Create a shipment for order {} with the following items: {}.', $.orderId, States.JsonToString($.items))",
"SessionState": {
"sessionAttributes": {
"orderId.$": "$.orderId",
"items.$": "$.items"
}
},
"EnableTrace": true
},
"ResultPath": "$.shippingResult",
"Catch": [
{
"ErrorEquals": ["States.TaskFailed", "Bedrock.InvokeAgent.Failed"],
"Next": "ShippingFailed"
}
],
"Next": "CheckShippingStatus"
},
"CheckShippingStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.shippingResult.output.text",
"StringContains": "Shipment created successfully",
"Next": "CompleteOrder"
}
],
"Default": "ShippingFailed"
},
"ShippingFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic",
"Message": !Sub "Order ${$.orderId} failed shipping. Reason: ${$.shippingResult.output.text}"
},
"End": true
},
"CompleteOrder": {
"Type": "Succeed"
}
}
}
RoleArn: !GetAtt StepFunctionsExecutionRole.Arn
StepFunctionsExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: states.${AWS::Region}.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: StepFunctionsPermissions
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- bedrock:InvokeAgent
Resource:
- !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent/${OrderValidationAgentId}"
- !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent/${ShippingAgentId}"
- !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent-alias/${OrderValidationAgentId}/*" # Allow invoking specific agent aliases
- !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent-alias/${ShippingAgentId}/*"
- Effect: Allow # For SNS notifications
Action:
- sns:Publish
Resource: "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic" # Ensure this topic exists or create it
- Kicking off the Workflow You can start a Step Functions execution via the AWS CLI or SDK:
aws stepfunctions start-execution \
--state-machine-arn "arn:aws:states:<REGION>:<YOUR_ACCOUNT_ID>:stateMachine:AutonomousOrderProcessingWorkflow" \
--input '{
"orderId": "ORD789",
"items": [
{"productId": "PROD123", "quantity": 1, "price": 99.99},
{"productId": "PROD456", "quantity": 2, "price": 49.99}
]
}'
Benefits of this Approach
- Modularity: Each agent is a specialist, reducing the cognitive load on a single LLM and allowing for easier updates and maintenance.
- Observability: Step Functions provides a visual workflow, detailed execution history, and CloudWatch Logs integration, making it easy to debug complex AI processes.
- Reliability: Built-in retry mechanisms, error handling, and parallel execution capabilities of Step Functions ensure resilience.
- Scalability: Both Bedrock Agents and Step Functions are fully managed, scaling automatically to meet demand.
- Security: IAM roles control permissions granularly, ensuring agents only access the resources they need.
- Cost-Effectiveness: You only pay for the state transitions in Step Functions and the Bedrock agent inv ocations.
Considerations for Production
- Version Control: Manage Bedrock agent versions carefully. Step Functions should typically invoke a specific, published agent version, not DRAFT for production.
- Input Validation: Implement robust input validation at the Step Functions entry point and within your Lambda tools.
- Human-in-the-Loop: For critical workflows, consider adding a human approval step in Step Functions, perhaps by sending a task to an Amazon SQS queue that a human operator monitors.
- Prompt Engineering: The instruction given to your Bedrock agents is crucial for their performance. Experiment with clear, concise instructions and few-shot examples if necessary.
- Tool Design: Design your Lambda tools to be idempotent and handle various edge cases, as the LLM might call them with unexpected inputs.
Conclusion
As we push the boundaries of Generative AI, the ability to build and orchestrate autonomous agents becomes a cornerstone of truly intelligent applications. By combining the reasoning and task execution capabilities of Amazon Bedrock Agents with the robust workflow management of AWS Step Functions, you can construct complex, multi-step processes that move far beyond simple conversational AI.
This architecture empowers developers to build sophisticated, resilient, and scalable AI solutions that can perform real-world actions, marking a significant leap forward in the journey of cloud-native AI. Dive in, experiment, and unleash the full potential of agentic AI on AWS!
Top comments (0)