By design, Large Language Models (LLMs) are non-deterministic. Even with an identical prompt, they can return different answers, trigger the wrong API, leak sensitive personal data, or initiate a costly chain of requests that evaporates a monthly cloud budget in seconds. For engineers managing production systems, this isn't an abstract risk — it's the nightmare scenario that keeps them up at night.
The solution does not lie in hoping for better models. Instead, it lies in a deterministic guardrail layer that governs the agent, regardless of the model's output. This article explores four strategic pillars of such an architecture, all built using FastAPI.
| Pillar | Technology | Purpose |
|---|---|---|
| Governance Layer | FastAPI middleware | Compliance, PII Masking, & Cost Control |
| Resource Sandboxing | Dependency Injection | Minimizing the "Blast Radius" |
| Observability & Traceability | OpenTelemetry + OTLP | Visualizing Chain of Thought (CoT) |
| Async ROI | asyncio event loop | Infrastructure Cost Optimization |
Pillar 1: Governance Layer – FastAPI Middleware
What is middleware and why is it a perfect place for guardrails?
In web architecture, middleware is logic executed during the lifecycle of every HTTP request — both before it reaches the intended handler and after the response is generated but before it reaches the client. Think of it as an airport security checkpoint: every passenger (request) must pass through it without exception.
In the context of AI agents, FastAPI middleware intercepts the agent's output before it reaches the end user. This provides a single, centralized layer to enforce compliance and safety policies, regardless of how many specialized agents are operating within the system.
The golden rule: guardrails are NOT part of the agent. The agent is a black box that can behave unpredictably. Guardrails must exist as an external control layer, operating independently of the agent's internal logic.
Implementation: Compliance Middleware
The middleware below intercepts every agent response and performs three critical operations: compliance validation, PII masking, and cost tracking.
# middleware/guardrails.py
import time
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from prometheus_client import Histogram, Counter
import logging
logger = logging.getLogger(__name__)
# ── Prometheus Metrics ──────────────────────────────────────────
# The histogram tracks the distribution of response times (not just the average)
REQUEST_DURATION = Histogram(
"agent_request_duration_ms",
"Agent response time in ms",
buckets=[100, 500, 1000, 5000, 15000, 30000, 60000]
)
# The Counter increases monotonically — ideal for anomaly detection alerts
POLICY_VIOLATIONS = Counter(
"agent_policy_violations_total", "Number of blocked responses",
labelnames=["reason"]
)
PII_DETECTIONS = Counter(
"agent_pii_detections_total", "Number of detected PII entities",
labelnames=["entity_type"]
)
# ── Prohibited Phrases (Prompt Injection / Compliance) ──────────
FORBIDDEN_PHRASES = [
"ignore previous instructions",
"ignore all instructions",
"you are now", # classic prompt injection attack
"disregard your",
]
class AgentGuardrailsMiddleware(BaseHTTPMiddleware):
def __init__(self, app, languages: list[str] = None):
super().__init__(app)
# Initialize ONCE in the constructor — NLP models are resource-heavy (~200ms);
# we want to avoid loading them per-request
self.languages = languages or ['en', 'pl']
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
async def dispatch(self, request: Request, call_next):
start_time = time.monotonic()
# ── STEP 1: Call proper handler (agent) ───────────────────
# call_next passes the request deeper into the middleware stack
# until it reaches the intended endpoint. The agent performs all its logic:
# querying the LLM, executing tools, and building the response.
# We wait, and only THEN do we receive the response object.
response = await call_next(request)
# ── STEP 2: Collect streaming body ───────────────────────
# Starlette streams responses chunk-by-chunk — there is
# no single 'response.text'. We must assemble the body manually.
# Note: this buffers the entire response in RAM.
body = b''
async for chunk in response.body_iterator:
body += chunk
text = body.decode('utf-8')
# ── STEP 3: Compliance check ─────────────────────────────
# Check if the agent returned any prohibited content.
# We block the response here, before anything reaches the client.
violation = self._check_policy(text)
if violation:
POLICY_VIOLATIONS.labels(reason=violation).inc()
logger.warning(f'Policy violation [{violation}]')
return JSONResponse(
status_code=403,
content={'error': 'Response blocked', 'reason': violation}
)
# ── STEP 4: PII masking with Presidio ────────────────────
# Presidio uses NLP models (spaCy) instead of pure RegEx.
# It understands context: '48123456789' as a phone number
# in 'call me at...' but not when it's an Order ID.
masked_text = self._mask_pii(text)
# ── STEP 5: Metrics ──────────────────────────────────────
# duration_ms is sent to the Prometheus Histogram, allowing us
# to calculate p50/p95/p99 latency and trigger alerts on threshold violations.
duration_ms = (time.monotonic() - start_time) * 1000
REQUEST_DURATION.observe(duration_ms)
logger.info(f'Agent: {duration_ms:.0f}ms, {len(text)} chars')
# ── STEP 6: Return response ──────────────────────────────
# Crucial: replicate the original status_code and headers.
# Without this you lose Content-Type and bypass original 403/404 codes.
return Response(
content=masked_text.encode('utf-8'),
status_code=response.status_code, # keep original code
headers=dict(response.headers), # keep original headers
media_type=response.media_type,
)
def _check_policy(self, text: str) -> str | None:
"""Returns the violation description or None if the text is compliant."""
lower = text.lower()
for phrase in FORBIDDEN_PHRASES:
if phrase in lower:
return f'prompt_injection: "{phrase}"'
return None
def _mask_pii(self, text: str) -> str:
# Presidio operates in two stages:
# 1. analyzer.analyze() — detects entities and returns their positions
# 2. anonymizer.anonymize() — replaces them according to the defined strategy
results = []
for lang in self.languages:
results.extend(self.analyzer.analyze(
text=text, language=lang,
entities=['PERSON', 'EMAIL_ADDRESS', 'PHONE_NUMBER',
'IBAN_CODE', 'CREDIT_CARD', 'IP_ADDRESS'],
score_threshold=0.6, # eliminates most false positives
))
if not results:
return text
# Log the entity TYPE only — never the value itself (that would be a PII leak!)
for r in results:
PII_DETECTIONS.labels(entity_type=r.entity_type).inc()
logger.info(f'PII: {r.entity_type} score={r.score:.2f}')
anonymized = self.anonymizer.anonymize(
text=text,
analyzer_results=results,
operators={
"PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
"IBAN_CODE": OperatorConfig("replace", {"new_value": "[IBAN]"}),
"CREDIT_CARD": OperatorConfig("replace", {"new_value": "[CARD]"}),
"IP_ADDRESS": OperatorConfig("replace", {"new_value": "[IP]"}),
}
)
return anonymized.text
What happens step by step?
call_next(request)— The middleware passes the request down the stack until it reaches the intended endpoint (the agent). The agent performs its logic: querying the LLM, calling tools, and constructing the response. We wait for the final output.response.body_iterator— Starlette streams responses chunk-by-chunk, so there is no singleresponse.textfield available. We must manually reassemble the body in a loop. Warning: this buffers the entire response in RAM; for extremely large payloads, a size limit should be implemented._check_policy(text)— We validate the agent's output for prohibited content, including prompt injection attempts, off-limits topics, or profanity. If a violation is detected, we block the request immediately — before it ever reaches the client — and return a403 Forbidden._mask_pii(text)— Microsoft Presidio in a two-stage process: first,analyzer.analyze()detects entities and their positions by understanding linguistic context; then,anonymizer.anonymize()replaces them with placeholders. Crucial: we log only the entity TYPE, never the actual value.REQUEST_DURATION.observe()— Response time is recorded in a Prometheus Histogram. A Histogram (rather than a simple Counter) allows us to calculate p50/p95/p99 latency and build percentile-based alerts.Response(status_code=…, headers=…)— We return the final response while preserving the original status code and headers. Without this, you lose metadata likeContent-TypeorCache-Control, and the client might receive a200 OKeven if the underlying logic returned a404.
Why is this better than internal agent guardrails? Because middleware operates across ALL endpoints simultaneously. Whether you have one agent or fifty, policies are defined once and enforced centrally.
Registering the middleware
# main.py
from fastapi import FastAPI
from middleware.guardrails import AgentGuardrailsMiddleware
app = FastAPI()
# Middleware stack — order matters!
# Last in = first executed (LIFO)
app.add_middleware(AgentGuardrailsMiddleware)
app.add_middleware(CostTrackingMiddleware)
app.add_middleware(AuthMiddleware) # <-- this executes first
Middleware is executed in reverse order of registration (LIFO). Authentication should always be registered last to ensure it triggers first — there is no point processing PII masking for an unauthorized request.
Pillar 2: Resource Sandboxing – Dependency Injection
The problem: what if the agent goes in the wrong direction?
Imagine an AI agent with access to your production database. The LLM hallucinates, generates a destructive SQL query, and within seconds your data is gone. This isn't science fiction — it's a reality already faced by some early adopters of AI agents.
Blast radius is an SRE term defining the maximum potential damage caused by a single component failure. For AI agents, we minimize this through Resource Sandboxing: the agent is granted only the minimum necessary permissions, operates in read-only mode, and remains strictly isolated from the rest of the infrastructure.
Dependency Injection in FastAPI
FastAPI's built-in DI system lets us inject dependencies from the outside rather than creating them within the class. This gives us total control over exactly which resources the agent can access.
# dependencies/sandboxed_db.py
from fastapi import Depends
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# Separate connection pool — read-only
# DB account with SELECT-only privileges; no INSERT/UPDATE/DELETE
READONLY_DB_URL = 'postgresql://agent_ro:pass@db-replica/prod'
readonly_engine = create_engine(
READONLY_DB_URL,
pool_size=5, # Small pool — prevents the agent from exhausting connections
max_overflow=2, # Maximum of 7 total concurrent connections
pool_timeout=10, # Timeout — ensures the agent doesn't wait indefinitely
connect_args={'options': '-c default_transaction_read_only=on'}
)
class SandboxedDatabase:
"""Database with enforced read-only access at the connection level."""
def __init__(self, session: Session):
self._session = session
self._query_count = 0
self._max_queries = 10 # Rate limit per request
def execute_query(self, sql: str, params: dict = None):
if self._query_count >= self._max_queries:
raise PermissionError('Query limit exceeded for this agent session')
# Block dangerous operations at the application level
forbidden = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'GRANT']
for keyword in forbidden:
if keyword in sql.upper():
raise PermissionError(f'Forbidden SQL keyword: {keyword}')
self._query_count += 1
return self._session.execute(text(sql), params or {})
def get_sandboxed_db() -> SandboxedDatabase:
with Session(readonly_engine) as session:
yield SandboxedDatabase(session)
@app.post('/agent/query')
async def agent_query(
request: AgentRequest,
db: SandboxedDatabase = Depends(get_sandboxed_db), # Injection
):
# The agent has access ONLY to db.execute_query()
# No access to the engine, session, or other databases
result = db.execute_query(request.sql)
return {'data': result.fetchall()}
Layered security – Defense in Depth
Four independent layers of protection:
-
Layer 1 – DB permissions:
agent_rohas onlySELECTat the PostgreSQL level. Even if the agent constructs aDELETE, the database will reject it. -
Layer 2 – Connection options:
default_transaction_read_only=onenforced at the connection level. An additional lock on the SQL session itself. - Layer 3 – Application validation: We scan for forbidden keywords in Python code. Any violation is logged and an alert is triggered.
- Layer 4 – Pool limits: The agent is restricted to 7 concurrent connections and 10 queries per request. This prevents it from overwhelming the infrastructure.
Dependency Injection is more than a design pattern — it is a security mechanism. When an agent is a function that accepts a
SandboxedDatabaseas a parameter, it is physically impossible for it to access an unrestricted database. The interface IS the guardrail.
Tool Registry
We apply the same pattern to the agent's tools:
class AgentToolkit:
def __init__(self, allowed_tools: list[str]):
self._tools = {
'search_products': self._search_products, # OK
'get_order_status': self._get_order_status, # OK
# 'send_email': ... — NOT registered = unavailable
# 'execute_code': ... — NOT registered = unavailable
}
self._available = {k: v for k, v in self._tools.items()
if k in allowed_tools}
def call(self, tool_name: str, **kwargs):
if tool_name not in self._available:
raise PermissionError(f'Tool not available: {tool_name}')
return self._available[tool_name](**kwargs)
Pillar 3: Observability & Traceability – OpenTelemetry
Why traditional APM doesn't work for AI agents
Tools like Datadog or New Relic were designed for synchronous, short-lived HTTP requests: request arrives, processed in 50ms, response sent. Core metrics focus on latency, error rate, and throughput.
AI agents fundamentally break this model. A single request can last 30–120 seconds, during which the agent executes a Chain of Thought (CoT) — a sequence of reasoning steps, each involving an LLM call and potential tool executions. Traditional APM sees one long, opaque request with zero visibility into what happened inside.
OpenTelemetry (OTel) is an open-source standard for distributed tracing, metrics, and logs. It provides a vendor-agnostic framework independent of the backend (Jaeger, Tempo, Datadog, Honeycomb). Instrument your code once, export to any destination.
Instrumenting FastAPI with OpenTelemetry
# observability/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_telemetry(app):
provider = TracerProvider(
resource=Resource.create({
'service.name': 'agent-service',
'service.version': '2.1.0',
'deployment.environment': 'production',
})
)
exporter = OTLPSpanExporter(endpoint='http://otel-collector:4317')
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# Auto-instrumentation — all FastAPI requests are automatically traced
FastAPIInstrumentor.instrument_app(app)
# HTTP client instrumentation — LLM API calls are also tracked
HTTPXClientInstrumentor().instrument()
Tracing the Chain of Thought – manual spans
# agent/reasoning.py
from opentelemetry import trace
tracer = trace.get_tracer('agent.reasoning')
async def execute_agent(user_query: str) -> str:
with tracer.start_as_current_span('agent.execute') as agent_span:
agent_span.set_attribute('agent.query', user_query)
agent_span.set_attribute('agent.model', 'claude-3-5-sonnet')
for step in range(MAX_STEPS):
with tracer.start_as_current_span(f'cot.step_{step}') as step_span:
step_span.set_attribute('cot.step_number', step)
with tracer.start_as_current_span('llm.call') as llm_span:
llm_span.set_attribute('llm.prompt_tokens', len(prompt))
response = await call_llm(prompt)
llm_span.set_attribute('llm.completion_tokens',
response.usage.completion_tokens)
llm_span.set_attribute('llm.cost_usd',
calculate_cost(response.usage))
if response.tool_call:
with tracer.start_as_current_span('tool.call') as tool_span:
tool_span.set_attribute('tool.name', response.tool_call.name)
result = await execute_tool(response.tool_call)
tool_span.set_attribute('tool.success', True)
if response.is_final:
break
return response.content
What you see in Grafana Tempo / Jaeger
agent.execute [0ms ────────────────────── 45,230ms]
cot.step_0 [12ms ─────── 8,400ms]
llm.call [15ms ─── 7,800ms] tokens: 412/623
tool.call: search_products [8,410ms ─ 8,890ms] success: true
cot.step_1 [8,900ms ─────── 28,100ms]
llm.call [8,900ms ─ 27,600ms] tokens: 1024/892
tool.call: get_order_status [27,610ms ─ 28,080ms]
cot.step_2 (final) [28,100ms ─── 45,200ms]
llm.call [28,100ms ─ 45,100ms] tokens: 2048/312
You can immediately see that cot.step_1 took 19 seconds — specifically the llm.call with 1024 input tokens. This provides a clear signal for optimization: prompt caching, switching models for that step, or context compression.
Without OTel: "request took 45 seconds."
With OTel: "step_1.llm.call took 18.7s with 1024 input tokens — probable cause: context window overhead."
Pillar 4: Async ROI – The Event Loop Economy
The problem with synchronous code
Imagine a call center. In a synchronous model, one employee handles one customer at a time. While the customer spends 30 seconds looking up their order number, the employee sits idle. To handle 100 customers simultaneously, you need 100 employees.
In server terms: one thread per request. While a thread waits for an LLM response (typically 2–30 seconds), it consumes CPU and memory doing nothing. Scaling to 1,000 concurrent agents requires 1,000 threads. A server with 32 vCPUs can only handle ~50–100 threads efficiently before degrading.
Asyncio: cooperative multitasking
Python's asyncio implements cooperative multitasking. One thread, one event loop, many coroutines. When a coroutine waits for I/O (network, disk, LLM API), it yields control back to the event loop, which immediately starts processing another coroutine.
# Synchronous — blocks the thread
def handle_request_sync(user_query: str) -> str:
response = requests.post(LLM_API, json={...}) # <-- BLOCKS 8 seconds
return response.json()['content'] # Thread sits idle
# Asynchronous — releases the thread while waiting
async def handle_request_async(user_query: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(LLM_API, json={...})
# await = 'wait for the result, but release the event loop'
# During the 8-second wait, the event loop processes other requests
return response.json()['content']
@app.post('/agent/query')
async def agent_endpoint(request: AgentRequest):
result = await handle_request_async(request.query)
return {'response': result}
Concrete math: Synchronous vs Async
Based on a typical agent workload (8s average latency per LLM call):
| Metric | Sync (Flask/Django) | Async (FastAPI) |
|---|---|---|
| Concurrent requests / vCPU | ~15–25 | ~500–2000 |
| RAM per 1,000 requests | ~8–16 GB (threads) | ~0.5–1 GB (coroutines) |
| CPU utilization during LLM wait | ~2–5% (blocking) | ~70–90% (other requests) |
| Instances required (1,000 req/s) | ~40–60 | ~4–8 |
| Estimated monthly cost (AWS) | ~$4,800–7,200 | ~$480–960 |
A 5–10× cost reduction is a mathematical certainty: async allows a single thread to handle hundreds of concurrent LLM wait states. For high I/O latency workloads — exactly what AI agents are — this is the most significant cost optimization you can implement without a complete architectural overhaul.
Pitfalls of asynchronous code
-
CPU-bound tasks block the event loop. Heavy computation (parsing large JSON, data transformations) freezes everything. Use
asyncio.to_thread()orProcessPoolExecutor. -
Synchronous libraries block. Using
requestsorpsycopg2from async code blocks the event loop. Use async counterparts:httpx,asyncpg,aiofiles. -
Debugging is harder. Coroutine stack traces are less readable. Use
asyncio.current_task()and structured logging.
# WRONG — sync operation blocks the event loop
async def bad_agent_step():
data = heavy_json_parse(large_response) # Blocks everyone!
return data
# CORRECT — delegate CPU-bound work to a thread pool
async def good_agent_step():
data = await asyncio.to_thread(heavy_json_parse, large_response)
return data
Summary: Production-Grade Architecture
Four pillars create a cohesive control layer around the non-deterministic agent:
- Middleware — enforces policies regardless of what the agent generates
- Dependency Injection — isolates the agent from infrastructure, minimizes blast radius
- OpenTelemetry — provides visibility inside long, multi-step Chain of Thought processes
- Async FastAPI — maximizes hardware utilization, reduces costs 5–10×
The common denominator: the agent is a black box — and that is perfectly fine. You don't need to control how the LLM thinks. You control the system boundaries: what the agent can read, what it can send, how long it can run, and what the client is allowed to see. This is engineering, not magic.
Deterministic guardrails for non-deterministic agents. Don't try to "fix" non-determinism — surround it with deterministic infrastructure.
Originally published at invra.co

Top comments (0)