DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Five Things to Do Before You Deploy Your Agent to Production

The gap between demo and production

Your agent works in development. It answers questions correctly. The tool calls resolve. The output looks good.

You deploy it. A week later, you get a Slack message: "the agent ran for 45 minutes and spent $300." Or: "it returned a user's API key in the response." Or: "it's been failing silently for three days and nobody noticed."

These are not edge cases. They happen to almost every agent deployment that skips the pre-production checklist.

This post covers five concrete things to do before you ship a Python LLM agent. Each item has a "what breaks without it" section and a "how to add it in 5 lines" section. Each one uses a library from the agent stack that is zero-dependency and can be dropped in without changing your core logic.


1. Budget caps

What breaks without it: your agent gets into a loop, or a user sends an adversarial prompt that causes many LLM calls, or a bug in your tool-calling logic causes recursive invocations. Each call costs money. Without a budget cap, there is no ceiling. You get a surprise invoice.

How to add it in 5 lines:

from llm_cost_cap import CostCap
from token_budget_py import TokenBudget

# Hard ceiling on USD spend per agent run
cost_cap = CostCap(max_usd=2.00)

# Soft ceiling on tokens per run (tokens are easier to track mid-call)
token_budget = TokenBudget(max_tokens=50_000)

import anthropic

client = anthropic.Anthropic()

def call_llm_with_budget(messages: list, model: str = "claude-sonnet-4-6") -> str:
    # Pre-flight cost estimate before firing the call
    estimated_input_tokens = sum(len(m["content"].split()) * 1.3 for m in messages)
    cost_cap.check_preflight(model=model, estimated_input_tokens=int(estimated_input_tokens))
    token_budget.check(estimated_input_tokens)

    response = client.messages.create(
        model=model,
        max_tokens=1024,
        messages=messages,
    )

    # Post-call accounting
    usage = response.usage
    cost_cap.record(
        model=model,
        input_tokens=usage.input_tokens,
        output_tokens=usage.output_tokens,
    )
    token_budget.spend(usage.input_tokens + usage.output_tokens)

    return response.content[0].text


# Raises CostCapExceeded when the cap is hit
# Raises TokenBudgetExceeded when token limit is hit
# Both raise before the LLM call on the preflight check
Enter fullscreen mode Exit fullscreen mode

Set the cap to something you are comfortable losing if things go wrong. For most agents, $5 per run is a reasonable hard ceiling. Adjust down for user-facing endpoints where abuse is more likely.


2. Deadline enforcement

What breaks without it: your agent waits on a slow tool. Or it gets into a reasoning loop where it calls the LLM repeatedly without making progress. Or the external API your tool depends on hangs. The agent sits there, blocking a thread or a request handler, for minutes or hours.

The worst version: your web framework has a 30-second request timeout. The agent silently dies at 30 seconds with no useful error, no logged context, and no way to tell the user what happened.

How to add it in 5 lines:

from agent_deadline import Deadline, DeadlineExceeded
import time

# Set a hard deadline for the entire agent run
deadline = Deadline(timeout_seconds=30)

def agent_run_with_deadline(user_query: str) -> str:
    with deadline:
        # Each step checks the deadline before proceeding
        step = 0
        messages = [{"role": "user", "content": user_query}]

        while True:
            deadline.check()  # Raises DeadlineExceeded if time is up

            response_text = call_llm_with_budget(messages)

            # Simulated tool call
            if "DONE" in response_text:
                return response_text

            messages.append({"role": "assistant", "content": response_text})
            messages.append({"role": "user", "content": "Continue."})

            step += 1
            if step > 10:
                break

        return "Max steps reached."


try:
    result = agent_run_with_deadline("Analyze the following dataset...")
except DeadlineExceeded as e:
    print(f"Agent timed out after {e.elapsed:.1f}s. Partial state saved.")
    # Log what was done, return a graceful error to the user
Enter fullscreen mode Exit fullscreen mode

You call deadline.check() at each step boundary. When the deadline fires, it raises at the next check point. This is cooperative: if a single LLM call takes longer than the deadline, the check fires after that call completes. For hard preemption, use threading with a timeout at the call level.


3. Provider failover

What breaks without it: your provider has an outage. Or you hit a rate limit. Or a model is deprecated. Your agent returns 500 errors to users. You scramble to manually switch providers in production.

How to add it in 5 lines:

from llm_fallback_router import FallbackRouter, ProviderConfig
from llm_retry_py import retry_with_backoff

import anthropic
import openai  # assuming you have both clients

anthropic_client = anthropic.Anthropic()
openai_client = openai.OpenAI()

def call_anthropic(prompt: str) -> str:
    msg = anthropic_client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}],
    )
    return msg.content[0].text

def call_openai(prompt: str) -> str:
    resp = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}],
    )
    return resp.choices[0].message.content

# Configure the router: try Anthropic first, fall back to OpenAI
router = FallbackRouter(
    providers=[
        ProviderConfig(name="anthropic", call_fn=call_anthropic, weight=1.0),
        ProviderConfig(name="openai", call_fn=call_openai, weight=0.0),  # fallback only
    ],
    retry_on=(Exception,),
    max_retries_per_provider=2,
)

