DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Build a Metrics Dashboard for Your Python Agent in 50 Lines

Most agent debugging happens in print statements. A run goes wrong, you add a print, redeploy, wait for it to happen again. This works until you have users. Once you have users, you need metrics that persist across runs and surface problems before users report them.

This post builds a lightweight metrics collector for a Python agent. It uses five libraries from the Hermes sprint. The output is a Prometheus-compatible HTTP endpoint you can scrape with Grafana or check with curl.


Hook

Your agent has been running in production for three days. On day four, costs jump by 40%. Nothing in your logs explains it. You do not know if it is more traffic, longer runs, more retries, or a loop that got worse. Without metrics, you are guessing.

The five numbers that tell you what changed:

  1. Cost per run (caught by agenttrace)
  2. Cache hit ratio (caught by cachebench)
  3. Circuit breaker state (caught by llm-circuit-breaker-py)
  4. Loop detection count (caught by tool-loop-guard)
  5. Error kind distribution (caught by tool-error-classify)

When all five are wired to agent-event-bus, they flow to a single subscriber that builds Prometheus metrics and serves them on a port.


Main Code

import asyncio
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import defaultdict

from agenttrace import Tracer
from cachebench import CacheBenchmark
from llm_circuit_breaker import CircuitBreaker, CircuitState
from tool_loop_guard import LoopGuard
from tool_error_classify import ErrorKind, classify
from agent_event_bus import EventBus
import anthropic

# Shared metrics store
metrics = {
    "runs_total": 0,
    "cost_usd_total": 0.0,
    "cache_hits": 0,
    "cache_misses": 0,
    "loop_detections": 0,
    "circuit_open": 0,
    "errors_by_kind": defaultdict(int),
    "runs_in_progress": 0,
}
metrics_lock = threading.Lock()

bus = EventBus()
tracer = Tracer()
cache_bench = CacheBenchmark()
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
loop_guard = LoopGuard(window_size=5, max_repeats=3)

# Subscribe to events from all libraries
@bus.on("run.complete")
def on_run_complete(event):
    with metrics_lock:
        metrics["runs_total"] += 1
        metrics["cost_usd_total"] += event.get("cost_usd", 0.0)
        metrics["runs_in_progress"] -= 1

@bus.on("run.start")
def on_run_start(event):
    with metrics_lock:
        metrics["runs_in_progress"] += 1

@bus.on("cache.hit")
def on_cache_hit(event):
    with metrics_lock:
        metrics["cache_hits"] += 1

@bus.on("cache.miss")
def on_cache_miss(event):
    with metrics_lock:
        metrics["cache_misses"] += 1

@bus.on("loop.detected")
def on_loop_detected(event):
    with metrics_lock:
        metrics["loop_detections"] += 1

@bus.on("circuit.open")
def on_circuit_open(event):
    with metrics_lock:
        metrics["circuit_open"] += 1

@bus.on("tool.error")
def on_tool_error(event):
    kind: ErrorKind = event.get("kind", ErrorKind.UNKNOWN)
    with metrics_lock:
        metrics["errors_by_kind"][kind.value] += 1


# Prometheus text format serializer
def build_prometheus_output() -> str:
    with metrics_lock:
        lines = [
            "# HELP agent_runs_total Total agent runs",
            "# TYPE agent_runs_total counter",
            f"agent_runs_total {metrics['runs_total']}",
            "",
            "# HELP agent_cost_usd_total Total cost in USD",
            "# TYPE agent_cost_usd_total counter",
            f"agent_cost_usd_total {metrics['cost_usd_total']:.6f}",
            "",
            "# HELP agent_cache_hits_total Cache hits",
            "# TYPE agent_cache_hits_total counter",
            f"agent_cache_hits_total {metrics['cache_hits']}",
            "",
            "# HELP agent_cache_misses_total Cache misses",
            "# TYPE agent_cache_misses_total counter",
            f"agent_cache_misses_total {metrics['cache_misses']}",
            "",
            "# HELP agent_loop_detections_total Loop detections",
            "# TYPE agent_loop_detections_total counter",
            f"agent_loop_detections_total {metrics['loop_detections']}",
            "",
            "# HELP agent_circuit_opens_total Circuit breaker opens",
            "# TYPE agent_circuit_opens_total counter",
            f"agent_circuit_opens_total {metrics['circuit_open']}",
            "",
            "# HELP agent_runs_in_progress Current runs in progress",
            "# TYPE agent_runs_in_progress gauge",
            f"agent_runs_in_progress {metrics['runs_in_progress']}",
        ]
        for kind, count in metrics["errors_by_kind"].items():
            lines += [
                f'agent_tool_errors_total{{kind="{kind}"}} {count}',
            ]
        return "\n".join(lines) + "\n"


# HTTP server for Prometheus scrape
class MetricsHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/metrics":
            body = build_prometheus_output().encode()
            self.send_response(200)
            self.send_header("Content-Type", "text/plain; version=0.0.4")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
        else:
            self.send_response(404)
            self.end_headers()

    def log_message(self, format, *args):
        pass  # silence default access log


def start_metrics_server(port: int = 9090):
    server = HTTPServer(("", port), MetricsHandler)
    t = threading.Thread(target=server.serve_forever, daemon=True)
    t.start()
    print(f"Metrics on http://localhost:{port}/metrics")


