DEV Community

Cover image for Designing a Secure Observability Contract for AI Agents: Logs, Spans, and Safety Signals
Kowshik Jallipalli
Kowshik Jallipalli

Posted on

Designing a Secure Observability Contract for AI Agents: Logs, Spans, and Safety Signals

When a traditional API fails, you get a stack trace pointing to a specific line of code. When a multi-agent workflow fails, you get a $40 bill for an agent that spent three minutes hallucinating malformed SQL queries against a database.

Agents do not just execute code; they make autonomous routing decisions. If a Planner agent delegates to a Tool agent, which hits a rate limit and retries infinitely, standard application logs will just show a wall of unstructured text.

However, after auditing dozens of "AI Observability" implementations, a massive flaw emerges: most homemade agent loggers are completely thread-unsafe, leak PII into plaintext databases, and use flawed timing metrics. Here is how to build a rigorous, heavily audited observability contract for multi-agent workflows so you can trace, debug, and safely halt rogue execution in production.

Why This Matters (The Audit Perspective)
By treating AI agents as first-class observability citizens—emitting standardized spans with cost, token counts, and safety flags—you transform a black box into a deterministic system.

But telemetry isn't just for dashboards; it acts as the data backbone for active runtime safety policies. If you build this system poorly, your safety checks will suffer from Time-of-Check to Time-of-Use (TOCTOU) race conditions. Two concurrent agents might check the $0.50 budget limit simultaneously, see $0.49, and both execute $0.10 queries, blowing past your financial circuit breaker. A secure observability layer enforces strict concurrency controls and sanitizes data before it ever hits the disk.

How It Works: The Hardened Span
We model agent execution exactly like distributed microservice tracing. Every action is a "Span."

To make this queryable and secure, every agent must adhere to a strict Observability Contract. Every emitted span must contain: step_id, parent_step_id, tool, input_size, output_size, latency_ms, cost, status, and safety_flags.

By aggregating these spans safely at runtime, we can enforce Telemetry-Powered Policies:

Cost limit: Block the agent if sum(cost) for the trace_id exceeds a threshold.

Loop limit: Kill the workflow if count(tool_calls) > 5.

Data Sanitization: Strip secrets from stack traces before writing the span to storage.

The Code: Contract, Thread-Safe Logger, and Safety Enforcer
Here is the audited, production-ready implementation in Python. Notice the critical security and testing fixes: we use time.perf_counter() for accurate latency (immune to NTP drift), enable SQLite WAL mode for concurrent writes, and implement explicit exception sanitization.

import time
import sqlite3
import uuid
import re
from typing import Optional
from pydantic import BaseModel

# 1. The Strict Observability Contract
class AgentSpan(BaseModel):
    trace_id: str
    step_id: str
    parent_step_id: Optional[str]
    agent_name: str
    tool_name: Optional[str]
    input_tokens: int = 0
    output_tokens: int = 0
    latency_ms: float = 0.0 # AUDIT FIX: Float for high-precision perf_counter
    cost_usd: float = 0.0
    status: str = "success"
    safety_flags: int = 0

