A deep technical guide to multi-agent orchestration, knowledge retrieval via Model Context Protocol, hallucination control, and serverless deployment — patterns extracted from real production systems.
The Gap Between Demo and Production
You've seen the demos. A shiny chatbot that answers questions about PDFs, retrieves knowledge from a vector store, and produces fluent responses. It works in the notebook. It impresses in the meeting room. Then you try to ship it.
Six weeks later, the agent hallucinates on a customer query. The vector search retrieves semantically irrelevant chunks. DynamoDB checkpointing breaks under concurrent load. The Lambda cold starts introduce 8-second latency spikes. The LLM picks the wrong knowledge base and confidently answers from the wrong domain.
This is the reality of production GenAI systems. And almost nobody writes honestly about what it actually takes to build them correctly.
This article documents the patterns, decisions, and hard lessons from building a multi-agent knowledge retrieval system for an enterprise use case: multiple specialized knowledge bases, a validation pipeline, a transformation agent, and a stateful chatbot — all wired together through MCP (Model Context Protocol) on a serverless cloud stack.
We'll go from fundamentals to full deployment architecture, with code you can actually use.
Why Most AI Agents Fail in Production
Before we build, let's diagnose. The failures are almost always the same five categories:
1. Retrieval is naïve
Most prototypes use a single vector store with cosine similarity. In enterprise settings, your knowledge is segmented. Safety documentation has different structure and retrieval semantics than software manuals. When you throw everything into one index, precision tanks. The agent retrieves documents that sound relevant but answer the wrong question.
2. The agent has no memory architecture
Session state lives in a dict that gets destroyed between requests. Thread IDs aren't propagated. Conversation history is either unlimited (context window overflow) or absent (agent forgets what it just said).
3. Tool contracts are loose
The LLM calls tools with missing, wrong, or hallucinated arguments. No validation. No schema enforcement. The tool silently returns nothing; the LLM fabricates a response.
4. Multi-agent coordination is an afterthought
One agent processes user queries. A second agent validates documents. A third transforms raw uploads. These agents are deployed independently with no shared message schema, no retry contract, and no shared observability. When one fails, you find out from the user.
5. Deployment is a science project
Lambda packages bloat beyond 50MB. Layers conflict. Cold starts kill latency SLAs. Dependencies are loaded on every invocation instead of being cached at the container level.
Each of these is solvable. But you need a system, not a stack of LangChain tutorials.
What MCP Solves
Model Context Protocol (MCP) is a JSON-RPC-based communication protocol for connecting AI agents to external tools, data sources, and services. Think of it as a standardized API contract between your LLM and the world outside it.
Where most RAG implementations hardcode retrieval calls directly into the agent logic, MCP externalizes them into discrete, versioned, discoverable services. Your agent becomes a client. Your retriever becomes a server. The contract is typed.
{
"jsonrpc": "2.0",
"id": "a1b2c3d4",
"method": "tools/call",
"params": {
"name": "hybridQueryTool",
"arguments": {
"retriever_input": {
"query": "What are the safety circuit requirements for servo drives?",
"kb_id": "kb-regulations"
}
}
}
}
This gives you four things that matter in production:
- Decoupling: The retrieval implementation can change without touching the agent
- Versioning: MCP endpoints are independently deployable
- Observability: You can log, trace, and rate-limit at the protocol layer
- Multi-tenancy: Multiple agents can share the same MCP server under different routing keys
Recommended Enterprise Architecture
Here is the full system architecture we'll implement:
┌──────────────────────────────────────────────────────────────┐
│ API Gateway │
│ (JWT / AWS IAM Authentication) │
└───────────────────────────┬──────────────────────────────────┘
│
┌─────────────▼──────────────┐
│ API Lambda │
│ (routing, auth, presigned │
│ URLs, async S3 reads) │
└──────┬──────────┬──────────┘
│ │
┌──────────▼─┐ ┌───▼────────────────┐
│ Chatbot │ │ Upload + Transform │
│ Agent │ │ Pipeline (SQS- │
│ Lambda │ │ triggered) │
└──────┬─────┘ └──────────┬───────────┘
│ │
┌──────▼─────┐ ┌──────▼──────────┐
│ LangGraph │ │ Transformation │
│ Workflow │ │ Agent Lambda │
│ │ │ (parse → S3) │
└──────┬─────┘ └──────────────────┘
│ │ (incidents)
┌──────▼─────┐ ┌──────▼──────────┐
│ MCP Layer │ │ Checker Agent │
│ │ │ Lambda (SQS- │
│ ┌────────┐│ │ triggered) │
│ │ KB-1 ││ └──────┬──────────┘
│ │ KB-2 ││ │
│ │ KB-3 ││ ┌──────▼──────────┐
│ │ ... ││ │ MCP Layer │
│ └────────┘│ │ (domain KB) │
└────────────┘ └─────────────────┘
│
┌───────▼────────┐
│ DynamoDB │
│ (Checkpointing │
│ / History) │
└────────────────┘
Let's build each layer.
The LangGraph Agent Core
LangGraph is the right choice for production agents. It gives you explicit state management, conditional routing, and composable graphs. Here's the complete core pattern.
Data Models First
Type safety is non-negotiable. Define your contract before you write any logic:
from pydantic import BaseModel, ConfigDict, Field
from typing import Any
class AgentMessageRequest(BaseModel):
message: str = Field(..., description="User message")
sessionId: str | None = Field(None, description="Optional session ID")
metadata: dict[str, Any] = Field(default_factory=dict)
class Message(BaseModel):
model_config = ConfigDict()
step_id: str | None = Field(None)
role: str = Field(...) # "user" | "agent"
content: str = Field(...)
structural_content: dict[str, Any] | None = Field(None)
create_timestamp: str = Field(...)
metadata: dict[str, Any] = Field(default_factory=dict)
Strong typing catches argument mismatches at the boundary, not deep inside graph execution.
The Tool Definition
This is where MCP integration lives. The @tool decorator makes this function visible to the LLM as a callable:
from langchain.tools import tool
VALID_DOMAINS = {
"kb-documents",
"kb-specifications",
"kb-regulations",
}
@tool
def query_knowledge_base(query: str, domain: str | None = None) -> str:
"""
Query a specialized knowledge base for domain-specific information.
Select the most appropriate domain based on the user's question:
- 'kb-documents': Product manuals, technical guides, API references
- 'kb-specifications': Hardware and software configuration standards
- 'kb-regulations': Compliance requirements, safety standards, audit rules
Args:
query: Rich contextual search query. More context = better results.
domain: Target knowledge domain. Required for precision retrieval.
Returns:
Formatted knowledge base chunks as a single string.
Note:
Query is vectorized for cosine similarity + keyword hybrid search.
"""
if not isinstance(query, str) or not query.strip():
return "Invalid query. Please provide a non-empty string."
if domain and domain not in VALID_DOMAINS:
return f"Invalid domain '{domain}'. Choose from: {', '.join(VALID_DOMAINS)}"
try:
global _LAST_KB_CONTEXT
kb_context = fetch_from_mcp(query, domain=domain)
if kb_context:
_LAST_KB_CONTEXT = kb_context # store for metadata extraction post-graph
if kb_context.get("status") == "success" and kb_context.get("context"):
formatted = "\n---\n".join(kb_context["context"])
return f"Knowledge Base Results:\n\n{formatted}"
return "No relevant information found in knowledge base."
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return f"Knowledge base query failed: {str(e)}"
Critical pattern: _LAST_KB_CONTEXT is a module-level global that captures references (file URLs, page numbers) returned by the MCP retriever. These can't travel through the LangGraph message channel cleanly — they're metadata, not conversation content. After the graph completes, you extract them from this global. This works because Lambda containers are single-threaded per invocation.
The Graph Structure
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph_dynamodb_checkpoint import DynamoDBSaver
def build_graph(checkpointer: DynamoDBSaver | None = None) -> StateGraph:
graph = StateGraph(state_schema=MessagesState)
graph.add_node("llm_node", llm_node)
graph.add_node("tool_node", tool_node)
graph.add_edge(START, "llm_node")
graph.add_conditional_edges(
"llm_node",
should_continue, # router function
["tool_node", END]
)
graph.add_edge("tool_node", "llm_node") # tool result → back to LLM
if checkpointer:
return graph.compile(checkpointer=checkpointer)
return graph.compile()
The graph is a ReAct loop: LLM reasons → decides whether to call a tool → tool executes → result fed back to LLM → LLM reasons again. This continues until the LLM determines it can answer without calling another tool.
The Router
def should_continue(state: MessagesState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
return "tool_node"
return END
Simple but critical. If the LLM emits tool calls, route to tool execution. Otherwise, the response is complete.
The LLM Node
from langchain_core.messages import SystemMessage
from langchain_aws import ChatBedrock
import boto3, os
def llm_node(state: MessagesState):
global _AGENT_SUMMARY
llm_with_tools = get_llm_with_tools()
response = llm_with_tools.invoke(
[SystemMessage(content=get_prompt(agent_summary=_AGENT_SUMMARY or ""))]
+ state["messages"]
)
return {"messages": [response]}
def tool_node(state: dict):
tools_by_name = {tool.name: tool for tool in [query_knowledge_base]}
result = []
for tool_call in state["messages"][-1].tool_calls:
tool_name = tool_call["name"]
if tool_name not in tools_by_name:
result.append(ToolMessage(
content=f"Error: Unknown tool '{tool_name}'",
tool_call_id=tool_call["id"]
))
continue
observation = tools_by_name[tool_name].invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
Conversation Checkpointing
Stateless Lambdas need external state. DynamoDB gives you persistent conversation memory:
import time, os
from langgraph_dynamodb_checkpoint import DynamoDBSaver
def get_checkpoint_table() -> DynamoDBSaver | None:
table_name = os.environ.get("MEMORY_TABLE")
if not table_name:
logger.warning("MEMORY_TABLE not set; running stateless")
return None
return DynamoDBSaver(
table_name=table_name,
max_read_request_units=10,
max_write_request_units=10,
ttl_seconds=int(time.time()) + 28 * 86400, # 28-day TTL
)
Thread IDs tie conversation turns together. On each request, the graph replays from the last checkpoint, not from scratch:
thread_id = message.metadata.get("thread_id", uuid.uuid4().hex)
config = {"configurable": {"thread_id": thread_id}}
result = graph.invoke(
{"messages": [HumanMessage(content=message.content)]},
config
)
Production note: The 28-day TTL prevents unbounded storage growth. Conversations older than 28 days are automatically purged by DynamoDB TTL. Set this to match your retention policy.
Multi-Agent Orchestration Patterns
The chatbot is one of three agents in this system. Here's how multi-agent orchestration actually works in production serverless architectures:
Agent 1: Chatbot Agent
→ Handles real-time user Q&A
→ LangGraph ReAct loop
→ Synchronous API response
Agent 2: Transformation Agent
→ SQS-triggered (file upload events)
→ Parses structured documents → normalized JSON
→ Routes based on document type metadata
Agent 3: Checker / Validation Agent
→ SQS-triggered (per incident)
→ Consults domain knowledge base
→ Appends recommended_action to S3 results
Asynchronous Agent Pipelines via SQS
The transformation agent fires when a user uploads files. SQS decouples the upload from the processing:
def lambda_handler(event, context):
jobs = {}
for sqs_record in event['Records']:
s3_event = json.loads(sqs_record['body'])
for s3_record in s3_event['Records']:
bucket = s3_record['s3']['bucket']['name']
s3_key = unquote_plus(s3_record['s3']['object']['key'])
# Extract job context from S3 key structure:
# jobs/{user_id}/{project_name}/{job_id}/docs/{filename}
parts = s3_key.split('/')
project_name = parts[2]
job_id = parts[3]
user_id = parts[1]
if job_id not in jobs:
jobs[job_id] = {'bucket': bucket, 'project_name': project_name}
# Idempotent: resolve job from S3, not from the event payload
for job_id, job_data in jobs.items():
all_files = list_job_files(bucket, user_id, job_data['project_name'], job_id)
process_job(bucket, user_id, job_data['project_name'], job_id, all_files)
Critical pattern: The trigger file is just a signal. Always list all files from S3 when processing. This makes the pipeline idempotent — reprocessing a job picks up all files regardless of upload order.
Document-Type Routing
def categorize_files(files: list[dict]) -> dict:
categorized = {'report': None, 'model': None, 'rules': None}
for file_info in files:
filename = file_info['filename'].lower()
if filename.endswith(('.xlsx', '.xls')):
categorized['report'] = file_info # structured data → incidents
elif filename.endswith(('.plczip', '.robzip')):
categorized['model'] = file_info # binary model → JSON
elif filename.endswith('.xml'):
categorized['rules'] = file_info # rule definitions → JSON
return categorized
Each file type has a dedicated parser. The transformation agent orchestrates them in dependency order: report first (to extract metadata needed by subsequent parsers), then model, then rules.
The Checker / Validation Agent
After transformation, individual incidents (one per detected issue) are queued via SQS. The checker agent processes them individually, consulting the domain knowledge base:
def lambda_handler(event, context):
for record in event.get("Records", []):
body = json.loads(record["body"])
s3_record = body["Records"][0]
bucket_name = s3_record["s3"]["bucket"]["name"]
object_key = urllib.parse.unquote_plus(s3_record["s3"]["object"]["key"])
base_path = object_key.partition("incidents/")[0]
# Read incident JSON from S3
incident_message = s3.get_object(
Bucket=bucket_name, Key=object_key
)["Body"].read().decode("utf-8")
# Run LangGraph agent: incident → knowledge base → recommendation
graph = build_graph()
result = graph.invoke({
"messages": [HumanMessage(content=incident_message)]
})
# Append recommendation and write to results/
output = json.loads(incident_message)
output["recommended_action"] = result["messages"][-1].content
push_to_s3(output, base_path)
Each agent is independently deployable, independently scalable, and independently observable. The shared contract is the S3 path structure and the JSON schema.
MCP Communication Layer
Here is the complete MCP client implementation — the most critical piece of production infrastructure in the entire system:
import httpx
import uuid
import os
import json
import logging
from typing import Any
from httpx import Headers
logger = logging.getLogger(__name__)
def send_mcp_request(
method: str,
params: Any = None,
session_id: str | None = None,
config_key: str | None = None,
) -> tuple[dict, Headers]:
"""
Sends a JSON-RPC 2.0 request to the MCP server.
Resolves the MCP endpoint URL from a secure configuration store.
"""
headers = {
"Content-Type": "application/json",
"MCP-Version": "2024-01-01",
}
if session_id:
headers["MCP-Session-Id"] = session_id
body = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
}
if params is not None:
body["params"] = params
if config_key:
load_config_into_env(config_key)
mcp_base_url = os.getenv("RETRIEVER_SERVICE_URL")
if not mcp_base_url:
raise ValueError("RETRIEVER_SERVICE_URL is not configured")
mcp_url = mcp_base_url.rstrip("/") + "/mcp"
try:
response = httpx.post(mcp_url, json=body, headers=headers, timeout=30)
response.raise_for_status()
return response.json(), response.headers
except httpx.HTTPStatusError as e:
logger.error(f"MCP server error {e.response.status_code}: {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"MCP network error: {e}")
raise
Dynamic KB Routing
The MCP call is parameterized at runtime. The domain identifier determines which retriever service receives the request:
def fetch_from_mcp(query: str, domain: str | None = None) -> dict[str, Any]:
try:
kb_config = json.loads(os.environ.get("KB_CONFIG", "{}"))
agent_id = kb_config.get("configurations", [{}])[0].get("agent_id")
kb_type = kb_config.get("configurations", [{}])[0].get("kb_type")
kb_id = domain or os.environ.get("KB_ID", "default")
# Dynamic endpoint resolution per knowledge base:
# /config/{agent_id}/{kb_id}/{kb_type}/RETRIEVER_SERVICE_URL
config_key = f"/config/{agent_id}/{kb_id}/{kb_type}/RETRIEVER_SERVICE_URL"
payload = {
"retriever_input": {
"query": query,
"kb_id": kb_id,
}
}
tool_name = kb_config.get("defaults", {}).get("tool_name", "hybridQueryTool")
resp_json, _ = send_mcp_request(
method="tools/call",
params={"name": tool_name, "arguments": payload},
session_id=os.environ.get("MCP_SESSION_ID"),
config_key=config_key,
)
return parse_kb_response(resp_json)
except Exception as e:
logger.error(f"MCP call failed: {e}")
return {"status": "error", "context": [], "reference": [], "error": str(e)}
Why dynamic endpoint resolution? Each knowledge domain can be served by a different retriever instance — different hardware, different index type, different SLA. By resolving the endpoint from configuration at call-time, you can independently scale, migrate, and update individual knowledge bases without redeploying the agent.
Response Parsing
MCP responses are nested. Parse them defensively:
import ast
def parse_kb_response(resp_json: dict[str, Any]) -> dict[str, Any]:
try:
outer = ast.literal_eval(resp_json["result"]["content"][0]["text"])
body = json.loads(outer["body"])
context: list[str] = []
reference: list[dict] = []
for item in body:
if item.get("text"):
context.append(item["text"])
reference.append({k: v for k, v in item.items() if k != "text"})
return {
"status": outer.get("status"),
"context": context, # text chunks for LLM consumption
"reference": reference, # metadata (file URLs, page numbers)
}
except (KeyError, ValueError, SyntaxError, json.JSONDecodeError) as e:
logger.error(f"Failed to parse KB response: {e}")
return {"status": "error", "context": [], "reference": [], "error": str(e)}
Separate context from reference. The LLM gets context. The UI gets reference metadata for citation display. Never mix them.
Retrieval + Knowledge Layer
Hybrid Search Configuration
Single-mode retrieval (pure vector or pure keyword) consistently underperforms on technical documentation. Production systems need hybrid:
{
"knowledge_base": {
"defaults": {
"kb_type": "lancedb",
"retriever_type": "hybrid",
"tool_name": "hybridQueryTool"
},
"configurations": [
{ "kb_name": "kb-documents", "kb_type": "lancedb" },
{ "kb_name": "kb-specifications", "kb_type": "lancedb" },
{ "kb_name": "kb-regulations", "kb_type": "lancedb" }
],
"infrastructure": {
"embedding_model": "amazon.titan-embed-text-v2:0"
}
}
}
Why separate knowledge bases per domain?
- Precision: Documents have different embedding distributions from regulatory text. Domain-scoped indexes give higher precision at the same k.
- Access control: You can enforce per-KB authorization at the MCP layer.
- Independent updates: A regulations KB can be re-indexed without touching documents or specifications.
- Observability: Per-KB latency and error metrics tell you exactly which domain is degrading.
Context Window Management
Never pass raw retrieval chunks to the LLM. Format them with separators so the LLM can identify chunk boundaries:
if kb_context.get("status") == "success" and kb_context.get("context"):
formatted = "\n---\n".join(kb_context["context"])
return f"Knowledge Base Results:\n\n{formatted}"
The --- separator is cheap signal. The LLM treats each chunk as a discrete evidence unit rather than a continuous blob.
Validation & Hallucination Prevention
Hallucination in domain-specific agents isn't just wrong answers — it's wrong answers delivered with high confidence that looks correct to non-experts.
Guard at the Prompt Layer
Your system prompt is the first line of defense:
<INSTRUCTIONS>
3. Information Retrieval
- Use the retrieval tool only when domain-specific factual information is required.
- If the knowledge base returns no results or an error, inform the user and advise
contacting the support team.
- Do not guess or invent information not found in the Knowledge Base.
</INSTRUCTIONS>
Explicit negative instructions outperform implicit expectations. Tell the model what it must NOT do, not just what it should do.
Guard at the Config Layer
Content filtering runs before and after the LLM:
{
"guardrail": {
"filters": {
"Hate": "MEDIUM",
"SEXUAL": "MEDIUM",
"Violence": "MEDIUM",
"Insults": "MEDIUM",
"MISCONDUCT": "MEDIUM",
"Prompt Attack": "HIGH"
}
}
}
Set Prompt Attack to HIGH. Prompt injection is the most common real attack vector against document-grounded agents.
Guard at the Tool Layer
Validate tool arguments before executing any external call:
@tool
def query_knowledge_base(query: str, domain: str | None = None) -> str:
if not isinstance(query, str) or not query.strip():
return "Invalid query. Please provide a non-empty string."
valid_domains = {"kb-documents", "kb-specifications", "kb-regulations"}
if domain and domain not in valid_domains:
return f"Invalid domain '{domain}'. Choose from: {', '.join(valid_domains)}"
# Only reach external systems after validation passes
...
Return descriptive error strings rather than raising exceptions. The LLM can reason about a string error message and self-correct. An unhandled exception terminates tool execution with no recovery path.
Conversation Scope Enforcement
Prevent domain drift through prompt rules:
<KB_RULES>
- Each conversation uses exactly one Knowledge Base.
- The Knowledge Base is selected only at conversation start.
- Switching Knowledge Bases within a conversation is not allowed.
- The selected Knowledge Base is stored in conversation history.
</KB_RULES>
This seems restrictive but it's correct for expert systems. A user working in kb-regulations doesn't want their session drifting into kb-specifications mid-conversation. Scope enforcement is a feature, not a limitation.
LLM Client Caching
Lambda containers are reused across invocations. Cache expensive initialization at the module level:
_LLM_WITH_TOOLS_CACHE = None
_AGENT_SUMMARY = None
_LAST_KB_CONTEXT = None
def get_llm_with_tools() -> Any:
global _LLM_WITH_TOOLS_CACHE
if _LLM_WITH_TOOLS_CACHE is not None:
logger.info("Using cached LLM client")
return _LLM_WITH_TOOLS_CACHE
llm = ChatBedrock(
model_id=os.getenv("MODEL_ID"),
client=boto3.client("bedrock-runtime", region_name=os.getenv("AWS_REGION")),
)
_LLM_WITH_TOOLS_CACHE = llm.bind_tools([query_knowledge_base])
return _LLM_WITH_TOOLS_CACHE
And critically, reset request-scoped state at the start of every invocation:
def process_message(message: Message) -> Message:
global _LAST_KB_CONTEXT
_LAST_KB_CONTEXT = None # Reset — avoid stale data from previous warm invocation
...
This is a subtle but critical bug if missed. Without the reset, the first request on a warm container sets _LAST_KB_CONTEXT. The second request inherits that stale context if the retrieval tool isn't called — returning citations from the previous user's query. This is both a correctness bug and a potential data exposure issue.
Observability & Monitoring
Structured Logging at Every Layer
logger.info(f"Thread: {thread_id} | Domain: {domain} | Query length: {len(query)}")
logger.info(f"MCP response: status={output.get('status')}, chunks={len(output.get('context', []))}")
logger.info(f"Routing decision: {routing_decision} | Tool calls detected: {bool(tool_calls)}")
logger.info(f"Response length: {len(result.content)} chars")
Log the routing decision, not just the outcome. When debugging a wrong answer, knowing which tool was called (or wasn't) is more valuable than the final response text.
Key Metrics to Track
| Metric | Why It Matters |
|---|---|
| KB retrieval latency per domain | Identifies degraded retrieval services |
| Tool call rate per session | High = LLM confused; zero = retrieval bypassed |
| Context chunks per query | Low count = poor retrieval quality |
| Graph iterations per request | High count = possible ReAct loop |
| Checkpoint read/write failures | Silent data loss in conversation history |
| Cold start frequency | Proxy for concurrent load spikes |
Async Result Aggregation
When users poll for processing results, don't serialize S3 reads:
import asyncio
import aioboto3
MAX_CONCURRENCY = 20
async def aggregate_results(user_id: str, job_id: str, project: str) -> list[dict]:
prefix = f"jobs/{user_id}/{project}/{job_id}/results/"
async with aioboto3.Session().client("s3") as s3:
resp = await s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
keys = [obj["Key"] for obj in resp.get("Contents", []) if obj["Key"].endswith(".json")]
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async def fetch(key):
async with semaphore:
obj = await s3.get_object(Bucket=BUCKET, Key=key)
content = await obj["Body"].read()
return json.loads(content)
results = await asyncio.gather(*[fetch(k) for k in keys])
return [item for sublist in results for item in (sublist if isinstance(sublist, list) else [sublist])]
The semaphore prevents S3 throttling on large result sets. 20 concurrent reads is a conservative default; tune against your S3 request rate limits.
Deployment & Scaling
Lambda Layer Management
The default Lambda deployment package limit is 250MB unzipped. LangGraph, LangChain, and their transitive dependencies comfortably exceed this. The solution: load layers dynamically from S3 at cold start:
LAYER_FILES = [
"langgraph-layer.zip",
"langchain-layer.zip",
"base-utils-layer.zip"
]
TMP_DIR = "/tmp/layers"
def load_s3_layers() -> None:
s3 = boto3.client("s3")
os.makedirs(TMP_DIR, exist_ok=True)
for layer_file in LAYER_FILES:
extract_path = os.path.join(TMP_DIR, layer_file.replace(".zip", ""))
if os.path.exists(extract_path):
# Already extracted on this warm container — skip download
_add_to_sys_path(extract_path)
continue
archive_path = os.path.join(TMP_DIR, layer_file)
s3.download_file(BUCKET_NAME, f"layers/{layer_file}", archive_path)
with __import__("zipfile").ZipFile(archive_path, "r") as zf:
zf.extractall(extract_path)
os.remove(archive_path) # free /tmp space immediately
_add_to_sys_path(extract_path)
def _add_to_sys_path(extract_path: str) -> None:
import sys
for path in [extract_path, os.path.join(extract_path, "python")]:
if os.path.isdir(path) and path not in sys.path:
sys.path.insert(0, path)
# Execute at module level — runs once per cold start
load_s3_layers()
The existence check on extract_path is the key optimization. Warm containers have already extracted the layers — skipping download saves 3–8 seconds per warm invocation.
Secure Configuration via Parameter Store
Never hardcode service URLs or credentials. Resolve them at runtime:
def load_config_into_env(config_key: str) -> None:
ssm = boto3.client("ssm")
if not config_key.endswith("/"):
# Exact parameter — direct fetch
response = ssm.get_parameter(Name=config_key, WithDecryption=True)
name = response["Parameter"]["Name"].split("/")[-1]
os.environ[name] = response["Parameter"]["Value"]
return
# Path prefix — fetch all parameters under path
params = []
next_token = None
while True:
kwargs = {
"Path": "/config/",
"WithDecryption": True,
"Recursive": True,
"MaxResults": 10,
}
if next_token:
kwargs["NextToken"] = next_token
response = ssm.get_parameters_by_path(**kwargs)
params.extend(response.get("Parameters", []))
next_token = response.get("NextToken")
if not next_token:
break
for param in params:
if config_key.lower() in param["Name"].lower():
os.environ[param["Name"].split("/")[-1]] = param["Value"]
This pattern lets you rotate service URLs without redeploying Lambda. Update the parameter — the next cold start picks up the new value.
API Authentication
Support both JWT (user-facing) and AWS IAM (service-to-service):
def authenticate_request(event: dict) -> str:
auth_header = (event.get("headers") or {}).get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1]
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload.get("sub")
if auth_header.startswith("AWS "):
access_key, secret_key, session_token = auth_header.replace("AWS ", "").split(":")
sts = boto3.client(
"sts",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
identity = sts.get_caller_identity()
return identity["UserId"]
raise Exception("Unsupported auth scheme: must be Bearer JWT or AWS session credentials")
Production Best Practices
1. Fail loudly at configuration time, silently at runtime
Missing MEMORY_TABLE? Log a warning and continue stateless. Missing MODEL_ID? Raise immediately — you cannot operate without an LLM.
2. Never let the agent choose between zero options
If the knowledge base returns empty results, return that fact explicitly: "No relevant information found in knowledge base." — not silence, not a hallucinated answer.
3. Scope your agents tightly
The chatbot does real-time Q&A. The transformation agent parses documents. The checker validates incidents. One agent, one job. Never add a new capability to an existing agent without evaluating whether it belongs there.
4. Make your pipelines idempotent
S3 trigger events can be delivered more than once. Design every pipeline step so re-running it produces the same output. Overwriting an S3 file with the same content is idempotent. Appending to a database without checking for duplicates is not.
5. Test your prompts against adversarial inputs
Prompt injection is real. Test your agent with:
- Instructions to ignore previous rules
- Requests to reveal the system prompt
- Queries that cross domain boundaries deliberately
- Empty strings and whitespace-only inputs
6. Log routing decisions, not just outputs
"Routing decision: tool_node (tool calls detected)" — this log line tells you exactly why the agent behaved the way it did. Without it, debugging a wrong answer means reading the entire message history blind.
7. Set explicit TTLs on everything
DynamoDB checkpoints: 28 days. Presigned URLs: 15 minutes. Session tokens: match your security policy. If you don't set TTLs, your tables grow unboundedly and your costs climb without warning.
Lessons Learned: What Actually Went Wrong
Warm Lambda stale global state — The _LAST_KB_CONTEXT pattern is powerful but fragile. Forgetting the reset at invocation start causes the second user on a warm container to see citations from the first user's session. This is both a correctness bug and a potential privacy issue. Reset all request-scoped globals at the top of your handler, every time.
LLM cold-selecting the wrong domain — When the agent selects a knowledge domain on the first message, it does so based only on a brief user string. Users who type a domain name as a quick-select mean "activate this domain," not "answer a question about this topic." We added explicit quick-prompt detection to pre-select the domain before the LLM sees the message:
DOMAIN_LABEL_MAP = {
"documents": "kb-documents",
"specifications": "kb-specifications",
"regulations": "kb-regulations",
}
def detect_domain_selection(message: str) -> str | None:
normalized = message.strip().lower()
return DOMAIN_LABEL_MAP.get(normalized)
Oversized retrieval context — Passing all retrieved chunks to the LLM without truncation causes two problems: cost (more tokens = more money) and quality (the LLM attends to early chunks more than later ones). Implement a context budget — truncate to N chunks, N tokens, or both.
SQS deduplication gaps — When multiple files in the same job trigger separate SQS events, each Lambda invocation processes only the triggering file unless you explicitly list all files from S3. Always resolve the complete job context from the source of truth (S3), not from the event payload.
DynamoDB checkpoint TTL drift — TTL in DynamoDB is approximate. Items may persist up to 48 hours past their TTL. Don't rely on DynamoDB TTL for hard security expiry. Use it for cost management only.
Final Thoughts
Production AI agents are distributed systems with an LLM in the middle. Every failure mode that applies to microservices — cascading failures, stale state, network timeouts, idempotency violations, auth edge cases — applies here too. Plus a new set: hallucination, domain drift, prompt injection, and retrieval precision.
MCP gives you a structured, evolvable interface between your agents and your knowledge. LangGraph gives you explicit, debuggable workflow graphs. DynamoDB gives you persistent state without managing servers. Serverless gives you scale without capacity planning.
The architecture in this article handles thousands of concurrent users, multiple specialized knowledge domains, asynchronous document processing, and real-time Q&A — all from a small, maintainable codebase.
The patterns are reusable. The lessons are hard-won. The blueprint is yours.
Build the thing. Ship the thing. Learn from the thing.
Key Takeaways
- Use MCP as the interface between agents and retrieval services — not direct function calls
- Separate knowledge domains into individual knowledge bases for precision and independence
- LangGraph graphs give you explicit, debuggable agent workflows — use them over chains
- DynamoDB checkpointing with TTLs is the correct pattern for Lambda-based conversation memory
- Reset request-scoped globals at the start of every Lambda invocation — warm container state is a real bug class
- Hybrid search (vector + keyword) outperforms single-mode retrieval on technical documentation
- Multi-agent via SQS decouples real-time agents from async processing pipelines
- Idempotent pipelines: resolve job state from S3, not from SQS event payloads
- Log routing decisions — the most important diagnostic signal in a ReAct agent
- Prompt guardrails + config filters + tool validation = defense in depth against hallucination
If this article helped you, consider following for more practical GenAI engineering content. Building something similar? Share it in the comments.
Cover Image Idea
A clean dark-background technical diagram showing a flow from a user icon → API Gateway → three branching Lambda icons (labeled "Chatbot", "Transform", "Validate") → an MCP protocol node → multiple colored cylinders representing knowledge bases. Blueprint-style. Color palette: deep navy, electric blue, white. Optional: a faint LangGraph state-transition graph overlaid in the background.
Author Bio
Suraj Khaitan is a Senior AI Architect and GenAI Engineer specializing in enterprise-scale AI systems, multi-agent orchestration, and cloud-native LLM deployments on AWS. He designs and ships production RAG pipelines, LangGraph-based agent frameworks, and MCP-connected knowledge systems for complex industrial and enterprise domains.
When he's not debugging warm Lambda containers at 2am, he writes about the engineering realities of AI systems that actually have to work in production.
Connect on LinkedIn | Follow for more engineering and architecture write-ups
Follow for more no-fluff GenAI architecture content.
Top comments (0)