I've now debugged enough agent systems that broke in production to see the pattern clearly.
The agent logic is rarely the problem. The prompts work. The individual agents return reasonable outputs when tested in isolation.
The problem is the layer between agents, the orchestration, the shared state management, the failure handling, and the observability. These are the layers most teams don't build until they're already on fire.
Here's what breaks, and the code patterns that prevent it.
Failure Mode 1: Race Conditions in Shared State
Multi-agent systems often share a state object that multiple agents read from and write to. Without explicit locking or immutable state patterns, concurrent agents corrupt each other's context.
import asyncio
from typing import Any
from dataclasses import dataclass, field
# BAD: Shared mutable state — race condition waiting to happen
class AgentState:
context: dict = {}
# GOOD: Immutable state with explicit update pattern
@dataclass(frozen=True)
class AgentState:
context: dict = field(default_factory=dict)
agent_id: str = ""
timestamp: float = 0.0
def with_update(self, key: str, value: Any) -> 'AgentState':
"""Returns new state — never mutates existing"""
return AgentState(
context={**self.context, key: value},
agent_id=self.agent_id,
timestamp=asyncio.get_event_loop().time()
)
# State transitions tracked explicitly — no silent overwrites
class StateManager:
def __init__(self):
self._states: list[AgentState] = []
self._lock = asyncio.Lock()
async def update(self, agent_id: str, key: str, value: Any) -> AgentState:
async with self._lock: # Explicit locking on every write
current = self._states[-1] if self._states else AgentState()
new_state = current.with_update(key, value)
self._states.append(new_state)
return new_state
def get_trace(self) -> list[AgentState]:
"""Full state history for debugging — not just current state"""
return self._states.copy()
Failure Mode 2: Cascading Failures Without Circuit Breakers
When Agent A produces low-quality output, and Agent B consumes it as if it were correct, the error compounds. By the time Agent C acts on Agent B's output, the original failure is invisible.
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
class OutputQuality(Enum):
HIGH = "high" # > 0.85 confidence
MEDIUM = "medium" # 0.65–0.85
LOW = "low" # < 0.65
@dataclass
class AgentOutput:
content: str
quality: OutputQuality
confidence: float
agent_id: str
class CircuitBreaker:
"""Prevents cascading failures by blocking low-quality outputs"""
def __init__(self, min_quality: OutputQuality = OutputQuality.MEDIUM):
self.min_quality = min_quality
self._failure_count = 0
self._threshold = 3 # Open after 3 consecutive failures
self._is_open = False
def check(self, output: AgentOutput) -> bool:
if self._is_open:
raise RuntimeError(f"Circuit breaker open: too many low-quality outputs from {output.agent_id}")
if output.quality == OutputQuality.LOW:
self._failure_count += 1
if self._failure_count >= self._threshold:
self._is_open = True
return False # Don't pass this output downstream
self._failure_count = 0 # Reset on success
return True
def with_quality_gate(min_quality: OutputQuality = OutputQuality.MEDIUM):
"""Decorator: validates output quality before passing to next agent"""
def decorator(agent_fn: Callable) -> Callable:
breaker = CircuitBreaker(min_quality)
async def wrapper(*args, **kwargs) -> Optional[AgentOutput]:
output = await agent_fn(*args, **kwargs)
if not breaker.check(output):
# Explicit fallback — not silent failure
return AgentOutput(
content="[Low confidence — human review required]",
quality=OutputQuality.LOW,
confidence=output.confidence,
agent_id=output.agent_id
)
return output
return wrapper
return decorator
Failure Mode 3: Silent Errors — No Observability
The single most dangerous failure mode: everything returns success, but the final output is wrong. Nothing alerts. Users discover it.
import time
import uuid
from contextlib import asynccontextmanager
class AgentTracer:
"""Distributed tracing for multi-agent systems"""
def __init__(self, trace_id: str = None):
self.trace_id = trace_id or str(uuid.uuid4())
self.spans: list[dict] = []
@asynccontextmanager
async def span(self, agent_id: str, operation: str):
span_id = str(uuid.uuid4())
start_time = time.time()
span_data = {
"span_id": span_id,
"trace_id": self.trace_id,
"agent_id": agent_id,
"operation": operation,
"start_time": start_time,
"status": "running"
}
self.spans.append(span_data)
try:
yield span_data
span_data["status"] = "success"
except Exception as e:
span_data["status"] = "error"
span_data["error"] = str(e)
raise
finally:
span_data["duration_ms"] = (time.time() - start_time) * 1000
def get_full_trace(self) -> dict:
"""Everything every agent did — for debugging what went wrong"""
return {
"trace_id": self.trace_id,
"total_duration_ms": sum(s["duration_ms"] for s in self.spans if "duration_ms" in s),
"agent_calls": len(self.spans),
"errors": [s for s in self.spans if s["status"] == "error"],
"spans": self.spans
}
Failure Mode 4: Infinite Loops Without Termination Conditions
from typing import Optional
import asyncio
async def run_agent_with_limits(
agent_fn: Callable,
max_iterations: int = 10,
timeout_seconds: float = 30.0,
termination_check: Optional[Callable] = None
) -> AgentOutput:
"""Always has a way out — no infinite loops"""
for iteration in range(max_iterations):
try:
result = await asyncio.wait_for(
agent_fn(),
timeout=timeout_seconds
)
# Check termination condition if provided
if termination_check and termination_check(result):
return result
# If no termination check, return on first success
if not termination_check:
return result
except asyncio.TimeoutError:
if iteration == max_iterations - 1:
return AgentOutput(
content="[Timeout — escalating to human review]",
quality=OutputQuality.LOW,
confidence=0.0,
agent_id="timeout_handler"
)
# Hit max iterations without satisfying termination condition
return AgentOutput(
content="[Max iterations reached — escalating]",
quality=OutputQuality.LOW,
confidence=0.0,
agent_id="iteration_limit_handler"
)
The Human-in-the-Loop Escalation Pattern
from typing import Callable, Optional
class HumanInTheLoop:
"""Defines exactly when and how humans enter the agent workflow"""
def __init__(self,
confidence_threshold: float = 0.70,
escalation_fn: Callable = None):
self.threshold = confidence_threshold
self.escalation_fn = escalation_fn or self._default_escalation
async def check(self, output: AgentOutput) -> AgentOutput:
if output.confidence < self.threshold:
await self.escalation_fn(output)
return AgentOutput(
content=f"[Pending human review — confidence {output.confidence:.0%}]",
quality=OutputQuality.LOW,
confidence=output.confidence,
agent_id="human_in_the_loop"
)
return output
async def _default_escalation(self, output: AgentOutput):
# Log, alert, create ticket — whatever your workflow requires
print(f"ESCALATION: Agent {output.agent_id} confidence {output.confidence:.0%} below threshold")
The Production Readiness Checklist
Before shipping any multi-agent system to production:
□ Immutable or explicitly locked shared state
□ Circuit breakers between the agent output and the next agent input
□ Distributed tracing covering every agent call
□ Max iteration + timeout limits on every agent
□ Explicit fallback output for every failure mode
□ Human-in-the-loop escalation with defined confidence thresholds
□ Output validation schema before any agent result is served to users
□ Load testing at 5× expected concurrent agent invocations
If any of these are missing, do not ship. Add it in week 1 of the project, not week 10 of debugging.
Ailoitte builds production-grade agent systems for funded startups, orchestration, observability, guardrails, and human-in-the-loop. 6–8 week delivery. ailoitte.com
Top comments (0)