DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

agenttap: Wire-Level Prompt Introspection for LLM Agents

Your agent is behaving wrong. You suspect the model is seeing different context than you think it is. Or the tool results are larger than you expected. Or there are extra tokens in the system prompt you did not notice.

You need to see exactly what is being sent to the model — byte for byte. Not a summary. Not a log of what you intended to send. The actual serialized request payload.

agenttap is a wire-level capture layer that intercepts every LLM call and writes the full request and response to a JSONL sink.


The Shape of the Fix

from agenttap import Tap

tap = Tap(sink="./logs/wire.jsonl")

# Wrap your LLM client
@tap.capture
def call_llm(**kwargs) -> dict:
    return anthropic_client.messages.create(**kwargs)

# Or use context manager for temporary capture
with tap.session() as session:
    response = call_llm(
        model="claude-sonnet-4-6",
        messages=messages,
        max_tokens=1024,
    )
    print(f"Captured {len(session.records)} calls")
Enter fullscreen mode Exit fullscreen mode

Every call to call_llm() writes a complete record to wire.jsonl: the full request (model, messages, tools, all parameters), the full response (content, usage, stop_reason), timing, and a session ID.


What It Does NOT Do

agenttap does not modify requests or responses. It is a passive observer. It captures what was sent and what was received, without changing anything. It does not redact, validate, or filter.

For security-sensitive deployments where prompts contain credentials or PII, apply llm-redact-secrets and llm-pii-redact to the tap's output or as preprocessing before the tap captures.

It does not support streaming in the current version. If you use streaming API calls, the tap captures the assembled response after stream completion (you need to feed it the completed response), not the intermediate chunks. llm-stream-collector works well alongside it for this case.


Inside the Library

The capture mechanism wraps the callable with timing and sink writes:

import functools
import json
import time
import threading
from pathlib import Path

class Tap:
    def __init__(self, sink: str, session_id: str | None = None):
        self._sink = Path(sink)
        self._sink.parent.mkdir(parents=True, exist_ok=True)
        self._session_id = session_id or str(uuid.uuid4())[:8]
        self._lock = threading.Lock()
        self._records: list[dict] = []

    def capture(self, fn):
        @functools.wraps(fn)
        def wrapper(**kwargs):
            start = time.monotonic()
            start_ts = time.time()
            error = None
            response = None

            try:
                response = fn(**kwargs)
                return response
            except Exception as e:
                error = str(e)
                raise
            finally:
                duration_ms = (time.monotonic() - start) * 1000
                record = {
                    "session_id": self._session_id,
                    "ts": start_ts,
                    "duration_ms": duration_ms,
                    "request": kwargs,
                    "response": _serialize_response(response),
                    "error": error,
                }
                self._write(record)

        return wrapper

    def _write(self, record: dict) -> None:
        with self._lock:
            self._records.append(record)
            with self._sink.open("a") as f:
                f.write(json.dumps(record, default=str) + "\n")

    def load_all(self) -> list[dict]:
        if not self._sink.exists():
            return []
        return [json.loads(line) for line in self._sink.read_text().splitlines() if line.strip()]

    def replay(self, index: int) -> dict:
        """Return the request from record N for manual replay."""
        records = self.load_all()
        return records[index]["request"]
Enter fullscreen mode Exit fullscreen mode

The _serialize_response() helper handles Anthropic's response objects by converting them to dicts:

def _serialize_response(response) -> dict | None:
    if response is None:
        return None
    if isinstance(response, dict):
        return response
    if hasattr(response, "model_dump"):
        return response.model_dump()
    try:
        return json.loads(response.model_dump_json())
    except Exception:
        return {"raw": str(response)}
Enter fullscreen mode Exit fullscreen mode

When to Use It

Use it during development and debugging. When an agent is misbehaving, the first question is: what did you actually send? The wire tap answers that without guesswork.

Use it for context window analysis. Load the wire JSONL and count actual token lengths across many runs. You may find your system prompt is longer than you think, or tool schemas are consuming more tokens than expected.

Use it for prompt regression testing. Record calls with the current prompt. Change the prompt. Record the same test cases. Compare request sizes and response quality. The wire log is the ground truth for what changed.

Skip it in production for cost-sensitive paths. Capturing full request payloads for every call creates large JSONL files. For production observability, use agentsnap for cost tracking and agent-step-log for step records; reserve agenttap for debugging specific issues.


Install

pip install git+https://github.com/MukundaKatta/agenttap

# Or from PyPI  
pip install agenttap
Enter fullscreen mode Exit fullscreen mode
from agenttap import Tap
from agent_run_id import RunContext

tap = Tap(sink="./logs/wire.jsonl")

@tap.capture
def call_anthropic(**kwargs):
    return client.messages.create(**kwargs)

async def debug_agent_run(task: str) -> str:
    ctx = RunContext.start()
    tap._session_id = str(ctx.run_id)[:8]

    response = call_anthropic(
        model="claude-sonnet-4-6",
        system="You are a helpful assistant.",
        messages=[{"role": "user", "content": task}],
        max_tokens=1024,
    )

    return response.content[0].text

# After the run, inspect what was captured
records = tap.load_all()
for r in records:
    print(f"Duration: {r['duration_ms']:.0f}ms")
    print(f"Input messages: {len(r['request']['messages'])}")
    print(f"Stop reason: {r['response']['stop_reason']}")
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
agent-step-log Structured step logging (higher-level than wire)
agentsnap Cost tracking from token counts
llm-redact-secrets Redact credentials before wire capture writes
llm-stream-collector Assemble streaming response for wire capture
agent-debug-replay Navigate wire logs step by step

The capture stack: agenttap for wire-level capture, agent-step-log for structured step records, agentsnap for cost, agent-debug-replay for navigation.


What's Next

Async capture: @tap.async_capture decorator for async agent functions. The current decorator works for sync callables; async wrapping needs await support.

Size statistics: tap.load_all() already provides full records. A tap.size_report() that computes average/max/p95 request sizes across all captured calls would make context window optimization easier.

Diff mode: tap.diff(record_a, record_b) that shows the difference between two captured requests. Useful for comparing prompts before and after a change to understand exactly what changed.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)