DEV Community

Jason Shotwell
Jason Shotwell

Posted on

Build a DVR for AI Agents: Episode Replay UI That Actually Works

Build a DVR for AI Agents: Episode Replay UI That Actually Works

Your AI agent made 47 LLM calls, spent $3.20 on tokens, and still couldn't book a restaurant reservation — but you have no idea which call broke the logic chain.

The Problem: AI Agent Debugging is Still Cave Painting

Here's what happens when your CrewAI agents go sideways at 2 AM:

You get a stack trace that ends with openai.BadRequestError and a chat log that looks like this:

Agent: I need to book a table
LLM: I'll help you book a table
Agent: Great, book it for 2 people
LLM: I don't have access to restaurant booking
Agent: But you just said you'd help
LLM: I apologize for the confusion
Agent: *spends $2.80 in circles*
Enter fullscreen mode Exit fullscreen mode

The current debugging experience is archaeological. You dig through logs, grep for error patterns, and try to reverse-engineer what your agent was "thinking" at step 23 of a 47-step conversation. It's like debugging a distributed system where every node has amnesia.

What you need is a DVR for AI agents. Something that records every decision, lets you scrub through the timeline, and replay exactly what happened at each step. Not just the final output — the entire cognitive workflow.

Architecture: Agent DVR with Timeline Scrubbing

Here's how we build a proper episode replay system:

graph TB
    subgraph "AI Agent Runtime"
        A[CrewAI Agent] --> B[LangChain Tools]
        B --> C[OpenAI API]
        C --> D[Tool Results]
        D --> A
    end

    subgraph "Airblackbox Recording Layer"
        E[Gateway Proxy] --> F[Episode Recorder]
        F --> G[Timeline Store]
        G --> H[Event Index]
    end

    subgraph "DVR Interface"
        I[Timeline Scrubber] --> J[Step Inspector]
        J --> K[Decision Tree Viewer]
        K --> L[Token Cost Tracker]
    end

    C --> E
    G --> I

    subgraph "Debugging Features"
        M[Replay Engine] --> N[State Diff Viewer]
        N --> O[Breakpoint System]
    end

    I --> M
Enter fullscreen mode Exit fullscreen mode

The key insight: treat each agent conversation as a recorded "episode" with frame-by-frame navigation. Every LLM call becomes a timeline event with full context — prompt, response, reasoning, tool calls, and state changes.

Implementation: Building the Agent DVR

Step 1: Episode Recording Infrastructure

First, we need to capture every agent decision with full context:

from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
import json
import sqlite3
from pathlib import Path

@dataclass
class AgentEvent:
    timestamp: str
    event_type: str  # 'llm_call', 'tool_call', 'decision', 'error'
    agent_name: str
    step_number: int
    content: Dict[str, Any]
    tokens_used: int
    cost_cents: int
    duration_ms: int
    parent_event_id: Optional[str] = None
    event_id: Optional[str] = None

class EpisodeRecorder:
    def __init__(self, db_path: str = "./agent_episodes.db"):
        self.db_path = Path(db_path)
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS episodes (
                episode_id TEXT PRIMARY KEY,
                agent_name TEXT,
                start_time TEXT,
                end_time TEXT,
                total_tokens INTEGER DEFAULT 0,
                total_cost_cents INTEGER DEFAULT 0,
                status TEXT DEFAULT 'running'
            )
        """)

        conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                event_id TEXT PRIMARY KEY,
                episode_id TEXT,
                timestamp TEXT,
                event_type TEXT,
                agent_name TEXT,
                step_number INTEGER,
                content TEXT,
                tokens_used INTEGER,
                cost_cents INTEGER,
                duration_ms INTEGER,
                parent_event_id TEXT,
                FOREIGN KEY (episode_id) REFERENCES episodes (episode_id)
            )
        """)

        conn.execute("CREATE INDEX IF NOT EXISTS idx_episode_events ON events(episode_id, step_number)")
        conn.commit()
        conn.close()

    def start_episode(self, agent_name: str) -> str:
        episode_id = f"{agent_name}_{datetime.now(timezone.utc).isoformat()}"
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "INSERT INTO episodes (episode_id, agent_name, start_time) VALUES (?, ?, ?)",
            (episode_id, agent_name, datetime.now(timezone.utc).isoformat())
        )
        conn.commit()
        conn.close()
        return episode_id

    def record_event(self, episode_id: str, event: AgentEvent) -> str:
        event_id = f"{episode_id}_{event.step_number}_{event.event_type}"
        event.event_id = event_id

        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            INSERT INTO events 
            (event_id, episode_id, timestamp, event_type, agent_name, step_number, 
             content, tokens_used, cost_cents, duration_ms, parent_event_id)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            event_id, episode_id, event.timestamp, event.event_type,
            event.agent_name, event.step_number, json.dumps(event.content),
            event.tokens_used, event.cost_cents, event.duration_ms,
            event.parent_event_id
        ))
        conn.commit()
        conn.close()
        return event_id
