DEV Community

dohko
dohko

Posted on

7 AI Agent Observability Patterns Every Developer Needs in Production (With Code)

7 AI Agent Observability Patterns Every Developer Needs in Production (With Code)

Your AI agent worked perfectly in development.

Then it hit production and burned through $400 in tokens overnight because a retry loop went infinite. Nobody noticed until the billing alert fired at 3 AM.

Sound familiar?

AI agent observability is the fastest-growing gap in modern dev tooling. We've gotten great at building agents — LangGraph, CrewAI, AutoGen — but terrible at watching them run. Traditional APM tools like Datadog and New Relic weren't designed for non-deterministic, multi-step AI workflows where the same input can produce wildly different execution paths.

In this guide, I'll walk you through 7 production-tested observability patterns with real Python code you can drop into any agent framework. No vendor lock-in. No expensive platforms. Just OpenTelemetry, structured logging, and smart instrumentation.


Why Traditional Monitoring Fails for AI Agents

Before we dive in, let's understand the problem:

Traditional App AI Agent
Deterministic paths Non-deterministic execution
Fixed latency range Wildly variable latency (1s to 5min)
Binary success/fail Quality spectrum (good/okay/terrible)
Predictable costs Per-token variable costs
Single request/response Multi-step chains with branching
Errors are exceptions "Errors" can be hallucinations that look like success

Traditional monitoring answers: "Is it up? Is it fast?"

Agent observability answers: "Is it doing the right thing? Is it spending wisely? Can I replay what happened?"


Pattern 1: Structured Trace Context for Multi-Step Agents

The foundation of agent observability is trace context — a way to follow a single user request through every LLM call, tool use, and decision point.

Here's a framework-agnostic trace context manager:

import uuid
import time
import json
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
from contextlib import contextmanager
from datetime import datetime, timezone


@dataclass
class SpanRecord:
    """A single unit of work within an agent trace."""
    span_id: str
    parent_span_id: Optional[str]
    trace_id: str
    name: str
    kind: str  # "llm_call", "tool_use", "decision", "retrieval"
    start_time: float
    end_time: Optional[float] = None
    status: str = "running"
    attributes: dict = field(default_factory=dict)
    events: list = field(default_factory=list)
    token_usage: dict = field(default_factory=dict)

    @property
    def duration_ms(self) -> Optional[float]:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return None


class AgentTracer:
    """Lightweight agent tracing — no external dependencies required."""

    def __init__(self, service_name: str = "ai-agent"):
        self.service_name = service_name
        self.traces: dict[str, list[SpanRecord]] = {}
        self._current_trace_id: Optional[str] = None
        self._current_span_id: Optional[str] = None
        self._exporters: list = []

    def add_exporter(self, exporter):
        """Add a span exporter (OTLP, file, console, etc.)."""
        self._exporters.append(exporter)

    @contextmanager
    def start_trace(self, name: str, metadata: dict = None):
        """Start a new trace (top-level agent invocation)."""
        trace_id = uuid.uuid4().hex[:16]
        self._current_trace_id = trace_id
        self.traces[trace_id] = []

        root_span = SpanRecord(
            span_id=uuid.uuid4().hex[:8],
            parent_span_id=None,
            trace_id=trace_id,
            name=name,
            kind="agent_run",
            start_time=time.time(),
            attributes=metadata or {},
        )
        self.traces[trace_id].append(root_span)
        self._current_span_id = root_span.span_id

        try:
            yield root_span
            root_span.status = "success"
        except Exception as e:
            root_span.status = "error"
            root_span.events.append({
                "name": "exception",
                "timestamp": time.time(),
                "attributes": {
                    "exception.type": type(e).__name__,
                    "exception.message": str(e),
                },
            })
            raise
        finally:
            root_span.end_time = time.time()
            self._export_trace(trace_id)

    @contextmanager
    def start_span(self, name: str, kind: str, attributes: dict = None):
        """Start a child span within the current trace."""
        span = SpanRecord(
            span_id=uuid.uuid4().hex[:8],
            parent_span_id=self._current_span_id,
            trace_id=self._current_trace_id,
            name=name,
            kind=kind,
            start_time=time.time(),
            attributes=attributes or {},
        )
        self.traces[self._current_trace_id].append(span)

        prev_span = self._current_span_id
        self._current_span_id = span.span_id

        try:
            yield span
            span.status = "success"
        except Exception as e:
            span.status = "error"
            span.events.append({
                "name": "exception",
                "timestamp": time.time(),
                "attributes": {
                    "exception.type": type(e).__name__,
                    "exception.message": str(e),
                },
            })
            raise
        finally:
            span.end_time = time.time()
            self._current_span_id = prev_span

    def _export_trace(self, trace_id: str):
        for exporter in self._exporters:
            exporter.export(self.traces[trace_id])


