DEV Community

Cover image for 7 Production Patterns for AI Agents That Don't Break in 2026
galian for Cursuri AI

Posted on

7 Production Patterns for AI Agents That Don't Break in 2026

A demo agent that loops three times, calls one tool, and returns "Hello, I helped you" is easy. A production agent that handles 10k requests a day across paying customers, without lighting your API bill on fire or hallucinating tool arguments at 3am, is a different animal.

I've shipped AI agents in production for the last 18 months — search, content generation, support triage, document analysis. The same seven patterns keep showing up in every codebase that actually works. None of them are exotic. Most of them are boring. That's the point: production agents are boring on purpose.

Here are the patterns, with Python examples you can drop into your own loop today.

1. The Tool Result Validator

Problem: LLMs hallucinate tool arguments. They will confidently call send_email(to="user@example.com", subject="Refund", body="...") when the user never asked for an email. They will pass user_id="123abc" to a function that requires an integer. They will invent product SKUs that don't exist.

If your tool layer trusts the model's output, every hallucination becomes a production incident.

Pattern: Validate tool arguments at the tool boundary, not inside the tool. Reject early with a structured error the model can recover from.

from pydantic import BaseModel, ValidationError

class SendEmailArgs(BaseModel):
    to: str
    subject: str
    body: str
    requires_user_confirmation: bool = True

def execute_tool(name: str, raw_args: dict) -> dict:
    schema = TOOL_SCHEMAS[name]
    try:
        args = schema.model_validate(raw_args)
    except ValidationError as e:
        return {
            "status": "error",
            "error_type": "invalid_arguments",
            "message": f"Tool call rejected. Fix these fields: {e.errors()}",
        }

    if name == "send_email" and args.requires_user_confirmation:
        return {"status": "pending_confirmation", "preview": args.model_dump()}

    return TOOLS[name](args)
Enter fullscreen mode Exit fullscreen mode

Gotcha: Always return the validation error back to the model as a tool result. Don't raise it. The agent can usually self-correct in the next turn — but only if it sees the error.

2. Bounded Memory

Problem: Naive agent loops accumulate every tool call, every observation, every reasoning step into the conversation history. After 15 turns, you're sending 80k tokens per request. Your latency doubles. Your cost goes up 10x. The model starts losing track of what it was doing because the relevant context is buried under five tool dumps.

Pattern: Treat conversation history as a finite resource. Compress aggressively, summarize old turns, and keep tool outputs out of the main thread when you can.

class BoundedMemory:
    def __init__(self, max_tokens: int = 32_000, summarize_at: int = 24_000):
        self.messages: list[dict] = []
        self.max_tokens = max_tokens
        self.summarize_at = summarize_at

    def add(self, message: dict) -> None:
        self.messages.append(message)
        if self._token_count() > self.summarize_at:
            self._compress()

    def _compress(self) -> None:
        # Keep system message + last 4 turns verbatim
        keep_recent = self.messages[-8:]
        to_summarize = self.messages[1:-8]
        if not to_summarize:
            return
        summary = summarize_with_llm(to_summarize, max_tokens=2_000)
        self.messages = (
            [self.messages[0]]
            + [{"role": "user", "content": f"<earlier_context>{summary}</earlier_context>"}]
            + keep_recent
        )
Enter fullscreen mode Exit fullscreen mode

Gotcha: Don't summarize tool call messages — the model needs the exact arguments to chain reasoning. Summarize only the observations, and only when they're old enough that detail no longer matters.

3. The Observable Loop

Problem: Your agent is in production. A user complains it gave them garbage. You have... a final string output and a vague memory of what the loop does. Good luck debugging.

Pattern: Emit a structured event for every state transition in the loop. Every model call, every tool call, every retry, every error. Ship them to whatever observability stack you already use (Datadog, Honeycomb, OpenTelemetry, even just structured JSON to stdout).

import time
import uuid
from contextlib import contextmanager

