- Book: AI Agents Pocket Guide: Patterns for Building Autonomous Systems with LLMs
- Also by me: Thinking in Go (2-book series) — Complete Guide to Go Programming + Hexagonal Architecture in Go
- My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools
- Me: xgabriel.com | GitHub
A user reports that the agent did the wrong thing on Tuesday afternoon. You open the trace, you read the spans, and the offending step jumps out: the model called refund_order instead of cancel_subscription. You have the prompt, the tool schemas, the model id, the temperature. You run the same prompt against the same model and get back a different tool call. The bug is gone, and so is your chance of fixing it.
This is the part of agent development the test-mocking docs do not cover. "Just mock the LLM" works for unit tests where you wrote both the prompt and the canned response. It does not work for a production trace where you want to rerun exactly what happened: the same model output, the same tool result, the same retry timing. Then step through it. Replay-from-trace is the pattern, and it needs more plumbing than mocks.
What follows is the harness shape: what to capture in record mode, what to fake in replay mode, where the boundary between the two lives, and how the OpenTelemetry trace id ties it all together. The runnable code is Python because that is where most of these agent loops live; the structure ports cleanly to Go or TypeScript.
Why "rerun the bug" is harder than it sounds
Three sources of nondeterminism stack up in a typical agent run, and any one of them is enough to turn a 30-step trace into something that diverges on step 4.
The model itself. Even at temperature=0, hosted LLMs are not bit-identical across calls. Anthropic's Messages API docs describe sampling parameters, and provider-side batching effects are widely reported in practitioner write-ups. Same prompt today, slightly different tool call next week. The output is close enough to look the same in spot checks and different enough to break a replay.
Tool I/O. Your agent calls get_weather, and the weather API returns different data every minute. It calls search_db, and the database has new rows since the original run. It calls now() and the clock has moved on by three days. The tool layer is where most divergence happens, and it is where most teams have the least telemetry.
Retries and backoff. The first call to the model 429'd, the SDK waited 1.3 seconds, the retry succeeded, the next tool call queued behind a circuit breaker that has since closed. The trace shows the path that ran. None of those timings reproduce on a fresh run unless you make them.
Replay has to fake all three. That is what record mode captures and replay mode plays back.
Record mode: what the trace actually needs
A useful capture has six categories of side-effect, every one keyed to a step index inside the agent loop. Skip any of them and the replay drifts.
- Model calls. Prompt sent, full response object, model id, sampling params, latency, retry count.
- Tool calls. Tool name, input args (after any client-side serialization), tool result text, error if any, execution latency.
-
Time. Every
time.time()/datetime.utcnow()the agent reads. The cleanest path is to wrap the clock and only let the agent read through that wrapper. -
RNG. Every
random.random()/secrets.token_hex()the agent uses for sampling, jitter, request ids. Same wrapping rule. - External reads. Vector search results, DB queries, HTTP fetches that are not tool calls (e.g. fetching a system prompt template at startup).
- Retries. Each retry attempt with its delay so the replay can reproduce the same backoff sequence the original run hit.
The capture lives next to the OpenTelemetry trace. The OTel span carries timing and structure; the capture file carries the bytes. One trace id ties them together, and the replay harness reads both.
import json
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from opentelemetry import trace
tracer = trace.get_tracer("agent")
@dataclass
class Capture:
trace_id: str
steps: list = field(default_factory=list)
def append(self, kind: str, payload: dict):
self.steps.append({
"i": len(self.steps),
"kind": kind,
"payload": payload,
})
def dump(self, path: str):
with open(path, "w") as f:
for step in self.steps:
f.write(json.dumps(step) + "\n")
A capture is an ordered list of step rows. Each row gets an integer index that matches the corresponding span attribute on the OTel side, so a span at index 7 in the trace lines up with row 7 in the capture file. The shape is dumb on purpose: JSON Lines, append-only, no foreign keys. Replay reads it sequentially.
Wrapping the boundaries
The harness fakes time, RNG, model, and tools. Each boundary gets a thin wrapper with two implementations: a record one that calls through to the real thing and writes to the capture, and a replay one that reads the next row off the capture.
Time first, because it is the smallest:
class Clock:
def __init__(self, mode, capture):
self.mode = mode
self.capture = capture
self.cursor = 0
def now(self) -> float:
if self.mode == "record":
t = time.time()
self.capture.append("now", {"t": t})
return t
# replay
row = self._next("now")
return row["payload"]["t"]
def _next(self, kind):
while self.cursor < len(self.capture.steps):
row = self.capture.steps[self.cursor]
self.cursor += 1
if row["kind"] == kind:
return row
raise RuntimeError(f"capture exhausted on {kind}")
The agent never calls time.time() directly. It calls clock.now(). In record mode the wrapper writes the real wall-clock value into the capture; in replay mode it reads the recorded value out. The cursor walks forward through the capture rows, skipping rows of other kinds (model calls, tool calls) that interleave between the time reads.
RNG follows the same shape. random.Random(seed) is sufficient for most agents (the seed itself is the captured value), but if the agent uses secrets for cryptographic-grade ids, you wrap that the same way.
import random as _random
class Rng:
def __init__(self, mode, capture, seed=None):
self.mode = mode
self.capture = capture
if mode == "record":
self.seed = seed or int(time.time() * 1e6)
self.r = _random.Random(self.seed)
capture.append("rng_seed", {"seed": self.seed})
else:
row = self._find("rng_seed")
self.r = _random.Random(row["payload"]["seed"])
def random(self) -> float:
return self.r.random()
def _find(self, kind):
for row in self.capture.steps:
if row["kind"] == kind:
return row
raise RuntimeError("no rng_seed in capture")
A single seed is enough because random.Random is itself deterministic: same seed, same sequence. You only need to capture the seed once at the start of the run, not every individual .random() call.
Faking the model
This is the boundary teams underbuild. The naive version is "save the response text, replay it as a string." The real version has to handle streaming chunks, tool-call arrays, stop reasons, and the assistant message structure the SDK expects on the next turn.
import anthropic
class ModelClient:
def __init__(self, mode, capture):
self.mode = mode
self.capture = capture
self.real = anthropic.Anthropic()
self.cursor = 0
def call(self, **kwargs):
if self.mode == "record":
with tracer.start_as_current_span(
"model.call"
) as span:
span.set_attribute(
"model", kwargs["model"]
)
resp = self.real.messages.create(
**kwargs
)
payload = {
"kwargs": _strip_unhashable(kwargs),
"response": resp.model_dump(),
}
self.capture.append("model", payload)
return resp
# replay
row = self._next("model")
return _rehydrate(row["payload"]["response"])
def _next(self, kind):
while self.cursor < len(self.capture.steps):
row = self.capture.steps[self.cursor]
self.cursor += 1
if row["kind"] == kind:
return row
raise RuntimeError("capture exhausted on model")
def _strip_unhashable(kwargs):
# remove things like httpx clients, custom retry
# objects, anything that won't json-serialise
safe = {}
for k, v in kwargs.items():
try:
json.dumps(v)
safe[k] = v
except TypeError:
continue
return safe
def _rehydrate(payload):
return anthropic.types.Message.model_validate(
payload
)
A few details land. The capture stores the request kwargs and the response, because in replay mode you usually want a divergence check. If the prompt the agent sends today does not match the prompt the original run sent, you want to know before the response gets handed back. The harness can compare the two and either fail loudly or log a divergence span.
Second, model_dump() on the SDK response gives you a JSON-serializable dict; model_validate puts it back.
Third, streaming changes the shape. For streaming calls you capture the assembled final message. The chunks only matter if you are specifically testing the streaming UX path.
For tool calls the same pattern works, with one extra concern: tool functions are user code, and user code has its own side effects. The harness only fakes the call boundary, not the function bodies. In replay mode the function never runs.
class ToolDispatcher:
def __init__(self, mode, capture, tools):
self.mode = mode
self.capture = capture
self.tools = tools
self.cursor = 0
def dispatch(self, name, args):
if self.mode == "record":
t0 = time.time()
try:
result = self.tools[name](**args)
err = None
except Exception as e:
result = None
err = str(e)
payload = {
"name": name,
"args": args,
"result": result,
"err": err,
"latency_ms": (time.time() - t0) * 1000,
}
self.capture.append("tool", payload)
if err:
raise RuntimeError(err)
return result
# replay
row = self._next("tool")
if row["payload"]["name"] != name:
raise RuntimeError(
f"tool divergence: expected "
f"{row['payload']['name']}, got {name}"
)
if row["payload"]["err"]:
raise RuntimeError(row["payload"]["err"])
return row["payload"]["result"]
def _next(self, kind):
while self.cursor < len(self.capture.steps):
row = self.capture.steps[self.cursor]
self.cursor += 1
if row["kind"] == kind:
return row
raise RuntimeError("capture exhausted on tool")
The divergence check on tool name is the early-warning system. If the original run called refund_order at step 12 and today's replay tries to call cancel_subscription, the harness fails fast at the boundary instead of silently feeding a wrong tool result into the model.
The OpenTelemetry tie-in
The capture file is dead bytes without a way to find it. The OTel trace id is that key.
In record mode, every span carries a replay.capture_uri attribute pointing at the file or object-store path. The agent loop sets it once at the root span:
def run_agent(user_input):
capture = Capture(trace_id=str(uuid.uuid4()))
with tracer.start_as_current_span(
"agent.run"
) as span:
span.set_attribute(
"trace.id", capture.trace_id
)
path = f"captures/{capture.trace_id}.jsonl"
span.set_attribute("replay.capture_uri", path)
clock = Clock("record", capture)
rng = Rng("record", capture)
model = ModelClient("record", capture)
tools = ToolDispatcher(
"record", capture, TOOL_REGISTRY
)
try:
return _agent_loop(
user_input, clock, rng, model, tools
)
finally:
capture.dump(path)
Replay reads the same trace id from your observability tool, fetches the capture, and runs the same loop in replay mode:
def replay_agent(trace_id, user_input):
path = f"captures/{trace_id}.jsonl"
capture = Capture(trace_id=trace_id)
with open(path) as f:
for line in f:
capture.steps.append(json.loads(line))
clock = Clock("replay", capture)
rng = Rng("replay", capture)
model = ModelClient("replay", capture)
tools = ToolDispatcher(
"replay", capture, TOOL_REGISTRY
)
return _agent_loop(
user_input, clock, rng, model, tools
)
The agent loop body is identical between record and replay. That is the whole point. No if mode == "replay" branching inside the agent code; only at the boundaries where time, randomness, and external calls happen.
For sampling, set the OTel sampler to head-sample at 100% during the canary phase, then drop to a tail-based sampler that keeps every error trace plus a small percentage of healthy traces. Your replay corpus is the captured traces from the tail-based pool; the cost is bounded because you only persist the captures for traces you actually keep.
The boundary between record and replay
A clean split has three rules.
Replay never calls the network. If your model.call or tools.dispatch method invokes the real SDK in replay mode, you have a bug. The whole harness is built on the property that replay reads from disk, period.
Replay does not catch exceptions inside the agent loop differently. If the original run raised RetryError at step 8, the replay raises RetryError at step 8: same type, same message. The capture stores the exception class name and message; the replay reraises with the same text.
Replay does not assert "the agent did the right thing." It asserts "the agent did the same thing." Correctness checks are a separate layer on top of replay. You replay to reproduce a bug, then you fix the bug, then you re-record and your test suite asserts that the new behaviour is correct.
A useful divergence model: every read from the capture in replay mode is a candidate for a mismatch warning. If the agent reads clock.now() at step 3 and the captured value is 1714000000.0, that is fine. If the agent calls model.call(model="claude-sonnet-B") and the captured call had model="claude-sonnet-A", the harness logs a span with replay.divergence=true and you know the agent code has changed since the trace was captured.
What this catches that mocks do not
Real bugs replay finds that handwritten test mocks do not.
The retry-storm bug. The original run hit a 429 on the third tool call, the SDK retried five times with exponential backoff, the agent burned its action budget on retries, and the user saw a timeout. A mocked test would not include the retry timings. A replay does. Every retry attempt is a captured row, and the bounded-loop check fires on the same step it fired in production.
The clock-dependent bug. The agent decides what to do based on the day-of-week or the time-of-day. The bug only manifests on Friday afternoons. A handwritten test gets the time wrong; a replay reads the captured timestamp and the bug reproduces every time you run the harness.
The tool-result-shape bug. The third-party API returned a slightly different JSON shape than usual (extra field, missing field, string where an int was expected), and the agent's downstream parser broke. The captured tool result has the exact bytes the API returned that day. The replay feeds those bytes back, and the parser breaks in the same place.
What to do with this on Monday
Pick one agent loop in your codebase. Add a Clock wrapper, an Rng wrapper, a ModelClient shim, and a ToolDispatcher shim. Keep them as four small classes with a mode flag. Persist captures keyed by trace id to whatever object store you already use for logs.
Run the next 100 production traces in record mode. Take the five most interesting ones (longest, most expensive, the one that errored) and replay them locally. The first thing you will notice is how many spans your existing observability misses: the clock reads, the RNG calls, the retries hidden inside SDK clients. Capture them, replay them, and your bug-reproduction story stops depending on whether the model returns the same tokens twice in a row.
If this was useful
The AI Agents Pocket Guide walks through the boundaries an agent loop has to manage in production (tool dispatch, retry policy, memory layers, action budgets) with the same record/replay discipline applied across all of them. Replay is one slice of the problem; the book covers the rest of the loop the harness above sits inside.

Top comments (0)