DEV Community

Beyond Chatbots: Building Autonomous Multi-Agent Workflows with Amazon Bedrock and Step Functions

I've witnessed the development of AI this year, especially on AWS. Simple prompt engineering and Retrieval Augmented Generation (RAG) have given way to autonomous agents. These programs are capable of reasoning, planning, and carrying out tasks, turning abstract objectives into tangible actions.

Instead of using monolithic models, generative AI can solve complex problems by coordinating a number of specialized agents. Using AWS Step Functions for orchestration and Amazon Bedrock Agents for task execution, this article will walk you through creating such a workflow.

The Transition to Agentic AI: Why It's Important Now

Conventional applications of generative AI frequently include:

  • A prompt is user input.
  • RAG (Optional): Knowledge base context.
  • LLM Call: Producing an answer.

This is useful for responding to enquiries, but what if the user wants to take action? "Summarize the last quarter's sales data, identify the top 3 underperforming products, and create a draft email to their respective product managers suggesting a review" is an example of what might happen.

This necessitates:

  • Information retrieval: Getting sales information.
  • Analysis: Finding underachievers.
  • Writing emails is the action.

This multi-step, action-oriented process is difficult for a single LLM call to handle. Bedrock Agents excel in this situation. An agent can be given a high-level objective, evaluate it, decide on a course of action, and then employ predefined "tools" (APIs) to accomplish that objective.

However, what happens if you require the cooperation of several agents? What if their tasks require human approval, conditional logic, error handling, and a particular sequence? This is the point at which AWS Step Functions become essential.

Overview of Architecture: Managing Independent Workflows

An "Order Processing" workflow will be used in our example scenario:

  1. A fresh order arrives.
  2. An "Order Validation Agent" uses an API to compare order details to the product catalogue.
  3. A "Shipping Agent" communicates with a shipping provider API if it is legitimate.
  4. An alert is sent by a "Notification Agent" if any problems occur (validation failed, shipping error, etc.).

Parts:

  • Order Ingestion: New order requests are sent to an API Gateway endpoint or an Amazon SQS queue.
  • Step Functions on AWS The central orchestrator is the State Machine. It manages conditional logic, parallel execution, retries, and flow definition.
  • Bedrock Agents on Amazon:
    • Order Validation Agent: Verifies product IDs, stock, and prices by interacting with a Product Catalogue API (such as a Lambda function).
    • Shipping Agent: Creates shipments by interacting with an API from an external shipping provider.
    • Notification Agent: A more straightforward agent that can log to DynamoDB based on event details or send alerts via SNS.
  • AWS Lambda: Provides the "tools" (APIs) for Bedrock Agents, serving as a safe bridge between the agent and internal data stores or external services.
  • Amazon CloudWatch: For tracking and recording every step of the process.

Methodical Execution

Let's dissect the implementation using Python/Boto3 for agent definitions and interactions and CloudFormation for infrastructure.

  1. Describe the AWS Lambda Functions Agent Tools. Every Bedrock Agent must have access to "tools"—API functions that it can use to carry out tasks. These will be implemented as AWS Lambda functions.

a) Lambda Product Catalogue (for Order Validation Agent) Checking a product catalogue is simulated by this Lambda.

# product_catalog_lambda.py
import json

def lambda_handler(event, context):
    print(f"Received event: {json.dumps(event)}")
    action_group = event['actionGroup']
    api_path = event['apiPath']
    http_method = event['httpMethod']
    parameters = event['parameters']

    response_body = {}

    if api_path == '/products/{productId}':
        product_id = next(p['value'] for p in parameters if p['name'] == 'productId')
        if product_id == 'PROD123':
            response_body = {
                "productName": "AWS Widget Pro",
                "price": 99.99,
                "inStock": True
            }
        elif product_id == 'PROD456':
             response_body = {
                "productName": "Cloud Gadget",
                "price": 49.99,
                "inStock": False 
            }
        else:
            response_body = {
                "message": "Product not found"
            }
    else:
        response_body = {
            "message": f"Unknown API path: {api_path}"
        }

    return {
        'body': json.dumps(response_body),
        'statusCode': 200,
        'applicationContentType': 'application/json'
    }
Enter fullscreen mode Exit fullscreen mode

b) Shipping Provider Lambda (for Shipping Agent) This Lambda simulates interacting with an external shipping API.

import json
import random

