Imagine this: You’ve just deployed a state-of-the-art autonomous AI agent. It uses advanced reasoning loops, accesses a vector database for long-term memory, and dynamically optimizes its own prompts to deliver incredibly accurate results. For the first few hours, it’s a triumph.
Then, you check your API dashboard.
In less than half a day, your agent has managed to burn through hundreds of dollars. It got caught in an infinite loop of self-reflection, repeatedly sending massive context windows to an expensive frontier model. Even worse, several users are complaining that the agent’s response times have ballooned to over thirty seconds, but you have no idea which step in the agent's chain of thought is causing the bottleneck.
This is the reality of operating AI agents in production without a dedicated observability and telemetry layer.
When we transition from simple, single-turn LLM queries to complex, self-improving agentic workflows, traditional application performance monitoring (APM) tools fall short. We don't just need to know if a server is up; we need to know how many tokens were consumed, the exact cost of each step, whether prompt caching was utilized effectively, and how latency behaves across streaming and asynchronous calls.
Let's break down the engineering principles behind building a production-grade telemetry layer for autonomous agents and explore how to implement a reusable tracking architecture in Python.
(The concepts and code demonstrated here are drawn from my ebook Hermes Agent, The Self-Evolving AI Workforce)
The Concept of the Agent's Flight Recorder
In aviation, a flight data recorder (the "black box") continuously captures hundreds of parameters during a flight. If something goes wrong, investigators don't guess; they look at the telemetry.
An AI agent requires the exact same level of instrumentation. An autonomous agent typically operates in a continuous cycle: Observation → Action → Reflection → Memory Update.
Without rigorous telemetry, this cycle is a black box. You cannot answer critical operational questions:
- Did a recent prompt optimization actually reduce latency, or did it make it worse?
- Is a background memory compaction routine silently draining your budget by processing thousands of historical tokens?
- How do you enforce hard financial guardrails when an agent makes thousands of nested API calls per hour?
By embedding a telemetry layer directly into the agent’s runtime, every API interaction becomes a structured data point. This data is not just for human developers to view on a dashboard; it flows directly back into the agent's persistent memory. This enables the agent to engage in self-evolution, using cost and latency metrics as feedback signals to prune expensive prompts, switch to cheaper models, or truncate bloated contexts.
The Three Pillars of Agent Telemetry
To build an effective telemetry system for AI agents, we must design around three core pillars: Cost Tracking, Token Accounting, and Latency Decomposition.
+-------------------------------------------------------------+
| Agent Telemetry |
+------------------------------+------------------------------+
|
+-----------------------+-----------------------+
| | |
v v v
[ Cost Tracking ] [ Token Accounting ] [ Latency Decomposition ]
- Financial Auditor - Performance Engineer - Race Engineer
- Multi-variable calc - Cache hit/miss ratio - TTFT vs. Total Latency
- Route-based pricing - Provider normalization - Bottleneck isolation
1. Cost Tracking: The Financial Auditor
Cost in LLM applications is rarely a simple, static number. It is a multi-variable function of the provider, the model, the routing mechanism, and the specific type of tokens processed.
A single API call might involve:
- Input Tokens: The base cost of sending your prompt.
- Output Tokens: The cost of generating the response (typically 3x to 4x more expensive than input tokens).
- Cache Reads: Discounted tokens read from the provider's prompt cache.
- Cache Writes: Tokens written to the cache, which may carry a slight premium but save money on subsequent turns.
To track this accurately, your telemetry layer must maintain a structured pricing database that maps provider-model pairs to their respective per-million-token rates. Furthermore, it must distinguish between direct API routes (like calling Anthropic directly) and proxy routes (like using OpenRouter or local offline models), as the billing rules change depending on the route.
Every API call should generate a financial transaction log. By aggregating these logs, the agent can monitor its own spending and trigger fallback behaviors—such as switching from a frontier model to a lightweight open-source model—if it approaches its daily budget.
2. Token Accounting: The Performance Engineer
Raw token counts can be incredibly deceptive. If your agent sends a 10,000-token prompt but benefits from a 90% prompt cache hit rate, your actual billed usage is drastically lower than the raw context size suggests.
True token accounting requires normalizing token usage into standardized buckets across different providers. While OpenAI, Anthropic, and Cohere all return token usage in their API responses, they format this data differently. Your telemetry layer must parse these disparate response shapes into a unified, canonical structure that tracks:
-
input_tokens -
output_tokens -
cache_read_tokens -
cache_write_tokens -
reasoning_tokens(for models that expose internal chain-of-thought processing)
By analyzing these metrics over time, you can calculate your cache efficiency ratio. If your cache hit rate is consistently low, it indicates that your agent's context window is changing too rapidly, or your prompt templates are poorly structured, preventing the API gateway from reusing cached states.
3. Latency Decomposition: The Race Engineer
In interactive agent applications, latency is the ultimate user experience killer. However, measuring total round-trip time is not enough. We need to decompose latency into its constituent parts:
- Pre-processing Latency: The time spent retrieving memories, formatting prompts, and searching vector databases.
- Time to First Token (TTFT): The time elapsed between sending the request and receiving the very first token. This is the most critical metric for perceived speed in streaming interfaces.
- Generation Latency: The time spent streaming the remainder of the response.
- Post-processing Latency: The time spent parsing JSON, executing tool calls, and writing updates back to persistent memory.
If an agent step takes 15 seconds, latency decomposition allows you to pinpoint the exact culprit. Was the network slow? Did the model spend too long generating reasoning tokens? Or did your database query take 10 seconds to fetch relevant context?
The Closed-Loop Feedback: Self-Optimization
The true magic happens when you couple telemetry with the agent's memory system. When telemetry data is stored alongside conversational history, the agent can run diagnostic routines on its own performance.
For example, if the agent detects that its average latency over the last fifty steps has degraded by 30%, it can query its telemetry logs, discover that the context size has grown too large, and autonomously trigger a memory compaction routine to summarize older turns and reduce the prompt size.
Similarly, an optimization framework can use cost-per-task as a reward signal, evolving prompt templates not just for accuracy, but for cost efficiency.
Implementing a Production Telemetry Layer
Let's look at how to implement this architecture in Python. We will build a robust, production-ready TelemetryCollector that handles time tracking, token normalization, and cost estimation.
Below is a complete, self-contained implementation of the telemetry pattern.
import time
import logging
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Dict, Any, Optional
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AgentTelemetry")
@dataclass(frozen=True)
class CanonicalUsage:
"""Standardized token representation across all LLM providers."""
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
@property
def total_prompt_tokens(self) -> int:
return self.input_tokens + self.cache_read_tokens + self.cache_write_tokens
@property
def total_tokens(self) -> int:
return self.total_prompt_tokens + self.output_tokens
@dataclass(frozen=True)
class PricingRates:
"""Cost per million tokens in USD."""
input_rate: Decimal
output_rate: Decimal
cache_read_rate: Decimal = Decimal("0.00")
cache_write_rate: Decimal = Decimal("0.00")
# Static pricing snapshot for popular models
MODEL_PRICING_DATABASE: Dict[str, PricingRates] = {
"claude-3-5-sonnet": PricingRates(
input_rate=Decimal("3.00"),
output_rate=Decimal("15.00"),
cache_read_rate=Decimal("0.30"),
cache_write_rate=Decimal("3.75")
),
"gpt-4o": PricingRates(
input_rate=Decimal("2.50"),
output_rate=Decimal("10.00"),
cache_read_rate=Decimal("1.25")
),
"deepseek-chat": PricingRates(
input_rate=Decimal("0.14"),
output_rate=Decimal("0.28"),
cache_read_rate=Decimal("0.014")
)
}
@dataclass
class TelemetryRecord:
"""The final telemetry record for an agent interaction."""
model_name: str
provider: str
usage: CanonicalUsage
estimated_cost_usd: Decimal
latency_ms: float
time_to_first_token_ms: Optional[float] = None
timestamp: float = field(default_factory=time.time)
class TelemetryCollector:
"""Context manager to track LLM execution metrics, cost, and latency."""
def __init__(self, model_name: str, provider: str):
self.model_name = model_name
self.provider = provider
self.start_time: float = 0.0
self.first_token_time: Optional[float] = None
self.end_time: float = 0.0
self.usage: CanonicalUsage = CanonicalUsage()
def __enter__(self):
self.start_time = time.perf_counter()
return self
def record_first_token(self):
"""Call this when the first token is received in a streaming response."""
self.first_token_time = time.perf_counter()
def set_usage(self, raw_usage: Dict[str, Any]):
"""Normalizes and sets token usage based on provider format."""
if self.provider.lower() == "anthropic":
self.usage = CanonicalUsage(
input_tokens=raw_usage.get("input_tokens", 0),
output_tokens=raw_usage.get("output_tokens", 0),
cache_read_tokens=raw_usage.get("cache_read_input_tokens", 0),
cache_write_tokens=raw_usage.get("cache_creation_input_tokens", 0)
)
elif self.provider.lower() == "openai":
# Extract prompt caching details if present
details = raw_usage.get("prompt_tokens_details", {})
cached = details.get("cached_tokens", 0)
input_tokens = raw_usage.get("prompt_tokens", 0) - cached
self.usage = CanonicalUsage(
input_tokens=max(0, input_tokens),
output_tokens=raw_usage.get("completion_tokens", 0),
cache_read_tokens=cached
)
else:
# Fallback for generic providers
self.usage = CanonicalUsage(
input_tokens=raw_usage.get("prompt_tokens", 0),
output_tokens=raw_usage.get("completion_tokens", 0)
)
def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.perf_counter()
# Calculate latency metrics
total_latency = (self.end_time - self.start_time) * 1000
ttft = None
if self.first_token_time:
ttft = (self.first_token_time - self.start_time) * 1000
# Calculate costs
cost = self._calculate_cost()
record = TelemetryRecord(
model_name=self.model_name,
provider=self.provider,
usage=self.usage,
estimated_cost_usd=cost,
latency_ms=total_latency,
time_to_first_token_ms=ttft
)
self._log_telemetry(record)
self._save_to_persistent_storage(record)
def _calculate_cost(self) -> Decimal:
rates = MODEL_PRICING_DATABASE.get(self.model_name)
if not rates:
logger.warning(f"No pricing rates found for model: {self.model_name}. Cost estimated at 0.00.")
return Decimal("0.00")
# Convert tokens to millions for rate multiplication
input_m = Decimal(self.usage.input_tokens) / Decimal("1000000")
output_m = Decimal(self.usage.output_tokens) / Decimal("1000000")
cache_read_m = Decimal(self.usage.cache_read_tokens) / Decimal("1000000")
cache_write_m = Decimal(self.usage.cache_write_tokens) / Decimal("1000000")
cost = (
(input_m * rates.input_rate) +
(output_m * rates.output_rate) +
(cache_read_m * rates.cache_read_rate) +
(cache_write_m * rates.cache_write_rate)
)
return cost.quantize(Decimal("1.000000"))
def _log_telemetry(self, record: TelemetryRecord):
logger.info(
f"\n[Telemetry Log] Model: {record.model_name} ({record.provider})\n"
f" - Total Latency: {record.latency_ms:.2f}ms\n"
f" - TTFT: {f'{record.time_to_first_token_ms:.2f}ms' if record.time_to_first_token_ms else 'N/A'}\n"
f" - Tokens: Input={record.usage.input_tokens} | Output={record.usage.output_tokens} | Cached={record.usage.cache_read_tokens}\n"
f" - Estimated Cost: ${record.estimated_cost_usd:.6f}\n"
)
def _save_to_persistent_storage(self, record: TelemetryRecord):
# In production, you would append this to an active SQLite table,
# a JSON Lines file, or stream it to a centralized logging system.
pass
# ==========================================
# Example Usage
# ==========================================
if __name__ == "__main__":
print("Simulating an API call with Telemetry Tracking...")
# Simulate calling Claude 3.5 Sonnet with prompt caching
with TelemetryCollector(model_name="claude-3-5-sonnet", provider="Anthropic") as telemetry:
# Simulate network latency before first token
time.sleep(0.4)
telemetry.record_first_token()
# Simulate streaming generation
time.sleep(0.8)
# Mock response usage payload returned from Anthropic's API
mock_api_usage = {
"input_tokens": 1200,
"output_tokens": 350,
"cache_read_input_tokens": 8000,
"cache_creation_input_tokens": 0
}
telemetry.set_usage(mock_api_usage)
Best Practices for Scaling Agent Observability
As your agent fleet grows from a single prototype to dozens of concurrent workers, managing telemetry data requires careful system design. Here are three critical patterns to follow:
1. Scalable Log Processing (Iterating Over File Objects)
As your agent runs continuously, its telemetry logs will grow rapidly. If you attempt to load an entire telemetry log file into memory to calculate daily spending or average latency, you risk crashing your application due to memory exhaustion.
Instead, always stream and parse logs line-by-line using Python’s file iteration protocols. This ensures your memory footprint remains constant, whether you are processing 10 logs or 10 million.
import json
def calculate_daily_spend(log_filepath: str) -> Decimal:
total_spend = Decimal("0.00")
# Using 'with' open iterates over the file object line-by-line,
# loading only one line into memory at a time.
with open(log_filepath, "r") as log_file:
for line in log_file:
record = json.loads(line)
total_spend += Decimal(record.get("estimated_cost_usd", "0.00"))
return total_spend
2. Implement Hard Guardrails (Alert Thresholds)
Telemetry is only useful if it can trigger action. Implement a lightweight control loop that inspects telemetry records in real-time. Define clear thresholds for:
- Budget per Window: If spending over the last 10 minutes exceeds $2.00, temporarily suspend agent execution or force-downgrade to a cheaper model.
- Latency Degradation: If the 90th percentile of latency over the last 10 steps exceeds a set threshold, fall back to non-streaming modes or switch to a faster model.
- Token Spikes: If a single prompt exceeds a specific token limit, automatically trigger a context-truncation function before sending the payload.
3. Handle Dynamic Pricing Safely
API pricing is a moving target. While keeping a static snapshot of model rates in your codebase is a great starting point, you must design your telemetry system to handle missing or outdated pricing data gracefully.
If a model isn't in your database, return a status of "unknown" and log the transaction using raw token counts. This prevents your entire agentic loop from crashing simply because a provider released a new model version overnight.
Conclusion: Telemetry is the Nervous System of AI
Observability is not an afterthought or a "nice-to-have" feature to be added right before deployment. For autonomous, self-improving AI agents, telemetry is their nervous system. It provides the sensory data required for the agent to understand its environment, evaluate its own efficiency, and make intelligent decisions about resource allocation.
By building a robust telemetry layer that tracks costs, normalizes token usage, and decomposes latency, you transform your agent from an unpredictable black box into a reliable, cost-controlled, and highly performant production system.
Let's Discuss
- How are you currently tracking token consumption and prompt caching efficiency in your agentic workflows?
- If your agent detects that it is approaching its hourly budget limit, what fallback strategy do you think is most effective: pausing execution, truncating context, or switching to a cheaper model?
The concepts and code demonstrated here are drawn directly from the comprehensive roadmap laid out in the ebook Hermes Agent, The Self-Evolving AI Workforce: details link, you can find also my programming ebooks with AI here: Programming & AI eBooks.
Top comments (0)