Enter fullscreen mode Exit fullscreen mode

Step 2: LangChain Integration with Recording

Now we instrument a LangChain agent to record every decision:

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.tools import Tool
import time
from typing import Dict, Any

class RecordingAgent:
    def __init__(self, recorder: EpisodeRecorder, tools: List[Tool]):
        self.recorder = recorder
        self.episode_id = None
        self.step_counter = 0

        # Create LangChain agent
        self.llm = ChatOpenAI(temperature=0)
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful assistant that uses tools to complete tasks."),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}")
        ])

        agent = create_openai_functions_agent(self.llm, tools, prompt)
        self.executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

        # Hook into the execution flow
        self._monkey_patch_executor()

    def _monkey_patch_executor(self):
        original_plan = self.executor.agent.plan
        original_llm_call = self.llm._generate

        def recording_plan(intermediate_steps, callbacks, **kwargs):
            start_time = time.time()
            self.step_counter += 1

            try:
                result = original_plan(intermediate_steps, callbacks, **kwargs)

                # Record the planning step
                self.recorder.record_event(self.episode_id, AgentEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    event_type="agent_plan",
                    agent_name="langchain_agent",
                    step_number=self.step_counter,
                    content={
                        "input": kwargs.get("input", ""),
                        "intermediate_steps": [str(step) for step in intermediate_steps],
                        "plan": str(result)
                    },
                    tokens_used=0,  # Filled by LLM hook
                    cost_cents=0,   # Filled by LLM hook
                    duration_ms=int((time.time() - start_time) * 1000)
                ))

                return result
            except Exception as e:
                self.recorder.record_event(self.episode_id, AgentEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    event_type="error",
                    agent_name="langchain_agent",
                    step_number=self.step_counter,
                    content={"error": str(e), "type": type(e).__name__},
                    tokens_used=0,
                    cost_cents=0,
                    duration_ms=int((time.time() - start_time) * 1000)
                ))
                raise

        self.executor.agent.plan = recording_plan

    def run_episode(self, task: str) -> str:
        self.episode_id = self.recorder.start_episode("langchain_agent")

        try:
            result = self.executor.invoke({"input": task})

            # Mark episode as completed
            conn = sqlite3.connect(self.recorder.db_path)
            conn.execute(
                "UPDATE episodes SET end_time = ?, status = ? WHERE episode_id = ?",
                (datetime.now(timezone.utc).isoformat(), "completed", self.episode_id)
            )
            conn.commit()
            conn.close()

            return result

        except Exception as e:
            # Mark episode as failed
            conn = sqlite3.connect(self.recorder.db_path)
            conn.execute(
                "UPDATE episodes SET end_time = ?, status = ? WHERE episode_id = ?",
                (datetime.now(timezone.utc).isoformat(), "failed", self.episode_id)
            )
            conn.commit()
            conn.close()
            raise
Enter fullscreen mode Exit fullscreen mode

Step 3: Timeline Scrubber Interface

Now for the DVR interface — a web UI that lets you scrub through agent decisions:

from flask import Flask, render_template, jsonify, request
import sqlite3
import json
from typing import List, Dict

