Most agent debugging happens in print statements. A run goes wrong, you add a print, redeploy, wait for it to happen again. This works until you have users. Once you have users, you need metrics that persist across runs and surface problems before users report them.
This post builds a lightweight metrics collector for a Python agent. It uses five libraries from the Hermes sprint. The output is a Prometheus-compatible HTTP endpoint you can scrape with Grafana or check with curl.
Hook
Your agent has been running in production for three days. On day four, costs jump by 40%. Nothing in your logs explains it. You do not know if it is more traffic, longer runs, more retries, or a loop that got worse. Without metrics, you are guessing.
The five numbers that tell you what changed:
- Cost per run (caught by
agenttrace) - Cache hit ratio (caught by
cachebench) - Circuit breaker state (caught by
llm-circuit-breaker-py) - Loop detection count (caught by
tool-loop-guard) - Error kind distribution (caught by
tool-error-classify)
When all five are wired to agent-event-bus, they flow to a single subscriber that builds Prometheus metrics and serves them on a port.
Main Code
import asyncio
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import defaultdict
from agenttrace import Tracer
from cachebench import CacheBenchmark
from llm_circuit_breaker import CircuitBreaker, CircuitState
from tool_loop_guard import LoopGuard
from tool_error_classify import ErrorKind, classify
from agent_event_bus import EventBus
import anthropic
# Shared metrics store
metrics = {
"runs_total": 0,
"cost_usd_total": 0.0,
"cache_hits": 0,
"cache_misses": 0,
"loop_detections": 0,
"circuit_open": 0,
"errors_by_kind": defaultdict(int),
"runs_in_progress": 0,
}
metrics_lock = threading.Lock()
bus = EventBus()
tracer = Tracer()
cache_bench = CacheBenchmark()
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
loop_guard = LoopGuard(window_size=5, max_repeats=3)
# Subscribe to events from all libraries
@bus.on("run.complete")
def on_run_complete(event):
with metrics_lock:
metrics["runs_total"] += 1
metrics["cost_usd_total"] += event.get("cost_usd", 0.0)
metrics["runs_in_progress"] -= 1
@bus.on("run.start")
def on_run_start(event):
with metrics_lock:
metrics["runs_in_progress"] += 1
@bus.on("cache.hit")
def on_cache_hit(event):
with metrics_lock:
metrics["cache_hits"] += 1
@bus.on("cache.miss")
def on_cache_miss(event):
with metrics_lock:
metrics["cache_misses"] += 1
@bus.on("loop.detected")
def on_loop_detected(event):
with metrics_lock:
metrics["loop_detections"] += 1
@bus.on("circuit.open")
def on_circuit_open(event):
with metrics_lock:
metrics["circuit_open"] += 1
@bus.on("tool.error")
def on_tool_error(event):
kind: ErrorKind = event.get("kind", ErrorKind.UNKNOWN)
with metrics_lock:
metrics["errors_by_kind"][kind.value] += 1
# Prometheus text format serializer
def build_prometheus_output() -> str:
with metrics_lock:
lines = [
"# HELP agent_runs_total Total agent runs",
"# TYPE agent_runs_total counter",
f"agent_runs_total {metrics['runs_total']}",
"",
"# HELP agent_cost_usd_total Total cost in USD",
"# TYPE agent_cost_usd_total counter",
f"agent_cost_usd_total {metrics['cost_usd_total']:.6f}",
"",
"# HELP agent_cache_hits_total Cache hits",
"# TYPE agent_cache_hits_total counter",
f"agent_cache_hits_total {metrics['cache_hits']}",
"",
"# HELP agent_cache_misses_total Cache misses",
"# TYPE agent_cache_misses_total counter",
f"agent_cache_misses_total {metrics['cache_misses']}",
"",
"# HELP agent_loop_detections_total Loop detections",
"# TYPE agent_loop_detections_total counter",
f"agent_loop_detections_total {metrics['loop_detections']}",
"",
"# HELP agent_circuit_opens_total Circuit breaker opens",
"# TYPE agent_circuit_opens_total counter",
f"agent_circuit_opens_total {metrics['circuit_open']}",
"",
"# HELP agent_runs_in_progress Current runs in progress",
"# TYPE agent_runs_in_progress gauge",
f"agent_runs_in_progress {metrics['runs_in_progress']}",
]
for kind, count in metrics["errors_by_kind"].items():
lines += [
f'agent_tool_errors_total{{kind="{kind}"}} {count}',
]
return "\n".join(lines) + "\n"
# HTTP server for Prometheus scrape
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/metrics":
body = build_prometheus_output().encode()
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # silence default access log
def start_metrics_server(port: int = 9090):
server = HTTPServer(("", port), MetricsHandler)
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
print(f"Metrics on http://localhost:{port}/metrics")
# Instrumented agent run
async def run_agent(prompt: str) -> str:
bus.emit("run.start", {})
run_id = tracer.start_run()
try:
with circuit:
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
text = response.content[0].text
usage = response.usage
is_cache_hit = (usage.cache_read_input_tokens or 0) > 0
bus.emit("cache.hit" if is_cache_hit else "cache.miss", {})
cost = tracer.end_run(run_id, usage=usage)
bus.emit("run.complete", {"cost_usd": cost})
return text
except Exception as exc:
kind = classify(exc)
bus.emit("tool.error", {"kind": kind})
tracer.end_run(run_id, error=str(exc))
bus.emit("run.complete", {"cost_usd": 0.0})
raise
async def main():
start_metrics_server(9090)
prompts = [
"What is 2 + 2?",
"Explain HTTP in one sentence.",
"What is the speed of light?",
]
for prompt in prompts:
try:
reply = await run_agent(prompt)
print(f"Q: {prompt}\nA: {reply}\n")
except Exception as e:
print(f"Error: {e}")
# Show raw metrics
print(build_prometheus_output())
if __name__ == "__main__":
asyncio.run(main())
What It Does NOT Do
This does not give you dashboards out of the box. The /metrics endpoint serves Prometheus text format. You need a running Prometheus instance pointed at the endpoint, and a Grafana dashboard on top of that. If you want a zero-setup view, pipe the output of curl localhost:9090/metrics into a spreadsheet.
This does not aggregate across processes. metrics is an in-memory dict in one Python process. If you run two agent workers, each has its own metrics endpoint. You need Prometheus federation or a shared backend to aggregate. A Redis counter per metric key is the simplest shared backend.
The circuit breaker state here is a count of times the circuit opened, not current state. For current state you add a gauge that calls circuit.state == CircuitState.OPEN on each scrape.
Design Reasoning
Every library emits to agent-event-bus. None of them know about Prometheus or HTTP. The metrics server subscribes to the bus and builds the text format from the accumulated dict. This means you can replace the Prometheus output with a CloudWatch PutMetricData call by changing only the subscriber.
The lock is coarse. One lock on the whole dict is fine for low-throughput agents. For high-throughput, use a lock per counter or atomic increment primitives.
The HTTP server runs in a daemon thread. The main async loop drives agent calls. They do not share an event loop, which avoids asyncio contention on the metrics path.
agent-event-bus uses a simple callback registry. Events are synchronous by default. For async subscribers you wrap the callback in asyncio.run_coroutine_threadsafe.
When This Applies / Does Not Apply
This pattern fits any agent that runs continuously: a service handling user requests, a background agent processing a queue, or a scheduled agent that runs on a cron. Anywhere you want to know "is this healthy right now" without reading logs.
It does not fit single-shot scripts. If your agent runs once and exits, there is no persistent process to scrape. Just print the tracer summary at the end.
If you are already using OpenTelemetry, agenttrap and otel-genai-bridge-rs are better choices. They emit spans and metrics in OTLP format directly. This post's approach is for when you want minimal dependencies and a fast setup.
Quick-Start Snippet
pip install agenttrace cachebench llm-circuit-breaker-py tool-loop-guard tool-error-classify agent-event-bus
Minimal metrics-only run:
from agenttrace import Tracer
from agent_event_bus import EventBus
bus = EventBus()
tracer = Tracer()
@bus.on("run.complete")
def log_cost(event):
print(f"Run cost: ${event['cost_usd']:.4f}")
run_id = tracer.start_run()
# ... your LLM call ...
cost = tracer.end_run(run_id, usage=response.usage)
bus.emit("run.complete", {"cost_usd": cost})
Add the HTTP server when you are ready to hook up Prometheus.
Siblings Table
| Library | Metric it covers | GitHub |
|---|---|---|
| agenttrace | Cost per run, latency | MukundaKatta/agenttrace |
| cachebench | Cache hit ratio | MukundaKatta/cachebench |
| llm-circuit-breaker-py | Provider failure rate | MukundaKatta/llm-circuit-breaker-py |
| tool-loop-guard | Repeated tool call rate | MukundaKatta/tool-loop-guard |
| tool-error-classify | Error kind distribution | MukundaKatta/tool-error-classify |
| agent-event-bus | Pub/sub bus for all events | MukundaKatta/agent-event-bus |
| agentfit | Token usage per run | MukundaKatta/agentfit |
What's Next
The natural next step is alerting. Right now you can see the metrics. You cannot get notified when the circuit breaker opens for the tenth time in an hour. Alertmanager handles that if you are using Prometheus. For a simpler setup, add a threshold check inside the on_circuit_open subscriber and send a webhook.
Cardinality is the other concern. The errors_by_kind label set is bounded because ErrorKind is a closed enum. Be careful if you add free-form labels like user IDs or prompt hashes. Those create unbounded cardinality and break Prometheus memory budgets fast.
The complete collection of libraries from this sprint is at MukundaKatta on GitHub. Each one has a README with install instructions and a minimal example.
Top comments (0)