@contextmanager
def trace_step(run_id: str, step: str, **attrs):
    span_id = str(uuid.uuid4())
    start = time.perf_counter()
    log_event("step.start", run_id=run_id, span_id=span_id, step=step, **attrs)
    try:
        yield span_id
        log_event("step.end", run_id=run_id, span_id=span_id, step=step,
                  status="ok", duration_ms=(time.perf_counter() - start) * 1000)
    except Exception as e:
        log_event("step.end", run_id=run_id, span_id=span_id, step=step,
                  status="error", error=str(e),
                  duration_ms=(time.perf_counter() - start) * 1000)
        raise

def run_agent(task: str) -> str:
    run_id = str(uuid.uuid4())
    memory = BoundedMemory()
    memory.add({"role": "user", "content": task})

    for turn in range(MAX_TURNS):
        with trace_step(run_id, "model_call", turn=turn):
            response = call_model(memory.messages)
        memory.add(response)

        if not response.tool_calls:
            return response.content

        for call in response.tool_calls:
            with trace_step(run_id, "tool_call", tool=call.name, turn=turn):
                result = execute_tool(call.name, call.arguments)
            memory.add({"role": "tool", "tool_call_id": call.id, "content": result})

    return "Max turns exceeded"
Enter fullscreen mode Exit fullscreen mode

Gotcha: Include a stable run_id on every event. When a customer reports an issue, you want one query that returns the entire trace.

4. Graceful Degradation

Problem: Your agent depends on three external services and a vector store. One of them is having a bad day. Your agent now returns a 500 to the user, even though for this particular query the broken dependency wasn't actually needed.

Pattern: Wrap dependencies in fallback chains. If the primary fails, the agent should know that capability is degraded — not crash.

class ToolRegistry:
    def __init__(self):
        self.tools: dict[str, list[Callable]] = {}
        self.health: dict[str, bool] = {}

    def register(self, name: str, *implementations: Callable) -> None:
        self.tools[name] = list(implementations)

    def call(self, name: str, args: dict) -> dict:
        for i, impl in enumerate(self.tools[name]):
            try:
                result = impl(args)
                self.health[f"{name}:{i}"] = True
                return {"status": "ok", "result": result, "tier": i}
            except Exception as e:
                self.health[f"{name}:{i}"] = False
                log_event("tool.fallback", tool=name, tier=i, error=str(e))
                continue
        return {
            "status": "degraded",
            "message": f"Tool '{name}' is unavailable. Try a different approach.",
        }
Enter fullscreen mode Exit fullscreen mode

The crucial bit is the degraded response — it goes back to the model as a tool result, and a well-prompted agent will re-plan. Maybe it tries a different tool. Maybe it tells the user "I can't check live inventory right now, but here's what I know." Either is better than a 500.

Gotcha: Surface the degraded status in your prompt. A line like "If a tool returns status=degraded, do not retry it. Acknowledge the limitation in your final response." prevents the model from looping on a dead service.

5. The Cost Circuit Breaker

Problem: A bug or an adversarial input puts your agent in a tool-calling loop. By the time you notice, you've spent $400 in 20 minutes.

Pattern: Track cumulative cost per run and per session. Hard-stop when limits are exceeded. This is not optional in production — it's the difference between a bad day and a layoff conversation.

class CostBudget:
    def __init__(self, max_usd_per_run: float = 0.50, max_usd_per_user_per_day: float = 5.00):
        self.run_cost = 0.0
        self.max_run = max_usd_per_run
        self.max_day = max_usd_per_user_per_day

    def charge(self, usage: dict, model: str) -> None:
        cost = compute_cost(usage, model)
        self.run_cost += cost
        if self.run_cost > self.max_run:
            raise BudgetExceeded(f"Run exceeded ${self.max_run}")

    def precheck_user(self, user_id: str) -> None:
        spent_today = redis.get(f"cost:{user_id}:{today()}") or 0
        if float(spent_today) > self.max_day:
            raise BudgetExceeded(f"User {user_id} exceeded daily budget")
Enter fullscreen mode Exit fullscreen mode

Gotcha: Different limits for different surfaces. An internal batch job can have a $5 ceiling per run. A free-tier chat user gets $0.10. A paying enterprise customer gets $2. Hardcoding one number is a footgun.

6. The Deterministic Critic

Problem: "LLM-as-a-judge" sounds clever, but using a model to grade itself is unreliable and slow. Two model calls per output, both hallucinate, both cost money.