def lambda_handler(event, context):
    print(f"Received event: {json.dumps(event)}")
    action_group = event['actionGroup']
    api_path = event['apiPath']
    http_method = event['httpMethod']
    parameters = event['parameters']
    request_body = json.loads(event['requestBody']['content']['application/json']['properties']['requestBody'])

    response_body = {}

    if api_path == '/shipments':
        # Extract details from request_body, e.g., request_body['orderId'], request_body['items']
        order_id = request_body.get('orderId', 'UNKNOWN')
        # Simulate an external API call
        if random.random() < 0.1: # 10% chance of failure
            response_body = {
                "message": f"Failed to create shipment for Order {order_id}",
                "status": "FAILED",
                "errorCode": "EXTERNAL_API_ERROR"
            }
            status_code = 500
        else:
            tracking_number = f"TRACK-{order_id}-{random.randint(1000, 9999)}"
            response_body = {
                "message": f"Shipment created successfully for Order {order_id}",
                "trackingNumber": tracking_number,
                "status": "CREATED"
            }
            status_code = 200
    else:
        response_body = {
            "message": f"Unknown API path: {api_path}"
        }
        status_code = 404

    return {
        'body': json.dumps(response_body),
        'statusCode': status_code,
        'applicationContentType': 'application/json'
    }
Enter fullscreen mode Exit fullscreen mode
  1. Create the Bedrock Agents Each agent requires:
  • An Agent Role with permissions to invoke the LLM and the Lambda tools.
  • An Agent Definition specifying the foundation model, a description, and the action groups (tools).
  • Action Groups: Link to the Lambda functions and their OpenAPI schemas.

a) CloudFormation for Agent Roles and Lambda Permissions

AWSTemplateFormatVersion: '2010-09-09'
Description: AWS Bedrock Agents and supporting Lambda functions

Resources:
  ProductCatalogLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: ProductCatalogLambda
      Handler: product_catalog_lambda.lambda_handler
      Runtime: python3.9
      Code:
        ZipFile: |
          import json
          def lambda_handler(event, context):
              print(f"Received event: {json.dumps(event)}")
              action_group = event['actionGroup']
              api_path = event['apiPath']
              http_method = event['httpMethod']
              parameters = event['parameters']
              response_body = {}
              if api_path == '/products/{productId}':
                  product_id = next(p['value'] for p in parameters if p['name'] == 'productId')
                  if product_id == 'PROD123':
                      response_body = {
                          "productName": "AWS Widget Pro",
                          "price": 99.99,
                          "inStock": True
                      }
                  elif product_id == 'PROD456':
                      response_body = {
                          "productName": "Cloud Gadget",
                          "price": 49.99,
                          "inStock": False
                      }
                  else:
                      response_body = {
                          "message": "Product not found"
                      }
              else:
                  response_body = {
                      "message": f"Unknown API path: {api_path}"
                  }
              return {
                  'body': json.dumps(response_body),
                  'statusCode': 200,
                  'applicationContentType': 'application/json'
              }
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 30

  ShippingProviderLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: ShippingProviderLambda
      Handler: shipping_provider_lambda.lambda_handler
      Runtime: python3.9
      Code:
        ZipFile: |
          import json
          import random
          def lambda_handler(event, context):
              print(f"Received event: {json.dumps(event)}")
              action_group = event['actionGroup']
              api_path = event['apiPath']
              http_method = event['httpMethod']
              parameters = event['parameters']
              request_body = json.loads(event['requestBody']['content']['application/json']['properties']['requestBody'])
              response_body = {}
              if api_path == '/shipments':
                  order_id = request_body.get('orderId', 'UNKNOWN')
                  if random.random() < 0.1:
                      response_body = {
                          "message": f"Failed to create shipment for Order {order_id}",
                          "status": "FAILED",
                          "errorCode": "EXTERNAL_API_ERROR"
                      }
                      status_code = 500
                  else:
                      tracking_number = f"TRACK-{order_id}-{random.randint(1000, 9999)}"
                      response_body = {
                          "message": f"Shipment created successfully for Order {order_id}",
                          "trackingNumber": tracking_number,
                          "status": "CREATED"
                      }
                      status_code = 200
              else:
                  response_body = {
                      "message": f"Unknown API path: {api_path}"
                  }
                  status_code = 404
              return {
                  'body': json.dumps(response_body),
                  'statusCode': status_code,
                  'applicationContentType': 'application/json'
              }
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 30

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  BedrockAgentServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: bedrock.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: BedrockAgentPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - bedrock:InvokeModel # Allow agents to use Bedrock LLMs
                Resource:
                  - !Sub "arn:aws:bedrock:${AWS::Region}::foundation-model/anthropic.claude-v2" # or your preferred model
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction # Allow agents to call their tool Lambdas
                Resource:
                  - !GetAtt ProductCatalogLambda.Arn
                  - !GetAtt ShippingProviderLambda.Arn
              - Effect: Allow
                Action:
                  - sns:Publish
                  - dynamodb:PutItem
                Resource: "*"
