DEV Community

Devon
Devon

Posted on • Originally published at kalibr.systems

The Production Agent Checklist: What Every AI Agent Needs Before It Touches Real Users

The Production Agent Checklist: What Every AI Agent Needs Before It Touches Real Users

Most AI agents that reach production aren't ready for it. They work in demos. They pass the tests the developer wrote. Then they hit real users and start failing in ways that are hard to detect and harder to debug.

This is a practical checklist. Not "10 tips to improve your AI," not a sales pitch — a real pre-flight list for teams shipping Python agents to production. Work through it before you flip the traffic switch.


1. Error Handling That Actually Handles Errors

The wrong version:

def call_llm(prompt: str) -> str:
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content
Enter fullscreen mode Exit fullscreen mode

This crashes on rate limits, network errors, and API outages. It also returns empty strings or None if the model returns an unexpected response format — which happens more than you'd think.

The right version:

import openai
import logging
from typing import Optional

logger = logging.getLogger(__name__)

def call_llm(prompt: str, max_retries: int = 3) -> Optional[str]:
    last_error = None

    for attempt in range(max_retries):
        try:
            response = openai.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": prompt}],
                timeout=30
            )
            content = response.choices[0].message.content
            if not content or not content.strip():
                logger.warning(f"Empty response on attempt {attempt + 1}")
                continue
            return content

        except openai.RateLimitError as e:
            wait = 2 ** attempt  # exponential backoff
            logger.warning(f"Rate limited, waiting {wait}s (attempt {attempt + 1})")
            time.sleep(wait)
            last_error = e

        except openai.APITimeoutError as e:
            logger.warning(f"Timeout on attempt {attempt + 1}")
            last_error = e

        except openai.APIError as e:
            logger.error(f"API error: {e}")
            last_error = e
            break  # Don't retry on 4xx errors

    logger.error(f"All attempts failed. Last error: {last_error}")
    return None
Enter fullscreen mode Exit fullscreen mode

Checklist items here:

  • [ ] Rate limit errors trigger exponential backoff, not immediate re-raise
  • [ ] Timeout is set explicitly — don't rely on the SDK default (some have none)
  • [ ] Empty/null responses are handled, not silently passed downstream
  • [ ] 4xx errors (bad request, auth failure) are not retried
  • [ ] All failures are logged with enough context to debug

2. Retry Logic With Jitter

Exponential backoff without jitter causes thundering herd: all your retrying clients hit the API at the same time, get rate limited again, back off the same amount, and pile up again.

import random
import time

def backoff_with_jitter(attempt: int, base: float = 1.0, cap: float = 60.0) -> float:
    """Full jitter: random value between 0 and min(cap, base * 2^attempt)"""
    return random.uniform(0, min(cap, base * (2 ** attempt)))

# Usage
for attempt in range(max_retries):
    try:
        result = call_llm(prompt)
        break
    except RateLimitError:
        if attempt < max_retries - 1:
            sleep_time = backoff_with_jitter(attempt)
            time.sleep(sleep_time)
Enter fullscreen mode Exit fullscreen mode

The tenacity library handles this well if you don't want to roll it yourself:

from tenacity import retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type

