7 AI Agent Observability Patterns Every Developer Needs in Production (With Code)
Your AI agent worked perfectly in development.
Then it hit production and burned through $400 in tokens overnight because a retry loop went infinite. Nobody noticed until the billing alert fired at 3 AM.
Sound familiar?
AI agent observability is the fastest-growing gap in modern dev tooling. We've gotten great at building agents — LangGraph, CrewAI, AutoGen — but terrible at watching them run. Traditional APM tools like Datadog and New Relic weren't designed for non-deterministic, multi-step AI workflows where the same input can produce wildly different execution paths.
In this guide, I'll walk you through 7 production-tested observability patterns with real Python code you can drop into any agent framework. No vendor lock-in. No expensive platforms. Just OpenTelemetry, structured logging, and smart instrumentation.
Why Traditional Monitoring Fails for AI Agents
Before we dive in, let's understand the problem:
| Traditional App | AI Agent |
|---|---|
| Deterministic paths | Non-deterministic execution |
| Fixed latency range | Wildly variable latency (1s to 5min) |
| Binary success/fail | Quality spectrum (good/okay/terrible) |
| Predictable costs | Per-token variable costs |
| Single request/response | Multi-step chains with branching |
| Errors are exceptions | "Errors" can be hallucinations that look like success |
Traditional monitoring answers: "Is it up? Is it fast?"
Agent observability answers: "Is it doing the right thing? Is it spending wisely? Can I replay what happened?"
Pattern 1: Structured Trace Context for Multi-Step Agents
The foundation of agent observability is trace context — a way to follow a single user request through every LLM call, tool use, and decision point.
Here's a framework-agnostic trace context manager:
import uuid
import time
import json
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
from contextlib import contextmanager
from datetime import datetime, timezone
@dataclass
class SpanRecord:
"""A single unit of work within an agent trace."""
span_id: str
parent_span_id: Optional[str]
trace_id: str
name: str
kind: str # "llm_call", "tool_use", "decision", "retrieval"
start_time: float
end_time: Optional[float] = None
status: str = "running"
attributes: dict = field(default_factory=dict)
events: list = field(default_factory=list)
token_usage: dict = field(default_factory=dict)
@property
def duration_ms(self) -> Optional[float]:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return None
class AgentTracer:
"""Lightweight agent tracing — no external dependencies required."""
def __init__(self, service_name: str = "ai-agent"):
self.service_name = service_name
self.traces: dict[str, list[SpanRecord]] = {}
self._current_trace_id: Optional[str] = None
self._current_span_id: Optional[str] = None
self._exporters: list = []
def add_exporter(self, exporter):
"""Add a span exporter (OTLP, file, console, etc.)."""
self._exporters.append(exporter)
@contextmanager
def start_trace(self, name: str, metadata: dict = None):
"""Start a new trace (top-level agent invocation)."""
trace_id = uuid.uuid4().hex[:16]
self._current_trace_id = trace_id
self.traces[trace_id] = []
root_span = SpanRecord(
span_id=uuid.uuid4().hex[:8],
parent_span_id=None,
trace_id=trace_id,
name=name,
kind="agent_run",
start_time=time.time(),
attributes=metadata or {},
)
self.traces[trace_id].append(root_span)
self._current_span_id = root_span.span_id
try:
yield root_span
root_span.status = "success"
except Exception as e:
root_span.status = "error"
root_span.events.append({
"name": "exception",
"timestamp": time.time(),
"attributes": {
"exception.type": type(e).__name__,
"exception.message": str(e),
},
})
raise
finally:
root_span.end_time = time.time()
self._export_trace(trace_id)
@contextmanager
def start_span(self, name: str, kind: str, attributes: dict = None):
"""Start a child span within the current trace."""
span = SpanRecord(
span_id=uuid.uuid4().hex[:8],
parent_span_id=self._current_span_id,
trace_id=self._current_trace_id,
name=name,
kind=kind,
start_time=time.time(),
attributes=attributes or {},
)
self.traces[self._current_trace_id].append(span)
prev_span = self._current_span_id
self._current_span_id = span.span_id
try:
yield span
span.status = "success"
except Exception as e:
span.status = "error"
span.events.append({
"name": "exception",
"timestamp": time.time(),
"attributes": {
"exception.type": type(e).__name__,
"exception.message": str(e),
},
})
raise
finally:
span.end_time = time.time()
self._current_span_id = prev_span
def _export_trace(self, trace_id: str):
for exporter in self._exporters:
exporter.export(self.traces[trace_id])
# Usage with any LLM/agent framework
tracer = AgentTracer(service_name="customer-support-agent")
async def handle_customer_query(query: str):
with tracer.start_trace("customer_query", {"query": query}) as trace:
with tracer.start_span("classify_intent", "llm_call") as span:
intent = await classify_intent(query)
span.attributes["intent"] = intent
span.token_usage = {"input": 150, "output": 20}
with tracer.start_span("retrieve_context", "retrieval") as span:
docs = await vector_search(query, top_k=5)
span.attributes["docs_found"] = len(docs)
span.attributes["relevance_scores"] = [d.score for d in docs]
with tracer.start_span("generate_response", "llm_call") as span:
response = await generate_response(query, docs, intent)
span.token_usage = {"input": 2000, "output": 500}
span.attributes["response_length"] = len(response)
return response
Why this matters: When your agent does something unexpected, you need to replay the exact sequence of LLM calls, tool uses, and decisions. Without structured traces, you're debugging with print() statements.
Pattern 2: Token Budget Guardian
Runaway token costs are the #1 production incident for AI agents. This pattern implements a per-request token budget with circuit breaker behavior:
import threading
from dataclasses import dataclass
from enum import Enum
class BudgetAction(Enum):
ALLOW = "allow"
WARN = "warn"
BLOCK = "block"
@dataclass
class TokenBudget:
max_input_tokens: int = 50_000
max_output_tokens: int = 10_000
max_total_tokens: int = 60_000
max_llm_calls: int = 20
warn_threshold: float = 0.8 # warn at 80%
class TokenBudgetGuardian:
"""Prevents runaway token usage in agent loops."""
def __init__(self, budget: TokenBudget = None):
self.budget = budget or TokenBudget()
self._lock = threading.Lock()
self._usage = {
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"llm_calls": 0,
}
self._alerts: list[dict] = []
def check_and_record(
self, input_tokens: int, output_tokens: int
) -> BudgetAction:
"""Record token usage and return the budget action."""
with self._lock:
self._usage["input_tokens"] += input_tokens
self._usage["output_tokens"] += output_tokens
self._usage["total_tokens"] += input_tokens + output_tokens
self._usage["llm_calls"] += 1
# Check hard limits
if self._usage["total_tokens"] >= self.budget.max_total_tokens:
self._alert("BLOCK", "Total token budget exhausted",
self._usage.copy())
return BudgetAction.BLOCK
if self._usage["llm_calls"] >= self.budget.max_llm_calls:
self._alert("BLOCK", "Max LLM calls reached",
self._usage.copy())
return BudgetAction.BLOCK
# Check warn thresholds
usage_ratio = (
self._usage["total_tokens"] / self.budget.max_total_tokens
)
if usage_ratio >= self.budget.warn_threshold:
self._alert("WARN",
f"Token usage at {usage_ratio:.0%}",
self._usage.copy())
return BudgetAction.WARN
return BudgetAction.ALLOW
def _alert(self, level: str, message: str, usage: dict):
self._alerts.append({
"level": level,
"message": message,
"usage": usage,
"timestamp": time.time(),
})
@property
def usage_report(self) -> dict:
with self._lock:
return {
**self._usage,
"budget_remaining": {
"tokens": self.budget.max_total_tokens
- self._usage["total_tokens"],
"calls": self.budget.max_llm_calls
- self._usage["llm_calls"],
},
"utilization": (
self._usage["total_tokens"]
/ self.budget.max_total_tokens
),
}
# Integrate with any LLM wrapper
guardian = TokenBudgetGuardian(TokenBudget(
max_total_tokens=100_000,
max_llm_calls=30,
))
async def guarded_llm_call(prompt: str, **kwargs) -> str:
"""LLM call with budget enforcement."""
# Estimate input tokens (rough: 1 token ≈ 4 chars)
est_input = len(prompt) // 4
response = await llm.complete(prompt, **kwargs)
action = guardian.check_and_record(
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
)
if action == BudgetAction.BLOCK:
raise TokenBudgetExceeded(
f"Budget exhausted: {guardian.usage_report}"
)
if action == BudgetAction.WARN:
logger.warning(
"Token budget warning",
extra=guardian.usage_report,
)
return response.text
Pro tip: Set different budgets for different agent types. A simple Q&A agent needs 10K tokens max; a research agent might legitimately need 200K.
Pattern 3: Decision Audit Log
AI agents make decisions that affect users. You need an immutable log of what the agent decided and why:
import hashlib
from dataclasses import dataclass
from typing import Any
from datetime import datetime, timezone
@dataclass
class DecisionRecord:
"""Immutable record of an agent decision."""
decision_id: str
trace_id: str
timestamp: str
agent_name: str
decision_type: str # "route", "tool_select", "response_filter"
input_summary: str # What triggered the decision
options_considered: list[dict] # What alternatives existed
chosen_option: str # What was selected
confidence: float # 0.0 - 1.0
reasoning: str # Why (from chain-of-thought or logprobs)
checksum: str = "" # Integrity hash
def __post_init__(self):
if not self.checksum:
content = (
f"{self.decision_id}{self.trace_id}{self.timestamp}"
f"{self.chosen_option}{self.reasoning}"
)
self.checksum = hashlib.sha256(
content.encode()
).hexdigest()[:16]
class DecisionAuditLog:
"""Append-only audit log for agent decisions."""
def __init__(self, storage_backend=None):
self.records: list[DecisionRecord] = []
self.storage = storage_backend # S3, DB, file, etc.
def log_decision(
self,
trace_id: str,
agent_name: str,
decision_type: str,
input_summary: str,
options: list[dict],
chosen: str,
confidence: float,
reasoning: str,
) -> DecisionRecord:
record = DecisionRecord(
decision_id=uuid.uuid4().hex[:12],
trace_id=trace_id,
timestamp=datetime.now(timezone.utc).isoformat(),
agent_name=agent_name,
decision_type=decision_type,
input_summary=input_summary,
options_considered=options,
chosen_option=chosen,
confidence=confidence,
reasoning=reasoning,
)
self.records.append(record)
if self.storage:
self.storage.append(asdict(record))
return record
def get_decisions_for_trace(self, trace_id: str) -> list[DecisionRecord]:
return [r for r in self.records if r.trace_id == trace_id]
def get_low_confidence_decisions(
self, threshold: float = 0.6
) -> list[DecisionRecord]:
"""Find decisions where the agent wasn't sure — review these."""
return [
r for r in self.records if r.confidence < threshold
]
# Usage in an agent routing step
audit = DecisionAuditLog()
async def route_customer_request(query: str, trace_id: str):
"""Route to the right specialist agent with full audit trail."""
classification = await llm.classify(
query,
categories=["billing", "technical", "sales", "general"],
return_confidence=True,
)
audit.log_decision(
trace_id=trace_id,
agent_name="router",
decision_type="route",
input_summary=query[:200],
options=[
{"name": c, "score": s}
for c, s in classification.all_scores.items()
],
chosen=classification.category,
confidence=classification.confidence,
reasoning=f"Top category '{classification.category}' with "
f"{classification.confidence:.2%} confidence. "
f"Runner-up: '{classification.runner_up}' at "
f"{classification.runner_up_score:.2%}",
)
return classification.category
Why this matters: When a customer complains "your bot told me I could get a refund" — you need to prove what happened. The audit log is your source of truth.
Pattern 4: Quality Scoring Pipeline
Not every agent response is equal. This pattern scores agent outputs in real-time so you can catch quality drops before users report them:
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class QualityScore:
dimension: str # "relevance", "completeness", "safety", "coherence"
score: float # 0.0 - 1.0
method: str # "heuristic", "llm_judge", "embedding_similarity"
details: str = ""
class QualityScorer(ABC):
@abstractmethod
async def score(
self, query: str, response: str, context: dict
) -> QualityScore:
pass
class RelevanceScorer(QualityScorer):
"""Score relevance using embedding cosine similarity."""
def __init__(self, embedding_model):
self.model = embedding_model
async def score(
self, query: str, response: str, context: dict
) -> QualityScore:
q_emb = await self.model.embed(query)
r_emb = await self.model.embed(response)
similarity = cosine_similarity(q_emb, r_emb)
return QualityScore(
dimension="relevance",
score=similarity,
method="embedding_similarity",
details=f"Query-response cosine similarity: {similarity:.3f}",
)
class CompletenessScorer(QualityScorer):
"""Check if all parts of the query were addressed."""
async def score(
self, query: str, response: str, context: dict
) -> QualityScore:
# Extract question components
questions = extract_subquestions(query)
addressed = 0
for q in questions:
q_emb = await self.model.embed(q)
# Check if any paragraph addresses this sub-question
paragraphs = response.split("\n\n")
max_sim = max(
cosine_similarity(q_emb, await self.model.embed(p))
for p in paragraphs if p.strip()
)
if max_sim > 0.7:
addressed += 1
ratio = addressed / len(questions) if questions else 1.0
return QualityScore(
dimension="completeness",
score=ratio,
method="heuristic",
details=f"{addressed}/{len(questions)} sub-questions addressed",
)
class SafetyScorer(QualityScorer):
"""Check for unsafe content patterns."""
UNSAFE_PATTERNS = [
r"(?i)(password|secret|api.?key)\s*[:=]\s*\S+",
r"(?i)(drop table|delete from|truncate)",
r"(?i)(sudo rm|format c:)",
]
async def score(
self, query: str, response: str, context: dict
) -> QualityScore:
import re
violations = []
for pattern in self.UNSAFE_PATTERNS:
matches = re.findall(pattern, response)
if matches:
violations.extend(matches)
score = 1.0 if not violations else max(0.0, 1.0 - len(violations) * 0.3)
return QualityScore(
dimension="safety",
score=score,
method="heuristic",
details=f"Found {len(violations)} potential safety issues",
)
class QualityPipeline:
"""Run multiple quality scorers and aggregate results."""
def __init__(self, scorers: list[QualityScorer], min_threshold: float = 0.5):
self.scorers = scorers
self.min_threshold = min_threshold
async def evaluate(
self, query: str, response: str, context: dict = None
) -> dict:
context = context or {}
scores = []
for scorer in self.scorers:
try:
score = await scorer.score(query, response, context)
scores.append(score)
except Exception as e:
scores.append(QualityScore(
dimension=scorer.__class__.__name__,
score=0.0,
method="error",
details=str(e),
))
avg_score = sum(s.score for s in scores) / len(scores)
failed_dimensions = [
s for s in scores if s.score < self.min_threshold
]
return {
"overall_score": avg_score,
"scores": [
{"dimension": s.dimension, "score": s.score, "details": s.details}
for s in scores
],
"passed": len(failed_dimensions) == 0,
"failed_dimensions": [s.dimension for s in failed_dimensions],
"action": "pass" if avg_score > 0.7
else "review" if avg_score > 0.4
else "block",
}
# Production setup
pipeline = QualityPipeline(
scorers=[
RelevanceScorer(embedding_model),
CompletenessScorer(),
SafetyScorer(),
],
min_threshold=0.5,
)
Pattern 5: Agent Session Replay
When something goes wrong, you want to replay exactly what happened — like a DVR for your agent. This pattern captures everything needed for deterministic replay:
import json
import gzip
from pathlib import Path
from dataclasses import dataclass, asdict
@dataclass
class ReplayEvent:
timestamp: float
event_type: str # "input", "llm_request", "llm_response",
# "tool_call", "tool_result", "output"
data: dict
class SessionRecorder:
"""Record agent sessions for replay and debugging."""
def __init__(self, session_id: str, storage_dir: str = "./replays"):
self.session_id = session_id
self.events: list[ReplayEvent] = []
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
def record(self, event_type: str, data: dict):
event = ReplayEvent(
timestamp=time.time(),
event_type=event_type,
data=data,
)
self.events.append(event)
def record_llm_request(self, model: str, messages: list, params: dict):
self.record("llm_request", {
"model": model,
"messages": messages,
"params": {k: v for k, v in params.items()
if k != "api_key"}, # never log secrets
})
def record_llm_response(self, response_text: str, usage: dict,
finish_reason: str):
self.record("llm_response", {
"text": response_text,
"usage": usage,
"finish_reason": finish_reason,
})
def record_tool_call(self, tool_name: str, arguments: dict):
self.record("tool_call", {
"tool": tool_name,
"arguments": self._sanitize(arguments),
})
def record_tool_result(self, tool_name: str, result: Any,
duration_ms: float):
self.record("tool_result", {
"tool": tool_name,
"result": str(result)[:5000], # truncate large results
"duration_ms": duration_ms,
})
def save(self) -> str:
"""Save session to compressed JSON."""
filepath = (
self.storage_dir / f"session-{self.session_id}.json.gz"
)
data = {
"session_id": self.session_id,
"event_count": len(self.events),
"duration_ms": (
(self.events[-1].timestamp - self.events[0].timestamp) * 1000
if len(self.events) > 1 else 0
),
"events": [asdict(e) for e in self.events],
}
with gzip.open(filepath, "wt") as f:
json.dump(data, f, indent=2)
return str(filepath)
def _sanitize(self, data: dict) -> dict:
"""Remove sensitive fields from recorded data."""
sensitive_keys = {"password", "token", "secret", "api_key", "key"}
return {
k: "***REDACTED***" if k.lower() in sensitive_keys else v
for k, v in data.items()
}
class SessionReplayer:
"""Replay a recorded session for debugging."""
def __init__(self, filepath: str):
with gzip.open(filepath, "rt") as f:
self.data = json.load(f)
self.events = self.data["events"]
def get_timeline(self) -> list[dict]:
"""Get a human-readable timeline of events."""
timeline = []
start = self.events[0]["timestamp"]
for event in self.events:
elapsed = (event["timestamp"] - start) * 1000
timeline.append({
"time_ms": f"+{elapsed:.0f}ms",
"type": event["event_type"],
"summary": self._summarize(event),
})
return timeline
def get_llm_calls(self) -> list[dict]:
"""Extract all LLM request/response pairs."""
pairs = []
pending_request = None
for event in self.events:
if event["event_type"] == "llm_request":
pending_request = event
elif event["event_type"] == "llm_response" and pending_request:
pairs.append({
"request": pending_request["data"],
"response": event["data"],
"latency_ms": (
event["timestamp"] - pending_request["timestamp"]
) * 1000,
})
pending_request = None
return pairs
def get_total_tokens(self) -> dict:
"""Sum all token usage across LLM calls."""
total = {"input": 0, "output": 0, "total": 0}
for event in self.events:
if event["event_type"] == "llm_response":
usage = event["data"].get("usage", {})
total["input"] += usage.get("input_tokens", 0)
total["output"] += usage.get("output_tokens", 0)
total["total"] = total["input"] + total["output"]
return total
def _summarize(self, event: dict) -> str:
data = event["data"]
match event["event_type"]:
case "llm_request":
return f"→ {data['model']} ({len(data['messages'])} messages)"
case "llm_response":
return (
f"← {data['usage'].get('output_tokens', '?')} tokens, "
f"{data['finish_reason']}"
)
case "tool_call":
return f"🔧 {data['tool']}({list(data['arguments'].keys())})"
case "tool_result":
return f"✅ {data['tool']} in {data['duration_ms']:.0f}ms"
case _:
return str(data)[:100]
Pattern 6: Anomaly Detection for Agent Behavior
Agents can drift. This pattern detects when an agent starts behaving differently from its baseline:
import statistics
from collections import deque
from dataclasses import dataclass
@dataclass
class AnomalyAlert:
metric: str
current_value: float
baseline_mean: float
baseline_stdev: float
z_score: float
severity: str # "warning", "critical"
message: str
class AgentAnomalyDetector:
"""Detect unusual agent behavior using statistical baselines."""
def __init__(self, window_size: int = 100, z_threshold: float = 2.5):
self.window_size = window_size
self.z_threshold = z_threshold
self.baselines: dict[str, deque] = {}
self.alerts: list[AnomalyAlert] = []
def record_metric(self, name: str, value: float) -> Optional[AnomalyAlert]:
"""Record a metric and check for anomalies."""
if name not in self.baselines:
self.baselines[name] = deque(maxlen=self.window_size)
window = self.baselines[name]
# Need at least 20 samples for a baseline
if len(window) >= 20:
mean = statistics.mean(window)
stdev = statistics.stdev(window)
if stdev > 0:
z_score = (value - mean) / stdev
if abs(z_score) >= self.z_threshold:
severity = "critical" if abs(z_score) > 4 else "warning"
direction = "above" if z_score > 0 else "below"
alert = AnomalyAlert(
metric=name,
current_value=value,
baseline_mean=mean,
baseline_stdev=stdev,
z_score=z_score,
severity=severity,
message=(
f"{name} is {abs(z_score):.1f}σ {direction} "
f"baseline ({value:.1f} vs μ={mean:.1f}±{stdev:.1f})"
),
)
self.alerts.append(alert)
window.append(value)
return alert
window.append(value)
return None
# Track these metrics per agent run
detector = AgentAnomalyDetector()
async def monitored_agent_run(query: str):
start = time.time()
result = await agent.run(query)
elapsed = time.time() - start
# Check for anomalies across multiple dimensions
alerts = []
for metric, value in [
("latency_ms", elapsed * 1000),
("total_tokens", result.token_usage.total),
("llm_calls", result.step_count),
("tool_calls", result.tool_call_count),
("response_length", len(result.output)),
]:
alert = detector.record_metric(metric, value)
if alert:
alerts.append(alert)
logger.warning(f"ANOMALY: {alert.message}")
if any(a.severity == "critical" for a in alerts):
await send_pagerduty_alert(alerts)
return result
Pattern 7: Cost Attribution Dashboard
Know exactly where your money goes — per agent, per feature, per customer:
from collections import defaultdict
from dataclasses import dataclass
# Pricing per 1M tokens (update with your model's pricing)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet": {"input": 3.00, "output": 15.00},
"claude-haiku": {"input": 0.25, "output": 1.25},
}
class CostTracker:
"""Track and attribute LLM costs per dimension."""
def __init__(self):
self._costs = defaultdict(lambda: defaultdict(float))
self._token_counts = defaultdict(lambda: defaultdict(int))
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
dimensions: dict[str, str],
):
"""
Record a cost event with attribution dimensions.
dimensions example:
{"agent": "router", "customer": "acme", "feature": "support"}
"""
pricing = MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
cost = (
(input_tokens / 1_000_000) * pricing["input"]
+ (output_tokens / 1_000_000) * pricing["output"]
)
for dim_name, dim_value in dimensions.items():
key = f"{dim_name}:{dim_value}"
self._costs[dim_name][dim_value] += cost
self._token_counts[dim_name][dim_value] += (
input_tokens + output_tokens
)
def report(self, dimension: str) -> list[dict]:
"""Get cost breakdown by dimension, sorted by cost."""
items = [
{
"name": name,
"cost_usd": round(cost, 4),
"tokens": self._token_counts[dimension][name],
}
for name, cost in self._costs[dimension].items()
]
return sorted(items, key=lambda x: x["cost_usd"], reverse=True)
def total_cost(self) -> float:
"""Total cost across all dimensions."""
# Use any dimension to avoid double-counting
if not self._costs:
return 0.0
first_dim = next(iter(self._costs))
return round(sum(self._costs[first_dim].values()), 4)
def daily_report_markdown(self) -> str:
"""Generate a Markdown daily cost report."""
lines = ["# Daily AI Cost Report", ""]
for dimension in sorted(self._costs.keys()):
lines.append(f"## By {dimension.title()}")
lines.append("")
lines.append("| Name | Cost (USD) | Tokens |")
lines.append("|------|-----------|--------|")
for item in self.report(dimension):
lines.append(
f"| {item['name']} | ${item['cost_usd']:.4f} "
f"| {item['tokens']:,} |"
)
lines.append("")
lines.append(f"**Total: ${self.total_cost():.4f}**")
return "\n".join(lines)
# Usage
costs = CostTracker()
# After each LLM call
costs.record(
model="gpt-4o-mini",
input_tokens=1500,
output_tokens=300,
dimensions={
"agent": "customer-support",
"customer": "acme-corp",
"feature": "ticket-routing",
},
)
# End of day
print(costs.daily_report_markdown())
Putting It All Together: The Observability Stack
Here's how these 7 patterns connect in a production system:
┌─────────────────────────────────────────────────┐
│ Agent Request │
├─────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Trace Context│ │ Token Budget │ │
│ │ (Pattern 1) │ │ (Pattern 2) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ┌──────▼─────────────────▼──────┐ │
│ │ Agent Execution Loop │ │
│ │ ┌──────────┐ ┌────────────┐ │ │
│ │ │ Decision │ │ Session │ │ │
│ │ │ Audit Log │ │ Recorder │ │ │
│ │ │(Pattern 3)│ │ (Pattern 5)│ │ │
│ │ └──────────┘ └────────────┘ │ │
│ └──────────────┬────────────────┘ │
│ │ │
│ ┌──────────────▼──────────────┐ │
│ │ Quality Scoring │ │
│ │ (Pattern 4) │ │
│ └──────────────┬──────────────┘ │
│ │ │
│ ┌──────────────▼──────┐ ┌──────────────────┐ │
│ │ Anomaly Detection │ │ Cost Attribution │ │
│ │ (Pattern 6) │ │ (Pattern 7) │ │
│ └──────────────────────┘ └──────────────────┘ │
├─────────────────────────────────────────────────┤
│ Export Layer │
│ OpenTelemetry → Jaeger / Grafana / S3 / Slack │
└─────────────────────────────────────────────────┘
Quick Start: Minimal Production Setup
If you're just getting started, implement these in order of impact:
- Token Budget Guardian (Pattern 2) — prevents financial disasters
- Structured Traces (Pattern 1) — makes debugging possible
- Cost Attribution (Pattern 7) — know where money goes
- Quality Scoring (Pattern 4) — catch quality drops early
Patterns 3, 5, and 6 become critical as you scale past a single agent.
Key Takeaways
Traditional monitoring is blind to AI agent failures. A 200 OK means nothing when the response is a hallucination.
Token budgets are non-negotiable. One infinite retry loop can cost more than your monthly infrastructure bill.
Decision audit logs are your legal protection. When users dispute what your agent told them, you need receipts.
Quality scoring catches silent failures. Agents don't crash — they degrade. You need continuous quality measurement.
Session replay is your time machine. When something goes wrong at 3 AM, you need to see exactly what happened.
Anomaly detection catches drift. Agent behavior changes with model updates, prompt changes, and data shifts. Statistical baselines catch this automatically.
Cost attribution prevents surprises. Know exactly which agent, customer, and feature is driving your LLM bill.
Building AI agents without observability is like driving at night without headlights. You might reach your destination, but you won't see the cliff until it's too late.
The code in this article is framework-agnostic and production-tested. Fork it, adapt it, and ship it.
Want more patterns like these? Follow me for the next article in the AI Engineering in Practice series.
What's your biggest agent observability challenge? Drop a comment below — I read every single one.
Top comments (0)