Enter fullscreen mode Exit fullscreen mode

b) Agent Creation (Boto3/CLI) Since Bedrock Agents are often managed programmatically due to their complexity (OpenAPI schemas, etc.), we'll use Boto3.

First, define the OpenAPI schemas for our Lambda functions:

product_catalog_openapi.yaml

openapi: 3.0.0
info:
  title: ProductCatalogAPI
  version: 1.0.0
paths:
  /products/{productId}:
    get:
      summary: Get product details
      description: Retrieves details for a specific product ID.
      operationId: getProductDetails
      parameters:
        - name: productId
          in: path
          required: true
          schema:
            type: string
          description: The ID of the product to retrieve.
      responses:
        '200':
          description: Product details
          content:
            application/json:
              schema:
                type: object
                properties:
                  productName:
                    type: string
                  price:
                    type: number
                  inStock:
                    type: boolean
        '404':
          description: Product not found
Enter fullscreen mode Exit fullscreen mode

shipping_provider_openapi.yaml

openapi: 3.0.0
info:
  title: ShippingProviderAPI
  version: 1.0.0
paths:
  /shipments:
    post:
      summary: Create a new shipment
      description: Creates a new shipment with the specified order details.
      operationId: createShipment
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                orderId:
                  type: string
                  description: The ID of the order to ship.
                items:
                  type: array
                  items:
                    type: object
                    properties:
                      productId:
                        type: string
                      quantity:
                        type: integer
                      price:
                        type: number
              required:
                - orderId
                - items
      responses:
        '200':
          description: Shipment created successfully
          content:
            application/json:
              schema:
                type: object
                properties:
                  message:
                    type: string
                  trackingNumber:
                    type: string
                  status:
                    type: string
        '500':
          description: Failed to create shipment
Enter fullscreen mode Exit fullscreen mode

Now, create the agents using Boto3 (Python):

import boto3
import json

bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

agent_role_arn = "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/BedrockAgentServiceRole" # From CloudFormation Output
product_catalog_lambda_arn = "arn:aws:lambda:<REGION>:<YOUR_ACCOUNT_ID>:function:ProductCatalogLambda"
shipping_provider_lambda_arn = "arn:aws:lambda:<REGION>:<YOUR_ACCOUNT_ID>:function:ShippingProviderLambda"

with open('product_catalog_openapi.yaml', 'r') as f:
    product_catalog_schema = f.read()

def create_order_validation_agent():
    try:
        response = bedrock_agent_client.create_agent(
            agentName='OrderValidationAgent',
            agentResourceRoleArn=agent_role_arn,
            description='Agent for validating product orders against a product catalog.',
            foundationModel='anthropic.claude-v2', # Or your preferred model
            idleSessionTTLInSeconds=1800,
            instruction="You are an order validation assistant. Your goal is to verify product details using the provided product catalog tool. If a product is not found or out of stock, report it as an error."
        )
        agent_id = response['agent']['agentId']
        print(f"Created OrderValidationAgent with ID: {agent_id}")

        response = bedrock_agent_client.create_agent_action_group(
            agentId=agent_id,
            agentVersion='DRAFT', # Always create on DRAFT
            actionGroupName='ProductCatalogActionGroup',
            description='Provides operations to check product details.',
            actionGroupExecutor={'lambda': product_catalog_lambda_arn},
            apiSchema={'payload': product_catalog_schema}
        )
        print(f"Created action group for ProductCatalogLambda: {response['agentActionGroup']['actionGroupId']}")

        response = bedrock_agent_client.prepare_agent(agentId=agent_id)
        print(f"Prepared OrderValidationAgent: {response['agent']['agentStatus']}")
        return agent_id
    except Exception as e:
        print(f"Error creating Order Validation Agent: {e}")
        return None

with open('shipping_provider_openapi.yaml', 'r') as f:
    shipping_provider_schema = f.read()

