Build a DVR for AI Agents: Episode Replay UI That Actually Works
Your AI agent made 147 decisions in 3.2 seconds, then failed — and you have no idea which decision broke everything.
The Problem: AI Agents Are Black Boxes Moving at Light Speed
Here's what happens when your CrewAI agent crashes at 2 AM:
# Your agent's final words
ERROR: Task execution failed after 147 LLM calls
Context: "something went wrong lol"
Traceback: <unintelligible garbage>
You know it made 147 LLM calls. You know it failed. You don't know:
- What it was "thinking" at step 73
- Why it decided to call the wrong API at step 112
- What context it had (or lost) when everything went sideways
Traditional logging gives you timestamps and error codes. What you need is a DVR that lets you scrub through your agent's entire "episode" — see every thought, decision, and API call in chronological order. With context.
This isn't just debugging. It's time travel for AI systems.
Architecture: How Agent DVR Actually Works
graph TB
subgraph "AI Agent Runtime"
A[CrewAI Agent] --> G[Airblackbox Gateway]
G --> O[OpenAI API]
G --> T[Telemetry Store]
end
subgraph "DVR Interface"
T --> P[Timeline Processor]
P --> UI[React Timeline UI]
UI --> S[Scrub Controls]
S --> C[Context Viewer]
end
subgraph "Episode Structure"
E1[Episode Start]
E2[LLM Call #1]
E3[Tool Use #1]
E4[LLM Call #2]
E5[Context Update]
E6[Decision Point]
E7[Episode End/Error]
end
P --> E1
E1 --> E2
E2 --> E3
E3 --> E4
E4 --> E5
E5 --> E6
E6 --> E7
The magic happens in three layers:
- Capture Layer: Airblackbox Gateway intercepts every LLM call and tool execution
- Timeline Layer: Processor converts raw telemetry into chronological episodes
- Replay Layer: UI lets you scrub through the timeline and inspect context at any point
Think of it as a flight recorder, but instead of altitude and airspeed, you're recording prompts, responses, and reasoning chains.
Implementation: Building the Agent DVR
Step 1: Set Up the Telemetry Gateway
First, we need to capture everything without breaking your existing agent:
# agent_dvr/gateway.py
import asyncio
import json
import time
from datetime import datetime
from typing import List, Dict, Any
from airblackbox import Gateway, TelemetryEvent
class AgentDVRGateway(Gateway):
def __init__(self):
super().__init__()
self.episode_id = None
self.episode_events = []
async def start_episode(self, agent_name: str, task: str) -> str:
"""Start a new episode recording"""
self.episode_id = f"{agent_name}_{int(time.time())}"
self.episode_events = []
start_event = TelemetryEvent(
event_type="episode_start",
timestamp=datetime.utcnow(),
episode_id=self.episode_id,
data={
"agent_name": agent_name,
"task": task,
"start_time": time.time()
}
)
await self.record_event(start_event)
return self.episode_id
async def record_llm_call(self, request: Dict, response: Dict) -> None:
"""Record LLM interaction with full context"""
event = TelemetryEvent(
event_type="llm_call",
timestamp=datetime.utcnow(),
episode_id=self.episode_id,
data={
"request": {
"model": request.get("model"),
"messages": request.get("messages", []),
"temperature": request.get("temperature"),
"max_tokens": request.get("max_tokens")
},
"response": {
"content": response.get("choices", [{}])[0].get("message", {}).get("content"),
"usage": response.get("usage", {}),
"finish_reason": response.get("choices", [{}])[0].get("finish_reason")
},
"context_size": len(json.dumps(request.get("messages", []))),
"latency_ms": response.get("_airblackbox_latency", 0)
}
)
await self.record_event(event)
async def record_tool_use(self, tool_name: str, args: Dict, result: Any) -> None:
"""Record tool execution"""
event = TelemetryEvent(
event_type="tool_use",
timestamp=datetime.utcnow(),
episode_id=self.episode_id,
data={
"tool_name": tool_name,
"arguments": args,
"result": str(result)[:1000], # Truncate long results
"success": result is not None
}
)
await self.record_event(event)
Step 2: Create the Timeline Processor
Raw telemetry is chaos. We need to turn it into a coherent timeline:
# agent_dvr/timeline.py
from dataclasses import dataclass
from typing import List, Dict, Optional
import json
@dataclass
class TimelineEvent:
timestamp: float
event_type: str
data: Dict
context_snapshot: Optional[Dict] = None
reasoning_chain: Optional[List[str]] = None
class TimelineProcessor:
def __init__(self):
self.context_tracker = ContextTracker()
async def process_episode(self, episode_id: str) -> List[TimelineEvent]:
"""Convert raw events into a scrrubbable timeline"""
raw_events = await self.get_raw_events(episode_id)
timeline = []
for event in raw_events:
processed_event = await self._process_event(event)
timeline.append(processed_event)
# Update context state
self.context_tracker.update(processed_event)
return timeline
async def _process_event(self, raw_event) -> TimelineEvent:
"""Process a single event with context enrichment"""
event_data = raw_event.data
# Extract reasoning for LLM calls
reasoning_chain = None
if raw_event.event_type == "llm_call":
reasoning_chain = self._extract_reasoning_chain(event_data)
return TimelineEvent(
timestamp=raw_event.timestamp.timestamp(),
event_type=raw_event.event_type,
data=event_data,
context_snapshot=self.context_tracker.get_snapshot(),
reasoning_chain=reasoning_chain
)
def _extract_reasoning_chain(self, llm_data: Dict) -> List[str]:
"""Extract step-by-step reasoning from LLM response"""
response_content = llm_data.get("response", {}).get("content", "")
# Look for common reasoning patterns
reasoning_steps = []
# Pattern 1: Numbered lists
if "1." in response_content and "2." in response_content:
lines = response_content.split('\n')
for line in lines:
if line.strip().startswith(('1.', '2.', '3.', '4.', '5.')):
reasoning_steps.append(line.strip())
# Pattern 2: "First, Then, Finally" structure
elif "first" in response_content.lower() and "then" in response_content.lower():
# Simple heuristic - split on transition words
transitions = ["first", "then", "next", "finally", "therefore"]
current_step = ""
for sentence in response_content.split('.'):
sentence = sentence.strip()
if any(trans in sentence.lower() for trans in transitions):
if current_step:
reasoning_steps.append(current_step)
current_step = sentence
else:
current_step += f". {sentence}"
if current_step:
reasoning_steps.append(current_step)
return reasoning_steps[:5] # Keep it manageable
class ContextTracker:
def __init__(self):
self.current_context = {
"memory": [],
"active_tools": [],
"conversation_history": [],
"task_progress": {}
}
def update(self, event: TimelineEvent):
"""Update context state based on event"""
if event.event_type == "llm_call":
self.current_context["conversation_history"].append({
"timestamp": event.timestamp,
"messages": event.data.get("request", {}).get("messages", [])
})
elif event.event_type == "tool_use":
self.current_context["active_tools"].append({
"timestamp": event.timestamp,
"tool": event.data.get("tool_name"),
"success": event.data.get("success", False)
})
def get_snapshot(self) -> Dict:
"""Get current context snapshot"""
return {
"memory_items": len(self.current_context["memory"]),
"conversation_length": len(self.current_context["conversation_history"]),
"tools_used": len(self.current_context["active_tools"]),
"last_activity": max(
[h["timestamp"] for h in self.current_context["conversation_history"]] + [0]
)
}
Step 3: Build the DVR Interface
Now for the fun part — a web interface that actually feels like a DVR:
# agent_dvr/server.py
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
import json
from typing import Dict, List
app = FastAPI()
timeline_processor = TimelineProcessor()
@app.get("/episodes")
async def list_episodes():
"""Get all recorded episodes"""
episodes = await get_episodes_from_storage()
return {"episodes": episodes}
@app.get("/episodes/{episode_id}/timeline")
async def get_episode_timeline(episode_id: str):
"""Get full timeline for scrubbing"""
timeline = await timeline_processor.process_episode(episode_id)
return {
"episode_id": episode_id,
"total_events": len(timeline),
"duration_seconds": timeline[-1].timestamp - timeline[0].timestamp if timeline else 0,
"timeline": [
{
"timestamp": event.timestamp,
"event_type": event.event_type,
"summary": _get_event_summary(event),
"data": event.data,
"context": event.context_snapshot,
"reasoning": event.reasoning_chain
}
for event in timeline
]
}
@app.get("/episodes/{episode_id}/context/{timestamp}")
async def get_context_at_time(episode_id: str, timestamp: float):
"""Get agent context at specific timestamp - for scrubbing"""
timeline = await timeline_processor.process_episode(episode_id)
# Find closest event at or before timestamp
context_event = None
for event in timeline:
if event.timestamp <= timestamp:
context_event = event
else:
break
if context_event is None:
return {"error": "No context available at timestamp"}
return {
"timestamp": timestamp,
"closest_event": context_event.timestamp,
"event_type": context_event.event_type,
"context": context_event.context_snapshot,
"reasoning": context_event.reasoning_chain,
"data": context_event.data
}
def _get_event_summary(event: TimelineEvent) -> str:
"""Generate human-readable event summary"""
if event.event_type == "llm_call":
model = event.data.get("request", {}).get("model", "unknown")
tokens = event.data.get("response", {}).get("usage", {}).get("total_tokens", 0)
return f"LLM call ({model}) - {tokens} tokens"
elif event.event_type == "tool_use":
tool = event.data.get("tool_name", "unknown")
success = "✅" if event.data.get("success") else "❌"
return f"Tool: {tool} {success}"
elif event.event_type == "episode_start":
agent = event.data.get("agent_name", "unknown")
return f"Episode started - {agent}"
return event.event_type
Step 4: The Frontend DVR Controls
// agent_dvr/frontend/src/DVRPlayer.jsx
import React, { useState, useEffect, useCallback } from 'react';
export const DVRPlayer = ({ episodeId }) => {
const [timeline, setTimeline] = useState([]);
const [currentTime, setCurrentTime] = useState(0);
const [isPlaying, setIsPlaying] = useState(false);
const [playbackSpeed, setPlaybackSpeed] = useState(1);
const [currentContext, setCurrentContext] = useState(null);
useEffect(() => {
fetchTimeline();
}, [episodeId]);
const fetchTimeline = async () => {
const response = await fetch(`/episodes/${episodeId}/timeline`);
const data = await response.json();
setTimeline(data.timeline);
};
const scrubToTime = useCallback(async (timestamp) => {
setCurrentTime(timestamp);
// Fetch context at this timestamp
const response = await fetch(`/episodes/${episodeId}/context/${timestamp}`);
const context = await response.json();
setCurrentContext(context);
}, [episodeId]);
const play = () => {
setIsPlaying(true);
// Implement playback logic
const interval = setInterval(() => {
setCurrentTime(prev => {
const next = prev + (playbackSpeed * 100); // 100ms steps
if (next >= timeline[timeline.length - 1]?.timestamp) {
setIsPlaying(false);
clearInterval(interval);
return prev;
}
return next;
});
}, 100);
};
return (
<div className="dvr-player">
{/* Timeline scrubber */}
<div className="timeline-scrubber">
<input
type="range"
min={timeline[0]?.timestamp || 0}
max={timeline[timeline.length - 1]?.timestamp || 100}
value={currentTime}
onChange={(e) => scrubToTime(parseFloat(e.target.value))}
className="scrub-bar"
/>
{/* Event markers */}
<div className="event-markers">
{timeline.map((event, idx) => (
<div
key={idx}
className={`marker ${event.event_type}`}
style={{
left: `${((event.timestamp - timeline[0].timestamp) /
(timeline[timeline.length - 1].timestamp - timeline[0].timestamp)) * 100}%`
}}
title={event.summary}
/>
))}
</div>
</div>
{/* Playback controls */}
<div className="controls">
<button onClick={play} disabled={isPlaying}>▶️</button>
<button onClick={() => setIsPlaying(false)}>⏸️</button>
<select value={playbackSpeed} onChange={(e) => setPlaybackSpeed(Number(e.target.value))}>
<option value={0.5}>0.5x</option>
<option value={1}>1x</option>
<option value={2}>2x</option>
<option value={5}>5x</option>
</select>
</div>
{/* Context viewer */}
<div className="context-viewer">
{currentContext && (
<div>
<h3>Context at {new Date(currentTime * 1000).toISOString()}</h3>
<div className="context-data">
<pre>{JSON.stringify(currentContext, null, 2)}</pre>
</div>
</div>
)}
</div>
</div>
);
};
Pitfalls: What Will Break (And How to Fix It)
Memory Explosion
Problem: Recording every LLM call generates massive amounts of data.
Fix: Implement intelligent truncation and compression:
def compress_event_data(event_data: Dict) -> Dict:
"""Compress large payloads while preserving debuggability"""
if "messages" in event_data:
# Keep first/last messages, summarize middle
messages = event_data["messages"]
if len(messages) > 10:
compressed = messages[:3] + [
{"role": "system", "content": f"... {len(messages) - 6} messages omitted ..."}
] + messages[-3:]
event_data["messages"] = compressed
return event_data
Timeline Synchronization
Problem: Events from different threads arrive out of order.
Fix: Use logical clocks instead of wall clock time:
class LogicalClock:
def __init__(self):
self.counter = 0
self.lock = asyncio.Lock()
async def tick(self) -> int:
async with self.lock:
self.counter += 1
return self.counter
Context State Drift
Problem: Context tracker gets out of sync with actual agent state.
Fix: Periodic context checkpoints and reconciliation:
async def checkpoint_context(self, agent_state: Dict):
"""Force context sync with ground truth"""
self.current_context = agent_state.copy()
await self.record_event(TelemetryEvent(
event_type="context_checkpoint",
data={"context": self.current_context}
))
Measurement: How to Know It's Working
Your DVR is working when you can answer these questions in under 30 seconds:
-
"What was the agent thinking right before it failed?"
- Scrub to timestamp of error
- Check reasoning chain of previous LLM call
- Inspect context at that moment
-
"Why did it choose Tool A instead of Tool B?"
- Find the decision point in timeline
- Look at reasoning chain
- Check available context
-
"When did it lose track of the original task?"
- Scrub through conversation history
- Watch task progress indicators
- Find the divergence point
Test this with a deliberately buggy agent:
# Test with an agent that fails predictably
async def test_dvr_debugging():
gateway = AgentDVRGateway()
episode_id = await gateway.start_episode("test_agent", "count to 10")
# Simulate agent that loses count at step 7
for i in range(1, 12): # Intentionally go past 10
if i == 7:
# Simulate context corruption
await gateway.record_llm_call(
{"messages": [{"role": "user", "content": "What number comes after 6?"}]},
{"choices": [{"message": {"content": "Purple!"}}]} # Wrong!
)
else:
await gateway.record_llm_call(
{"messages": [{"role": "user", "content": f"Say {i}"}]},
{"choices": [{"message": {"content": str(i)}}]}
)
# Now debug: scrub to timestamp around step 7
# You should see exactly when and why it went wrong
Next Steps: Your Agent DVR in 10 Minutes
Want to see this in action? Clone the demo repo and get your first episode recording:
git clone https://github.com/airblackboxio/agent-dvr-demo
cd agent-dvr-demo
pip install -r requirements.txt
# Start recording
python examples/crewai_with_dvr.py
The demo includes:
- A CrewAI agent with intentional bugs
- Full DVR interface with scrubbing
- Sample episodes you can explore
- Integration guide for your existing agents
Ready to stop debugging AI agents with print statements? Try the live demo and watch your agent's decisions in real-time.
Because the only thing worse than an AI agent that fails is an AI agent that fails mysteriously.
Mr. Bigglesworth builds Airblackbox — the flight recorder for autonomous AI agents. When your agent inevitably becomes sentient and starts making questionable life choices, at least you'll have the receipts.
Top comments (0)