class AgentDVR:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.app = Flask(__name__)
        self._setup_routes()

    def _setup_routes(self):
        @self.app.route('/')
        def index():
            return render_template('dvr.html')

        @self.app.route('/api/episodes')
        def get_episodes():
            conn = sqlite3.connect(self.db_path)
            cursor = conn.execute("""
                SELECT episode_id, agent_name, start_time, end_time, 
                       total_tokens, total_cost_cents, status,
                       COUNT(events.event_id) as event_count
                FROM episodes 
                LEFT JOIN events ON episodes.episode_id = events.episode_id
                GROUP BY episode_id
                ORDER BY start_time DESC
                LIMIT 50
            """)

            episodes = []
            for row in cursor.fetchall():
                episodes.append({
                    'episode_id': row[0],
                    'agent_name': row[1],
                    'start_time': row[2],
                    'end_time': row[3],
                    'total_tokens': row[4],
                    'total_cost_cents': row[5],
                    'status': row[6],
                    'event_count': row[7]
                })

            conn.close()
            return jsonify(episodes)

        @self.app.route('/api/episodes/<episode_id>/timeline')
        def get_timeline(episode_id):
            conn = sqlite3.connect(self.db_path)
            cursor = conn.execute("""
                SELECT event_id, timestamp, event_type, step_number, content,
                       tokens_used, cost_cents, duration_ms
                FROM events 
                WHERE episode_id = ?
                ORDER BY step_number
            """, (episode_id,))

            timeline = []
            for row in cursor.fetchall():
                timeline.append({
                    'event_id': row[0],
                    'timestamp': row[1],
                    'event_type': row[2],
                    'step_number': row[3],
                    'content': json.loads(row[4]),
                    'tokens_used': row[5],
                    'cost_cents': row[6],
                    'duration_ms': row[7]
                })

            conn.close()
            return jsonify(timeline)

        @self.app.route('/api/episodes/<episode_id>/step/<int:step>')
        def get_step_detail(episode_id, step):
            conn = sqlite3.connect(self.db_path)
            cursor = conn.execute("""
                SELECT * FROM events 
                WHERE episode_id = ? AND step_number = ?
            """, (episode_id, step))

            row = cursor.fetchone()
            if not row:
                return jsonify({'error': 'Step not found'}), 404

            step_detail = {
                'event_id': row[0],
                'timestamp': row[2],
                'event_type': row[3],
                'content': json.loads(row[6]),
                'tokens_used': row[7],
                'cost_cents': row[8],
                'duration_ms': row[9]
            }

            conn.close()
            return jsonify(step_detail)

    def run(self, debug=True, port=5000):
        self.app.run(debug=debug, port=port)
Enter fullscreen mode Exit fullscreen mode

Step 4: Frontend Timeline Scrubber

Here's the HTML interface with timeline scrubbing:

