The Production Agent Checklist: What Every AI Agent Needs Before It Touches Real Users
Most AI agents that reach production aren't ready for it. They work in demos. They pass the tests the developer wrote. Then they hit real users and start failing in ways that are hard to detect and harder to debug.
This is a practical checklist. Not "10 tips to improve your AI," not a sales pitch — a real pre-flight list for teams shipping Python agents to production. Work through it before you flip the traffic switch.
1. Error Handling That Actually Handles Errors
The wrong version:
def call_llm(prompt: str) -> str:
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
This crashes on rate limits, network errors, and API outages. It also returns empty strings or None if the model returns an unexpected response format — which happens more than you'd think.
The right version:
import openai
import logging
from typing import Optional
logger = logging.getLogger(__name__)
def call_llm(prompt: str, max_retries: int = 3) -> Optional[str]:
last_error = None
for attempt in range(max_retries):
try:
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
content = response.choices[0].message.content
if not content or not content.strip():
logger.warning(f"Empty response on attempt {attempt + 1}")
continue
return content
except openai.RateLimitError as e:
wait = 2 ** attempt # exponential backoff
logger.warning(f"Rate limited, waiting {wait}s (attempt {attempt + 1})")
time.sleep(wait)
last_error = e
except openai.APITimeoutError as e:
logger.warning(f"Timeout on attempt {attempt + 1}")
last_error = e
except openai.APIError as e:
logger.error(f"API error: {e}")
last_error = e
break # Don't retry on 4xx errors
logger.error(f"All attempts failed. Last error: {last_error}")
return None
Checklist items here:
- [ ] Rate limit errors trigger exponential backoff, not immediate re-raise
- [ ] Timeout is set explicitly — don't rely on the SDK default (some have none)
- [ ] Empty/null responses are handled, not silently passed downstream
- [ ] 4xx errors (bad request, auth failure) are not retried
- [ ] All failures are logged with enough context to debug
2. Retry Logic With Jitter
Exponential backoff without jitter causes thundering herd: all your retrying clients hit the API at the same time, get rate limited again, back off the same amount, and pile up again.
import random
import time
def backoff_with_jitter(attempt: int, base: float = 1.0, cap: float = 60.0) -> float:
"""Full jitter: random value between 0 and min(cap, base * 2^attempt)"""
return random.uniform(0, min(cap, base * (2 ** attempt)))
# Usage
for attempt in range(max_retries):
try:
result = call_llm(prompt)
break
except RateLimitError:
if attempt < max_retries - 1:
sleep_time = backoff_with_jitter(attempt)
time.sleep(sleep_time)
The tenacity library handles this well if you don't want to roll it yourself:
from tenacity import retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type
@retry(
retry=retry_if_exception_type(openai.RateLimitError),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(5)
)
def call_llm_with_retry(prompt: str) -> str:
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Checklist:
- [ ] Retries use jitter, not pure exponential backoff
- [ ] Max retry count is set (don't retry forever)
- [ ] Total retry budget (max wait time) is bounded
- [ ] Retry logic is not duplicated across call sites — centralize it
3. Fallback Paths
A fallback is a different execution path you switch to when the primary path fails. This is distinct from retrying — retrying hits the same path again; fallbacks try something different.
Common fallback patterns:
from typing import Optional
def extract_with_gpt4o(text: str) -> Optional[dict]:
# Primary path
...
def extract_with_claude(text: str) -> Optional[dict]:
# Fallback path
...
def extract_with_regex(text: str) -> Optional[dict]:
# Last-resort deterministic fallback
import re
# Simple pattern matching — less capable but always works
...
def extract_order(text: str) -> dict:
result = extract_with_gpt4o(text)
if result:
return result
logger.warning("GPT-4o extraction failed, trying Claude")
result = extract_with_claude(text)
if result:
return result
logger.warning("Claude extraction failed, trying regex fallback")
result = extract_with_regex(text)
if result:
return result
raise ValueError("All extraction paths failed")
This is better than no fallback. It has a serious problem though: the fallback selection is static. You wrote it once, it stays that way forever. If Claude starts outperforming GPT-4o in production, your code still tries GPT-4o first every time.
We'll address this in Post 3 on dynamic routing, but the checklist item here is simply: do you have a fallback at all?
Checklist:
- [ ] Every LLM call has at least one fallback path
- [ ] The fallback is tested independently — don't assume it works because the primary did
- [ ] There's a final fallback that always returns something (even if degraded)
- [ ] Fallback activation is logged and visible in your metrics
4. Outcome Tracking
This is the one most teams skip, and it's the one that matters most for long-term reliability.
Logging requests and responses is not outcome tracking. Outcome tracking is recording whether the agent achieved its goal for each request.
import time
from dataclasses import dataclass
from typing import Optional, Any
@dataclass
class AgentOutcome:
request_id: str
task: str
success: bool
path_used: str # which model/tool combination
latency_ms: float
input_tokens: Optional[int]
output_tokens: Optional[int]
error: Optional[str]
metadata: dict
def track_outcome(outcome: AgentOutcome):
# Send to your metrics system
# Could be Datadog, Prometheus, a database, whatever
metrics.increment(
"agent.outcome",
tags=[
f"task:{outcome.task}",
f"success:{outcome.success}",
f"path:{outcome.path_used}"
]
)
if outcome.latency_ms > 5000:
metrics.increment("agent.slow_request", tags=[f"task:{outcome.task}"])
The key is defining "success" programmatically. For every agent task, you need to be able to answer: did this work?
def is_extraction_successful(result: Optional[dict]) -> bool:
if not result:
return False
required_fields = {"item", "quantity", "address"}
return required_fields.issubset(result.keys()) and all(result[f] for f in required_fields)
# After every extraction:
success = is_extraction_successful(result)
track_outcome(AgentOutcome(
request_id=request_id,
task="order-extraction",
success=success,
path_used="gpt-4o",
latency_ms=elapsed_ms,
...
))
Checklist:
- [ ] Every agent task has a programmatic success function
- [ ] Success/failure is recorded per request, not just per error
- [ ] You can query: "what's the success rate for this task in the last hour?"
- [ ] Outcome data includes which path was used (model, tool, params)
5. Cost Monitoring
LLM costs are variable and can spike unexpectedly. An agent bug that causes excessive retrying or unusually long prompts can cost you serious money before you notice.
from dataclasses import dataclass
# Rough cost per 1K tokens (check current pricing)
COST_PER_1K_TOKENS = {
"gpt-4o": {"input": 0.0025, "output": 0.010},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
"claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
}
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
if model not in COST_PER_1K_TOKENS:
return 0.0
rates = COST_PER_1K_TOKENS[model]
return (input_tokens / 1000 * rates["input"]) + (output_tokens / 1000 * rates["output"])
def call_with_cost_tracking(prompt: str, model: str = "gpt-4o") -> tuple[str, float]:
response = openai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
usage = response.usage
cost = estimate_cost(model, usage.prompt_tokens, usage.completion_tokens)
# Alert if single call is unusually expensive
if cost > 0.10: # $0.10 threshold — tune for your use case
logger.warning(f"Expensive LLM call: ${cost:.4f} ({usage.prompt_tokens} input, {usage.completion_tokens} output)")
return response.choices[0].message.content, cost
Checklist:
- [ ] Token usage is recorded for every LLM call
- [ ] Cost is estimated per call and aggregated per task type
- [ ] Alert thresholds exist for abnormal cost spikes
- [ ] You know your expected cost per 1000 requests before launch
6. Observability vs. Reliability — Don't Confuse Them
This is where teams make a category error.
Observability tools (LangSmith, Langfuse, Helicone, Weights & Biases) give you visibility into what's happening. Traces, spans, prompt logs, output comparison. They're genuinely useful for debugging and evaluation. Use them.
Reliability tools ensure the agent keeps working when things go wrong. Retries, fallbacks, circuit breakers, outcome-based routing. These operate at request time, not review time.
The difference: observability tells you your agent is failing. Reliability keeps it from failing, or recovers it automatically.
Here's an honest comparison of tools that often get conflated:
| Kalibr | LangSmith | OpenRouter | |
|---|---|---|---|
| Primary purpose | Outcome-based path routing | Tracing, evaluation, debugging | Model gateway (cost/latency) |
| Adapts at runtime? | Yes — reroutes based on outcomes | No — dashboards for humans | Partial — routes by cost/latency, not outcomes |
| Success signal | Your programmatic success function | Human eval / labeled data | None (cost and latency only) |
| When it helps | Model degrades, tool fails, path breaks in production | Debugging why something failed, evaluating prompt quality | Reducing cost, hitting multiple providers |
| Requires human? | No — adapts automatically | Yes — someone looks at the dashboard | No |
| Learning mechanism | Thompson Sampling on outcome signals | N/A | Static rules or weighted routing |
These are not competing tools. A production agent might legitimately use all three:
- LangSmith for tracing and offline evaluation
- OpenRouter for provider flexibility and cost management
- Kalibr for outcome-based routing that adapts when things degrade
See Kalibr's docs for how the SDK fits into an existing stack.
7. Output Validation
Never trust LLM output directly. Validate it before passing it to anything downstream.
import json
from pydantic import BaseModel, ValidationError
from typing import Optional
class OrderData(BaseModel):
item: str
quantity: int
address: str
notes: Optional[str] = None
def parse_and_validate_order(llm_output: str) -> Optional[OrderData]:
# Clean up common formatting issues
content = llm_output.strip()
# Strip markdown code fences
if content.startswith("```
"):
lines = content.split("\n")
content = "\n".join(lines[1:-1] if lines[-1] == "
```" else lines[1:])
try:
data = json.loads(content)
return OrderData(**data)
except json.JSONDecodeError as e:
logger.warning(f"JSON parse failed: {e}. Raw: {content[:200]}")
return None
except ValidationError as e:
logger.warning(f"Schema validation failed: {e}")
return None
Checklist:
- [ ] LLM outputs are validated against an expected schema before use
- [ ] JSON parsing failures are handled gracefully (logged, not raised)
- [ ] Pydantic or equivalent schema validation is in the request path
- [ ] Partial/empty outputs don't propagate as valid results
8. Rate Limiting and Circuit Breakers
Your agent should protect the APIs it calls, not just itself.
from collections import deque
import time
from threading import Lock
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = 0
self.state = "closed" # closed = normal, open = blocking, half-open = testing
self._lock = Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker open — service unavailable")
try:
result = func(*args, **kwargs)
with self._lock:
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
with self._lock:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
logger.error(f"Circuit breaker opened after {self.failures} failures")
raise
# Usage
openai_breaker = CircuitBreaker(failure_threshold=5, timeout=30)
def call_openai_safe(prompt: str) -> str:
return openai_breaker.call(call_llm, prompt)
Checklist:
- [ ] Circuit breakers prevent cascading failures to downstream APIs
- [ ] Rate limiting is applied at the application level, not just relied on from the API
- [ ] Breaker state is monitored — an open circuit breaker is an alert condition
9. Timeouts Everywhere
This is short because it's simple: set explicit timeouts on everything.
import asyncio
from concurrent.futures import ThreadPoolExecutor, TimeoutError
async def call_with_timeout(prompt: str, timeout_seconds: float = 30) -> Optional[str]:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
try:
result = await asyncio.wait_for(
loop.run_in_executor(executor, call_llm, prompt),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
logger.warning(f"LLM call timed out after {timeout_seconds}s")
return None
Checklist:
- [ ] Every external call (LLM, API, database) has an explicit timeout
- [ ] Timeouts are appropriate for the operation (not all 30s — fast ops should timeout faster)
- [ ] Timeout failures are counted separately from other failures in metrics
Putting It Together: The Minimum Viable Production Agent
Here's what a minimal production-ready agent looks like, integrating the items above:
import kalibr # First — before any model SDK imports
import openai
import time
import logging
import json
from typing import Optional
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
class ExtractionResult(BaseModel):
item: str
quantity: int
address: str
def success_fn(result: Optional[ExtractionResult]) -> bool:
return result is not None
def extract_gpt4o(text: str) -> Optional[ExtractionResult]:
try:
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Extract order fields as JSON: item, quantity, address"},
{"role": "user", "content": text}
],
timeout=20
)
content = response.choices[0].message.content.strip()
content = content.strip("```
json").strip("
```").strip()
return ExtractionResult(**json.loads(content))
except Exception as e:
logger.warning(f"GPT-4o extraction error: {e}")
return None
def extract_claude(text: str) -> Optional[ExtractionResult]:
try:
import anthropic
ac = anthropic.Anthropic()
response = ac.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=512,
messages=[{"role": "user", "content": f"Extract as JSON (item, quantity, address): {text}"}]
)
content = response.content[0].text.strip()
content = content.strip("```
json").strip("
```").strip()
return ExtractionResult(**json.loads(content))
except Exception as e:
logger.warning(f"Claude extraction error: {e}")
return None
# Kalibr router: outcome-based routing between paths
router = kalibr.Router(
paths=[extract_gpt4o, extract_claude],
success_fn=success_fn,
task="order-extraction"
)
def process_order(text: str) -> Optional[ExtractionResult]:
start = time.time()
result = router.run(text)
elapsed_ms = (time.time() - start) * 1000
logger.info(f"Extraction {'succeeded' if result else 'failed'} in {elapsed_ms:.0f}ms")
return result
This isn't complete production code — you'd add cost tracking, circuit breakers, and proper metrics. But it covers the core: validated output, multiple paths, outcome-aware routing that adapts automatically.
The Checklist, Condensed
Error handling:
- [ ] Specific exception types caught and handled differently
- [ ] Empty/null outputs handled before returning
- [ ] All errors logged with context
Retries:
- [ ] Exponential backoff with jitter
- [ ] Max retry count bounded
- [ ] 4xx errors not retried
Fallbacks:
- [ ] Every LLM call has at least one fallback
- [ ] Fallback activation is logged
Outcome tracking:
- [ ] Success function defined per task
- [ ] Success/failure recorded per request
- [ ] Path used recorded with each outcome
Cost monitoring:
- [ ] Token usage tracked per call
- [ ] Alert thresholds for cost spikes
Validation:
- [ ] Schema validation on all LLM outputs
- [ ] JSON parsing errors handled
Infrastructure:
- [ ] Circuit breakers on external calls
- [ ] Explicit timeouts everywhere
- [ ] Metrics and alerting in place
If you can check every box, your agent is ready for production. Most teams can't check them all on day one — that's fine. Work through it in priority order.
Related: Why Your AI Agent Works in Dev and Silently Fails in Production covers the detection problem in more depth. Stop Hardcoding Model Fallbacks covers outcome-based routing in detail.
Top comments (0)