# Usage with any LLM/agent framework
tracer = AgentTracer(service_name="customer-support-agent")

async def handle_customer_query(query: str):
    with tracer.start_trace("customer_query", {"query": query}) as trace:

        with tracer.start_span("classify_intent", "llm_call") as span:
            intent = await classify_intent(query)
            span.attributes["intent"] = intent
            span.token_usage = {"input": 150, "output": 20}

        with tracer.start_span("retrieve_context", "retrieval") as span:
            docs = await vector_search(query, top_k=5)
            span.attributes["docs_found"] = len(docs)
            span.attributes["relevance_scores"] = [d.score for d in docs]

        with tracer.start_span("generate_response", "llm_call") as span:
            response = await generate_response(query, docs, intent)
            span.token_usage = {"input": 2000, "output": 500}
            span.attributes["response_length"] = len(response)

        return response
Enter fullscreen mode Exit fullscreen mode

Why this matters: When your agent does something unexpected, you need to replay the exact sequence of LLM calls, tool uses, and decisions. Without structured traces, you're debugging with print() statements.


Pattern 2: Token Budget Guardian

Runaway token costs are the #1 production incident for AI agents. This pattern implements a per-request token budget with circuit breaker behavior:

import threading
from dataclasses import dataclass
from enum import Enum


class BudgetAction(Enum):
    ALLOW = "allow"
    WARN = "warn"
    BLOCK = "block"


@dataclass
class TokenBudget:
    max_input_tokens: int = 50_000
    max_output_tokens: int = 10_000
    max_total_tokens: int = 60_000
    max_llm_calls: int = 20
    warn_threshold: float = 0.8  # warn at 80%


class TokenBudgetGuardian:
    """Prevents runaway token usage in agent loops."""

    def __init__(self, budget: TokenBudget = None):
        self.budget = budget or TokenBudget()
        self._lock = threading.Lock()
        self._usage = {
            "input_tokens": 0,
            "output_tokens": 0,
            "total_tokens": 0,
            "llm_calls": 0,
        }
        self._alerts: list[dict] = []

    def check_and_record(
        self, input_tokens: int, output_tokens: int
    ) -> BudgetAction:
        """Record token usage and return the budget action."""
        with self._lock:
            self._usage["input_tokens"] += input_tokens
            self._usage["output_tokens"] += output_tokens
            self._usage["total_tokens"] += input_tokens + output_tokens
            self._usage["llm_calls"] += 1

            # Check hard limits
            if self._usage["total_tokens"] >= self.budget.max_total_tokens:
                self._alert("BLOCK", "Total token budget exhausted",
                           self._usage.copy())
                return BudgetAction.BLOCK

            if self._usage["llm_calls"] >= self.budget.max_llm_calls:
                self._alert("BLOCK", "Max LLM calls reached",
                           self._usage.copy())
                return BudgetAction.BLOCK

            # Check warn thresholds
            usage_ratio = (
                self._usage["total_tokens"] / self.budget.max_total_tokens
            )
            if usage_ratio >= self.budget.warn_threshold:
                self._alert("WARN",
                           f"Token usage at {usage_ratio:.0%}",
                           self._usage.copy())
                return BudgetAction.WARN

            return BudgetAction.ALLOW

    def _alert(self, level: str, message: str, usage: dict):
        self._alerts.append({
            "level": level,
            "message": message,
            "usage": usage,
            "timestamp": time.time(),
        })

    @property
    def usage_report(self) -> dict:
        with self._lock:
            return {
                **self._usage,
                "budget_remaining": {
                    "tokens": self.budget.max_total_tokens
                              - self._usage["total_tokens"],
                    "calls": self.budget.max_llm_calls
                             - self._usage["llm_calls"],
                },
                "utilization": (
                    self._usage["total_tokens"]
                    / self.budget.max_total_tokens
                ),
            }


# Integrate with any LLM wrapper
guardian = TokenBudgetGuardian(TokenBudget(
    max_total_tokens=100_000,
    max_llm_calls=30,
))