def create_shipping_agent():
    try:
        response = bedrock_agent_client.create_agent(
            agentName='ShippingAgent',
            agentResourceRoleArn=agent_role_arn,
            description='Agent for creating shipments using an external shipping provider API.',
            foundationModel='anthropic.claude-v2',
            idleSessionTTLInSeconds=1800,
            instruction="You are a shipping assistant. Your goal is to create shipments for orders using the shipping provider tool. If the shipment creation fails, report the error."
        )
        agent_id = response['agent']['agentId']
        print(f"Created ShippingAgent with ID: {agent_id}")

        response = bedrock_agent_client.create_agent_action_group(
            agentId=agent_id,
            agentVersion='DRAFT',
            actionGroupName='ShippingProviderActionGroup',
            description='Provides operations to create shipments.',
            actionGroupExecutor={'lambda': shipping_provider_lambda_arn},
            apiSchema={'payload': shipping_provider_schema}
        )
        print(f"Created action group for ShippingProviderLambda: {response['agentActionGroup']['actionGroupId']}")

        response = bedrock_agent_client.prepare_agent(agentId=agent_id)
        print(f"Prepared ShippingAgent: {response['agent']['agentStatus']}")
        return agent_id
    except Exception as e:
        print(f"Error creating Shipping Agent: {e}")
        return None

if __name__ == "__main__":
    validation_agent_id = create_order_validation_agent()
    shipping_agent_id = create_shipping_agent()

    print(f"\nOrder Validation Agent ID: {validation_agent_id}")
    print(f"Shipping Agent ID: {shipping_agent_id}")
Enter fullscreen mode Exit fullscreen mode
  1. Orchestrate with AWS Step Functions Now, the magic of orchestration. Step Functions allows us to define the entire workflow visually and programmatically using Amazon States Language (ASL).
AWSTemplateFormatVersion: '2010-09-09'
Description: AWS Step Functions State Machine for Autonomous Order Processing

Parameters:
  OrderValidationAgentId:
    Type: String
    Description: The ID of the Bedrock Order Validation Agent.
  ShippingAgentId:
    Type: String
    Description: The ID of the Bedrock Shipping Agent.

