Building Production-Ready AI Agents: A Practical Guide
Building an AI agent is easy. Building one that runs reliably in production is hard.
At Groovy Web, we've deployed AI agents that handle millions of requests per month, and we've learned that the gap between "works on my machine" and "production-ready" is significant. This guide captures everything we've learned about building AI agents that are reliable, observable, and maintainable.
Table of Contents
- What Makes an AI Agent "Production-Ready"?
- Architecture Patterns
- AI Agents vs Traditional Automation
- Building Your First Production Agent
- Error Handling and Resilience
- Monitoring and Observability
- Production Readiness Checklist
- Key Takeaways
- Common Anti-Patterns
- Next Steps
What Makes an AI Agent "Production-Ready"?
A production-ready AI agent isn't just about correct code. It's about:
| Quality | Description |
|---|---|
| Reliability | Handles failures gracefully, never crashes |
| Observability | Every action is logged, traced, and measurable |
| Scalability | Handles traffic spikes without degradation |
| Security | Protects sensitive data, validates inputs |
| Maintainability | Easy to debug, update, and extend |
| Testability | Comprehensive tests for all code paths |
| Cost-efficiency | Optimized token usage and API calls |
The Production Gap
# Prototype agent (not production-ready)
def simple_agent(query):
response = llm.invoke(query)
return response.content # What could go wrong?
# Production agent
async def production_agent(query: str, context: AgentContext) -> AgentResponse:
"""Production-ready agent with full error handling."""
with tracer.start_as_current_span("agent.execute") as span:
span.set_attribute("query.length", len(query))
# Validate input
validated_query = await validate_and_sanitize(query)
# Execute with retries and timeout
response = await retry_with_backoff(
lambda: execute_with_timeout(
lambda: llm.ainvoke(validated_query),
timeout_seconds=30
),
max_retries=3
)
# Log and trace
logger.info("agent_completed", extra={
"query_hash": hash_query(validated_query),
"response_length": len(response.content),
"tokens_used": response.usage.total_tokens
})
return AgentResponse(
content=response.content,
metadata=ResponseMetadata(
model=response.model,
tokens_used=response.usage.total_tokens,
latency_ms=span.duration_ms
)
)
Architecture Patterns
1. ReAct Pattern (Reasoning + Acting)
The most common pattern for production agents:
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
class ReActAgent:
"""Production ReAct agent with structured tools."""
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model, temperature=0)
self.tools = self._setup_tools()
self.agent = create_openai_tools_agent(self.llm, self.tools)
self.executor = AgentExecutor(
agent=self.agent,
tools=self.tools,
max_iterations=5,
verbose=True,
handle_parsing_errors=True
)
def _setup_tools(self) -> list[Tool]:
return [
Tool(
name="search_database",
func=self._search_database,
description="Search the product database for information"
),
Tool(
name="calculate_metrics",
func=self._calculate_metrics,
description="Calculate business metrics from data"
),
Tool(
name="send_notification",
func=self._send_notification,
description="Send a notification to a user or channel"
)
]
async def execute(self, query: str) -> dict:
"""Execute the agent with error handling."""
try:
result = await self.executor.ainvoke({
"input": query
})
return {
"success": True,
"output": result["output"],
"intermediate_steps": result.get("intermediate_steps", [])
}
except Exception as e:
logger.error(f"Agent execution failed: {e}")
return {
"success": False,
"error": str(e),
"output": None
}
2. Multi-Agent Orchestration
For complex tasks, use specialized agents:
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, END
class AgentState(TypedDict):
query: str
research_result: str
analysis_result: str
final_output: str
next_agent: str
class MultiAgentOrchestrator:
"""Orchestrate multiple specialized agents."""
def __init__(self):
self.research_agent = ResearchAgent()
self.analysis_agent = AnalysisAgent()
self.writer_agent = WriterAgent()
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("research", self._research_node)
workflow.add_node("analyze", self._analyze_node)
workflow.add_node("write", self._write_node)
workflow.add_node("route", self._route_node)
# Define edges
workflow.set_entry_point("route")
workflow.add_conditional_edges(
"route",
self._should_research,
{
"research": "research",
"analyze": "analyze"
}
)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)
return workflow.compile()
async def execute(self, query: str) -> dict:
"""Execute the multi-agent workflow."""
initial_state = AgentState(
query=query,
research_result="",
analysis_result="",
final_output="",
next_agent="research"
)
result = await self.workflow.ainvoke(initial_state)
return result
3. Hierarchical Agent Pattern
For enterprise-scale systems:
Coordinator Agent
|
+----------------+----------------+
| | |
Research Agent Analysis Agent Action Agent
| | |
+----+----+ +----+----+ +----+----+
| | | | | | | | |
Web DB API Stats ML Viz Email Slack DB
AI Agents vs Traditional Automation
| Aspect | Traditional Automation | AI Agents |
|---|---|---|
| Decision Making | Rule-based, explicit | Context-aware, adaptive |
| Edge Cases | Must be pre-programmed | Handles naturally |
| Maintenance | Update rules manually | Improve with examples |
| Complexity Cost | Linear with rules | Constant with context |
| Flexibility | Rigid, predictable | Flexible, probabilistic |
| Debugging | Traceable, deterministic | Requires logging & tracing |
| Cost Profile | Fixed infrastructure | Per-query token costs |
| Best For | Repetitive, well-defined tasks | Complex, variable tasks |
When to Use Each
Use Traditional Automation when:
- Task is fully deterministic
- Rules are well-defined and stable
- 100% predictability is required
- Cost sensitivity is high
- Regulatory compliance demands audit trails
Use AI Agents when:
- Task requires judgment or reasoning
- Input variability is high
- Edge cases are numerous
- Natural language understanding is needed
- Adaptability is valuable
Building Your First Production Agent
Let's build a complete production-ready customer support agent:
import asyncio
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import logging
from opentelemetry import trace
# Configure logging and tracing
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
@dataclass
class CustomerContext:
"""Customer context for personalized responses."""
customer_id: str
tier: str # free, pro, enterprise
history: list[dict]
current_issue: Optional[str] = None
@dataclass
class AgentResponse:
"""Structured agent response."""
content: str
confidence: float
actions_taken: list[str]
escalation_needed: bool
metadata: dict
class ProductionSupportAgent:
"""Production-ready customer support agent."""
def __init__(self, config: dict):
self.llm = ChatOpenAI(
model=config.get("model", "gpt-4"),
temperature=config.get("temperature", 0.1)
)
self.max_tokens = config.get("max_tokens", 2000)
self.timeout_seconds = config.get("timeout", 30)
# Initialize tools
self.knowledge_base = KnowledgeBaseTool()
self.ticket_system = TicketSystemTool()
self.notification_service = NotificationTool()
# Rate limiting
self.rate_limiter = RateLimiter(
requests_per_minute=config.get("rpm_limit", 60)
)
async def handle_query(
self,
query: str,
context: CustomerContext
) -> AgentResponse:
"""Handle a customer support query."""
with tracer.start_as_current_span("support_agent.handle_query") as span:
span.set_attribute("customer.id", context.customer_id)
span.set_attribute("customer.tier", context.tier)
start_time = datetime.now()
try:
# Rate limiting check
await self.rate_limiter.acquire()
# Build context-aware prompt
system_prompt = self._build_system_prompt(context)
messages = self._build_messages(system_prompt, query, context)
# Execute with timeout
response = await asyncio.wait_for(
self.llm.ainvoke(messages),
timeout=self.timeout_seconds
)
# Process response
parsed_response = self._parse_response(response.content)
# Take any required actions
actions = await self._execute_actions(
parsed_response.actions,
context
)
# Log success
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.info("query_completed", extra={
"customer_id": context.customer_id,
"duration_ms": duration_ms,
"actions_count": len(actions),
"escalation": parsed_response.escalation_needed
})
return AgentResponse(
content=parsed_response.content,
confidence=parsed_response.confidence,
actions_taken=[a["name"] for a in actions],
escalation_needed=parsed_response.escalation_needed,
metadata={
"duration_ms": duration_ms,
"model": response.model,
"tokens": response.usage.total_tokens
}
)
except asyncio.TimeoutError:
logger.error("query_timeout", extra={
"customer_id": context.customer_id
})
return self._error_response(
"Request timed out. Please try again.",
escalate=True
)
except Exception as e:
logger.exception("query_failed", extra={
"customer_id": context.customer_id,
"error": str(e)
})
return self._error_response(
"An error occurred. Escalating to human support.",
escalate=True
)
def _build_system_prompt(self, context: CustomerContext) -> str:
"""Build context-aware system prompt."""
base_prompt = """You are a helpful customer support agent.
Always be professional, empathetic, and solution-oriented.
Response Format:
{
"content": "Your response to the customer",
"confidence": 0.0-1.0,
"actions": ["action1", "action2"],
"escalation_needed": true/false,
"reasoning": "Brief explanation of your response"
}
"""
tier_prompts = {
"enterprise": "This is an enterprise customer. Prioritize their request.",
"pro": "This is a pro customer. Provide detailed, helpful responses.",
"free": "This is a free tier user. Be helpful but concise."
}
return f"{base_prompt}\n\n{tier_prompts.get(context.tier, '')}"
def _build_messages(
self,
system_prompt: str,
query: str,
context: CustomerContext
) -> list[dict]:
"""Build the message list for the LLM."""
messages = [{"role": "system", "content": system_prompt}]
# Add relevant history (last 5 interactions)
for interaction in context.history[-5:]:
messages.append({
"role": "user",
"content": interaction["query"]
})
messages.append({
"role": "assistant",
"content": interaction["response"]
})
# Add current query
messages.append({"role": "user", "content": query})
return messages
Error Handling and Resilience
1. Retry with Exponential Backoff
import asyncio
from functools import wraps
from typing import Type, Tuple
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
logger.error(f"All retries exhausted: {e}")
raise
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
f"Attempt {attempt + 1} failed, "
f"retrying in {delay}s: {e}"
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
@retry_with_backoff(max_retries=3, exceptions=(RateLimitError, APIError))
async def call_llm(prompt: str) -> str:
return await llm.ainvoke(prompt)
2. Circuit Breaker Pattern
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for external service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.state = CircuitState.CLOSED
self.last_failure_time: Optional[datetime] = None
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_recovery(self) -> bool:
if self.last_failure_time is None:
return True
return datetime.now() - self.last_failure_time > timedelta(
seconds=self.recovery_timeout
)
def _on_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.warning("Circuit breaker opened due to failures")
3. Graceful Degradation
class ResilientAgent:
"""Agent with graceful degradation capabilities."""
def __init__(self):
self.primary_llm = ChatOpenAI(model="gpt-4")
self.fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
self.cache = ResponseCache()
async def execute(self, query: str) -> str:
"""Execute with multiple fallback strategies."""
# Try cache first
cached = await self.cache.get(query)
if cached:
return cached
# Try primary model
try:
response = await self.primary_llm.ainvoke(query)
await self.cache.set(query, response.content)
return response.content
except Exception as e:
logger.warning(f"Primary model failed: {e}")
# Fallback to cheaper model
try:
response = await self.fallback_llm.ainvoke(query)
await self.cache.set(query, response.content)
return response.content
except Exception as e:
logger.error(f"Fallback model failed: {e}")
# Return safe default
return self._safe_default_response(query)
Monitoring and Observability
1. Structured Logging
import structlog
logger = structlog.get_logger()
class ObservableAgent:
"""Agent with comprehensive observability."""
async def execute(self, query: str, context: dict) -> dict:
log = logger.bind(
agent_id=self.agent_id,
session_id=context.get("session_id"),
user_id=context.get("user_id")
)
log.info("agent_execution_started", query_length=len(query))
try:
result = await self._execute_internal(query, context)
log.info(
"agent_execution_completed",
result_length=len(result["content"]),
tokens_used=result.get("tokens", 0),
duration_ms=result.get("duration_ms", 0)
)
return result
except Exception as e:
log.error(
"agent_execution_failed",
error_type=type(e).__name__,
error_message=str(e)
)
raise
2. Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
AGENT_REQUESTS = Counter(
'agent_requests_total',
'Total agent requests',
['agent_name', 'status']
)
AGENT_LATENCY = Histogram(
'agent_latency_seconds',
'Agent request latency',
['agent_name']
)
AGENT_TOKENS = Counter(
'agent_tokens_total',
'Total tokens consumed',
['agent_name', 'model']
)
ACTIVE_CONVERSATIONS = Gauge(
'active_conversations',
'Number of active conversations'
)
class MetricsAgent:
"""Agent with Prometheus metrics."""
async def execute(self, query: str) -> str:
start_time = time.time()
try:
response = await self._execute(query)
# Record metrics
AGENT_REQUESTS.labels(
agent_name=self.name,
status='success'
).inc()
AGENT_LATENCY.labels(
agent_name=self.name
).observe(time.time() - start_time)
AGENT_TOKENS.labels(
agent_name=self.name,
model=self.model
).inc(response.usage.total_tokens)
return response.content
except Exception as e:
AGENT_REQUESTS.labels(
agent_name=self.name,
status='error'
).inc()
raise
Production Readiness Checklist
Infrastructure
- [ ] API rate limiting configured
- [ ] Circuit breakers implemented for external services
- [ ] Timeout handling for all async operations
- [ ] Graceful shutdown handling
- [ ] Health check endpoints exposed
Reliability
- [ ] Retry logic with exponential backoff
- [ ] Fallback strategies for critical paths
- [ ] Input validation and sanitization
- [ ] Output validation and filtering
- [ ] Dead letter queues for failed messages
Observability
- [ ] Structured logging with correlation IDs
- [ ] Request/response tracing
- [ ] Performance metrics (latency, throughput)
- [ ] Error rate monitoring
- [ ] Token usage tracking
- [ ] Cost monitoring alerts
Security
- [ ] Input sanitization for prompts
- [ ] Output filtering for sensitive data
- [ ] API key rotation strategy
- [ ] Rate limiting per user/tenant
- [ ] Audit logging for compliance
Testing
- [ ] Unit tests for all components
- [ ] Integration tests for workflows
- [ ] Load testing for expected traffic
- [ ] Chaos testing for resilience
- [ ] Prompt injection tests
Operations
- [ ] Runbooks for common incidents
- [ ] Alerting thresholds defined
- [ ] On-call rotation established
- [ ] Capacity planning documented
- [ ] Disaster recovery plan tested
Key Takeaways
Error handling is non-negotiable. Every external call needs timeouts, retries, and fallbacks.
Observability must be built-in. Structured logging, metrics, and tracing from day one.
Rate limiting protects everyone. Prevent cascading failures and cost overruns.
Circuit breakers prevent cascading failures. Fail fast when services are unhealthy.
Graceful degradation beats hard failures. Always have a fallback plan.
Testing is harder but more important. Test edge cases, failure modes, and performance.
Cost monitoring is critical. Token costs can spiral quickly without visibility.
Common Anti-Patterns
Mistakes to Avoid
1. Synchronous External Calls
- Problem: Blocking calls kill throughput
- Solution: Always use async/await
2. No Timeout Handling
- Problem: LLM calls can hang indefinitely
- Solution: Every external call needs a timeout
3. Ignoring Token Limits
- Problem: Context window overflow errors
- Solution: Truncate or chunk your inputs
4. Storing Sensitive Data in Prompts
- Problem: LLM logs may persist credentials or PII
- Solution: Never put sensitive data in prompts
5. No Rate Limiting
- Problem: One heavy user degrades service for everyone
- Solution: Implement per-user rate limiting
6. Trusting LLM Output Blindly
- Problem: Malformed or malicious outputs
- Solution: Always validate and sanitize outputs
7. Monolithic Agent Design
- Problem: Complex agents become unmaintainable
- Solution: Split into specialized sub-agents
Next Steps
Ready to Build Production Agents?
At Groovy Web, we help companies build and deploy AI agents that handle millions of requests reliably. Our methodology combines:
- Proven architecture patterns refined through production deployments
- Comprehensive monitoring with custom dashboards and alerts
- Cost optimization strategies that reduce token usage by 40-60%
- Starting at $22/hr for development support
What We Offer
- Agent Architecture Review — Evaluate your current approach
- Production Deployment — Get your agent to production fast
- Monitoring Setup — Full observability stack
- Ongoing Support — Continuous improvement and optimization
Related Articles:
- Building Multi-Agent Systems with LangChain
- AI-First Development: How to Build Software 10-20X Faster
- RAG Systems in Production
Published: February 19, 2026 | Author: Groovy Web Team | Category: AI Development
Top comments (0)