You've shipped microservices that handle millions of requests. You've designed event-driven systems that wake you up at 3am. Now your CTO is asking you to "add AI agents." This is that guide written for engineers who know what a race condition is and have opinions about retry logic.
The Mental Model Shift
Most AI agent frameworks are demos dressed up as infrastructure. Before touching any library, internalize this:
An AI agent is a control loop with a probabilistic step.
That's it. The "intelligence" is just one step in a workflow usually a call to an LLM that produces a decision. Everything around it: state management, tool dispatch, error handling, observability that's engineering. And that's where senior engineers have an actual edge.
while not done:
observation = perceive(environment)
action = llm_decide(observation, memory, tools) # the probabilistic step
result = execute(action)
memory.update(result)
Once you see it this way, your existing intuitions about fault tolerance, idempotency, and distributed systems apply directly.
Architecture Patterns That Actually Scale
1. The Supervisor Pattern (Multi-Agent Orchestration)
For complex workflows, a single agent becomes a god object. Instead, build a supervisor that routes to specialized sub-agents.
from dataclasses import dataclass
from typing import Callable, Any
import asyncio
@dataclass
class AgentTask:
task_id: str
task_type: str
payload: dict
priority: int = 5
class SupervisorAgent:
def __init__(self):
self._registry: dict[str, Callable] = {}
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
def register(self, task_type: str):
"""Decorator to register specialized sub-agents."""
def decorator(fn: Callable):
self._registry[task_type] = fn
return fn
return decorator
async def dispatch(self, task: AgentTask) -> Any:
handler = self._registry.get(task.task_type)
if not handler:
raise ValueError(f"No agent registered for task type: {task.task_type}")
return await handler(task.payload)
async def run_parallel(self, tasks: list[AgentTask]) -> list[Any]:
"""Dispatch multiple tasks concurrently with priority ordering."""
sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
return await asyncio.gather(*[self.dispatch(t) for t in sorted_tasks])
supervisor = SupervisorAgent()
@supervisor.register("code_review")
async def code_review_agent(payload: dict) -> dict:
# Specialized agent with its own system prompt and tools
...
@supervisor.register("test_generation")
async def test_generation_agent(payload: dict) -> dict:
...
The key insight: each sub-agent is stateless from the supervisor's perspective. State lives in your message store, not in the agent object.
2. Pipelines vs. Agents: Know Which One You Need
A pipeline is deterministic every input follows the same steps. An agent is non-deterministic the LLM decides what to do next. Don't use an agent when a pipeline will do.
| Use a Pipeline when... | Use an Agent when... |
|---|---|
| Steps are known at design time | Steps depend on runtime context |
| Latency matters (low variance needed) | Flexibility matters |
| You need 100% auditability | Best-effort is acceptable |
| Input/output schema is fixed | Open-ended task completion |
# Pipeline: deterministic, predictable, fast
class DocumentPipeline:
steps = [extract_text, chunk_document, embed_chunks, store_vectors]
async def run(self, document: bytes) -> list[str]:
result = document
for step in self.steps:
result = await step(result)
return result
# Agent: flexible, non-deterministic, slower
class ResearchAgent:
async def run(self, query: str) -> str:
# May call web_search 0-5 times, then summarize
# The LLM decides
...
Advanced Python Patterns for AI Systems
Structured Outputs with Pydantic + Runtime Validation
Never parse LLM output with regex. Define your schema, enforce it, validate at runtime.
from pydantic import BaseModel, field_validator, model_validator
from typing import Literal
import instructor
import anthropic
class ActionPlan(BaseModel):
reasoning: str
action: Literal["search", "summarize", "delegate", "terminate"]
confidence: float
tool_args: dict | None = None
@field_validator("confidence")
@classmethod
def confidence_must_be_probability(cls, v: float) -> float:
if not 0.0 <= v <= 1.0:
raise ValueError("Confidence must be between 0 and 1")
return round(v, 3)
@model_validator(mode="after")
def tool_args_required_for_search(self) -> "ActionPlan":
if self.action == "search" and not self.tool_args:
raise ValueError("tool_args required when action is 'search'")
return self
# Using instructor for structured extraction
client = instructor.from_anthropic(anthropic.Anthropic())
def get_action(observation: str, available_tools: list[str]) -> ActionPlan:
return client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
response_model=ActionPlan,
messages=[{
"role": "user",
"content": f"Observation: {observation}\nAvailable tools: {available_tools}\nWhat action should we take?"
}]
)
Memory Architecture: Don't Reinvent the Wheel
Production agents need layered memory. Model it explicitly:
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import hashlib
class MemoryLayer(ABC):
@abstractmethod
async def store(self, key: str, value: dict) -> None: ...
@abstractmethod
async def retrieve(self, query: str, top_k: int = 5) -> list[dict]: ...
class WorkingMemory:
"""Short-term, in-process. Cleared between runs."""
def __init__(self, max_tokens: int = 8000):
self._buffer: list[dict] = []
self._max_tokens = max_tokens
def add(self, message: dict) -> None:
self._buffer.append(message)
self._trim()
def _trim(self) -> None:
# Sliding window keep most recent context under token budget
# In practice, count tokens properly with tiktoken or equivalent
while len(self._buffer) > 20:
self._buffer.pop(0)
def snapshot(self) -> list[dict]:
return list(self._buffer)
class EpisodicMemory(MemoryLayer):
"""Medium-term. Key episodes from past runs. Stored in vector DB."""
async def store(self, key: str, value: dict) -> None:
embedding = await embed(value["content"])
await vector_db.upsert(id=key, vector=embedding, metadata=value)
async def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
query_embedding = await embed(query)
results = await vector_db.query(vector=query_embedding, top_k=top_k)
return [r.metadata for r in results]
class AgentMemoryManager:
"""Orchestrates all memory layers."""
def __init__(self):
self.working = WorkingMemory()
self.episodic = EpisodicMemory()
async def get_context(self, query: str) -> list[dict]:
working_context = self.working.snapshot()
episodic_context = await self.episodic.retrieve(query, top_k=3)
# Merge, deduplicate, and rank
return self._merge(working_context, episodic_context)
def _merge(self, working: list, episodic: list) -> list:
seen = set()
merged = []
for item in working + episodic:
content_hash = hashlib.md5(str(item).encode()).hexdigest()
if content_hash not in seen:
seen.add(content_hash)
merged.append(item)
return merged
Tool Use: The Sharp Edge
Tool execution is where most agent systems fail in production. Every tool is an external side effect treat it like a database write.
Idempotent Tool Design
import functools
import hashlib
import json
from typing import TypeVar, Callable
T = TypeVar("T")
def idempotent_tool(cache_ttl_seconds: int = 300):
"""
Makes tool calls idempotent by caching results keyed on input.
Critical for retry scenarios where the LLM re-calls a tool that already succeeded.
"""
def decorator(fn: Callable[..., T]) -> Callable[..., T]:
_cache: dict[str, tuple[T, datetime]] = {}
@functools.wraps(fn)
async def wrapper(*args, **kwargs) -> T:
cache_key = hashlib.sha256(
json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True).encode()
).hexdigest()
if cache_key in _cache:
result, cached_at = _cache[cache_key]
if datetime.utcnow() - cached_at < timedelta(seconds=cache_ttl_seconds):
return result
result = await fn(*args, **kwargs)
_cache[cache_key] = (result, datetime.utcnow())
return result
return wrapper
return decorator
@idempotent_tool(cache_ttl_seconds=60)
async def web_search(query: str, max_results: int = 5) -> list[dict]:
"""Tool that won't hammer the search API on LLM retries."""
...
Tool Registry with Schema Validation
from typing import get_type_hints
import inspect
class ToolRegistry:
def __init__(self):
self._tools: dict[str, dict] = {}
def register(self, description: str):
def decorator(fn: Callable):
hints = get_type_hints(fn)
sig = inspect.signature(fn)
schema = {
"name": fn.__name__,
"description": description,
"input_schema": {
"type": "object",
"properties": {
param: {"type": self._python_type_to_json(hints.get(param))}
for param in sig.parameters
if param != "return"
},
"required": [
p for p, v in sig.parameters.items()
if v.default is inspect.Parameter.empty
]
}
}
self._tools[fn.__name__] = {"fn": fn, "schema": schema}
return fn
return decorator
def get_schemas(self) -> list[dict]:
return [t["schema"] for t in self._tools.values()]
async def call(self, name: str, **kwargs) -> Any:
if name not in self._tools:
raise KeyError(f"Tool '{name}' not found in registry")
return await self._tools[name]["fn"](**kwargs)
@staticmethod
def _python_type_to_json(t) -> str:
mapping = {str: "string", int: "integer", float: "number", bool: "boolean"}
return mapping.get(t, "string")
Observability Is Not Optional
An agent you can't observe is a black box your on-call rotation will hate. Instrument everything.
import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
@dataclass
class AgentSpan:
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
trace_id: str = ""
parent_id: str | None = None
name: str = ""
start_time: float = field(default_factory=time.time)
end_time: float | None = None
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "ok"
def add_event(self, name: str, attrs: dict = None):
self.events.append({
"name": name,
"timestamp": time.time(),
"attributes": attrs or {}
})
def finish(self, status: str = "ok"):
self.end_time = time.time()
self.status = status
return self
class AgentTracer:
def __init__(self, trace_id: str | None = None):
self.trace_id = trace_id or str(uuid.uuid4())
self._spans: list[AgentSpan] = []
self._current_span: AgentSpan | None = None
@asynccontextmanager
async def span(self, name: str, **attrs):
s = AgentSpan(
trace_id=self.trace_id,
parent_id=self._current_span.span_id if self._current_span else None,
name=name,
attributes=attrs
)
prev = self._current_span
self._current_span = s
try:
yield s
s.finish("ok")
except Exception as e:
s.add_event("exception", {"message": str(e), "type": type(e).__name__})
s.finish("error")
raise
finally:
self._spans.append(s)
self._current_span = prev
def export(self) -> list[dict]:
return [
{
"trace_id": s.trace_id,
"span_id": s.span_id,
"parent_id": s.parent_id,
"name": s.name,
"duration_ms": round((s.end_time - s.start_time) * 1000, 2) if s.end_time else None,
"status": s.status,
"attributes": s.attributes,
"events": s.events,
}
for s in self._spans
]
# Usage inside your agent loop
async def agent_step(observation: str, tracer: AgentTracer) -> dict:
async with tracer.span("llm_call", model="claude-opus-4-5", input_tokens=len(observation)):
action = await call_llm(observation)
async with tracer.span("tool_execution", tool=action.tool_name):
result = await tool_registry.call(action.tool_name, **action.args)
return result
Feed spans into your existing observability stack (Datadog, Grafana, Honeycomb). AI agents are distributed systems trace them like one.
Failure Modes That Will Bite You
1. The Infinite Loop
LLMs can get stuck in thought loops calling the same tool repeatedly, or alternating between two actions.
from collections import Counter
class LoopDetector:
def __init__(self, window: int = 10, threshold: int = 3):
self._window = window
self._threshold = threshold
self._action_history: list[str] = []
def check(self, action: str) -> bool:
"""Returns True if a loop is detected."""
self._action_history.append(action)
if len(self._action_history) > self._window:
self._action_history.pop(0)
counts = Counter(self._action_history)
return any(count >= self._threshold for count in counts.values())
2. Context Window Overflow
For long-running agents, the context window fills up. Build compaction in from the start.
async def compact_context(messages: list[dict], llm_client) -> list[dict]:
"""Summarize older messages to free context space."""
if len(messages) < 20:
return messages
old_messages = messages[:-10]
recent_messages = messages[-10:]
summary_response = await llm_client.messages.create(
model="claude-haiku-4-5", # Use cheaper model for summarization
max_tokens=512,
messages=[
*old_messages,
{"role": "user", "content": "Summarize the key facts and decisions from this conversation in under 200 words."}
]
)
summary_message = {
"role": "assistant",
"content": f"[Context Summary]: {summary_response.content[0].text}"
}
return [summary_message] + recent_messages
3. Prompt Injection in Tool Outputs
External data (web pages, user files, database rows) can contain adversarial instructions. Sanitize before feeding back to the LLM.
import re
INJECTION_PATTERNS = [
r"ignore\s+previous\s+instructions",
r"new\s+system\s+prompt",
r"you\s+are\s+now\s+",
r"disregard\s+all",
]
def sanitize_tool_output(raw_output: str) -> str:
"""Basic prompt injection mitigation for tool outputs."""
sanitized = raw_output
for pattern in INJECTION_PATTERNS:
sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)
# Wrap in a context boundary
return f"<tool_result>\n{sanitized}\n</tool_result>"
Putting It Together: A Production Agent Loop
async def run_agent(
task: str,
max_iterations: int = 20,
tracer: AgentTracer | None = None,
) -> str:
tracer = tracer or AgentTracer()
memory = AgentMemoryManager()
loop_detector = LoopDetector()
memory.working.add({"role": "user", "content": task})
for iteration in range(max_iterations):
async with tracer.span("agent_iteration", iteration=iteration):
# Build context
context = await memory.get_context(task)
if len(context) > 15:
context = await compact_context(context, llm_client)
# Get action from LLM
async with tracer.span("llm_decision"):
action = get_action(
observation=json.dumps(context[-5:]),
available_tools=tool_registry.get_schemas()
)
# Loop detection
if loop_detector.check(action.action):
tracer._current_span.add_event("loop_detected")
return "Agent halted: repetitive action loop detected."
# Terminal condition
if action.action == "terminate":
async with tracer.span("final_answer"):
return await generate_final_answer(context)
# Execute tool
async with tracer.span("tool_call", tool=action.action, confidence=action.confidence):
tool_result = await tool_registry.call(action.action, **(action.tool_args or {}))
sanitized_result = sanitize_tool_output(str(tool_result))
# Update memory
memory.working.add({"role": "assistant", "content": str(action)})
memory.working.add({"role": "tool", "content": sanitized_result})
return "Agent halted: max iterations reached."
Framework Landscape (2025 State of Play)
Don't build everything from scratch. Know when to reach for existing tools:
| Framework | Best For | Watch Out For |
|---|---|---|
| LangGraph | Stateful, graph-based workflows | Steep learning curve; abstraction leaks |
| CrewAI | Multi-agent role-playing | Can be too "magic" for complex logic |
| Pydantic AI | Type-safe, structured agent outputs | Younger ecosystem |
| Temporal + custom | Long-running, durable workflows | Infrastructure overhead |
| Raw API + your code | Full control, complex routing | You own the bugs |
The right answer for most teams: use LangGraph for the graph/state machine, instructor for structured outputs, Pydantic for validation, and write tool execution yourself.
Closing Thoughts
AI agents are not magic. They're non-deterministic state machines wrapped around probabilistic function calls and they demand the same rigor as any other distributed system.
Your unfair advantage as a senior engineer isn't knowing the hottest framework. It's knowing that:
- Idempotency matters. Tools get retried.
- Observability is load-bearing. You will debug this at midnight.
- Failure modes compound. Design for them explicitly, not as an afterthought.
- Simpler is almost always better. A pipeline beats an agent when you know the steps.
The teams shipping reliable AI systems aren't the ones with the most sophisticated prompts. They're the ones who treated AI components like infrastructure with circuit breakers, retries, schema validation, and on-call runbooks.
Build accordingly.
Found a bug or a better pattern? Drop a comment. This is production code, not a tutorial corrections welcome.
If this was useful, follow for more on AI systems engineering, Python architecture, and the sharp edges that don't make it into the demos.
Top comments (0)