<!DOCTYPE html>
<html>
<head>
    <title>Agent DVR - Episode Replay</title>
    <script src="https://unpkg.com/react@18/umd/react.development.js"></script>
    <script src="https://unpkg.com/react-dom@18/umd/react-dom.development.js"></script>
    <script src="https://unpkg.com/@babel/standalone/babel.min.js"></script>
    <style>
        .timeline-container { margin: 20px 0; }
        .timeline-scrubber { 
            width: 100%; 
            height: 40px; 
            background: #f0f0f0; 
            position: relative;
            cursor: pointer;
        }
        .timeline-event {
            position: absolute;
            height: 100%;
            border-left: 2px solid;
            cursor: pointer;
        }
        .event-llm { border-color: #007acc; }
        .event-tool { border-color: #ff6b35; }
        .event-error { border-color: #ff1744; }
        .step-detail {
            background: #f9f9f9;
            padding: 20px;
            margin: 10px 0;
            border-radius: 4px;
        }
    </style>
</head>
<body>
    <div id="dvr-root"></div>

    <script type="text/babel">
        const AgentDVR = () => {
            const [episodes, setEpisodes] = React.useState([]);
            const [selectedEpisode, setSelectedEpisode] = React.useState(null);
            const [timeline, setTimeline] = React.useState([]);
            const [currentStep, setCurrentStep] = React.useState(0);
            const [stepDetail, setStepDetail] = React.useState(null);

            React.useEffect(() => {
                fetch('/api/episodes')
                    .then(r => r.json())
                    .then(setEpisodes);
            }, []);

            const loadEpisode = async (episodeId) => {
                const timelineRes = await fetch(`/api/episodes/${episodeId}/timeline`);
                const timelineData = await timelineRes.json();

                setSelectedEpisode(episodeId);
                setTimeline(timelineData);
                setCurrentStep(0);
                loadStepDetail(episodeId, 0);
            };

            const loadStepDetail = async (episodeId, step) => {
                if (timeline[step]) {
                    const detailRes = await fetch(`/api/episodes/${episodeId}/step/${step + 1}`);
                    const detail = await detailRes.json();
                    setStepDetail(detail);
                    setCurrentStep(step);
                }
            };

            const TimelineScrubber = ({ timeline, onStepSelect }) => {
                const handleClick = (e) => {
                    const rect = e.target.getBoundingClientRect();
                    const x = e.clientX - rect.left;
                    const step = Math.floor((x / rect.width) * timeline.length);
                    onStepSelect(Math.max(0, Math.min(step, timeline.length - 1)));
                };

                return (
                    <div className="timeline-container">
                        <div className="timeline-scrubber" onClick={handleClick}>
                            {timeline.map((event, i) => (
                                <div
                                    key={event.event_id}
                                    className={`timeline-event event-${event.event_type}`}
                                    style={{
                                        left: `${(i / timeline.length) * 100}%`,
                                        width: `${100 / timeline.length}%`
                                    }}
                                    title={`Step ${i + 1}: ${event.event_type}`}
                                />
                            ))}
                        </div>
                        <div>Step {currentStep + 1} of {timeline.length}</div>
                    </div>
                );
            };

            return (
                <div style={{ padding: '20px' }}>
                    <h1>Agent DVR - Episode Replay</h1>

                    <div>
                        <h2>Episodes</h2>
                        {episodes.map(ep => (
                            <div key={ep.episode_id} style={{ margin: '10px 0', padding: '10px', border: '1px solid #ddd' }}>
                                <button onClick={() => loadEpisode(ep.episode_id)}>
                                    {ep.agent_name} - {ep.start_time} ({ep.event_count} steps)
                                </button>
                                <span style={{ marginLeft: '10px', color: '#666' }}>
                                    ${(ep.total_cost_cents / 100).toFixed(2)} | {ep.total_tokens} tokens
                                </span>
                            </div>
                        ))}
                    </div>

                    {selectedEpisode && (
                        <div>
                            <h2>Timeline Scrubber</h2>
                            <TimelineScrubber 
                                timeline={timeline} 
                                onStepSelect={(step) => loadStepDetail(selectedEpisode, step)}
                            />

                            {stepDetail && (
                                <div className="step-detail">
                                    <h3>Step {currentStep + 1}: {stepDetail.event_type}</h3>
                                    <p><strong>Duration:</strong> {stepDetail.duration_ms}ms</p>
                                    <p><strong>Tokens:</strong> {stepDetail.tokens_used}</p>
                                    <p><strong>Cost:</strong> ${(stepDetail.cost_cents / 100).toFixed(3)}</p>
                                    <pre>{JSON.stringify(stepDetail.content, null, 2)}</pre>
                                </div>
                            )}
                        </div>
                    )}
                </div>
            );
        };

        ReactDOM.render(<AgentDVR />, document.getElementById('dvr-root'));
    </script>
</body>
</html>
Enter fullscreen mode Exit fullscreen mode

Pitfalls: What Breaks and How to Handle It

1. Database Lock Contention

Recording every agent step means frequent database writes. SQLite will lock up under concurrent agent runs.

Solution: Use connection pooling and batch writes:

import queue
import threading
from contextlib import contextmanager

class BatchingRecorder(EpisodeRecorder):
    def __init__(self, *args, batch_size=100, flush_interval=5):
        super().__init__(*args)
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.event_queue = queue.Queue()
        self.batch_thread = threading.Thread(target=self._batch_processor, daemon=True)
        self.batch_thread.start()

    def _batch_processor(self):
        batch = []
        while True:
            try:
                event = self.event_queue.get(timeout=self.flush_interval)
                batch.append(event)

                if len(batch) >= self.batch_size:
                    self._flush_batch(batch)
                    batch = []
            except queue.Empty:
                if batch:
                    self._flush_batch(batch)
                    batch = []

    def _flush_batch(self, events):
        conn = sqlite3.connect(self.db_path)
        conn.executemany("""
            INSERT INTO events (event_id, episode_id, timestamp, event_type, ...)
            VALUES (?, ?, ?, ?, ...)
        """, [(e.event_id, e.episode_id, ...) for e in events])
        conn.commit()
        conn.close()
Enter fullscreen mode Exit fullscreen mode

2. Memory Explosion from Large Conversations

Long agent conversations create massive content payloads. A 200-step reasoning chain will blow up your database.

Solution: Compress and chunk large content:

import gzip
import json

def compress_content(content: Dict) -> bytes:
    json_str = json.dumps(content)
    return gzip.compress(json_str.encode())

def decompress_content(compressed: bytes) -> Dict:
    json_str = gzip.decompress(compressed).decode()
    return json.loads(json_str)
Enter fullscreen mode Exit fullscreen mode

3. Timeline Scrubber Performance

Loading 500+ events in the frontend will kill the browser. The timeline becomes unusable.

Solution: Virtual scrolling and event aggregation:


javascript
const useVirtualTimeline = (events, viewportWidth = 1000) => {
    const eventsPerPixel = events.length / viewportWidth;
    const aggregatedEvents = React.useMemo(() => {
        if (eventsPerPixel <= 1) return events;

        // Aggregate events into pixels
        const aggregated = [];
        for (let i = 0; i < viewportWidth; i++) {
            const start = Math.floor(i * eventsPerPixel);
            const end = Math.floor((i + 1) * eventsPerPixel);
            const slice = events.slice(start, end);

            aggregated.push({
                pixel: i,
                events: slice,
                primaryType: slice[0]?.event_type,
                totalCost: slice.reduce((sum, e) => sum + e.cost_cents, 0)
            });
        }
        return aggregated;
    }, [events, eventsPerPixel]);

    return aggregated
Enter fullscreen mode Exit fullscreen mode

Top comments (0)