DEV Community

Cover image for Building the Future: How to Deploy AI Agent Teams That Scale to Millions Using AWS Lambda and Crew AI
Suraj Khaitan
Suraj Khaitan

Posted on

Building the Future: How to Deploy AI Agent Teams That Scale to Millions Using AWS Lambda and Crew AI

Introduction

In the rapidly evolving landscape of artificial intelligence, building scalable, serverless agent systems has become crucial for enterprises looking to deploy GenAI solutions efficiently. This blog post explores how to architect and implement serverless agent orchestration using AWS Lambda patterns with Crew AI agents, demonstrating a real-world implementation that combines the power of multi-agent systems with cloud-native scalability.

Architecture Overview

Our serverless agent orchestration system leverages AWS Lambda for compute, Amazon S3 for storage, DynamoDB for state management, and integrates with Model Context Protocol (MCP) for tool interactions. The architecture follows a microservices pattern where each agent operates independently while being orchestrated through a centralized Lambda function.

Key Components

  1. AWS Lambda Functions: Serverless compute for agent execution
  2. Crew AI Agents: Multi-agent system for task orchestration
  3. Amazon S3: Document storage and processing pipeline
  4. DynamoDB: State management and results storage
  5. API Gateway: RESTful API exposure
  6. MCP Integration: Tool interaction and data persistence

Crew AI Agent Implementation

The core of our system is built around Crew AI agents that handle text extraction from documents. The configuration-driven approach allows for flexible agent behavior without code changes.

Agent Configuration Structure

The agent and task definitions are externalized in a JSON configuration file (uc_templates/react/crewai_config.json), enabling easy customization and deployment across different environments:

{
  "agents": {
    "text_extractor": {
      "role": "Document Summarization and Information Extraction Assistant",
      "goal": "Analyze document and extract text from the document. The document consists of several image files at paths {document}. Make sure you use the tools provided to extract text from each of the images.",
      "backstory": "You are skilled at reading and understanding structured and unstructured text from images. You excel at identifying key elements and producing clear, concise, and structured outputs that are easy to parse and use in downstream applications.",
      "llm": "bedrock/eu.anthropic.claude-3-haiku-20240307-v1:0"
    }
  },
  "tasks": {
    "text_extraction_task": {
      "description": "Extract essential structured information from the provided document. The document consists of several image files at paths {document} Use the Vision Tool for each of the image files. Ensure that the extracted text is accurate and complete, and ready for any further analysis or processing tasks. Focus on identifying:\n  1. Main headings and subheadings\n  2. Bullet points or listed items\n  3. A concise summary of the content\nOrganize the findings in a clean, hierarchical structure.",
      "expected_output": "A structured JSON file with the following keys:\n  - headings: [list of main headings]\n  - subheadings: [list of subheadings grouped under each heading]\n  - bullet_points: [list of extracted bullet points]\n  - summary: [concise text summary]",
      "agent": "text_extractor"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Agent Implementation

The ExtractionCrew class dynamically loads this configuration and creates Crew AI agents:

class ExtractionCrew:
    """Extraction crew for comprehensive text extraction"""

    def __init__(self) -> None:
        self.agents: list[BaseAgent] = []
        self.tasks: list[Task] = []
        config_json = load_prompt()  # Loads crewai_config.json
        self.agents_config = config_json["agents"]
        self.tasks_config = config_json["tasks"]

        self.text_extractor = Agent(
            role=self.agents_config["text_extractor"]["role"],
            goal=self.agents_config["text_extractor"]["goal"],
            backstory=self.agents_config["text_extractor"]["backstory"],
            llm=LLM(self.agents_config["text_extractor"]["llm"]),
            verbose=True,
            max_iter=2,
            iterations=2,
            max_retry_limit=3,
            multimodal=True,
            tools=[VisionTool(llm=llm)],
        )

        self.extraction_task = Task(
            description=self.tasks_config["text_extraction_task"]["description"],
            expected_output=self.tasks_config["text_extraction_task"]["expected_output"],
            agent=self.text_extractor,
        )

        self.crew = Crew(
            agents=[self.text_extractor],
            tasks=[self.extraction_task],
            process=Process.sequential,
            verbose=True,
        )
Enter fullscreen mode Exit fullscreen mode

Key Configuration Features

  1. Role-Based Agents: Each agent has a clearly defined role, goal, and backstory that shapes its behavior
  2. Structured Output: Tasks specify expected JSON output format for consistent data extraction
  3. Model Flexibility: LLM configuration allows switching between different foundation models
  4. Vision Capabilities: Integration with VisionTool for multimodal document processing

Configuration-Driven Agent Design

One of the key architectural decisions in this implementation is the use of external JSON configuration for agent definitions. This approach provides several advantages:

Benefits of Configuration-Driven Approach

  1. Environment Flexibility: Different configurations for development, staging, and production
  2. A/B Testing: Easy switching between different agent behaviors
  3. Non-Technical Updates: Business users can modify agent behavior without code changes
  4. Version Control: Agent configurations can be versioned independently from code
  5. Multi-Tenancy: Different configurations for different customers or use cases

Configuration Structure Deep Dive

The crewai_config.json defines two main sections:

Agent Configuration

  • Role: Defines the agent's primary function and identity
  • Goal: Specific objective the agent should achieve
  • Backstory: Context that influences the agent's behavior and decision-making
  • LLM: Specifies which foundation model to use (Claude Haiku in this case)

Task Configuration

  • Description: Detailed instructions for the task execution
  • Expected Output: Structured format specification (JSON schema)
  • Agent Assignment: Links tasks to specific agents

Dynamic Loading Implementation

The configuration is loaded dynamically at runtime:

def load_prompt() -> dict:
    """Load agent and task configurations from JSON file"""
    config_path = "uc_templates/react/crewai_config.json"
    with open(config_path, 'r') as f:
        return json.load(f)

# Usage in ExtractionCrew
config_json = load_prompt()
self.agents_config = config_json["agents"]
self.tasks_config = config_json["tasks"]
Enter fullscreen mode Exit fullscreen mode

This pattern allows for:

  • Hot Configuration Reloading: Update agent behavior without redeploying Lambda functions
  • Template-Based Deployments: Use the same codebase with different configurations
  • Simplified Testing: Easy mocking and testing with different agent configurations

Lambda Function Integration

The Lambda function serves as the orchestration layer, managing the entire workflow from document ingestion to result delivery:

def run_agent(state: AgentMessageRequest) -> Message:
    """
    Executes the agent workflow for text extraction with Crew AI.
    """
    try:
        logger.info("Message State", state.model_dump())
        bucket = state.metadata.get("bucket")
        s3_key = state.metadata.get("s3_object_key")

        if not bucket or not s3_key:
            raise ValueError("Missing bucket or s3_object_key in metadata")

        message = Message(
            role="agent",
            content=state.message,
            create_timestamp=_iso_now_utc(),
            metadata=state.metadata or {},
        )

        result = process_pdf_with_text(message)
        return result
    except Exception as e:
        logger.error(f"Unexpected error occurred: {e}")
        raise RuntimeError("Internal server error") from e
Enter fullscreen mode Exit fullscreen mode

Document Processing Pipeline

The system processes documents through a sophisticated pipeline that converts PDFs to images and extracts structured content:

def process_pdf_with_text(message: Message) -> Message:
    """
    Processes a PDF file from S3, converts it to images, and runs text
    extraction using Crew AI.
    """
    logger.info("Processing PDF -> Images -> Text")

    # Retrieve document from S3
    response = s3_client.get_object(
        Bucket=message.metadata.get("bucket"),
        Key=message.metadata.get("s3_object_key"),
    )["Body"].read()

    # Store in temporary location
    temp_key = f"processed/{message.metadata.get('thread_id')}/{uuid.uuid4().hex}.pdf"
    s3_client.put_object(
        Bucket=message.metadata.get("bucket"), 
        Key=temp_key, 
        Body=response
    )

    # Convert to images and process
    images = convert_pdf_to_images(response)
    inputs = {"document": images}
    result = ExtractionCrew().crew.kickoff(inputs=inputs)

    # Parse and structure the output
    try:
        output_dict = json.loads(result.raw)
    except json.JSONDecodeError:
        match = re.search(r"\{.*\}", result.raw, re.DOTALL)
        output_dict = json.loads(match.group(0)) if match else {}

    message.structural_content = output_dict
    return message
Enter fullscreen mode Exit fullscreen mode

MCP Tool Integration

The system integrates with Model Context Protocol (MCP) for persistent storage and tool interactions:

def call_mcp_tool(message: Message) -> Message:
    """
    Calls the MCP tool to store the structural content of a message
    in the database.
    """
    logger.info("Calling MCP Tool")
    message.structural_content["id"] = uuid.uuid4().hex
    payload = {
        "table_name": "text-extractor-response",
        "item": message.structural_content,
    }

    send_mcp_request(
        "tools/call",
        {"name": "createItem", "arguments": payload},
        session_id=message.metadata.get("mcp_session"),
    )
    return message
Enter fullscreen mode Exit fullscreen mode

AWS Infrastructure with CDK

The infrastructure is defined using AWS CDK, providing infrastructure as code capabilities:

class AgentStack(Cdk_utils, Stack):
    """
    AgentStack provisions all resources required for the Agent service,
    including Lambda functions, DynamoDB tables, SQS queues, IAM roles,
    S3 buckets, API Gateway, and ALB integration.
    """

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create Lambda function for agent orchestration
        self.agent_lambda = _lambda.Function(
            self,
            "AgentFunction",
            runtime=_lambda.Runtime.PYTHON_3_11,
            handler="lambda_function.lambda_handler",
            code=_lambda.Code.from_asset("backend/agent_services/agent_wrapper"),
            timeout=cdk.Duration.minutes(15),
            memory_size=3008,
        )

        # Create DynamoDB tables for state management
        self.history_table = dynamodb.Table(
            self,
            "HistoryTable",
            partition_key=dynamodb.Attribute(
                name="thread_id", 
                type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
        )
Enter fullscreen mode Exit fullscreen mode

Scalability Patterns

1. Event-Driven Architecture

The system uses SQS queues and Lambda event sources for asynchronous processing:

# SQS Queue for agent task queuing
queue = sqs.Queue(
    self,
    "AgentTaskQueue",
    visibility_timeout=cdk.Duration.minutes(15),
    message_retention_period=cdk.Duration.days(14),
)

# Lambda trigger from SQS
self.agent_lambda.add_event_source(
    lambda_event_sources.SqsEventSource(queue)
)
Enter fullscreen mode Exit fullscreen mode

2. State Management

Using DynamoDB for distributed state management ensures consistency across agent executions:

class Message(BaseModel):
    """Standard message structure used across all API endpoints."""

    step_id: str | None = Field(None, description="Unique step identifier")
    role: str = Field(..., description="Message role: 'user' or 'agent'")
    content: str = Field(..., description="Message content")
    structural_content: dict[str, Any] | None = Field(
        None,
        description="Structured message if structured_output.json was provided",
    )
    create_timestamp: str = Field(..., description="Message creation timestamp")
    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Additional metadata"
    )
Enter fullscreen mode Exit fullscreen mode

3. Resource Optimization

Lambda concurrency limits and memory allocation are tuned for optimal performance:

# Configure Lambda with appropriate memory and timeout
self.agent_lambda = _lambda.Function(
    self,
    "AgentFunction",
    memory_size=3008,  # High memory for ML workloads
    timeout=cdk.Duration.minutes(15),  # Extended timeout for complex processing
    reserved_concurrent_executions=10,  # Control concurrency
)
Enter fullscreen mode Exit fullscreen mode

Benefits of Serverless Agent Orchestration

1. Cost Efficiency

  • Pay-per-execution model
  • No idle resource costs
  • Automatic scaling based on demand

2. Scalability

  • Automatic horizontal scaling
  • Handle thousands of concurrent requests
  • No infrastructure management overhead

3. Reliability

  • Built-in retry mechanisms
  • Dead letter queues for error handling
  • Multi-AZ availability

4. Developer Productivity

  • Infrastructure as code with CDK
  • Simplified deployment pipeline
  • Focus on business logic, not infrastructure

Best Practices

1. Error Handling and Resilience

def run_agent(state: AgentMessageRequest) -> Message:
    try:
        # Agent execution logic
        return process_pdf_with_text(message)
    except KeyError as ke:
        logger.error(f"Input validation error: {ke}")
        raise ValueError("Missing required field in request") from ke
    except json.JSONDecodeError as je:
        logger.error(f"JSON parsing error: {je}")
        raise ValueError("JSON parsing error") from je
    except Exception as e:
        logger.error(f"Unexpected error occurred: {e}")
        raise RuntimeError("Internal server error") from e
Enter fullscreen mode Exit fullscreen mode

2. Configuration Management

Use external configuration files and environment variables for flexible deployments:

# Configuration-driven agent setup
config_json = load_prompt()  # Loads crewai_config.json
self.agents_config = config_json["agents"]

# Environment-specific model configuration
from env_dev_file import load_ssm_parameters_into_env
load_ssm_parameters_into_env()
model_id = os.environ["MODEL_SONNET"]
llm = LLM(model="bedrock/" + model_id)
Enter fullscreen mode Exit fullscreen mode

Configuration Best Practices:

  • Store sensitive configuration in AWS SSM Parameter Store
  • Use different configuration files per environment
  • Validate configuration schemas at startup
  • Version control configuration changes
  • Implement configuration rollback capabilities

3. Structured Output Validation

Define clear output schemas in your configuration:

{
  "expected_output": "A structured JSON file with the following keys:\n  - headings: [list of main headings]\n  - subheadings: [list of subheadings grouped under each heading]\n  - bullet_points: [list of extracted bullet points]\n  - summary: [concise text summary]"
}
Enter fullscreen mode Exit fullscreen mode

4. Monitoring and Observability

Implement comprehensive logging and monitoring:

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Log key events
logger.info("Processing PDF -> Images -> Text")
logger.info("Calling MCP Tool")
Enter fullscreen mode Exit fullscreen mode

Real-World Use Cases

This serverless agent orchestration pattern is particularly effective for:

  1. Document Processing: Extract structured data from unstructured documents
  2. Content Analysis: Analyze and categorize large volumes of content
  3. Multi-modal Processing: Handle text, images, and other media types
  4. Real-time Workflows: Process user requests with low latency
  5. Batch Processing: Handle large-scale document processing jobs

Conclusion

Serverless agent orchestration with AWS Lambda and Crew AI provides a powerful foundation for building scalable GenAI applications. This architecture pattern offers the benefits of serverless computing while maintaining the flexibility and capability of multi-agent systems.

The combination of AWS Lambda's auto-scaling capabilities, Crew AI's agent orchestration, and proper infrastructure as code practices creates a robust platform that can handle enterprise-scale workloads while maintaining cost efficiency and operational simplicity.

As organizations continue to adopt GenAI solutions, this serverless approach provides a proven path to deploy intelligent agents at scale, ensuring both performance and cost-effectiveness in production environments.

Key Takeaways

  • Serverless-first approach reduces operational overhead and costs
  • Crew AI agents provide sophisticated task orchestration capabilities
  • AWS Lambda patterns enable automatic scaling and high availability
  • Infrastructure as Code with CDK ensures reproducible deployments
  • Event-driven architecture supports asynchronous, resilient processing
  • MCP integration enables tool extensibility and data persistence

This architecture pattern represents the future of scalable AI agent deployment, combining the best of serverless computing with advanced agent orchestration capabilities.

About the Author

Written by Suraj Khaitan
— Gen AI Architect | Working on serverless AI & cloud platforms.

Top comments (0)