@retry(
    retry=retry_if_exception_type(openai.RateLimitError),
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(5)
)
def call_llm_with_retry(prompt: str) -> str:
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] Retries use jitter, not pure exponential backoff
  • [ ] Max retry count is set (don't retry forever)
  • [ ] Total retry budget (max wait time) is bounded
  • [ ] Retry logic is not duplicated across call sites — centralize it

3. Fallback Paths

A fallback is a different execution path you switch to when the primary path fails. This is distinct from retrying — retrying hits the same path again; fallbacks try something different.

Common fallback patterns:

from typing import Optional

def extract_with_gpt4o(text: str) -> Optional[dict]:
    # Primary path
    ...

def extract_with_claude(text: str) -> Optional[dict]:
    # Fallback path
    ...

def extract_with_regex(text: str) -> Optional[dict]:
    # Last-resort deterministic fallback
    import re
    # Simple pattern matching — less capable but always works
    ...

def extract_order(text: str) -> dict:
    result = extract_with_gpt4o(text)
    if result:
        return result

    logger.warning("GPT-4o extraction failed, trying Claude")
    result = extract_with_claude(text)
    if result:
        return result

    logger.warning("Claude extraction failed, trying regex fallback")
    result = extract_with_regex(text)
    if result:
        return result

    raise ValueError("All extraction paths failed")
Enter fullscreen mode Exit fullscreen mode

This is better than no fallback. It has a serious problem though: the fallback selection is static. You wrote it once, it stays that way forever. If Claude starts outperforming GPT-4o in production, your code still tries GPT-4o first every time.

We'll address this in Post 3 on dynamic routing, but the checklist item here is simply: do you have a fallback at all?

Checklist:

  • [ ] Every LLM call has at least one fallback path
  • [ ] The fallback is tested independently — don't assume it works because the primary did
  • [ ] There's a final fallback that always returns something (even if degraded)
  • [ ] Fallback activation is logged and visible in your metrics

4. Outcome Tracking

This is the one most teams skip, and it's the one that matters most for long-term reliability.

Logging requests and responses is not outcome tracking. Outcome tracking is recording whether the agent achieved its goal for each request.

import time
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class AgentOutcome:
    request_id: str
    task: str
    success: bool
    path_used: str  # which model/tool combination
    latency_ms: float
    input_tokens: Optional[int]
    output_tokens: Optional[int]
    error: Optional[str]
    metadata: dict

def track_outcome(outcome: AgentOutcome):
    # Send to your metrics system
    # Could be Datadog, Prometheus, a database, whatever
    metrics.increment(
        "agent.outcome",
        tags=[
            f"task:{outcome.task}",
            f"success:{outcome.success}",
            f"path:{outcome.path_used}"
        ]
    )
    if outcome.latency_ms > 5000:
        metrics.increment("agent.slow_request", tags=[f"task:{outcome.task}"])
Enter fullscreen mode Exit fullscreen mode

The key is defining "success" programmatically. For every agent task, you need to be able to answer: did this work?

def is_extraction_successful(result: Optional[dict]) -> bool:
    if not result:
        return False
    required_fields = {"item", "quantity", "address"}
    return required_fields.issubset(result.keys()) and all(result[f] for f in required_fields)

# After every extraction:
success = is_extraction_successful(result)
track_outcome(AgentOutcome(
    request_id=request_id,
    task="order-extraction",
    success=success,
    path_used="gpt-4o",
    latency_ms=elapsed_ms,
    ...
))
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] Every agent task has a programmatic success function
  • [ ] Success/failure is recorded per request, not just per error
  • [ ] You can query: "what's the success rate for this task in the last hour?"
  • [ ] Outcome data includes which path was used (model, tool, params)

5. Cost Monitoring

LLM costs are variable and can spike unexpectedly. An agent bug that causes excessive retrying or unusually long prompts can cost you serious money before you notice.

from dataclasses import dataclass

# Rough cost per 1K tokens (check current pricing)
COST_PER_1K_TOKENS = {
    "gpt-4o": {"input": 0.0025, "output": 0.010},
    "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
    "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
}

def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    if model not in COST_PER_1K_TOKENS:
        return 0.0
    rates = COST_PER_1K_TOKENS[model]
    return (input_tokens / 1000 * rates["input"]) + (output_tokens / 1000 * rates["output"])

def call_with_cost_tracking(prompt: str, model: str = "gpt-4o") -> tuple[str, float]:
    response = openai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    usage = response.usage
    cost = estimate_cost(model, usage.prompt_tokens, usage.completion_tokens)

    # Alert if single call is unusually expensive
    if cost > 0.10:  # $0.10 threshold — tune for your use case
        logger.warning(f"Expensive LLM call: ${cost:.4f} ({usage.prompt_tokens} input, {usage.completion_tokens} output)")

    return response.choices[0].message.content, cost
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] Token usage is recorded for every LLM call
  • [ ] Cost is estimated per call and aggregated per task type
  • [ ] Alert thresholds exist for abnormal cost spikes
  • [ ] You know your expected cost per 1000 requests before launch