Pattern: For checks you can express as code, use code. Reserve LLM grading for genuinely subjective dimensions, and only after the deterministic checks pass.

class OutputCritic:
    def evaluate(self, output: str, context: dict) -> dict:
        issues = []

        if context.get("must_cite_sources") and not re.search(r"\[\d+\]", output):
            issues.append("missing_citations")

        if context.get("max_length") and len(output) > context["max_length"]:
            issues.append("too_long")

        if BANNED_PHRASES.search(output):
            issues.append("banned_phrase")

        if context.get("must_mention"):
            missing = [k for k in context["must_mention"] if k.lower() not in output.lower()]
            if missing:
                issues.append(f"missing_keywords:{missing}")

        if issues:
            return {"verdict": "reject", "issues": issues, "method": "deterministic"}

        if context.get("subjective_check"):
            return llm_grade(output, context["subjective_check"])

        return {"verdict": "accept", "method": "deterministic"}
Enter fullscreen mode Exit fullscreen mode

When the critic rejects, feed the issues back to the agent as a "revise this" instruction. After two rejections, return whatever you have with a flag — infinite revision loops are their own bug class.

Gotcha: Don't make the critic too strict. If your accept rate is below 70%, your prompt is broken, not your output.

7. Stateless Replay (Idempotency)

Problem: Your agent half-completed a task — it sent the email, then crashed before logging the result. The user retries. Now they get two emails.

Pattern: Treat every external side-effect as idempotent by design. Use deterministic IDs derived from the input, dedupe at the tool layer, and make agent runs replayable from any saved checkpoint.

import hashlib
import json

def idempotency_key(tool_name: str, args: dict) -> str:
    canonical = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]

def execute_tool_idempotent(name: str, args: dict, run_id: str) -> dict:
    key = idempotency_key(name, args)
    cache_key = f"tool_result:{run_id}:{key}"
    cached = redis.get(cache_key)
    if cached:
        return json.loads(cached)

    result = TOOLS[name](args)
    redis.setex(cache_key, 3600, json.dumps(result))
    return result
Enter fullscreen mode Exit fullscreen mode

Now if the agent retries the same step within the run, it gets the cached result. If you persist the cache across runs (with a longer TTL), you get cross-run idempotency too — which is what you want for anything that costs money or sends messages.

Gotcha: Be careful what you put in the idempotency key. Timestamps, request IDs, or random nonces in the args will defeat it. Strip them before hashing.

Putting It Together

A production agent loop using all seven patterns is roughly 200 lines of Python. Not glamorous, but it survives. Here's the skeleton:

def run_agent_production(task: str, user_id: str) -> str:
    run_id = str(uuid.uuid4())
    budget = CostBudget()
    budget.precheck_user(user_id)

    memory = BoundedMemory(max_tokens=32_000)
    memory.add({"role": "system", "content": SYSTEM_PROMPT})
    memory.add({"role": "user", "content": task})

    critic = OutputCritic()

    for turn in range(MAX_TURNS):
        with trace_step(run_id, "model_call", turn=turn) as span:
            response = call_model(memory.messages)
            budget.charge(response.usage, response.model)

        memory.add(response.message)

        if not response.tool_calls:
            verdict = critic.evaluate(response.content, task_context())
            if verdict["verdict"] == "accept":
                return response.content
            memory.add({"role": "user", "content": f"Revise: {verdict['issues']}"})
            continue

        for call in response.tool_calls:
            with trace_step(run_id, "tool_call", tool=call.name, turn=turn):
                args = call.arguments
                result = execute_tool_idempotent(call.name, args, run_id)
            memory.add({"role": "tool", "tool_call_id": call.id, "content": result})

    return "Task incomplete after max turns"
Enter fullscreen mode Exit fullscreen mode

That's the loop. Drop in your favorite model API (Claude, GPT, open source — patterns work the same), wire up your tools with the validator from pattern 1, and you have something that won't embarrass you in production.

What I'd Read Next

If you've shipped agents in production, what patterns did I miss? Drop them in the comments — I'll add the best ones to a follow-up post.


Written by a developer who has paged themselves at 3am because an agent went into a tool-calling loop. Don't be that developer. Use the circuit breaker.

Top comments (0)