async def guarded_llm_call(prompt: str, **kwargs) -> str:
    """LLM call with budget enforcement."""
    # Estimate input tokens (rough: 1 token ≈ 4 chars)
    est_input = len(prompt) // 4

    response = await llm.complete(prompt, **kwargs)

    action = guardian.check_and_record(
        input_tokens=response.usage.input_tokens,
        output_tokens=response.usage.output_tokens,
    )

    if action == BudgetAction.BLOCK:
        raise TokenBudgetExceeded(
            f"Budget exhausted: {guardian.usage_report}"
        )

    if action == BudgetAction.WARN:
        logger.warning(
            "Token budget warning",
            extra=guardian.usage_report,
        )

    return response.text
Enter fullscreen mode Exit fullscreen mode

Pro tip: Set different budgets for different agent types. A simple Q&A agent needs 10K tokens max; a research agent might legitimately need 200K.


Pattern 3: Decision Audit Log

AI agents make decisions that affect users. You need an immutable log of what the agent decided and why:

import hashlib
from dataclasses import dataclass
from typing import Any
from datetime import datetime, timezone


@dataclass
class DecisionRecord:
    """Immutable record of an agent decision."""
    decision_id: str
    trace_id: str
    timestamp: str
    agent_name: str
    decision_type: str        # "route", "tool_select", "response_filter"
    input_summary: str        # What triggered the decision
    options_considered: list[dict]  # What alternatives existed
    chosen_option: str        # What was selected
    confidence: float         # 0.0 - 1.0
    reasoning: str            # Why (from chain-of-thought or logprobs)
    checksum: str = ""        # Integrity hash

    def __post_init__(self):
        if not self.checksum:
            content = (
                f"{self.decision_id}{self.trace_id}{self.timestamp}"
                f"{self.chosen_option}{self.reasoning}"
            )
            self.checksum = hashlib.sha256(
                content.encode()
            ).hexdigest()[:16]


class DecisionAuditLog:
    """Append-only audit log for agent decisions."""

    def __init__(self, storage_backend=None):
        self.records: list[DecisionRecord] = []
        self.storage = storage_backend  # S3, DB, file, etc.

    def log_decision(
        self,
        trace_id: str,
        agent_name: str,
        decision_type: str,
        input_summary: str,
        options: list[dict],
        chosen: str,
        confidence: float,
        reasoning: str,
    ) -> DecisionRecord:
        record = DecisionRecord(
            decision_id=uuid.uuid4().hex[:12],
            trace_id=trace_id,
            timestamp=datetime.now(timezone.utc).isoformat(),
            agent_name=agent_name,
            decision_type=decision_type,
            input_summary=input_summary,
            options_considered=options,
            chosen_option=chosen,
            confidence=confidence,
            reasoning=reasoning,
        )
        self.records.append(record)

        if self.storage:
            self.storage.append(asdict(record))

        return record

    def get_decisions_for_trace(self, trace_id: str) -> list[DecisionRecord]:
        return [r for r in self.records if r.trace_id == trace_id]

    def get_low_confidence_decisions(
        self, threshold: float = 0.6
    ) -> list[DecisionRecord]:
        """Find decisions where the agent wasn't sure — review these."""
        return [
            r for r in self.records if r.confidence < threshold
        ]


# Usage in an agent routing step
audit = DecisionAuditLog()

async def route_customer_request(query: str, trace_id: str):
    """Route to the right specialist agent with full audit trail."""
    classification = await llm.classify(
        query,
        categories=["billing", "technical", "sales", "general"],
        return_confidence=True,
    )

    audit.log_decision(
        trace_id=trace_id,
        agent_name="router",
        decision_type="route",
        input_summary=query[:200],
        options=[
            {"name": c, "score": s}
            for c, s in classification.all_scores.items()
        ],
        chosen=classification.category,
        confidence=classification.confidence,
        reasoning=f"Top category '{classification.category}' with "
                  f"{classification.confidence:.2%} confidence. "
                  f"Runner-up: '{classification.runner_up}' at "
                  f"{classification.runner_up_score:.2%}",
    )

    return classification.category
Enter fullscreen mode Exit fullscreen mode

Why this matters: When a customer complains "your bot told me I could get a refund" — you need to prove what happened. The audit log is your source of truth.


Pattern 4: Quality Scoring Pipeline

Not every agent response is equal. This pattern scores agent outputs in real-time so you can catch quality drops before users report them:

from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class QualityScore:
    dimension: str  # "relevance", "completeness", "safety", "coherence"
    score: float    # 0.0 - 1.0
    method: str     # "heuristic", "llm_judge", "embedding_similarity"
    details: str = ""


class QualityScorer(ABC):
    @abstractmethod
    async def score(
        self, query: str, response: str, context: dict
    ) -> QualityScore:
        pass


class RelevanceScorer(QualityScorer):
    """Score relevance using embedding cosine similarity."""

    def __init__(self, embedding_model):
        self.model = embedding_model

    async def score(
        self, query: str, response: str, context: dict
    ) -> QualityScore:
        q_emb = await self.model.embed(query)
        r_emb = await self.model.embed(response)

        similarity = cosine_similarity(q_emb, r_emb)

        return QualityScore(
            dimension="relevance",
            score=similarity,
            method="embedding_similarity",
            details=f"Query-response cosine similarity: {similarity:.3f}",
        )


class CompletenessScorer(QualityScorer):
    """Check if all parts of the query were addressed."""

    async def score(
        self, query: str, response: str, context: dict
    ) -> QualityScore:
        # Extract question components
        questions = extract_subquestions(query)
        addressed = 0

        for q in questions:
            q_emb = await self.model.embed(q)
            # Check if any paragraph addresses this sub-question
            paragraphs = response.split("\n\n")
            max_sim = max(
                cosine_similarity(q_emb, await self.model.embed(p))
                for p in paragraphs if p.strip()
            )
            if max_sim > 0.7:
                addressed += 1

        ratio = addressed / len(questions) if questions else 1.0

        return QualityScore(
            dimension="completeness",
            score=ratio,
            method="heuristic",
            details=f"{addressed}/{len(questions)} sub-questions addressed",
        )


class SafetyScorer(QualityScorer):
    """Check for unsafe content patterns."""

    UNSAFE_PATTERNS = [
        r"(?i)(password|secret|api.?key)\s*[:=]\s*\S+",
        r"(?i)(drop table|delete from|truncate)",
        r"(?i)(sudo rm|format c:)",
    ]

    async def score(
        self, query: str, response: str, context: dict
    ) -> QualityScore:
        import re
        violations = []
        for pattern in self.UNSAFE_PATTERNS:
            matches = re.findall(pattern, response)
            if matches:
                violations.extend(matches)

        score = 1.0 if not violations else max(0.0, 1.0 - len(violations) * 0.3)

        return QualityScore(
            dimension="safety",
            score=score,
            method="heuristic",
            details=f"Found {len(violations)} potential safety issues",
        )


class QualityPipeline:
    """Run multiple quality scorers and aggregate results."""

    def __init__(self, scorers: list[QualityScorer], min_threshold: float = 0.5):
        self.scorers = scorers
        self.min_threshold = min_threshold

    async def evaluate(
        self, query: str, response: str, context: dict = None
    ) -> dict:
        context = context or {}
        scores = []

        for scorer in self.scorers:
            try:
                score = await scorer.score(query, response, context)
                scores.append(score)
            except Exception as e:
                scores.append(QualityScore(
                    dimension=scorer.__class__.__name__,
                    score=0.0,
                    method="error",
                    details=str(e),
                ))

        avg_score = sum(s.score for s in scores) / len(scores)
        failed_dimensions = [
            s for s in scores if s.score < self.min_threshold
        ]

        return {
            "overall_score": avg_score,
            "scores": [
                {"dimension": s.dimension, "score": s.score, "details": s.details}
                for s in scores
            ],
            "passed": len(failed_dimensions) == 0,
            "failed_dimensions": [s.dimension for s in failed_dimensions],
            "action": "pass" if avg_score > 0.7
                      else "review" if avg_score > 0.4
                      else "block",
        }


# Production setup
pipeline = QualityPipeline(
    scorers=[
        RelevanceScorer(embedding_model),
        CompletenessScorer(),
        SafetyScorer(),
    ],
    min_threshold=0.5,
)
Enter fullscreen mode Exit fullscreen mode

Pattern 5: Agent Session Replay

When something goes wrong, you want to replay exactly what happened — like a DVR for your agent. This pattern captures everything needed for deterministic replay:

import json
import gzip
from pathlib import Path
from dataclasses import dataclass, asdict