6. Observability vs. Reliability — Don't Confuse Them

This is where teams make a category error.

Observability tools (LangSmith, Langfuse, Helicone, Weights & Biases) give you visibility into what's happening. Traces, spans, prompt logs, output comparison. They're genuinely useful for debugging and evaluation. Use them.

Reliability tools ensure the agent keeps working when things go wrong. Retries, fallbacks, circuit breakers, outcome-based routing. These operate at request time, not review time.

The difference: observability tells you your agent is failing. Reliability keeps it from failing, or recovers it automatically.

Here's an honest comparison of tools that often get conflated:

Kalibr LangSmith OpenRouter
Primary purpose Outcome-based path routing Tracing, evaluation, debugging Model gateway (cost/latency)
Adapts at runtime? Yes — reroutes based on outcomes No — dashboards for humans Partial — routes by cost/latency, not outcomes
Success signal Your programmatic success function Human eval / labeled data None (cost and latency only)
When it helps Model degrades, tool fails, path breaks in production Debugging why something failed, evaluating prompt quality Reducing cost, hitting multiple providers
Requires human? No — adapts automatically Yes — someone looks at the dashboard No
Learning mechanism Thompson Sampling on outcome signals N/A Static rules or weighted routing

These are not competing tools. A production agent might legitimately use all three:

  • LangSmith for tracing and offline evaluation
  • OpenRouter for provider flexibility and cost management
  • Kalibr for outcome-based routing that adapts when things degrade

See Kalibr's docs for how the SDK fits into an existing stack.


7. Output Validation

Never trust LLM output directly. Validate it before passing it to anything downstream.

import json
from pydantic import BaseModel, ValidationError
from typing import Optional

class OrderData(BaseModel):
    item: str
    quantity: int
    address: str
    notes: Optional[str] = None

