DEV Community

Jason Shotwell
Jason Shotwell

Posted on

Build a DVR for AI Agents: Episode Replay UI That Actually Works

Build a DVR for AI Agents: Episode Replay UI That Actually Works

Your AI agent made 147 decisions in 3.2 seconds, then failed — and you have no idea which decision broke everything.

The Problem: AI Agents Are Black Boxes Moving at Light Speed

Here's what happens when your CrewAI agent crashes at 2 AM:

# Your agent's final words
ERROR: Task execution failed after 147 LLM calls
Context: "something went wrong lol"
Traceback: <unintelligible garbage>
Enter fullscreen mode Exit fullscreen mode

You know it made 147 LLM calls. You know it failed. You don't know:

  • What it was "thinking" at step 73
  • Why it decided to call the wrong API at step 112
  • What context it had (or lost) when everything went sideways

Traditional logging gives you timestamps and error codes. What you need is a DVR that lets you scrub through your agent's entire "episode" — see every thought, decision, and API call in chronological order. With context.

This isn't just debugging. It's time travel for AI systems.

Architecture: How Agent DVR Actually Works

graph TB
    subgraph "AI Agent Runtime"
        A[CrewAI Agent] --> G[Airblackbox Gateway]
        G --> O[OpenAI API]
        G --> T[Telemetry Store]
    end

    subgraph "DVR Interface"
        T --> P[Timeline Processor]
        P --> UI[React Timeline UI]
        UI --> S[Scrub Controls]
        S --> C[Context Viewer]
    end

    subgraph "Episode Structure"
        E1[Episode Start]
        E2[LLM Call #1]
        E3[Tool Use #1]
        E4[LLM Call #2]
        E5[Context Update]
        E6[Decision Point]
        E7[Episode End/Error]
    end

    P --> E1
    E1 --> E2
    E2 --> E3
    E3 --> E4
    E4 --> E5
    E5 --> E6
    E6 --> E7
Enter fullscreen mode Exit fullscreen mode

The magic happens in three layers:

  1. Capture Layer: Airblackbox Gateway intercepts every LLM call and tool execution
  2. Timeline Layer: Processor converts raw telemetry into chronological episodes
  3. Replay Layer: UI lets you scrub through the timeline and inspect context at any point

Think of it as a flight recorder, but instead of altitude and airspeed, you're recording prompts, responses, and reasoning chains.

Implementation: Building the Agent DVR

Step 1: Set Up the Telemetry Gateway

First, we need to capture everything without breaking your existing agent:

# agent_dvr/gateway.py
import asyncio
import json
import time
from datetime import datetime
from typing import List, Dict, Any
from airblackbox import Gateway, TelemetryEvent

class AgentDVRGateway(Gateway):
    def __init__(self):
        super().__init__()
        self.episode_id = None
        self.episode_events = []

    async def start_episode(self, agent_name: str, task: str) -> str:
        """Start a new episode recording"""
        self.episode_id = f"{agent_name}_{int(time.time())}"
        self.episode_events = []

        start_event = TelemetryEvent(
            event_type="episode_start",
            timestamp=datetime.utcnow(),
            episode_id=self.episode_id,
            data={
                "agent_name": agent_name,
                "task": task,
                "start_time": time.time()
            }
        )
        await self.record_event(start_event)
        return self.episode_id

    async def record_llm_call(self, request: Dict, response: Dict) -> None:
        """Record LLM interaction with full context"""
        event = TelemetryEvent(
            event_type="llm_call",
            timestamp=datetime.utcnow(),
            episode_id=self.episode_id,
            data={
                "request": {
                    "model": request.get("model"),
                    "messages": request.get("messages", []),
                    "temperature": request.get("temperature"),
                    "max_tokens": request.get("max_tokens")
                },
                "response": {
                    "content": response.get("choices", [{}])[0].get("message", {}).get("content"),
                    "usage": response.get("usage", {}),
                    "finish_reason": response.get("choices", [{}])[0].get("finish_reason")
                },
                "context_size": len(json.dumps(request.get("messages", []))),
                "latency_ms": response.get("_airblackbox_latency", 0)
            }
        )
        await self.record_event(event)

    async def record_tool_use(self, tool_name: str, args: Dict, result: Any) -> None:
        """Record tool execution"""
        event = TelemetryEvent(
            event_type="tool_use",
            timestamp=datetime.utcnow(),
            episode_id=self.episode_id,
            data={
                "tool_name": tool_name,
                "arguments": args,
                "result": str(result)[:1000],  # Truncate long results
                "success": result is not None
            }
        )
        await self.record_event(event)
Enter fullscreen mode Exit fullscreen mode

Step 2: Create the Timeline Processor

Raw telemetry is chaos. We need to turn it into a coherent timeline:

# agent_dvr/timeline.py
from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class TimelineEvent:
    timestamp: float
    event_type: str
    data: Dict
    context_snapshot: Optional[Dict] = None
    reasoning_chain: Optional[List[str]] = None

class TimelineProcessor:
    def __init__(self):
        self.context_tracker = ContextTracker()

    async def process_episode(self, episode_id: str) -> List[TimelineEvent]:
        """Convert raw events into a scrrubbable timeline"""
        raw_events = await self.get_raw_events(episode_id)
        timeline = []

        for event in raw_events:
            processed_event = await self._process_event(event)
            timeline.append(processed_event)

            # Update context state
            self.context_tracker.update(processed_event)

        return timeline

    async def _process_event(self, raw_event) -> TimelineEvent:
        """Process a single event with context enrichment"""
        event_data = raw_event.data

        # Extract reasoning for LLM calls
        reasoning_chain = None
        if raw_event.event_type == "llm_call":
            reasoning_chain = self._extract_reasoning_chain(event_data)

        return TimelineEvent(
            timestamp=raw_event.timestamp.timestamp(),
            event_type=raw_event.event_type,
            data=event_data,
            context_snapshot=self.context_tracker.get_snapshot(),
            reasoning_chain=reasoning_chain
        )

    def _extract_reasoning_chain(self, llm_data: Dict) -> List[str]:
        """Extract step-by-step reasoning from LLM response"""
        response_content = llm_data.get("response", {}).get("content", "")

        # Look for common reasoning patterns
        reasoning_steps = []

        # Pattern 1: Numbered lists
        if "1." in response_content and "2." in response_content:
            lines = response_content.split('\n')
            for line in lines:
                if line.strip().startswith(('1.', '2.', '3.', '4.', '5.')):
                    reasoning_steps.append(line.strip())

        # Pattern 2: "First, Then, Finally" structure
        elif "first" in response_content.lower() and "then" in response_content.lower():
            # Simple heuristic - split on transition words
            transitions = ["first", "then", "next", "finally", "therefore"]
            current_step = ""

            for sentence in response_content.split('.'):
                sentence = sentence.strip()
                if any(trans in sentence.lower() for trans in transitions):
                    if current_step:
                        reasoning_steps.append(current_step)
                    current_step = sentence
                else:
                    current_step += f". {sentence}"

            if current_step:
                reasoning_steps.append(current_step)

        return reasoning_steps[:5]  # Keep it manageable

class ContextTracker:
    def __init__(self):
        self.current_context = {
            "memory": [],
            "active_tools": [],
            "conversation_history": [],
            "task_progress": {}
        }

    def update(self, event: TimelineEvent):
        """Update context state based on event"""
        if event.event_type == "llm_call":
            self.current_context["conversation_history"].append({
                "timestamp": event.timestamp,
                "messages": event.data.get("request", {}).get("messages", [])
            })

        elif event.event_type == "tool_use":
            self.current_context["active_tools"].append({
                "timestamp": event.timestamp,
                "tool": event.data.get("tool_name"),
                "success": event.data.get("success", False)
            })

    def get_snapshot(self) -> Dict:
        """Get current context snapshot"""
        return {
            "memory_items": len(self.current_context["memory"]),
            "conversation_length": len(self.current_context["conversation_history"]),
            "tools_used": len(self.current_context["active_tools"]),
            "last_activity": max(
                [h["timestamp"] for h in self.current_context["conversation_history"]] + [0]
            )
        }
Enter fullscreen mode Exit fullscreen mode

Step 3: Build the DVR Interface

Now for the fun part — a web interface that actually feels like a DVR:

# agent_dvr/server.py
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
import json
from typing import Dict, List

app = FastAPI()
timeline_processor = TimelineProcessor()

@app.get("/episodes")
async def list_episodes():
    """Get all recorded episodes"""
    episodes = await get_episodes_from_storage()
    return {"episodes": episodes}

@app.get("/episodes/{episode_id}/timeline")
async def get_episode_timeline(episode_id: str):
    """Get full timeline for scrubbing"""
    timeline = await timeline_processor.process_episode(episode_id)

    return {
        "episode_id": episode_id,
        "total_events": len(timeline),
        "duration_seconds": timeline[-1].timestamp - timeline[0].timestamp if timeline else 0,
        "timeline": [
            {
                "timestamp": event.timestamp,
                "event_type": event.event_type,
                "summary": _get_event_summary(event),
                "data": event.data,
                "context": event.context_snapshot,
                "reasoning": event.reasoning_chain
            }
            for event in timeline
        ]
    }

@app.get("/episodes/{episode_id}/context/{timestamp}")
async def get_context_at_time(episode_id: str, timestamp: float):
    """Get agent context at specific timestamp - for scrubbing"""
    timeline = await timeline_processor.process_episode(episode_id)

    # Find closest event at or before timestamp
    context_event = None
    for event in timeline:
        if event.timestamp <= timestamp:
            context_event = event
        else:
            break

    if context_event is None:
        return {"error": "No context available at timestamp"}

    return {
        "timestamp": timestamp,
        "closest_event": context_event.timestamp,
        "event_type": context_event.event_type,
        "context": context_event.context_snapshot,
        "reasoning": context_event.reasoning_chain,
        "data": context_event.data
    }

def _get_event_summary(event: TimelineEvent) -> str:
    """Generate human-readable event summary"""
    if event.event_type == "llm_call":
        model = event.data.get("request", {}).get("model", "unknown")
        tokens = event.data.get("response", {}).get("usage", {}).get("total_tokens", 0)
        return f"LLM call ({model}) - {tokens} tokens"

    elif event.event_type == "tool_use":
        tool = event.data.get("tool_name", "unknown")
        success = "" if event.data.get("success") else ""
        return f"Tool: {tool} {success}"

    elif event.event_type == "episode_start":
        agent = event.data.get("agent_name", "unknown")
        return f"Episode started - {agent}"

    return event.event_type
Enter fullscreen mode Exit fullscreen mode

Step 4: The Frontend DVR Controls

// agent_dvr/frontend/src/DVRPlayer.jsx
import React, { useState, useEffect, useCallback } from 'react';

export const DVRPlayer = ({ episodeId }) => {
  const [timeline, setTimeline] = useState([]);
  const [currentTime, setCurrentTime] = useState(0);
  const [isPlaying, setIsPlaying] = useState(false);
  const [playbackSpeed, setPlaybackSpeed] = useState(1);
  const [currentContext, setCurrentContext] = useState(null);

  useEffect(() => {
    fetchTimeline();
  }, [episodeId]);

  const fetchTimeline = async () => {
    const response = await fetch(`/episodes/${episodeId}/timeline`);
    const data = await response.json();
    setTimeline(data.timeline);
  };

  const scrubToTime = useCallback(async (timestamp) => {
    setCurrentTime(timestamp);

    // Fetch context at this timestamp
    const response = await fetch(`/episodes/${episodeId}/context/${timestamp}`);
    const context = await response.json();
    setCurrentContext(context);
  }, [episodeId]);

  const play = () => {
    setIsPlaying(true);
    // Implement playback logic
    const interval = setInterval(() => {
      setCurrentTime(prev => {
        const next = prev + (playbackSpeed * 100); // 100ms steps
        if (next >= timeline[timeline.length - 1]?.timestamp) {
          setIsPlaying(false);
          clearInterval(interval);
          return prev;
        }
        return next;
      });
    }, 100);
  };

  return (
    <div className="dvr-player">
      {/* Timeline scrubber */}
      <div className="timeline-scrubber">
        <input
          type="range"
          min={timeline[0]?.timestamp || 0}
          max={timeline[timeline.length - 1]?.timestamp || 100}
          value={currentTime}
          onChange={(e) => scrubToTime(parseFloat(e.target.value))}
          className="scrub-bar"
        />

        {/* Event markers */}
        <div className="event-markers">
          {timeline.map((event, idx) => (
            <div
              key={idx}
              className={`marker ${event.event_type}`}
              style={{
                left: `${((event.timestamp - timeline[0].timestamp) / 
                        (timeline[timeline.length - 1].timestamp - timeline[0].timestamp)) * 100}%`
              }}
              title={event.summary}
            />
          ))}
        </div>
      </div>

      {/* Playback controls */}
      <div className="controls">
        <button onClick={play} disabled={isPlaying}>▶️</button>
        <button onClick={() => setIsPlaying(false)}>⏸️</button>
        <select value={playbackSpeed} onChange={(e) => setPlaybackSpeed(Number(e.target.value))}>
          <option value={0.5}>0.5x</option>
          <option value={1}>1x</option>
          <option value={2}>2x</option>
          <option value={5}>5x</option>
        </select>
      </div>

      {/* Context viewer */}
      <div className="context-viewer">
        {currentContext && (
          <div>
            <h3>Context at {new Date(currentTime * 1000).toISOString()}</h3>
            <div className="context-data">
              <pre>{JSON.stringify(currentContext, null, 2)}</pre>
            </div>
          </div>
        )}
      </div>
    </div>
  );
};
Enter fullscreen mode Exit fullscreen mode

Pitfalls: What Will Break (And How to Fix It)

Memory Explosion

Problem: Recording every LLM call generates massive amounts of data.
Fix: Implement intelligent truncation and compression:

def compress_event_data(event_data: Dict) -> Dict:
    """Compress large payloads while preserving debuggability"""
    if "messages" in event_data:
        # Keep first/last messages, summarize middle
        messages = event_data["messages"]
        if len(messages) > 10:
            compressed = messages[:3] + [
                {"role": "system", "content": f"... {len(messages) - 6} messages omitted ..."}
            ] + messages[-3:]
            event_data["messages"] = compressed

    return event_data
Enter fullscreen mode Exit fullscreen mode

Timeline Synchronization

Problem: Events from different threads arrive out of order.
Fix: Use logical clocks instead of wall clock time:

class LogicalClock:
    def __init__(self):
        self.counter = 0
        self.lock = asyncio.Lock()

    async def tick(self) -> int:
        async with self.lock:
            self.counter += 1
            return self.counter
Enter fullscreen mode Exit fullscreen mode

Context State Drift

Problem: Context tracker gets out of sync with actual agent state.
Fix: Periodic context checkpoints and reconciliation:

async def checkpoint_context(self, agent_state: Dict):
    """Force context sync with ground truth"""
    self.current_context = agent_state.copy()
    await self.record_event(TelemetryEvent(
        event_type="context_checkpoint",
        data={"context": self.current_context}
    ))
Enter fullscreen mode Exit fullscreen mode

Measurement: How to Know It's Working

Your DVR is working when you can answer these questions in under 30 seconds:

  1. "What was the agent thinking right before it failed?"

    • Scrub to timestamp of error
    • Check reasoning chain of previous LLM call
    • Inspect context at that moment
  2. "Why did it choose Tool A instead of Tool B?"

    • Find the decision point in timeline
    • Look at reasoning chain
    • Check available context
  3. "When did it lose track of the original task?"

    • Scrub through conversation history
    • Watch task progress indicators
    • Find the divergence point

Test this with a deliberately buggy agent:

# Test with an agent that fails predictably
async def test_dvr_debugging():
    gateway = AgentDVRGateway()
    episode_id = await gateway.start_episode("test_agent", "count to 10")

    # Simulate agent that loses count at step 7
    for i in range(1, 12):  # Intentionally go past 10
        if i == 7:
            # Simulate context corruption
            await gateway.record_llm_call(
                {"messages": [{"role": "user", "content": "What number comes after 6?"}]},
                {"choices": [{"message": {"content": "Purple!"}}]}  # Wrong!
            )
        else:
            await gateway.record_llm_call(
                {"messages": [{"role": "user", "content": f"Say {i}"}]},
                {"choices": [{"message": {"content": str(i)}}]}
            )

    # Now debug: scrub to timestamp around step 7
    # You should see exactly when and why it went wrong
Enter fullscreen mode Exit fullscreen mode

Next Steps: Your Agent DVR in 10 Minutes

Want to see this in action? Clone the demo repo and get your first episode recording:

git clone https://github.com/airblackboxio/agent-dvr-demo
cd agent-dvr-demo
pip install -r requirements.txt

# Start recording
python examples/crewai_with_dvr.py
Enter fullscreen mode Exit fullscreen mode

The demo includes:

  • A CrewAI agent with intentional bugs
  • Full DVR interface with scrubbing
  • Sample episodes you can explore
  • Integration guide for your existing agents

Ready to stop debugging AI agents with print statements? Try the live demo and watch your agent's decisions in real-time.

Because the only thing worse than an AI agent that fails is an AI agent that fails mysteriously.


Mr. Bigglesworth builds Airblackbox — the flight recorder for autonomous AI agents. When your agent inevitably becomes sentient and starts making questionable life choices, at least you'll have the receipts.

Top comments (0)