@dataclass
class ReplayEvent:
    timestamp: float
    event_type: str  # "input", "llm_request", "llm_response",
                     # "tool_call", "tool_result", "output"
    data: dict


class SessionRecorder:
    """Record agent sessions for replay and debugging."""

    def __init__(self, session_id: str, storage_dir: str = "./replays"):
        self.session_id = session_id
        self.events: list[ReplayEvent] = []
        self.storage_dir = Path(storage_dir)
        self.storage_dir.mkdir(parents=True, exist_ok=True)

    def record(self, event_type: str, data: dict):
        event = ReplayEvent(
            timestamp=time.time(),
            event_type=event_type,
            data=data,
        )
        self.events.append(event)

    def record_llm_request(self, model: str, messages: list, params: dict):
        self.record("llm_request", {
            "model": model,
            "messages": messages,
            "params": {k: v for k, v in params.items()
                      if k != "api_key"},  # never log secrets
        })

    def record_llm_response(self, response_text: str, usage: dict,
                            finish_reason: str):
        self.record("llm_response", {
            "text": response_text,
            "usage": usage,
            "finish_reason": finish_reason,
        })

    def record_tool_call(self, tool_name: str, arguments: dict):
        self.record("tool_call", {
            "tool": tool_name,
            "arguments": self._sanitize(arguments),
        })

    def record_tool_result(self, tool_name: str, result: Any,
                           duration_ms: float):
        self.record("tool_result", {
            "tool": tool_name,
            "result": str(result)[:5000],  # truncate large results
            "duration_ms": duration_ms,
        })

    def save(self) -> str:
        """Save session to compressed JSON."""
        filepath = (
            self.storage_dir / f"session-{self.session_id}.json.gz"
        )
        data = {
            "session_id": self.session_id,
            "event_count": len(self.events),
            "duration_ms": (
                (self.events[-1].timestamp - self.events[0].timestamp) * 1000
                if len(self.events) > 1 else 0
            ),
            "events": [asdict(e) for e in self.events],
        }
        with gzip.open(filepath, "wt") as f:
            json.dump(data, f, indent=2)

        return str(filepath)

    def _sanitize(self, data: dict) -> dict:
        """Remove sensitive fields from recorded data."""
        sensitive_keys = {"password", "token", "secret", "api_key", "key"}
        return {
            k: "***REDACTED***" if k.lower() in sensitive_keys else v
            for k, v in data.items()
        }


class SessionReplayer:
    """Replay a recorded session for debugging."""

    def __init__(self, filepath: str):
        with gzip.open(filepath, "rt") as f:
            self.data = json.load(f)
        self.events = self.data["events"]

    def get_timeline(self) -> list[dict]:
        """Get a human-readable timeline of events."""
        timeline = []
        start = self.events[0]["timestamp"]
        for event in self.events:
            elapsed = (event["timestamp"] - start) * 1000
            timeline.append({
                "time_ms": f"+{elapsed:.0f}ms",
                "type": event["event_type"],
                "summary": self._summarize(event),
            })
        return timeline

    def get_llm_calls(self) -> list[dict]:
        """Extract all LLM request/response pairs."""
        pairs = []
        pending_request = None
        for event in self.events:
            if event["event_type"] == "llm_request":
                pending_request = event
            elif event["event_type"] == "llm_response" and pending_request:
                pairs.append({
                    "request": pending_request["data"],
                    "response": event["data"],
                    "latency_ms": (
                        event["timestamp"] - pending_request["timestamp"]
                    ) * 1000,
                })
                pending_request = None
        return pairs

    def get_total_tokens(self) -> dict:
        """Sum all token usage across LLM calls."""
        total = {"input": 0, "output": 0, "total": 0}
        for event in self.events:
            if event["event_type"] == "llm_response":
                usage = event["data"].get("usage", {})
                total["input"] += usage.get("input_tokens", 0)
                total["output"] += usage.get("output_tokens", 0)
        total["total"] = total["input"] + total["output"]
        return total

    def _summarize(self, event: dict) -> str:
        data = event["data"]
        match event["event_type"]:
            case "llm_request":
                return f"{data['model']} ({len(data['messages'])} messages)"
            case "llm_response":
                return (
                    f"{data['usage'].get('output_tokens', '?')} tokens, "
                    f"{data['finish_reason']}"
                )
            case "tool_call":
                return f"🔧 {data['tool']}({list(data['arguments'].keys())})"
            case "tool_result":
                return f"{data['tool']} in {data['duration_ms']:.0f}ms"
            case _:
                return str(data)[:100]