# Instrumented agent run
async def run_agent(prompt: str) -> str:
    bus.emit("run.start", {})
    run_id = tracer.start_run()

    try:
        with circuit:
            client = anthropic.Anthropic()
            response = client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=512,
                messages=[{"role": "user", "content": prompt}],
            )
            text = response.content[0].text

        usage = response.usage
        is_cache_hit = (usage.cache_read_input_tokens or 0) > 0
        bus.emit("cache.hit" if is_cache_hit else "cache.miss", {})

        cost = tracer.end_run(run_id, usage=usage)
        bus.emit("run.complete", {"cost_usd": cost})
        return text

    except Exception as exc:
        kind = classify(exc)
        bus.emit("tool.error", {"kind": kind})
        tracer.end_run(run_id, error=str(exc))
        bus.emit("run.complete", {"cost_usd": 0.0})
        raise


async def main():
    start_metrics_server(9090)

    prompts = [
        "What is 2 + 2?",
        "Explain HTTP in one sentence.",
        "What is the speed of light?",
    ]
    for prompt in prompts:
        try:
            reply = await run_agent(prompt)
            print(f"Q: {prompt}\nA: {reply}\n")
        except Exception as e:
            print(f"Error: {e}")

    # Show raw metrics
    print(build_prometheus_output())

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

What It Does NOT Do

This does not give you dashboards out of the box. The /metrics endpoint serves Prometheus text format. You need a running Prometheus instance pointed at the endpoint, and a Grafana dashboard on top of that. If you want a zero-setup view, pipe the output of curl localhost:9090/metrics into a spreadsheet.

This does not aggregate across processes. metrics is an in-memory dict in one Python process. If you run two agent workers, each has its own metrics endpoint. You need Prometheus federation or a shared backend to aggregate. A Redis counter per metric key is the simplest shared backend.

The circuit breaker state here is a count of times the circuit opened, not current state. For current state you add a gauge that calls circuit.state == CircuitState.OPEN on each scrape.


Design Reasoning

Every library emits to agent-event-bus. None of them know about Prometheus or HTTP. The metrics server subscribes to the bus and builds the text format from the accumulated dict. This means you can replace the Prometheus output with a CloudWatch PutMetricData call by changing only the subscriber.

The lock is coarse. One lock on the whole dict is fine for low-throughput agents. For high-throughput, use a lock per counter or atomic increment primitives.

The HTTP server runs in a daemon thread. The main async loop drives agent calls. They do not share an event loop, which avoids asyncio contention on the metrics path.

agent-event-bus uses a simple callback registry. Events are synchronous by default. For async subscribers you wrap the callback in asyncio.run_coroutine_threadsafe.


When This Applies / Does Not Apply

This pattern fits any agent that runs continuously: a service handling user requests, a background agent processing a queue, or a scheduled agent that runs on a cron. Anywhere you want to know "is this healthy right now" without reading logs.

It does not fit single-shot scripts. If your agent runs once and exits, there is no persistent process to scrape. Just print the tracer summary at the end.

If you are already using OpenTelemetry, agenttrap and otel-genai-bridge-rs are better choices. They emit spans and metrics in OTLP format directly. This post's approach is for when you want minimal dependencies and a fast setup.


Quick-Start Snippet

pip install agenttrace cachebench llm-circuit-breaker-py tool-loop-guard tool-error-classify agent-event-bus
Enter fullscreen mode Exit fullscreen mode

Minimal metrics-only run:

from agenttrace import Tracer
from agent_event_bus import EventBus

bus = EventBus()
tracer = Tracer()

@bus.on("run.complete")
def log_cost(event):
    print(f"Run cost: ${event['cost_usd']:.4f}")

run_id = tracer.start_run()
# ... your LLM call ...
cost = tracer.end_run(run_id, usage=response.usage)
bus.emit("run.complete", {"cost_usd": cost})
Enter fullscreen mode Exit fullscreen mode

Add the HTTP server when you are ready to hook up Prometheus.


Siblings Table

Library Metric it covers GitHub
agenttrace Cost per run, latency MukundaKatta/agenttrace
cachebench Cache hit ratio MukundaKatta/cachebench
llm-circuit-breaker-py Provider failure rate MukundaKatta/llm-circuit-breaker-py
tool-loop-guard Repeated tool call rate MukundaKatta/tool-loop-guard
tool-error-classify Error kind distribution MukundaKatta/tool-error-classify
agent-event-bus Pub/sub bus for all events MukundaKatta/agent-event-bus
agentfit Token usage per run MukundaKatta/agentfit

What's Next

The natural next step is alerting. Right now you can see the metrics. You cannot get notified when the circuit breaker opens for the tenth time in an hour. Alertmanager handles that if you are using Prometheus. For a simpler setup, add a threshold check inside the on_circuit_open subscriber and send a webhook.

Cardinality is the other concern. The errors_by_kind label set is bounded because ErrorKind is a closed enum. Be careful if you add free-form labels like user IDs or prompt hashes. Those create unbounded cardinality and break Prometheus memory budgets fast.

The complete collection of libraries from this sprint is at MukundaKatta on GitHub. Each one has a README with install instructions and a minimal example.

Top comments (0)