DEV Community

Cover image for Building Production-Ready AI Agents with MCP: The Enterprise Blueprint Nobody Talks About
Suraj Khaitan
Suraj Khaitan

Posted on

Building Production-Ready AI Agents with MCP: The Enterprise Blueprint Nobody Talks About

A deep technical guide to multi-agent orchestration, knowledge retrieval via Model Context Protocol, hallucination control, and serverless deployment — patterns extracted from real production systems.

The Gap Between Demo and Production

You've seen the demos. A shiny chatbot that answers questions about PDFs, retrieves knowledge from a vector store, and produces fluent responses. It works in the notebook. It impresses in the meeting room. Then you try to ship it.

Six weeks later, the agent hallucinates on a customer query. The vector search retrieves semantically irrelevant chunks. DynamoDB checkpointing breaks under concurrent load. The Lambda cold starts introduce 8-second latency spikes. The LLM picks the wrong knowledge base and confidently answers from the wrong domain.

This is the reality of production GenAI systems. And almost nobody writes honestly about what it actually takes to build them correctly.

This article documents the patterns, decisions, and hard lessons from building a multi-agent knowledge retrieval system for an enterprise use case: multiple specialized knowledge bases, a validation pipeline, a transformation agent, and a stateful chatbot — all wired together through MCP (Model Context Protocol) on a serverless cloud stack.

We'll go from fundamentals to full deployment architecture, with code you can actually use.


Why Most AI Agents Fail in Production

Before we build, let's diagnose. The failures are almost always the same five categories:

1. Retrieval is naïve

Most prototypes use a single vector store with cosine similarity. In enterprise settings, your knowledge is segmented. Safety documentation has different structure and retrieval semantics than software manuals. When you throw everything into one index, precision tanks. The agent retrieves documents that sound relevant but answer the wrong question.

2. The agent has no memory architecture

Session state lives in a dict that gets destroyed between requests. Thread IDs aren't propagated. Conversation history is either unlimited (context window overflow) or absent (agent forgets what it just said).

3. Tool contracts are loose

The LLM calls tools with missing, wrong, or hallucinated arguments. No validation. No schema enforcement. The tool silently returns nothing; the LLM fabricates a response.

4. Multi-agent coordination is an afterthought

One agent processes user queries. A second agent validates documents. A third transforms raw uploads. These agents are deployed independently with no shared message schema, no retry contract, and no shared observability. When one fails, you find out from the user.

5. Deployment is a science project

Lambda packages bloat beyond 50MB. Layers conflict. Cold starts kill latency SLAs. Dependencies are loaded on every invocation instead of being cached at the container level.

Each of these is solvable. But you need a system, not a stack of LangChain tutorials.


What MCP Solves

Model Context Protocol (MCP) is a JSON-RPC-based communication protocol for connecting AI agents to external tools, data sources, and services. Think of it as a standardized API contract between your LLM and the world outside it.

Where most RAG implementations hardcode retrieval calls directly into the agent logic, MCP externalizes them into discrete, versioned, discoverable services. Your agent becomes a client. Your retriever becomes a server. The contract is typed.