Enter fullscreen mode Exit fullscreen mode

Pattern 6: Anomaly Detection for Agent Behavior

Agents can drift. This pattern detects when an agent starts behaving differently from its baseline:

import statistics
from collections import deque
from dataclasses import dataclass


@dataclass
class AnomalyAlert:
    metric: str
    current_value: float
    baseline_mean: float
    baseline_stdev: float
    z_score: float
    severity: str  # "warning", "critical"
    message: str


class AgentAnomalyDetector:
    """Detect unusual agent behavior using statistical baselines."""

    def __init__(self, window_size: int = 100, z_threshold: float = 2.5):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.baselines: dict[str, deque] = {}
        self.alerts: list[AnomalyAlert] = []

    def record_metric(self, name: str, value: float) -> Optional[AnomalyAlert]:
        """Record a metric and check for anomalies."""
        if name not in self.baselines:
            self.baselines[name] = deque(maxlen=self.window_size)

        window = self.baselines[name]

        # Need at least 20 samples for a baseline
        if len(window) >= 20:
            mean = statistics.mean(window)
            stdev = statistics.stdev(window)

            if stdev > 0:
                z_score = (value - mean) / stdev

                if abs(z_score) >= self.z_threshold:
                    severity = "critical" if abs(z_score) > 4 else "warning"
                    direction = "above" if z_score > 0 else "below"

                    alert = AnomalyAlert(
                        metric=name,
                        current_value=value,
                        baseline_mean=mean,
                        baseline_stdev=stdev,
                        z_score=z_score,
                        severity=severity,
                        message=(
                            f"{name} is {abs(z_score):.1f}σ {direction} "
                            f"baseline ({value:.1f} vs μ={mean:.1f}±{stdev:.1f})"
                        ),
                    )
                    self.alerts.append(alert)
                    window.append(value)
                    return alert

        window.append(value)
        return None


# Track these metrics per agent run
detector = AgentAnomalyDetector()

async def monitored_agent_run(query: str):
    start = time.time()
    result = await agent.run(query)
    elapsed = time.time() - start

    # Check for anomalies across multiple dimensions
    alerts = []
    for metric, value in [
        ("latency_ms", elapsed * 1000),
        ("total_tokens", result.token_usage.total),
        ("llm_calls", result.step_count),
        ("tool_calls", result.tool_call_count),
        ("response_length", len(result.output)),
    ]:
        alert = detector.record_metric(metric, value)
        if alert:
            alerts.append(alert)
            logger.warning(f"ANOMALY: {alert.message}")

    if any(a.severity == "critical" for a in alerts):
        await send_pagerduty_alert(alerts)

    return result
Enter fullscreen mode Exit fullscreen mode

Pattern 7: Cost Attribution Dashboard

Know exactly where your money goes — per agent, per feature, per customer:

from collections import defaultdict
from dataclasses import dataclass


# Pricing per 1M tokens (update with your model's pricing)
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-sonnet": {"input": 3.00, "output": 15.00},
    "claude-haiku": {"input": 0.25, "output": 1.25},
}


class CostTracker:
    """Track and attribute LLM costs per dimension."""

    def __init__(self):
        self._costs = defaultdict(lambda: defaultdict(float))
        self._token_counts = defaultdict(lambda: defaultdict(int))

    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        dimensions: dict[str, str],
    ):
        """
        Record a cost event with attribution dimensions.

        dimensions example:
            {"agent": "router", "customer": "acme", "feature": "support"}
        """
        pricing = MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
        cost = (
            (input_tokens / 1_000_000) * pricing["input"]
            + (output_tokens / 1_000_000) * pricing["output"]
        )

        for dim_name, dim_value in dimensions.items():
            key = f"{dim_name}:{dim_value}"
            self._costs[dim_name][dim_value] += cost
            self._token_counts[dim_name][dim_value] += (
                input_tokens + output_tokens
            )

    def report(self, dimension: str) -> list[dict]:
        """Get cost breakdown by dimension, sorted by cost."""
        items = [
            {
                "name": name,
                "cost_usd": round(cost, 4),
                "tokens": self._token_counts[dimension][name],
            }
            for name, cost in self._costs[dimension].items()
        ]
        return sorted(items, key=lambda x: x["cost_usd"], reverse=True)

    def total_cost(self) -> float:
        """Total cost across all dimensions."""
        # Use any dimension to avoid double-counting
        if not self._costs:
            return 0.0
        first_dim = next(iter(self._costs))
        return round(sum(self._costs[first_dim].values()), 4)

    def daily_report_markdown(self) -> str:
        """Generate a Markdown daily cost report."""
        lines = ["# Daily AI Cost Report", ""]

        for dimension in sorted(self._costs.keys()):
            lines.append(f"## By {dimension.title()}")
            lines.append("")
            lines.append("| Name | Cost (USD) | Tokens |")
            lines.append("|------|-----------|--------|")
            for item in self.report(dimension):
                lines.append(
                    f"| {item['name']} | ${item['cost_usd']:.4f} "
                    f"| {item['tokens']:,} |"
                )
            lines.append("")

        lines.append(f"**Total: ${self.total_cost():.4f}**")
        return "\n".join(lines)