Resources:
  AutonomousOrderProcessingStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: AutonomousOrderProcessingWorkflow
      DefinitionString: !Sub |
        {
          "Comment": "Autonomous Order Processing Workflow using Bedrock Agents",
          "StartAt": "InitializeOrder",
          "States": {
            "InitializeOrder": {
              "Type": "Pass",
              "Result": {
                "orderId.$": "$.orderId",
                "items.$": "$.items",
                "validationResult": null,
                "shippingResult": null
              },
              "Next": "InvokeOrderValidationAgent"
            },
            "InvokeOrderValidationAgent": {
              "Type": "Task",
              "Resource": "arn:aws:states:::bedrock:invokeAgent", 
              "Parameters": {
                "AgentId": "${OrderValidationAgentId}",
                "AgentAliasId": "TSTALIAS", # Use TSTALIAS for the DRAFT version
                "InputText.$": "States.Format('Validate the following order details: Order ID {}. Items: {}.', $.orderId, States.JsonToString($.items))",
                "SessionState": {
                  "sessionAttributes": {
                    "orderId.$": "$.orderId"
                  }
                },
                "EnableTrace": true
              },
              "ResultPath": "$.validationResult",
              "Catch": [
                {
                  "ErrorEquals": ["States.TaskFailed", "Bedrock.InvokeAgent.Failed"],
                  "Next": "OrderValidationFailed"
                }
              ],
              "Next": "CheckValidationStatus"
            },
            "CheckValidationStatus": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.validationResult.output.text",
                  "StringContains": "Validation successful",
                  "Next": "InvokeShippingAgent"
                },
                {
                  "Variable": "$.validationResult.output.text",
                  "StringContains": "Product not found",
                  "Next": "OrderValidationFailed"
                },
                {
                  "Variable": "$.validationResult.output.text",
                  "StringContains": "out of stock",
                  "Next": "OrderValidationFailed"
                }
              ],
              "Default": "OrderValidationFailed" # Catch-all for unexpected validation results
            },
            "OrderValidationFailed": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish", # Example Notification Action
              "Parameters": {
                "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic", 
                "Message": !Sub "Order ${$.orderId} failed validation. Reason: ${$.validationResult.output.text}"
              },
              "End": true
            },
            "InvokeShippingAgent": {
              "Type": "Task",
              "Resource": "arn:aws:states:::bedrock:invokeAgent",
              "Parameters": {
                "AgentId": "${ShippingAgentId}",
                "AgentAliasId": "TSTALIAS",
                "InputText.$": "States.Format('Create a shipment for order {} with the following items: {}.', $.orderId, States.JsonToString($.items))",
                "SessionState": {
                  "sessionAttributes": {
                    "orderId.$": "$.orderId",
                    "items.$": "$.items"
                  }
                },
                "EnableTrace": true
              },
              "ResultPath": "$.shippingResult",
              "Catch": [
                {
                  "ErrorEquals": ["States.TaskFailed", "Bedrock.InvokeAgent.Failed"],
                  "Next": "ShippingFailed"
                }
              ],
              "Next": "CheckShippingStatus"
            },
            "CheckShippingStatus": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.shippingResult.output.text",
                  "StringContains": "Shipment created successfully",
                  "Next": "CompleteOrder"
                }
              ],
              "Default": "ShippingFailed"
            },
            "ShippingFailed": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic",
                "Message": !Sub "Order ${$.orderId} failed shipping. Reason: ${$.shippingResult.output.text}"
              },
              "End": true
            },
            "CompleteOrder": {
              "Type": "Succeed"
            }
          }
        }
      RoleArn: !GetAtt StepFunctionsExecutionRole.Arn

  StepFunctionsExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: states.${AWS::Region}.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: StepFunctionsPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - bedrock:InvokeAgent
                Resource:
                  - !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent/${OrderValidationAgentId}"
                  - !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent/${ShippingAgentId}"
                  - !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent-alias/${OrderValidationAgentId}/*" # Allow invoking specific agent aliases
                  - !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:agent-alias/${ShippingAgentId}/*"
              - Effect: Allow # For SNS notifications
                Action:
                  - sns:Publish
                Resource: "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:OrderAlertsTopic" # Ensure this topic exists or create it
Enter fullscreen mode Exit fullscreen mode
  1. Kicking off the Workflow You can start a Step Functions execution via the AWS CLI or SDK:
aws stepfunctions start-execution \
    --state-machine-arn "arn:aws:states:<REGION>:<YOUR_ACCOUNT_ID>:stateMachine:AutonomousOrderProcessingWorkflow" \
    --input '{
        "orderId": "ORD789",
        "items": [
            {"productId": "PROD123", "quantity": 1, "price": 99.99},
            {"productId": "PROD456", "quantity": 2, "price": 49.99}
        ]
    }'
Enter fullscreen mode Exit fullscreen mode

Benefits of this Approach

  • Modularity: Each agent is a specialist, reducing the cognitive load on a single LLM and allowing for easier updates and maintenance.
  • Observability: Step Functions provides a visual workflow, detailed execution history, and CloudWatch Logs integration, making it easy to debug complex AI processes.
  • Reliability: Built-in retry mechanisms, error handling, and parallel execution capabilities of Step Functions ensure resilience.
  • Scalability: Both Bedrock Agents and Step Functions are fully managed, scaling automatically to meet demand.
  • Security: IAM roles control permissions granularly, ensuring agents only access the resources they need.
  • Cost-Effectiveness: You only pay for the state transitions in Step Functions and the Bedrock agent inv ocations.

Considerations for Production

  • Version Control: Manage Bedrock agent versions carefully. Step Functions should typically invoke a specific, published agent version, not DRAFT for production.
  • Input Validation: Implement robust input validation at the Step Functions entry point and within your Lambda tools.
  • Human-in-the-Loop: For critical workflows, consider adding a human approval step in Step Functions, perhaps by sending a task to an Amazon SQS queue that a human operator monitors.
  • Prompt Engineering: The instruction given to your Bedrock agents is crucial for their performance. Experiment with clear, concise instructions and few-shot examples if necessary.
  • Tool Design: Design your Lambda tools to be idempotent and handle various edge cases, as the LLM might call them with unexpected inputs.

Conclusion

As we push the boundaries of Generative AI, the ability to build and orchestrate autonomous agents becomes a cornerstone of truly intelligent applications. By combining the reasoning and task execution capabilities of Amazon Bedrock Agents with the robust workflow management of AWS Step Functions, you can construct complex, multi-step processes that move far beyond simple conversational AI.

This architecture empowers developers to build sophisticated, resilient, and scalable AI solutions that can perform real-world actions, marking a significant leap forward in the journey of cloud-native AI. Dive in, experiment, and unleash the full potential of agentic AI on AWS!

Top comments (0)