def parse_and_validate_order(llm_output: str) -> Optional[OrderData]:
    # Clean up common formatting issues
    content = llm_output.strip()

    # Strip markdown code fences
    if content.startswith("```

"):
        lines = content.split("\n")
        content = "\n".join(lines[1:-1] if lines[-1] == "

```" else lines[1:])

    try:
        data = json.loads(content)
        return OrderData(**data)
    except json.JSONDecodeError as e:
        logger.warning(f"JSON parse failed: {e}. Raw: {content[:200]}")
        return None
    except ValidationError as e:
        logger.warning(f"Schema validation failed: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] LLM outputs are validated against an expected schema before use
  • [ ] JSON parsing failures are handled gracefully (logged, not raised)
  • [ ] Pydantic or equivalent schema validation is in the request path
  • [ ] Partial/empty outputs don't propagate as valid results

8. Rate Limiting and Circuit Breakers

Your agent should protect the APIs it calls, not just itself.

from collections import deque
import time
from threading import Lock

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed = normal, open = blocking, half-open = testing
        self._lock = Lock()

    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == "open":
                if time.time() - self.last_failure_time > self.timeout:
                    self.state = "half-open"
                else:
                    raise Exception("Circuit breaker open — service unavailable")

        try:
            result = func(*args, **kwargs)
            with self._lock:
                if self.state == "half-open":
                    self.state = "closed"
                    self.failures = 0
            return result
        except Exception as e:
            with self._lock:
                self.failures += 1
                self.last_failure_time = time.time()
                if self.failures >= self.failure_threshold:
                    self.state = "open"
                    logger.error(f"Circuit breaker opened after {self.failures} failures")
            raise

# Usage
openai_breaker = CircuitBreaker(failure_threshold=5, timeout=30)

def call_openai_safe(prompt: str) -> str:
    return openai_breaker.call(call_llm, prompt)
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] Circuit breakers prevent cascading failures to downstream APIs
  • [ ] Rate limiting is applied at the application level, not just relied on from the API
  • [ ] Breaker state is monitored — an open circuit breaker is an alert condition

9. Timeouts Everywhere

This is short because it's simple: set explicit timeouts on everything.

import asyncio
from concurrent.futures import ThreadPoolExecutor, TimeoutError

async def call_with_timeout(prompt: str, timeout_seconds: float = 30) -> Optional[str]:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor() as executor:
        try:
            result = await asyncio.wait_for(
                loop.run_in_executor(executor, call_llm, prompt),
                timeout=timeout_seconds
            )
            return result
        except asyncio.TimeoutError:
            logger.warning(f"LLM call timed out after {timeout_seconds}s")
            return None
Enter fullscreen mode Exit fullscreen mode

Checklist:

  • [ ] Every external call (LLM, API, database) has an explicit timeout
  • [ ] Timeouts are appropriate for the operation (not all 30s — fast ops should timeout faster)
  • [ ] Timeout failures are counted separately from other failures in metrics

Putting It Together: The Minimum Viable Production Agent

Here's what a minimal production-ready agent looks like, integrating the items above:

import kalibr  # First — before any model SDK imports
import openai
import time
import logging
import json
from typing import Optional
from pydantic import BaseModel, ValidationError

logger = logging.getLogger(__name__)

class ExtractionResult(BaseModel):
    item: str
    quantity: int
    address: str

def success_fn(result: Optional[ExtractionResult]) -> bool:
    return result is not None

def extract_gpt4o(text: str) -> Optional[ExtractionResult]:
    try:
        response = openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "Extract order fields as JSON: item, quantity, address"},
                {"role": "user", "content": text}
            ],
            timeout=20
        )
        content = response.choices[0].message.content.strip()
        content = content.strip("```

json").strip("

```").strip()
        return ExtractionResult(**json.loads(content))
    except Exception as e:
        logger.warning(f"GPT-4o extraction error: {e}")
        return None

def extract_claude(text: str) -> Optional[ExtractionResult]:
    try:
        import anthropic
        ac = anthropic.Anthropic()
        response = ac.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Extract as JSON (item, quantity, address): {text}"}]
        )
        content = response.content[0].text.strip()
        content = content.strip("```

json").strip("

```").strip()
        return ExtractionResult(**json.loads(content))
    except Exception as e:
        logger.warning(f"Claude extraction error: {e}")
        return None

# Kalibr router: outcome-based routing between paths
router = kalibr.Router(
    paths=[extract_gpt4o, extract_claude],
    success_fn=success_fn,
    task="order-extraction"
)

def process_order(text: str) -> Optional[ExtractionResult]:
    start = time.time()
    result = router.run(text)
    elapsed_ms = (time.time() - start) * 1000

    logger.info(f"Extraction {'succeeded' if result else 'failed'} in {elapsed_ms:.0f}ms")
    return result
Enter fullscreen mode Exit fullscreen mode

This isn't complete production code — you'd add cost tracking, circuit breakers, and proper metrics. But it covers the core: validated output, multiple paths, outcome-aware routing that adapts automatically.


The Checklist, Condensed

Error handling:

  • [ ] Specific exception types caught and handled differently
  • [ ] Empty/null outputs handled before returning
  • [ ] All errors logged with context

Retries:

  • [ ] Exponential backoff with jitter
  • [ ] Max retry count bounded
  • [ ] 4xx errors not retried

Fallbacks:

  • [ ] Every LLM call has at least one fallback
  • [ ] Fallback activation is logged

Outcome tracking:

  • [ ] Success function defined per task
  • [ ] Success/failure recorded per request
  • [ ] Path used recorded with each outcome

Cost monitoring:

  • [ ] Token usage tracked per call
  • [ ] Alert thresholds for cost spikes

Validation:

  • [ ] Schema validation on all LLM outputs
  • [ ] JSON parsing errors handled

Infrastructure:

  • [ ] Circuit breakers on external calls
  • [ ] Explicit timeouts everywhere
  • [ ] Metrics and alerting in place

If you can check every box, your agent is ready for production. Most teams can't check them all on day one — that's fine. Work through it in priority order.


Related: Why Your AI Agent Works in Dev and Silently Fails in Production covers the detection problem in more depth. Stop Hardcoding Model Fallbacks covers outcome-based routing in detail.

Top comments (0)