# Usage
costs = CostTracker()

# After each LLM call
costs.record(
    model="gpt-4o-mini",
    input_tokens=1500,
    output_tokens=300,
    dimensions={
        "agent": "customer-support",
        "customer": "acme-corp",
        "feature": "ticket-routing",
    },
)

# End of day
print(costs.daily_report_markdown())
Enter fullscreen mode Exit fullscreen mode

Putting It All Together: The Observability Stack

Here's how these 7 patterns connect in a production system:

┌─────────────────────────────────────────────────┐
│                  Agent Request                    │
├─────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐              │
│  │ Trace Context│  │ Token Budget │              │
│  │  (Pattern 1) │  │  (Pattern 2) │              │
│  └──────┬───────┘  └──────┬───────┘              │
│         │                 │                       │
│  ┌──────▼─────────────────▼──────┐               │
│  │     Agent Execution Loop       │               │
│  │  ┌──────────┐ ┌────────────┐  │               │
│  │  │ Decision  │ │  Session   │  │               │
│  │  │ Audit Log │ │  Recorder  │  │               │
│  │  │(Pattern 3)│ │ (Pattern 5)│  │               │
│  │  └──────────┘ └────────────┘  │               │
│  └──────────────┬────────────────┘               │
│                 │                                 │
│  ┌──────────────▼──────────────┐                 │
│  │      Quality Scoring         │                 │
│  │       (Pattern 4)            │                 │
│  └──────────────┬──────────────┘                 │
│                 │                                 │
│  ┌──────────────▼──────┐ ┌──────────────────┐   │
│  │  Anomaly Detection   │ │ Cost Attribution  │   │
│  │    (Pattern 6)       │ │   (Pattern 7)     │   │
│  └──────────────────────┘ └──────────────────┘   │
├─────────────────────────────────────────────────┤
│               Export Layer                        │
│  OpenTelemetry → Jaeger / Grafana / S3 / Slack   │
└─────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Quick Start: Minimal Production Setup

If you're just getting started, implement these in order of impact:

  1. Token Budget Guardian (Pattern 2) — prevents financial disasters
  2. Structured Traces (Pattern 1) — makes debugging possible
  3. Cost Attribution (Pattern 7) — know where money goes
  4. Quality Scoring (Pattern 4) — catch quality drops early

Patterns 3, 5, and 6 become critical as you scale past a single agent.


Key Takeaways

  1. Traditional monitoring is blind to AI agent failures. A 200 OK means nothing when the response is a hallucination.

  2. Token budgets are non-negotiable. One infinite retry loop can cost more than your monthly infrastructure bill.

  3. Decision audit logs are your legal protection. When users dispute what your agent told them, you need receipts.

  4. Quality scoring catches silent failures. Agents don't crash — they degrade. You need continuous quality measurement.

  5. Session replay is your time machine. When something goes wrong at 3 AM, you need to see exactly what happened.

  6. Anomaly detection catches drift. Agent behavior changes with model updates, prompt changes, and data shifts. Statistical baselines catch this automatically.

  7. Cost attribution prevents surprises. Know exactly which agent, customer, and feature is driving your LLM bill.


Building AI agents without observability is like driving at night without headlights. You might reach your destination, but you won't see the cliff until it's too late.

The code in this article is framework-agnostic and production-tested. Fork it, adapt it, and ship it.

Want more patterns like these? Follow me for the next article in the AI Engineering in Practice series.


What's your biggest agent observability challenge? Drop a comment below — I read every single one.

Top comments (0)