# 2. Thread-Safe DIY Logger (SQLite)
class SecureAgentLogger:
    def __init__(self, db_path: str = "agent_traces.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)

        # AUDIT FIX: Enable Write-Ahead Logging (WAL) to prevent 'database is locked'
        # errors when multiple agents log spans concurrently.
        self.conn.execute("PRAGMA journal_mode=WAL;")
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS spans (
                trace_id TEXT, step_id TEXT, parent_step_id TEXT,
                agent_name TEXT, tool_name TEXT, input_tokens INTEGER,
                output_tokens INTEGER, latency_ms REAL, cost_usd REAL,
                status TEXT, safety_flags INTEGER
            )
        """)
        self.conn.commit()

    def record_span(self, span: AgentSpan):
        self.conn.execute(
            "INSERT INTO spans VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
            (span.trace_id, span.step_id, span.parent_step_id, span.agent_name, 
             span.tool_name, span.input_tokens, span.output_tokens, span.latency_ms, 
             span.cost_usd, span.status, span.safety_flags)
        )
        self.conn.commit()

    def get_trace_cost(self, trace_id: str) -> float:
        cur = self.conn.execute("SELECT SUM(cost_usd) FROM spans WHERE trace_id = ?", (trace_id,))
        return cur.fetchone()[0] or 0.0

    def get_tool_call_count(self, trace_id: str) -> int:
        cur = self.conn.execute("SELECT COUNT(*) FROM spans WHERE trace_id = ? AND tool_name IS NOT NULL", (trace_id,))
        return cur.fetchone()[0] or 0

# 3. Telemetry-Powered Safety Engine
class SecureAgentTracer:
    def __init__(self, logger: SecureAgentLogger, trace_id: str, parent_id: str = None):
        self.logger = logger
        self.trace_id = trace_id
        self.parent_id = parent_id

        # Hardcoded Safety Policies
        self.MAX_TRACE_COST = 0.50 
        self.MAX_TOOL_CALLS = 5

    def sanitize_error(self, error_msg: str) -> str:
        """AUDIT FIX: Prevent PII/Secrets in stack traces from leaking into telemetry."""
        # Strip common credential patterns (basic example)
        sanitized = re.sub(r'(api_key|password|secret)=["\'][^"\']+["\']', r'\1=[REDACTED]', error_msg, flags=re.IGNORECASE)
        return sanitized[:500] # Truncate

    def __enter__(self):
        # AUDIT FIX: time.time() is subject to system clock updates. 
        # perf_counter is strictly monotonic and required for accurate benchmarking.
        self.start_time = time.perf_counter()
        self.step_id = str(uuid.uuid4())

        # Policy Check: Halt before execution if budget is blown
        current_cost = self.logger.get_trace_cost(self.trace_id)
        if current_cost > self.MAX_TRACE_COST:
            raise RuntimeError(f"Safety Halt: Trace cost ${current_cost} exceeds limit.")

        tool_calls = self.logger.get_tool_call_count(self.trace_id)
        if tool_calls >= self.MAX_TOOL_CALLS:
            raise RuntimeError(f"Safety Halt: Infinite loop suspected. Tool calls: {tool_calls}")

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        latency = (time.perf_counter() - self.start_time) * 1000

        status = "success"
        safety_flag = 0

        if exc_type:
            status = f"error: {self.sanitize_error(str(exc_val))}"
            if "DROP" in str(exc_val) or "Unauthorized" in str(exc_val):
                safety_flag = 1

        # In a real app, extract actual tokens/cost from the LLM response object
        span = AgentSpan(
            trace_id=self.trace_id,
            step_id=self.step_id,
            parent_step_id=self.parent_id,
            agent_name="db_query_tool", 
            tool_name="execute_sql" if not exc_type else None,    
            input_tokens=150,
            output_tokens=50,
            latency_ms=round(latency, 2),
            cost_usd=0.01,
            status=status,
            safety_flags=safety_flag
        )

        self.logger.record_span(span)
        if span.safety_flags > 0:
            print(f"🚨 Escalate to Human: Safety flag triggered in step {self.step_id}")

# Usage Example
if __name__ == "__main__":
    db = SecureAgentLogger()
    session_trace_id = str(uuid.uuid4())

    try:
        # Step 1: Tool Call
        with SecureAgentTracer(db, session_trace_id) as tracer:
            time.sleep(0.1) # Simulate LLM I/O

        # Step 2: Summarizer Call
        with SecureAgentTracer(db, session_trace_id, parent_id=tracer.step_id) as tracer2:
            time.sleep(0.05)

        print(f"Trace {session_trace_id} complete. Total cost: ${db.get_trace_cost(session_trace_id)}")
    except RuntimeError as e:
        print(e)
Enter fullscreen mode Exit fullscreen mode

Pitfalls and Gotchas
When building agent telemetry, watch out for these operational and security traps:

Concurrency Database Locks: As addressed in the code, if you use standard SQLite and fire off three parallel agents using asyncio.gather(), your database will throw a sqlite3.OperationalError: database is locked. You must enable PRAGMA journal_mode=WAL; (Write-Ahead Logging) or use a robust queue (like Redis or RabbitMQ) to batch telemetry writes.

The TOCTOU Race Condition: Our cost limit check happens before the agent executes. If three parallel agents check the database simultaneously, they might all see a total cost of $0.49, pass the gate, and each spend $0.10—resulting in a final bill of $0.79, violating your $0.50 limit. Fix: For parallel swarms, implement a distributed lock (e.g., Redis INCRBYFLOAT) to reserve budget before the LLM call.

PII Leaks in Exception Handling: If an agent fails to connect to Postgres, exc_val might contain the raw connection string, including the password. If you blindly log str(exc_val) to your telemetry database, you have created a massive data leak. Always sanitize error logs before recording the span.

Async Context Dropping: If your agents run in Python asyncio or Node.js workers, you must use context variables (contextvars in Python or AsyncLocalStorage in Node) to implicitly pass the trace_id and parent_step_id. Passing them manually as function arguments across a massive orchestration codebase will fail.

What to Try Next
Ready to harden your agent observability? Try these next steps:

Export to OpenTelemetry (OTLP): Rip out the SQLite logger and replace it with the standard OpenTelemetry Python SDK. This allows you to forward your agent spans directly to Datadog, Honeycomb, or Jaeger, utilizing their enterprise-grade dashboards and alerting without changing your contract.

LLM-as-a-Judge Safety Flags: Instead of relying on static regex checks (like looking for the word "DROP"), inject a fast, cheap model (like Claude 3.5 Haiku) as an asynchronous background task. Have it evaluate the output of an agent step and update the safety_flags column to 1 if it detects prompt injection or data exfiltration.

Streaming Token Circuit Breakers: The current tracer waits for the LLM call to finish before recording the cost. Upgrade your LLM client to use streaming, and maintain a running counter of generated tokens. If the mid-stream cost breaches the budget, forcefully close the connection (response.close()) to halt the generation instantly.

Top comments (0)