{
  "jsonrpc": "2.0",
  "id": "a1b2c3d4",
  "method": "tools/call",
  "params": {
    "name": "hybridQueryTool",
    "arguments": {
      "retriever_input": {
        "query": "What are the safety circuit requirements for servo drives?",
        "kb_id": "kb-regulations"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

This gives you four things that matter in production:

  • Decoupling: The retrieval implementation can change without touching the agent
  • Versioning: MCP endpoints are independently deployable
  • Observability: You can log, trace, and rate-limit at the protocol layer
  • Multi-tenancy: Multiple agents can share the same MCP server under different routing keys

Recommended Enterprise Architecture

Here is the full system architecture we'll implement:

┌──────────────────────────────────────────────────────────────┐
│                        API Gateway                           │
│                (JWT / AWS IAM Authentication)                │
└───────────────────────────┬──────────────────────────────────┘
                            │
              ┌─────────────▼──────────────┐
              │        API Lambda          │
              │  (routing, auth, presigned │
              │   URLs, async S3 reads)    │
              └──────┬──────────┬──────────┘
                     │          │
          ┌──────────▼─┐    ┌───▼────────────────┐
          │  Chatbot   │    │  Upload + Transform  │
          │  Agent     │    │  Pipeline (SQS-      │
          │  Lambda    │    │  triggered)          │
          └──────┬─────┘    └──────────┬───────────┘
                 │                     │
          ┌──────▼─────┐        ┌──────▼──────────┐
          │ LangGraph  │        │ Transformation   │
          │ Workflow   │        │ Agent Lambda     │
          │            │        │ (parse → S3)     │
          └──────┬─────┘        └──────────────────┘
                 │                     │ (incidents)
          ┌──────▼─────┐        ┌──────▼──────────┐
          │  MCP Layer │        │  Checker Agent  │
          │            │        │  Lambda (SQS-   │
          │  ┌────────┐│        │  triggered)     │
          │  │ KB-1   ││        └──────┬──────────┘
          │  │ KB-2   ││               │
          │  │ KB-3   ││        ┌──────▼──────────┐
          │  │ ...    ││        │   MCP Layer     │
          │  └────────┘│        │ (domain KB)     │
          └────────────┘        └─────────────────┘
                 │
         ┌───────▼────────┐
         │   DynamoDB     │
         │ (Checkpointing │
         │  / History)    │
         └────────────────┘
Enter fullscreen mode Exit fullscreen mode

Let's build each layer.


The LangGraph Agent Core

LangGraph is the right choice for production agents. It gives you explicit state management, conditional routing, and composable graphs. Here's the complete core pattern.

Data Models First

Type safety is non-negotiable. Define your contract before you write any logic:

from pydantic import BaseModel, ConfigDict, Field
from typing import Any

class AgentMessageRequest(BaseModel):
    message: str = Field(..., description="User message")
    sessionId: str | None = Field(None, description="Optional session ID")
    metadata: dict[str, Any] = Field(default_factory=dict)


class Message(BaseModel):
    model_config = ConfigDict()

    step_id: str | None = Field(None)
    role: str = Field(...)           # "user" | "agent"
    content: str = Field(...)
    structural_content: dict[str, Any] | None = Field(None)
    create_timestamp: str = Field(...)
    metadata: dict[str, Any] = Field(default_factory=dict)
Enter fullscreen mode Exit fullscreen mode

Strong typing catches argument mismatches at the boundary, not deep inside graph execution.

The Tool Definition

This is where MCP integration lives. The @tool decorator makes this function visible to the LLM as a callable:

from langchain.tools import tool

VALID_DOMAINS = {
    "kb-documents",
    "kb-specifications",
    "kb-regulations",
}

@tool
def query_knowledge_base(query: str, domain: str | None = None) -> str:
    """
    Query a specialized knowledge base for domain-specific information.

    Select the most appropriate domain based on the user's question:
    - 'kb-documents': Product manuals, technical guides, API references
    - 'kb-specifications': Hardware and software configuration standards
    - 'kb-regulations': Compliance requirements, safety standards, audit rules

    Args:
        query: Rich contextual search query. More context = better results.
        domain: Target knowledge domain. Required for precision retrieval.

    Returns:
        Formatted knowledge base chunks as a single string.

    Note:
        Query is vectorized for cosine similarity + keyword hybrid search.
    """
    if not isinstance(query, str) or not query.strip():
        return "Invalid query. Please provide a non-empty string."

    if domain and domain not in VALID_DOMAINS:
        return f"Invalid domain '{domain}'. Choose from: {', '.join(VALID_DOMAINS)}"

    try:
        global _LAST_KB_CONTEXT

        kb_context = fetch_from_mcp(query, domain=domain)

        if kb_context:
            _LAST_KB_CONTEXT = kb_context  # store for metadata extraction post-graph

        if kb_context.get("status") == "success" and kb_context.get("context"):
            formatted = "\n---\n".join(kb_context["context"])
            return f"Knowledge Base Results:\n\n{formatted}"

        return "No relevant information found in knowledge base."

    except Exception as e:
        logger.error(f"Tool execution failed: {e}")
        return f"Knowledge base query failed: {str(e)}"
Enter fullscreen mode Exit fullscreen mode

Critical pattern: _LAST_KB_CONTEXT is a module-level global that captures references (file URLs, page numbers) returned by the MCP retriever. These can't travel through the LangGraph message channel cleanly — they're metadata, not conversation content. After the graph completes, you extract them from this global. This works because Lambda containers are single-threaded per invocation.

The Graph Structure

from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph_dynamodb_checkpoint import DynamoDBSaver

def build_graph(checkpointer: DynamoDBSaver | None = None) -> StateGraph:
    graph = StateGraph(state_schema=MessagesState)

    graph.add_node("llm_node", llm_node)
    graph.add_node("tool_node", tool_node)

    graph.add_edge(START, "llm_node")
    graph.add_conditional_edges(
        "llm_node",
        should_continue,       # router function
        ["tool_node", END]
    )
    graph.add_edge("tool_node", "llm_node")  # tool result → back to LLM

    if checkpointer:
        return graph.compile(checkpointer=checkpointer)
    return graph.compile()
Enter fullscreen mode Exit fullscreen mode

The graph is a ReAct loop: LLM reasons → decides whether to call a tool → tool executes → result fed back to LLM → LLM reasons again. This continues until the LLM determines it can answer without calling another tool.

The Router

def should_continue(state: MessagesState) -> str:
    last_message = state["messages"][-1]

    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "tool_node"

    return END
Enter fullscreen mode Exit fullscreen mode

Simple but critical. If the LLM emits tool calls, route to tool execution. Otherwise, the response is complete.

The LLM Node

from langchain_core.messages import SystemMessage
from langchain_aws import ChatBedrock
import boto3, os

def llm_node(state: MessagesState):
    global _AGENT_SUMMARY
    llm_with_tools = get_llm_with_tools()
    response = llm_with_tools.invoke(
        [SystemMessage(content=get_prompt(agent_summary=_AGENT_SUMMARY or ""))]
        + state["messages"]
    )
    return {"messages": [response]}


def tool_node(state: dict):
    tools_by_name = {tool.name: tool for tool in [query_knowledge_base]}
    result = []

    for tool_call in state["messages"][-1].tool_calls:
        tool_name = tool_call["name"]

        if tool_name not in tools_by_name:
            result.append(ToolMessage(
                content=f"Error: Unknown tool '{tool_name}'",
                tool_call_id=tool_call["id"]
            ))
            continue

        observation = tools_by_name[tool_name].invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))

    return {"messages": result}
Enter fullscreen mode Exit fullscreen mode

Conversation Checkpointing

Stateless Lambdas need external state. DynamoDB gives you persistent conversation memory:

import time, os
from langgraph_dynamodb_checkpoint import DynamoDBSaver

def get_checkpoint_table() -> DynamoDBSaver | None:
    table_name = os.environ.get("MEMORY_TABLE")
    if not table_name:
        logger.warning("MEMORY_TABLE not set; running stateless")
        return None

    return DynamoDBSaver(
        table_name=table_name,
        max_read_request_units=10,
        max_write_request_units=10,
        ttl_seconds=int(time.time()) + 28 * 86400,  # 28-day TTL
    )
Enter fullscreen mode Exit fullscreen mode

Thread IDs tie conversation turns together. On each request, the graph replays from the last checkpoint, not from scratch:

thread_id = message.metadata.get("thread_id", uuid.uuid4().hex)
config = {"configurable": {"thread_id": thread_id}}

result = graph.invoke(
    {"messages": [HumanMessage(content=message.content)]},
    config
)
Enter fullscreen mode Exit fullscreen mode

Production note: The 28-day TTL prevents unbounded storage growth. Conversations older than 28 days are automatically purged by DynamoDB TTL. Set this to match your retention policy.


Multi-Agent Orchestration Patterns

The chatbot is one of three agents in this system. Here's how multi-agent orchestration actually works in production serverless architectures:

Agent 1: Chatbot Agent
  → Handles real-time user Q&A
  → LangGraph ReAct loop
  → Synchronous API response

Agent 2: Transformation Agent
  → SQS-triggered (file upload events)
  → Parses structured documents → normalized JSON
  → Routes based on document type metadata

Agent 3: Checker / Validation Agent
  → SQS-triggered (per incident)
  → Consults domain knowledge base
  → Appends recommended_action to S3 results
Enter fullscreen mode Exit fullscreen mode

Asynchronous Agent Pipelines via SQS

The transformation agent fires when a user uploads files. SQS decouples the upload from the processing:

def lambda_handler(event, context):
    jobs = {}

    for sqs_record in event['Records']:
        s3_event = json.loads(sqs_record['body'])

        for s3_record in s3_event['Records']:
            bucket = s3_record['s3']['bucket']['name']
            s3_key = unquote_plus(s3_record['s3']['object']['key'])

            # Extract job context from S3 key structure:
            # jobs/{user_id}/{project_name}/{job_id}/docs/{filename}
            parts = s3_key.split('/')
            project_name = parts[2]
            job_id = parts[3]
            user_id = parts[1]

            if job_id not in jobs:
                jobs[job_id] = {'bucket': bucket, 'project_name': project_name}

    # Idempotent: resolve job from S3, not from the event payload
    for job_id, job_data in jobs.items():
        all_files = list_job_files(bucket, user_id, job_data['project_name'], job_id)
        process_job(bucket, user_id, job_data['project_name'], job_id, all_files)
Enter fullscreen mode Exit fullscreen mode

Critical pattern: The trigger file is just a signal. Always list all files from S3 when processing. This makes the pipeline idempotent — reprocessing a job picks up all files regardless of upload order.

Document-Type Routing

def categorize_files(files: list[dict]) -> dict:
    categorized = {'report': None, 'model': None, 'rules': None}

    for file_info in files:
        filename = file_info['filename'].lower()

        if filename.endswith(('.xlsx', '.xls')):
            categorized['report'] = file_info     # structured data → incidents
        elif filename.endswith(('.plczip', '.robzip')):
            categorized['model'] = file_info      # binary model → JSON
        elif filename.endswith('.xml'):
            categorized['rules'] = file_info      # rule definitions → JSON

    return categorized
Enter fullscreen mode Exit fullscreen mode

Each file type has a dedicated parser. The transformation agent orchestrates them in dependency order: report first (to extract metadata needed by subsequent parsers), then model, then rules.

The Checker / Validation Agent

After transformation, individual incidents (one per detected issue) are queued via SQS. The checker agent processes them individually, consulting the domain knowledge base:

def lambda_handler(event, context):
    for record in event.get("Records", []):
        body = json.loads(record["body"])
        s3_record = body["Records"][0]

        bucket_name = s3_record["s3"]["bucket"]["name"]
        object_key = urllib.parse.unquote_plus(s3_record["s3"]["object"]["key"])
        base_path = object_key.partition("incidents/")[0]

        # Read incident JSON from S3
        incident_message = s3.get_object(
            Bucket=bucket_name, Key=object_key
        )["Body"].read().decode("utf-8")

        # Run LangGraph agent: incident → knowledge base → recommendation
        graph = build_graph()
        result = graph.invoke({
            "messages": [HumanMessage(content=incident_message)]
        })

        # Append recommendation and write to results/
        output = json.loads(incident_message)
        output["recommended_action"] = result["messages"][-1].content
        push_to_s3(output, base_path)
Enter fullscreen mode Exit fullscreen mode

Each agent is independently deployable, independently scalable, and independently observable. The shared contract is the S3 path structure and the JSON schema.


MCP Communication Layer

Here is the complete MCP client implementation — the most critical piece of production infrastructure in the entire system:

import httpx
import uuid
import os
import json
import logging
from typing import Any
from httpx import Headers

logger = logging.getLogger(__name__)

def send_mcp_request(
    method: str,
    params: Any = None,
    session_id: str | None = None,
    config_key: str | None = None,
) -> tuple[dict, Headers]:
    """
    Sends a JSON-RPC 2.0 request to the MCP server.
    Resolves the MCP endpoint URL from a secure configuration store.
    """
    headers = {
        "Content-Type": "application/json",
        "MCP-Version": "2024-01-01",
    }
    if session_id:
        headers["MCP-Session-Id"] = session_id

    body = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": method,
    }
    if params is not None:
        body["params"] = params

    if config_key:
        load_config_into_env(config_key)

    mcp_base_url = os.getenv("RETRIEVER_SERVICE_URL")
    if not mcp_base_url:
        raise ValueError("RETRIEVER_SERVICE_URL is not configured")

    mcp_url = mcp_base_url.rstrip("/") + "/mcp"

    try:
        response = httpx.post(mcp_url, json=body, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json(), response.headers
    except httpx.HTTPStatusError as e:
        logger.error(f"MCP server error {e.response.status_code}: {e.response.text}")
        raise
    except httpx.RequestError as e:
        logger.error(f"MCP network error: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

Dynamic KB Routing

The MCP call is parameterized at runtime. The domain identifier determines which retriever service receives the request:

def fetch_from_mcp(query: str, domain: str | None = None) -> dict[str, Any]:
    try:
        kb_config = json.loads(os.environ.get("KB_CONFIG", "{}"))
        agent_id = kb_config.get("configurations", [{}])[0].get("agent_id")
        kb_type = kb_config.get("configurations", [{}])[0].get("kb_type")
        kb_id = domain or os.environ.get("KB_ID", "default")

        # Dynamic endpoint resolution per knowledge base:
        # /config/{agent_id}/{kb_id}/{kb_type}/RETRIEVER_SERVICE_URL
        config_key = f"/config/{agent_id}/{kb_id}/{kb_type}/RETRIEVER_SERVICE_URL"

        payload = {
            "retriever_input": {
                "query": query,
                "kb_id": kb_id,
            }
        }

        tool_name = kb_config.get("defaults", {}).get("tool_name", "hybridQueryTool")

        resp_json, _ = send_mcp_request(
            method="tools/call",
            params={"name": tool_name, "arguments": payload},
            session_id=os.environ.get("MCP_SESSION_ID"),
            config_key=config_key,
        )

        return parse_kb_response(resp_json)

    except Exception as e:
        logger.error(f"MCP call failed: {e}")
        return {"status": "error", "context": [], "reference": [], "error": str(e)}
Enter fullscreen mode Exit fullscreen mode

Why dynamic endpoint resolution? Each knowledge domain can be served by a different retriever instance — different hardware, different index type, different SLA. By resolving the endpoint from configuration at call-time, you can independently scale, migrate, and update individual knowledge bases without redeploying the agent.

Response Parsing

MCP responses are nested. Parse them defensively:

import ast

def parse_kb_response(resp_json: dict[str, Any]) -> dict[str, Any]:
    try:
        outer = ast.literal_eval(resp_json["result"]["content"][0]["text"])
        body = json.loads(outer["body"])

        context: list[str] = []
        reference: list[dict] = []

        for item in body:
            if item.get("text"):
                context.append(item["text"])
            reference.append({k: v for k, v in item.items() if k != "text"})

        return {
            "status": outer.get("status"),
            "context": context,       # text chunks for LLM consumption
            "reference": reference,   # metadata (file URLs, page numbers)
        }

    except (KeyError, ValueError, SyntaxError, json.JSONDecodeError) as e:
        logger.error(f"Failed to parse KB response: {e}")
        return {"status": "error", "context": [], "reference": [], "error": str(e)}
Enter fullscreen mode Exit fullscreen mode

Separate context from reference. The LLM gets context. The UI gets reference metadata for citation display. Never mix them.


Retrieval + Knowledge Layer

Hybrid Search Configuration

Single-mode retrieval (pure vector or pure keyword) consistently underperforms on technical documentation. Production systems need hybrid:

{
  "knowledge_base": {
    "defaults": {
      "kb_type": "lancedb",
      "retriever_type": "hybrid",
      "tool_name": "hybridQueryTool"
    },
    "configurations": [
      { "kb_name": "kb-documents",     "kb_type": "lancedb" },
      { "kb_name": "kb-specifications", "kb_type": "lancedb" },
      { "kb_name": "kb-regulations",   "kb_type": "lancedb" }
    ],
    "infrastructure": {
      "embedding_model": "amazon.titan-embed-text-v2:0"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Why separate knowledge bases per domain?

  1. Precision: Documents have different embedding distributions from regulatory text. Domain-scoped indexes give higher precision at the same k.
  2. Access control: You can enforce per-KB authorization at the MCP layer.
  3. Independent updates: A regulations KB can be re-indexed without touching documents or specifications.
  4. Observability: Per-KB latency and error metrics tell you exactly which domain is degrading.

Context Window Management

Never pass raw retrieval chunks to the LLM. Format them with separators so the LLM can identify chunk boundaries:

if kb_context.get("status") == "success" and kb_context.get("context"):
    formatted = "\n---\n".join(kb_context["context"])
    return f"Knowledge Base Results:\n\n{formatted}"
Enter fullscreen mode Exit fullscreen mode

The --- separator is cheap signal. The LLM treats each chunk as a discrete evidence unit rather than a continuous blob.


Validation & Hallucination Prevention

Hallucination in domain-specific agents isn't just wrong answers — it's wrong answers delivered with high confidence that looks correct to non-experts.

Guard at the Prompt Layer

Your system prompt is the first line of defense:

<INSTRUCTIONS>
3. Information Retrieval
   - Use the retrieval tool only when domain-specific factual information is required.
   - If the knowledge base returns no results or an error, inform the user and advise
     contacting the support team.
   - Do not guess or invent information not found in the Knowledge Base.
</INSTRUCTIONS>
Enter fullscreen mode Exit fullscreen mode

Explicit negative instructions outperform implicit expectations. Tell the model what it must NOT do, not just what it should do.

Guard at the Config Layer

Content filtering runs before and after the LLM:

{
  "guardrail": {
    "filters": {
      "Hate":          "MEDIUM",
      "SEXUAL":        "MEDIUM",
      "Violence":      "MEDIUM",
      "Insults":       "MEDIUM",
      "MISCONDUCT":    "MEDIUM",
      "Prompt Attack": "HIGH"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Set Prompt Attack to HIGH. Prompt injection is the most common real attack vector against document-grounded agents.

Guard at the Tool Layer

Validate tool arguments before executing any external call:

@tool
def query_knowledge_base(query: str, domain: str | None = None) -> str:
    if not isinstance(query, str) or not query.strip():
        return "Invalid query. Please provide a non-empty string."

    valid_domains = {"kb-documents", "kb-specifications", "kb-regulations"}
    if domain and domain not in valid_domains:
        return f"Invalid domain '{domain}'. Choose from: {', '.join(valid_domains)}"

    # Only reach external systems after validation passes
    ...
Enter fullscreen mode Exit fullscreen mode

Return descriptive error strings rather than raising exceptions. The LLM can reason about a string error message and self-correct. An unhandled exception terminates tool execution with no recovery path.

Conversation Scope Enforcement

Prevent domain drift through prompt rules:

<KB_RULES>
- Each conversation uses exactly one Knowledge Base.
- The Knowledge Base is selected only at conversation start.
- Switching Knowledge Bases within a conversation is not allowed.
- The selected Knowledge Base is stored in conversation history.
</KB_RULES>
Enter fullscreen mode Exit fullscreen mode

This seems restrictive but it's correct for expert systems. A user working in kb-regulations doesn't want their session drifting into kb-specifications mid-conversation. Scope enforcement is a feature, not a limitation.


LLM Client Caching

Lambda containers are reused across invocations. Cache expensive initialization at the module level:

_LLM_WITH_TOOLS_CACHE = None
_AGENT_SUMMARY = None
_LAST_KB_CONTEXT = None

def get_llm_with_tools() -> Any:
    global _LLM_WITH_TOOLS_CACHE

    if _LLM_WITH_TOOLS_CACHE is not None:
        logger.info("Using cached LLM client")
        return _LLM_WITH_TOOLS_CACHE

    llm = ChatBedrock(
        model_id=os.getenv("MODEL_ID"),
        client=boto3.client("bedrock-runtime", region_name=os.getenv("AWS_REGION")),
    )
    _LLM_WITH_TOOLS_CACHE = llm.bind_tools([query_knowledge_base])
    return _LLM_WITH_TOOLS_CACHE
Enter fullscreen mode Exit fullscreen mode

And critically, reset request-scoped state at the start of every invocation:

def process_message(message: Message) -> Message:
    global _LAST_KB_CONTEXT
    _LAST_KB_CONTEXT = None  # Reset — avoid stale data from previous warm invocation
    ...
Enter fullscreen mode Exit fullscreen mode

This is a subtle but critical bug if missed. Without the reset, the first request on a warm container sets _LAST_KB_CONTEXT. The second request inherits that stale context if the retrieval tool isn't called — returning citations from the previous user's query. This is both a correctness bug and a potential data exposure issue.


Observability & Monitoring

Structured Logging at Every Layer

logger.info(f"Thread: {thread_id} | Domain: {domain} | Query length: {len(query)}")
logger.info(f"MCP response: status={output.get('status')}, chunks={len(output.get('context', []))}")
logger.info(f"Routing decision: {routing_decision} | Tool calls detected: {bool(tool_calls)}")
logger.info(f"Response length: {len(result.content)} chars")
Enter fullscreen mode Exit fullscreen mode

Log the routing decision, not just the outcome. When debugging a wrong answer, knowing which tool was called (or wasn't) is more valuable than the final response text.

Key Metrics to Track

Metric Why It Matters
KB retrieval latency per domain Identifies degraded retrieval services
Tool call rate per session High = LLM confused; zero = retrieval bypassed
Context chunks per query Low count = poor retrieval quality
Graph iterations per request High count = possible ReAct loop
Checkpoint read/write failures Silent data loss in conversation history
Cold start frequency Proxy for concurrent load spikes

Async Result Aggregation

When users poll for processing results, don't serialize S3 reads:

import asyncio
import aioboto3

MAX_CONCURRENCY = 20

async def aggregate_results(user_id: str, job_id: str, project: str) -> list[dict]:
    prefix = f"jobs/{user_id}/{project}/{job_id}/results/"

    async with aioboto3.Session().client("s3") as s3:
        resp = await s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
        keys = [obj["Key"] for obj in resp.get("Contents", []) if obj["Key"].endswith(".json")]

        semaphore = asyncio.Semaphore(MAX_CONCURRENCY)

        async def fetch(key):
            async with semaphore:
                obj = await s3.get_object(Bucket=BUCKET, Key=key)
                content = await obj["Body"].read()
                return json.loads(content)

        results = await asyncio.gather(*[fetch(k) for k in keys])
        return [item for sublist in results for item in (sublist if isinstance(sublist, list) else [sublist])]
Enter fullscreen mode Exit fullscreen mode

The semaphore prevents S3 throttling on large result sets. 20 concurrent reads is a conservative default; tune against your S3 request rate limits.


Deployment & Scaling

Lambda Layer Management

The default Lambda deployment package limit is 250MB unzipped. LangGraph, LangChain, and their transitive dependencies comfortably exceed this. The solution: load layers dynamically from S3 at cold start:

LAYER_FILES = [
    "langgraph-layer.zip",
    "langchain-layer.zip",
    "base-utils-layer.zip"
]
TMP_DIR = "/tmp/layers"

def load_s3_layers() -> None:
    s3 = boto3.client("s3")
    os.makedirs(TMP_DIR, exist_ok=True)

    for layer_file in LAYER_FILES:
        extract_path = os.path.join(TMP_DIR, layer_file.replace(".zip", ""))

        if os.path.exists(extract_path):
            # Already extracted on this warm container — skip download
            _add_to_sys_path(extract_path)
            continue

        archive_path = os.path.join(TMP_DIR, layer_file)
        s3.download_file(BUCKET_NAME, f"layers/{layer_file}", archive_path)

        with __import__("zipfile").ZipFile(archive_path, "r") as zf:
            zf.extractall(extract_path)

        os.remove(archive_path)  # free /tmp space immediately
        _add_to_sys_path(extract_path)


def _add_to_sys_path(extract_path: str) -> None:
    import sys
    for path in [extract_path, os.path.join(extract_path, "python")]:
        if os.path.isdir(path) and path not in sys.path:
            sys.path.insert(0, path)


# Execute at module level — runs once per cold start
load_s3_layers()
Enter fullscreen mode Exit fullscreen mode

The existence check on extract_path is the key optimization. Warm containers have already extracted the layers — skipping download saves 3–8 seconds per warm invocation.

Secure Configuration via Parameter Store

Never hardcode service URLs or credentials. Resolve them at runtime:

def load_config_into_env(config_key: str) -> None:
    ssm = boto3.client("ssm")

    if not config_key.endswith("/"):
        # Exact parameter — direct fetch
        response = ssm.get_parameter(Name=config_key, WithDecryption=True)
        name = response["Parameter"]["Name"].split("/")[-1]
        os.environ[name] = response["Parameter"]["Value"]
        return

    # Path prefix — fetch all parameters under path
    params = []
    next_token = None

    while True:
        kwargs = {
            "Path": "/config/",
            "WithDecryption": True,
            "Recursive": True,
            "MaxResults": 10,
        }
        if next_token:
            kwargs["NextToken"] = next_token

        response = ssm.get_parameters_by_path(**kwargs)
        params.extend(response.get("Parameters", []))
        next_token = response.get("NextToken")
        if not next_token:
            break

    for param in params:
        if config_key.lower() in param["Name"].lower():
            os.environ[param["Name"].split("/")[-1]] = param["Value"]
Enter fullscreen mode Exit fullscreen mode

This pattern lets you rotate service URLs without redeploying Lambda. Update the parameter — the next cold start picks up the new value.

API Authentication

Support both JWT (user-facing) and AWS IAM (service-to-service):

def authenticate_request(event: dict) -> str:
    auth_header = (event.get("headers") or {}).get("Authorization", "")

    if auth_header.startswith("Bearer "):
        token = auth_header.split(" ", 1)[1]
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload.get("sub")

    if auth_header.startswith("AWS "):
        access_key, secret_key, session_token = auth_header.replace("AWS ", "").split(":")
        sts = boto3.client(
            "sts",
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            aws_session_token=session_token,
        )
        identity = sts.get_caller_identity()
        return identity["UserId"]

    raise Exception("Unsupported auth scheme: must be Bearer JWT or AWS session credentials")
Enter fullscreen mode Exit fullscreen mode

Production Best Practices

1. Fail loudly at configuration time, silently at runtime

Missing MEMORY_TABLE? Log a warning and continue stateless. Missing MODEL_ID? Raise immediately — you cannot operate without an LLM.

2. Never let the agent choose between zero options

If the knowledge base returns empty results, return that fact explicitly: "No relevant information found in knowledge base." — not silence, not a hallucinated answer.

3. Scope your agents tightly

The chatbot does real-time Q&A. The transformation agent parses documents. The checker validates incidents. One agent, one job. Never add a new capability to an existing agent without evaluating whether it belongs there.

4. Make your pipelines idempotent

S3 trigger events can be delivered more than once. Design every pipeline step so re-running it produces the same output. Overwriting an S3 file with the same content is idempotent. Appending to a database without checking for duplicates is not.

5. Test your prompts against adversarial inputs

Prompt injection is real. Test your agent with:

  • Instructions to ignore previous rules
  • Requests to reveal the system prompt
  • Queries that cross domain boundaries deliberately
  • Empty strings and whitespace-only inputs

6. Log routing decisions, not just outputs

"Routing decision: tool_node (tool calls detected)" — this log line tells you exactly why the agent behaved the way it did. Without it, debugging a wrong answer means reading the entire message history blind.

7. Set explicit TTLs on everything

DynamoDB checkpoints: 28 days. Presigned URLs: 15 minutes. Session tokens: match your security policy. If you don't set TTLs, your tables grow unboundedly and your costs climb without warning.


Lessons Learned: What Actually Went Wrong

Warm Lambda stale global state — The _LAST_KB_CONTEXT pattern is powerful but fragile. Forgetting the reset at invocation start causes the second user on a warm container to see citations from the first user's session. This is both a correctness bug and a potential privacy issue. Reset all request-scoped globals at the top of your handler, every time.

LLM cold-selecting the wrong domain — When the agent selects a knowledge domain on the first message, it does so based only on a brief user string. Users who type a domain name as a quick-select mean "activate this domain," not "answer a question about this topic." We added explicit quick-prompt detection to pre-select the domain before the LLM sees the message:

DOMAIN_LABEL_MAP = {
    "documents":      "kb-documents",
    "specifications": "kb-specifications",
    "regulations":    "kb-regulations",
}

def detect_domain_selection(message: str) -> str | None:
    normalized = message.strip().lower()
    return DOMAIN_LABEL_MAP.get(normalized)
Enter fullscreen mode Exit fullscreen mode

Oversized retrieval context — Passing all retrieved chunks to the LLM without truncation causes two problems: cost (more tokens = more money) and quality (the LLM attends to early chunks more than later ones). Implement a context budget — truncate to N chunks, N tokens, or both.

SQS deduplication gaps — When multiple files in the same job trigger separate SQS events, each Lambda invocation processes only the triggering file unless you explicitly list all files from S3. Always resolve the complete job context from the source of truth (S3), not from the event payload.

DynamoDB checkpoint TTL drift — TTL in DynamoDB is approximate. Items may persist up to 48 hours past their TTL. Don't rely on DynamoDB TTL for hard security expiry. Use it for cost management only.


Final Thoughts

Production AI agents are distributed systems with an LLM in the middle. Every failure mode that applies to microservices — cascading failures, stale state, network timeouts, idempotency violations, auth edge cases — applies here too. Plus a new set: hallucination, domain drift, prompt injection, and retrieval precision.

MCP gives you a structured, evolvable interface between your agents and your knowledge. LangGraph gives you explicit, debuggable workflow graphs. DynamoDB gives you persistent state without managing servers. Serverless gives you scale without capacity planning.

The architecture in this article handles thousands of concurrent users, multiple specialized knowledge domains, asynchronous document processing, and real-time Q&A — all from a small, maintainable codebase.

The patterns are reusable. The lessons are hard-won. The blueprint is yours.

Build the thing. Ship the thing. Learn from the thing.


Key Takeaways

  • Use MCP as the interface between agents and retrieval services — not direct function calls
  • Separate knowledge domains into individual knowledge bases for precision and independence
  • LangGraph graphs give you explicit, debuggable agent workflows — use them over chains
  • DynamoDB checkpointing with TTLs is the correct pattern for Lambda-based conversation memory
  • Reset request-scoped globals at the start of every Lambda invocation — warm container state is a real bug class
  • Hybrid search (vector + keyword) outperforms single-mode retrieval on technical documentation
  • Multi-agent via SQS decouples real-time agents from async processing pipelines
  • Idempotent pipelines: resolve job state from S3, not from SQS event payloads
  • Log routing decisions — the most important diagnostic signal in a ReAct agent
  • Prompt guardrails + config filters + tool validation = defense in depth against hallucination

If this article helped you, consider following for more practical GenAI engineering content. Building something similar? Share it in the comments.


Cover Image Idea

A clean dark-background technical diagram showing a flow from a user icon → API Gateway → three branching Lambda icons (labeled "Chatbot", "Transform", "Validate") → an MCP protocol node → multiple colored cylinders representing knowledge bases. Blueprint-style. Color palette: deep navy, electric blue, white. Optional: a faint LangGraph state-transition graph overlaid in the background.


Author Bio

Suraj Khaitan is a Senior AI Architect and GenAI Engineer specializing in enterprise-scale AI systems, multi-agent orchestration, and cloud-native LLM deployments on AWS. He designs and ships production RAG pipelines, LangGraph-based agent frameworks, and MCP-connected knowledge systems for complex industrial and enterprise domains.

When he's not debugging warm Lambda containers at 2am, he writes about the engineering realities of AI systems that actually have to work in production.

Connect on LinkedIn | Follow for more engineering and architecture write-ups

Follow for more no-fluff GenAI architecture content.

Top comments (0)