# Add retry with exponential backoff for transient errors
def resilient_call(prompt: str) -> str:
    def attempt():
        return router.call(prompt)

    return retry_with_backoff(
        fn=attempt,
        max_retries=3,
        base_delay=1.0,
        max_delay=30.0,
        backoff_factor=2.0,
        retryable=(ConnectionError, TimeoutError),
    )
Enter fullscreen mode Exit fullscreen mode

The router tracks provider health. If a provider fails multiple times, it marks it degraded and routes to the next one. Retries handle transient errors (rate limits, network blips). Fallover handles provider outages. These are different failure modes and need separate mechanisms.


4. Secret scrubbing

What breaks without it: tool calls return database connection strings, API keys, and credentials. Those get logged, passed to the LLM, and possibly included in the response to the user. You have a credential leak and may not notice for weeks.

How to add it in 5 lines:

from tool_secret_scrubber import SecretScrubber

scrubber = SecretScrubber()

def execute_tool_with_scrubbing(tool_name: str, tool_fn, tool_args: dict) -> str:
    raw_output = tool_fn(**tool_args)

    # Convert to string if needed
    if not isinstance(raw_output, str):
        import json
        raw_output = json.dumps(raw_output)

    # Scrub before returning to the LLM or logging
    clean_output = scrubber.scrub(raw_output)

    # Log the scrubbed version
    print(f"[TOOL] {tool_name}: {clean_output[:200]}")

    return clean_output


# Example: a tool that reads a config file
def read_config(path: str) -> str:
    with open(path) as f:
        return f.read()

# Raw output might contain: ANTHROPIC_API_KEY=sk-ant-api03-...
# Scrubbed output returns: ANTHROPIC_API_KEY=[REDACTED]
output = execute_tool_with_scrubbing("read_config", read_config, {"path": "/etc/app/config.env"})
Enter fullscreen mode Exit fullscreen mode

The scrubber runs on the output side. This means the tool executes normally. You do not need to modify your tools. You intercept the output before it goes anywhere sensitive.

Patterns detected by default include AWS access keys, Anthropic and OpenAI API keys, GitHub tokens, JWTs, generic high-entropy strings that match API key patterns, and PEM-encoded private key blocks.


5. Basic observability

What breaks without it: something goes wrong. You have no trace. You cannot tell which tool call caused the problem or how long each step took. Debugging means reproducing from scratch.

How to add it in 5 lines:

from agentsnap import AgentSnap
from agenttrace import AgentTrace

snap = AgentSnap()
trace = AgentTrace()

class ObservedAgent:
    def __init__(self):
        self.snap = AgentSnap()
        self.trace = AgentTrace()

    def run(self, user_query: str) -> str:
        run_id = self.trace.start_run(query=user_query)

        messages = [{"role": "user", "content": user_query}]
        step = 0

        while step < 10:
            step_id = self.trace.start_step(run_id, step=step)

            try:
                response_text = call_llm_with_budget(messages)
                self.trace.end_step(step_id, output=response_text, status="ok")

                # Snapshot the full agent state at this step
                self.snap.capture(
                    run_id=run_id,
                    step=step,
                    messages=messages,
                    response=response_text,
                )

                if self.is_final_answer(response_text):
                    self.trace.end_run(run_id, status="completed")
                    return response_text

                messages.append({"role": "assistant", "content": response_text})
                step += 1

            except Exception as e:
                self.trace.end_step(step_id, status="error", error=str(e))
                self.trace.end_run(run_id, status="failed", error=str(e))
                raise

        self.trace.end_run(run_id, status="max_steps")
        return "Max steps reached."

    def is_final_answer(self, text: str) -> bool:
        return "FINAL ANSWER:" in text

    def get_run_summary(self, run_id: str) -> dict:
        return {
            "trace": self.trace.get_run(run_id),
            "snapshots": self.snap.list_snapshots(run_id),
        }
Enter fullscreen mode Exit fullscreen mode

AgentSnap captures the full message history and tool outputs at each step. AgentTrace records timing and status. When something breaks, you replay the trace to see exactly what the agent saw at each point, without reproducing the issue from scratch.


Install and quick-start

pip install llm-cost-cap token-budget-py agent-deadline llm-fallback-router llm-retry-py tool-secret-scrubber agentsnap agenttrace
Enter fullscreen mode Exit fullscreen mode

All zero-dependency. All compatible with Python 3.9+.


Sibling libraries in the agent stack

Checklist item Library What it prevents
Budget caps llm-cost-cap, token-budget-py Runaway spend from loops or abuse
Deadline enforcement agent-deadline Hung agents, silent timeouts
Provider failover llm-fallback-router, llm-retry-py Outage-caused user-facing errors
Secret scrubbing tool-secret-scrubber Credential leaks via tool outputs
Observability agentsnap, agenttrace Undebuggable production failures

What is next

Three gaps in the current stack. First, startup health checks: verify the LLM provider is reachable and tools are accessible before accepting requests. Second, alerting integration: right now you log errors but do not get paged. An alert hook on budget cap hits or deadline exceeded events would close that. Third, per-tenant cost attribution: the current cap is per-run. Multi-tenant deployments need per-user budget tracking so one user's runaway spend does not affect others.

These five items are the minimum bar. Add them before launch. Layer in the rest as your deployment matures.

Top comments (0)