Every AI agent framework has a "build a research agent in 10 lines" tutorial. Cool. Now try running 50 agents concurrently, handling failures, managing shared state, and keeping costs under control.
That's where demos die and engineering begins.
These are 7 orchestration patterns that work across frameworks — LangGraph, CrewAI, AutoGen, OpenAI Agents SDK, or your own custom setup. The patterns are framework-agnostic because good architecture outlasts any library.
Pattern 1: The Supervisor with Backpressure
The classic supervisor pattern — one agent delegates to workers — breaks down under load. Worker 3 is slow, but the supervisor keeps sending it tasks. Queue grows. Memory grows. Everything dies.
Backpressure means: when a worker is overwhelmed, the system slows down instead of crashing.
import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from enum import Enum
import time
import uuid
class WorkerState(Enum):
IDLE = "idle"
BUSY = "busy"
OVERLOADED = "overloaded"
FAILED = "failed"
@dataclass
class WorkerAgent:
agent_id: str
name: str
handler: Callable
max_concurrent: int = 3
current_tasks: int = 0
state: WorkerState = WorkerState.IDLE
total_completed: int = 0
total_errors: int = 0
avg_duration_ms: float = 0
@property
def load_factor(self) -> float:
return self.current_tasks / self.max_concurrent
def can_accept(self) -> bool:
return self.current_tasks < self.max_concurrent and self.state != WorkerState.FAILED
@dataclass
class Task:
task_id: str
task_type: str
payload: dict
priority: int = 0 # Lower = higher priority
created_at: float = field(default_factory=time.time)
timeout_seconds: float = 60.0
@dataclass
class TaskResult:
task_id: str
worker_id: str
success: bool
result: Any = None
error: Optional[str] = None
duration_ms: float = 0
class BackpressureSupervisor:
def __init__(
self,
max_queue_size: int = 100,
backpressure_threshold: float = 0.8,
):
self.workers: dict[str, WorkerAgent] = {}
self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue(
maxsize=max_queue_size
)
self.results: dict[str, TaskResult] = {}
self.backpressure_threshold = backpressure_threshold
self._running = False
def register_worker(
self,
name: str,
handler: Callable,
task_types: list[str],
max_concurrent: int = 3,
) -> str:
agent_id = f"worker-{name}-{uuid.uuid4().hex[:6]}"
worker = WorkerAgent(
agent_id=agent_id,
name=name,
handler=handler,
max_concurrent=max_concurrent,
)
self.workers[agent_id] = worker
# Store task type mapping
if not hasattr(self, "_type_map"):
self._type_map: dict[str, list[str]] = {}
for t in task_types:
self._type_map.setdefault(t, []).append(agent_id)
return agent_id
def system_load(self) -> float:
"""Overall system load factor (0.0 to 1.0)."""
if not self.workers:
return 1.0
total_capacity = sum(w.max_concurrent for w in self.workers.values())
total_active = sum(w.current_tasks for w in self.workers.values())
return total_active / total_capacity if total_capacity > 0 else 1.0
async def submit(self, task: Task) -> bool:
"""Submit a task. Returns False if backpressure rejects it."""
load = self.system_load()
if load >= self.backpressure_threshold:
# Check if any worker can handle this type
candidates = self._type_map.get(task.task_type, [])
available = [
wid for wid in candidates
if self.workers[wid].can_accept()
]
if not available:
return False # Backpressure: reject task
try:
self.task_queue.put_nowait((task.priority, task.created_at, task))
return True
except asyncio.QueueFull:
return False
async def _dispatch_loop(self):
"""Main dispatch loop — assigns tasks to workers."""
while self._running:
try:
priority, _, task = await asyncio.wait_for(
self.task_queue.get(), timeout=1.0
)
except asyncio.TimeoutError:
continue
# Find best worker for this task
candidates = self._type_map.get(task.task_type, [])
available = [
self.workers[wid]
for wid in candidates
if self.workers[wid].can_accept()
]
if not available:
# Re-queue with slight delay
await asyncio.sleep(0.5)
await self.task_queue.put((priority, task.created_at, task))
continue
# Pick least loaded worker
worker = min(available, key=lambda w: w.load_factor)
asyncio.create_task(self._execute(worker, task))
async def _execute(self, worker: WorkerAgent, task: Task):
"""Execute a task on a worker with timeout and error handling."""
worker.current_tasks += 1
if worker.load_factor > 0.8:
worker.state = WorkerState.OVERLOADED
else:
worker.state = WorkerState.BUSY
start = time.perf_counter()
try:
result = await asyncio.wait_for(
worker.handler(task.payload),
timeout=task.timeout_seconds,
)
duration = (time.perf_counter() - start) * 1000
worker.total_completed += 1
# Running average
worker.avg_duration_ms = (
worker.avg_duration_ms * 0.9 + duration * 0.1
)
self.results[task.task_id] = TaskResult(
task_id=task.task_id,
worker_id=worker.agent_id,
success=True,
result=result,
duration_ms=round(duration, 2),
)
except asyncio.TimeoutError:
worker.total_errors += 1
self.results[task.task_id] = TaskResult(
task_id=task.task_id,
worker_id=worker.agent_id,
success=False,
error=f"Timeout after {task.timeout_seconds}s",
duration_ms=task.timeout_seconds * 1000,
)
except Exception as e:
worker.total_errors += 1
duration = (time.perf_counter() - start) * 1000
self.results[task.task_id] = TaskResult(
task_id=task.task_id,
worker_id=worker.agent_id,
success=False,
error=str(e),
duration_ms=round(duration, 2),
)
# Mark worker as failed if error rate > 50%
total = worker.total_completed + worker.total_errors
if total > 5 and worker.total_errors / total > 0.5:
worker.state = WorkerState.FAILED
finally:
worker.current_tasks -= 1
if worker.current_tasks == 0 and worker.state != WorkerState.FAILED:
worker.state = WorkerState.IDLE
async def start(self):
self._running = True
asyncio.create_task(self._dispatch_loop())
async def stop(self):
self._running = False
def dashboard(self) -> dict:
return {
"system_load": round(self.system_load() * 100, 1),
"queue_size": self.task_queue.qsize(),
"workers": {
w.name: {
"state": w.state.value,
"load": f"{w.load_factor:.0%}",
"completed": w.total_completed,
"errors": w.total_errors,
"avg_ms": round(w.avg_duration_ms, 1),
}
for w in self.workers.values()
},
}
The submit() returning False is the backpressure signal. The caller (your API, your UI, your upstream agent) decides what to do: queue externally, show a "busy" message, or drop the request.
Pattern 2: Shared State with Conflict Resolution
Multiple agents reading and writing shared state creates race conditions. Agent A reads user preferences, Agent B updates them, Agent A writes stale data back.
interface StateVersion {
data: Record<string, unknown>;
version: number;
lastModifiedBy: string;
timestamp: number;
}
interface StateConflict {
key: string;
currentVersion: number;
attemptedVersion: number;
currentValue: unknown;
attemptedValue: unknown;
modifiedBy: string;
}
type ConflictStrategy = "last-write-wins" | "first-write-wins" | "merge" | "reject";
class SharedAgentState {
private store: Map<string, StateVersion> = new Map();
private conflictStrategy: ConflictStrategy;
private history: Map<string, StateVersion[]> = new Map();
private locks: Map<string, { holder: string; acquired: number }> = new Map();
constructor(strategy: ConflictStrategy = "reject") {
this.conflictStrategy = strategy;
}
read(key: string): { data: unknown; version: number } | null {
const entry = this.store.get(key);
if (!entry) return null;
return { data: structuredClone(entry.data), version: entry.version };
}
write(
key: string,
data: Record<string, unknown>,
agentId: string,
expectedVersion?: number
): { success: boolean; conflict?: StateConflict; version: number } {
const current = this.store.get(key);
// New key — just write
if (!current) {
const version: StateVersion = {
data,
version: 1,
lastModifiedBy: agentId,
timestamp: Date.now(),
};
this.store.set(key, version);
this.appendHistory(key, version);
return { success: true, version: 1 };
}
// Version check
if (expectedVersion !== undefined && expectedVersion !== current.version) {
const conflict: StateConflict = {
key,
currentVersion: current.version,
attemptedVersion: expectedVersion,
currentValue: current.data,
attemptedValue: data,
modifiedBy: current.lastModifiedBy,
};
return this.resolveConflict(key, conflict, data, agentId);
}
// No version check or version matches — write
const newVersion: StateVersion = {
data,
version: (current?.version ?? 0) + 1,
lastModifiedBy: agentId,
timestamp: Date.now(),
};
this.store.set(key, newVersion);
this.appendHistory(key, newVersion);
return { success: true, version: newVersion.version };
}
private resolveConflict(
key: string,
conflict: StateConflict,
newData: Record<string, unknown>,
agentId: string
): { success: boolean; conflict?: StateConflict; version: number } {
const current = this.store.get(key)!;
switch (this.conflictStrategy) {
case "last-write-wins": {
const version: StateVersion = {
data: newData,
version: current.version + 1,
lastModifiedBy: agentId,
timestamp: Date.now(),
};
this.store.set(key, version);
this.appendHistory(key, version);
return { success: true, version: version.version };
}
case "first-write-wins":
return { success: false, conflict, version: current.version };
case "merge": {
// Deep merge: current wins on conflicts, new adds missing keys
const merged = this.deepMerge(
current.data as Record<string, unknown>,
newData
);
const version: StateVersion = {
data: merged,
version: current.version + 1,
lastModifiedBy: `${current.lastModifiedBy}+${agentId}`,
timestamp: Date.now(),
};
this.store.set(key, version);
this.appendHistory(key, version);
return { success: true, version: version.version };
}
case "reject":
default:
return { success: false, conflict, version: current.version };
}
}
private deepMerge(
target: Record<string, unknown>,
source: Record<string, unknown>
): Record<string, unknown> {
const result = { ...target };
for (const key of Object.keys(source)) {
if (
key in result &&
typeof result[key] === "object" &&
typeof source[key] === "object" &&
result[key] !== null &&
source[key] !== null
) {
result[key] = this.deepMerge(
result[key] as Record<string, unknown>,
source[key] as Record<string, unknown>
);
} else if (!(key in result)) {
result[key] = source[key];
}
// If key exists in both and isn't object: target wins
}
return result;
}
// Optimistic locking helper
async acquireLock(
key: string,
agentId: string,
timeoutMs: number = 5000
): Promise<boolean> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const existing = this.locks.get(key);
if (!existing || Date.now() - existing.acquired > 30000) {
// No lock or stale lock
this.locks.set(key, { holder: agentId, acquired: Date.now() });
return true;
}
if (existing.holder === agentId) return true; // Re-entrant
await new Promise((resolve) => setTimeout(resolve, 100));
}
return false;
}
releaseLock(key: string, agentId: string): boolean {
const lock = this.locks.get(key);
if (lock?.holder === agentId) {
this.locks.delete(key);
return true;
}
return false;
}
private appendHistory(key: string, version: StateVersion): void {
const hist = this.history.get(key) ?? [];
hist.push(structuredClone(version));
// Keep last 20 versions
if (hist.length > 20) hist.shift();
this.history.set(key, hist);
}
getHistory(key: string): StateVersion[] {
return this.history.get(key) ?? [];
}
}
For most agent systems, "merge" strategy works best — agents contribute different fields to the same state object without overwriting each other.
Pattern 3: Cost-Aware Task Routing
Not all LLM calls are equal. A simple classification task doesn't need GPT-4o. A complex reasoning task shouldn't go to a cheap model. Route tasks based on estimated cost and complexity.
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class Complexity(Enum):
TRIVIAL = 1 # Classification, extraction
SIMPLE = 2 # Summarization, Q&A
MODERATE = 3 # Multi-step reasoning
COMPLEX = 4 # Code generation, analysis
EXPERT = 5 # Architecture, novel solutions
@dataclass
class ModelConfig:
name: str
provider: str
cost_per_1k_input: float # USD
cost_per_1k_output: float # USD
max_context: int
speed_tokens_per_sec: float
min_complexity: Complexity
max_complexity: Complexity
@dataclass
class TaskEstimate:
estimated_input_tokens: int
estimated_output_tokens: int
complexity: Complexity
requires_tools: bool = False
requires_vision: bool = False
max_latency_ms: Optional[float] = None
class CostAwareRouter:
def __init__(self, monthly_budget_usd: float = 100.0):
self.models: list[ModelConfig] = []
self.monthly_budget = monthly_budget_usd
self.month_spend: float = 0.0
self.call_log: list[dict] = []
def add_model(self, config: ModelConfig) -> None:
self.models.append(config)
def estimate_cost(self, model: ModelConfig, task: TaskEstimate) -> float:
input_cost = (task.estimated_input_tokens / 1000) * model.cost_per_1k_input
output_cost = (task.estimated_output_tokens / 1000) * model.cost_per_1k_output
return round(input_cost + output_cost, 6)
def estimate_latency(self, model: ModelConfig, task: TaskEstimate) -> float:
"""Estimated latency in ms."""
total_tokens = task.estimated_input_tokens + task.estimated_output_tokens
seconds = total_tokens / model.speed_tokens_per_sec
return seconds * 1000
def route(self, task: TaskEstimate) -> ModelConfig:
"""Pick the cheapest model that meets all requirements."""
candidates = []
for model in self.models:
# Filter by complexity range
if task.complexity.value < model.min_complexity.value:
continue
if task.complexity.value > model.max_complexity.value:
continue
# Filter by context size
total_tokens = task.estimated_input_tokens + task.estimated_output_tokens
if total_tokens > model.max_context:
continue
# Filter by latency requirement
if task.max_latency_ms:
estimated_latency = self.estimate_latency(model, task)
if estimated_latency > task.max_latency_ms:
continue
cost = self.estimate_cost(model, task)
# Budget check
if self.month_spend + cost > self.monthly_budget:
continue
candidates.append((model, cost))
if not candidates:
# Fallback: pick cheapest regardless of complexity match
fallback = min(self.models, key=lambda m: m.cost_per_1k_input)
return fallback
# Sort by cost, pick cheapest
candidates.sort(key=lambda x: x[1])
chosen_model, chosen_cost = candidates[0]
self.month_spend += chosen_cost
self.call_log.append({
"model": chosen_model.name,
"cost": chosen_cost,
"complexity": task.complexity.value,
})
return chosen_model
def budget_report(self) -> dict:
return {
"monthly_budget": self.monthly_budget,
"spent": round(self.month_spend, 4),
"remaining": round(self.monthly_budget - self.month_spend, 4),
"utilization_pct": round(
self.month_spend / self.monthly_budget * 100, 1
),
"total_calls": len(self.call_log),
"by_model": self._group_by_model(),
}
def _group_by_model(self) -> dict:
groups: dict[str, dict] = {}
for entry in self.call_log:
name = entry["model"]
if name not in groups:
groups[name] = {"calls": 0, "total_cost": 0}
groups[name]["calls"] += 1
groups[name]["total_cost"] = round(
groups[name]["total_cost"] + entry["cost"], 4
)
return groups
# Setup
router = CostAwareRouter(monthly_budget_usd=50.0)
router.add_model(ModelConfig(
name="gpt-4.1-nano",
provider="openai",
cost_per_1k_input=0.00010,
cost_per_1k_output=0.00040,
max_context=1_048_576,
speed_tokens_per_sec=300,
min_complexity=Complexity.TRIVIAL,
max_complexity=Complexity.SIMPLE,
))
router.add_model(ModelConfig(
name="gpt-4.1-mini",
provider="openai",
cost_per_1k_input=0.0004,
cost_per_1k_output=0.0016,
max_context=1_048_576,
speed_tokens_per_sec=200,
min_complexity=Complexity.TRIVIAL,
max_complexity=Complexity.MODERATE,
))
router.add_model(ModelConfig(
name="claude-sonnet-4",
provider="anthropic",
cost_per_1k_input=0.003,
cost_per_1k_output=0.015,
max_context=200_000,
speed_tokens_per_sec=150,
min_complexity=Complexity.MODERATE,
max_complexity=Complexity.EXPERT,
))
router.add_model(ModelConfig(
name="claude-opus-4",
provider="anthropic",
cost_per_1k_input=0.015,
cost_per_1k_output=0.075,
max_context=200_000,
speed_tokens_per_sec=80,
min_complexity=Complexity.COMPLEX,
max_complexity=Complexity.EXPERT,
))
# Route a task
task = TaskEstimate(
estimated_input_tokens=2000,
estimated_output_tokens=500,
complexity=Complexity.SIMPLE,
max_latency_ms=3000,
)
model = router.route(task)
print(f"Routed to: {model.name} (${router.estimate_cost(model, task):.4f})")
# → Routed to: gpt-4.1-nano ($0.0004)
This pattern alone can cut your LLM costs by 60-80%. Most agent tasks are TRIVIAL or SIMPLE — don't send them to your expensive model.
Pattern 4: Agent Memory with Decay
Long-running agents accumulate context. But not all context ages equally. A user preference from 5 minutes ago matters more than a search result from 2 hours ago.
import time
import math
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
class MemoryType(Enum):
FACT = "fact" # User said X, system state
INSTRUCTION = "instruction" # User preferences, rules
RESULT = "result" # Tool results, search hits
CONVERSATION = "conversation" # Chat messages
DECISION = "decision" # Agent's past decisions
@dataclass
class MemoryEntry:
content: str
memory_type: MemoryType
created_at: float = field(default_factory=time.time)
last_accessed: float = field(default_factory=time.time)
access_count: int = 0
importance: float = 1.0 # 0.0 to 1.0
metadata: dict = field(default_factory=dict)
def relevance_score(self, now: Optional[float] = None) -> float:
"""Calculate current relevance with time decay."""
now = now or time.time()
age_hours = (now - self.created_at) / 3600
recency_hours = (now - self.last_accessed) / 3600
# Different decay rates per type
decay_rates = {
MemoryType.FACT: 0.01, # Very slow decay
MemoryType.INSTRUCTION: 0.005, # Almost permanent
MemoryType.RESULT: 0.1, # Fast decay
MemoryType.CONVERSATION: 0.05, # Moderate decay
MemoryType.DECISION: 0.02, # Slow decay
}
decay_rate = decay_rates.get(self.memory_type, 0.05)
# Exponential decay based on age
time_factor = math.exp(-decay_rate * age_hours)
# Boost from recent access
recency_boost = math.exp(-0.1 * recency_hours)
# Frequency boost (diminishing returns)
frequency_boost = math.log(1 + self.access_count) / 5
score = (
self.importance * 0.4 +
time_factor * 0.3 +
recency_boost * 0.2 +
frequency_boost * 0.1
)
return min(1.0, max(0.0, score))
class AgentMemory:
def __init__(
self,
max_entries: int = 500,
max_context_tokens: int = 8000,
eviction_threshold: float = 0.1,
):
self.entries: list[MemoryEntry] = []
self.max_entries = max_entries
self.max_context_tokens = max_context_tokens
self.eviction_threshold = eviction_threshold
def add(
self,
content: str,
memory_type: MemoryType,
importance: float = 0.5,
metadata: Optional[dict] = None,
) -> MemoryEntry:
entry = MemoryEntry(
content=content,
memory_type=memory_type,
importance=importance,
metadata=metadata or {},
)
self.entries.append(entry)
# Evict if over capacity
if len(self.entries) > self.max_entries:
self._evict()
return entry
def recall(
self,
top_k: int = 20,
memory_types: Optional[list[MemoryType]] = None,
min_relevance: float = 0.0,
) -> list[MemoryEntry]:
"""Retrieve most relevant memories."""
now = time.time()
candidates = self.entries
if memory_types:
candidates = [e for e in candidates if e.memory_type in memory_types]
# Score and sort
scored = [
(entry, entry.relevance_score(now))
for entry in candidates
]
scored = [(e, s) for e, s in scored if s >= min_relevance]
scored.sort(key=lambda x: x[1], reverse=True)
# Update access stats
results = []
for entry, score in scored[:top_k]:
entry.last_accessed = now
entry.access_count += 1
results.append(entry)
return results
def build_context(
self,
max_tokens: Optional[int] = None,
) -> str:
"""Build a context string from memories, fitting within token budget."""
max_tokens = max_tokens or self.max_context_tokens
memories = self.recall(top_k=50)
# Group by type for organized context
groups: dict[MemoryType, list[MemoryEntry]] = {}
for entry in memories:
groups.setdefault(entry.memory_type, []).append(entry)
sections = []
total_chars = 0
char_budget = max_tokens * 4 # Rough token estimate
# Priority order for context building
type_order = [
MemoryType.INSTRUCTION,
MemoryType.DECISION,
MemoryType.FACT,
MemoryType.CONVERSATION,
MemoryType.RESULT,
]
for mem_type in type_order:
if mem_type not in groups:
continue
section_entries = groups[mem_type]
header = f"## {mem_type.value.title()}s"
section_lines = [header]
for entry in section_entries:
line = f"- {entry.content}"
if total_chars + len(line) > char_budget:
break
section_lines.append(line)
total_chars += len(line)
if len(section_lines) > 1:
sections.append("\n".join(section_lines))
return "\n\n".join(sections)
def _evict(self):
"""Remove lowest-relevance entries."""
now = time.time()
self.entries.sort(key=lambda e: e.relevance_score(now), reverse=True)
# Keep only top max_entries
evicted = self.entries[self.max_entries:]
self.entries = self.entries[:self.max_entries]
if evicted:
print(f"🧹 Evicted {len(evicted)} low-relevance memories")
def stats(self) -> dict:
now = time.time()
scores = [e.relevance_score(now) for e in self.entries]
return {
"total_entries": len(self.entries),
"by_type": {
t.value: sum(1 for e in self.entries if e.memory_type == t)
for t in MemoryType
},
"avg_relevance": round(sum(scores) / len(scores), 3) if scores else 0,
"min_relevance": round(min(scores), 3) if scores else 0,
"max_relevance": round(max(scores), 3) if scores else 0,
}
# Usage
memory = AgentMemory(max_entries=200, max_context_tokens=4000)
# User preferences decay very slowly
memory.add(
"User prefers Python over JavaScript",
MemoryType.INSTRUCTION,
importance=0.9,
)
# Search results decay fast
memory.add(
"Found 3 repos matching 'MCP server': repo-a (234 stars), repo-b (89 stars), repo-c (12 stars)",
MemoryType.RESULT,
importance=0.4,
)
# Build context for next LLM call
context = memory.build_context(max_tokens=2000)
The decay rates per type make all the difference. Instructions should persist almost forever. Search results should fade quickly.
Pattern 5: Agent Checkpoint and Recovery
Long-running agent workflows crash. Network blip, OOM, timeout. Without checkpointing, you lose all progress and start over.
interface Checkpoint {
workflowId: string;
stepIndex: number;
stepName: string;
state: Record<string, unknown>;
completedSteps: string[];
timestamp: number;
metadata: Record<string, unknown>;
}
interface WorkflowStep {
name: string;
handler: (state: Record<string, unknown>) => Promise<Record<string, unknown>>;
retryable: boolean;
maxRetries: number;
timeoutMs: number;
}
class CheckpointedWorkflow {
private steps: WorkflowStep[] = [];
private checkpoints: Map<string, Checkpoint> = new Map();
private storage: CheckpointStorage;
constructor(storage: CheckpointStorage) {
this.storage = storage;
}
addStep(step: WorkflowStep): this {
this.steps.push(step);
return this;
}
async execute(
workflowId: string,
initialState: Record<string, unknown> = {}
): Promise<Record<string, unknown>> {
// Check for existing checkpoint
let checkpoint = await this.storage.load(workflowId);
let state = checkpoint?.state ?? initialState;
let startIndex = checkpoint ? checkpoint.stepIndex + 1 : 0;
let completedSteps = checkpoint?.completedSteps ?? [];
if (checkpoint) {
console.log(
`♻️ Resuming workflow "${workflowId}" from step ${startIndex} ` +
`(${completedSteps.length}/${this.steps.length} complete)`
);
}
for (let i = startIndex; i < this.steps.length; i++) {
const step = this.steps[i];
let attempts = 0;
let lastError: Error | null = null;
while (attempts <= step.maxRetries) {
try {
console.log(
`▶️ Step ${i + 1}/${this.steps.length}: ${step.name} ` +
`(attempt ${attempts + 1}/${step.maxRetries + 1})`
);
const result = await Promise.race([
step.handler(state),
this.timeout(step.timeoutMs, step.name),
]);
state = { ...state, ...result };
completedSteps.push(step.name);
// Save checkpoint
const cp: Checkpoint = {
workflowId,
stepIndex: i,
stepName: step.name,
state,
completedSteps,
timestamp: Date.now(),
metadata: { attempts: attempts + 1 },
};
await this.storage.save(cp);
break; // Step succeeded
} catch (error) {
lastError = error as Error;
attempts++;
if (!step.retryable || attempts > step.maxRetries) {
// Save failed state for debugging
await this.storage.save({
workflowId,
stepIndex: i,
stepName: step.name,
state: {
...state,
_failedAt: step.name,
_error: lastError.message,
_attempts: attempts,
},
completedSteps,
timestamp: Date.now(),
metadata: { failed: true },
});
throw new WorkflowStepError(
step.name,
i,
lastError,
completedSteps
);
}
// Exponential backoff
const delay = Math.min(1000 * Math.pow(2, attempts), 30000);
console.log(
`⚠️ Step "${step.name}" failed (attempt ${attempts}), ` +
`retrying in ${delay}ms: ${lastError.message}`
);
await new Promise((r) => setTimeout(r, delay));
}
}
}
// Workflow complete — clean up checkpoint
await this.storage.delete(workflowId);
console.log(`✅ Workflow "${workflowId}" complete`);
return state;
}
private timeout(ms: number, stepName: string): Promise<never> {
return new Promise((_, reject) =>
setTimeout(
() => reject(new Error(`Step "${stepName}" timed out after ${ms}ms`)),
ms
)
);
}
}
class WorkflowStepError extends Error {
constructor(
public stepName: string,
public stepIndex: number,
public cause: Error,
public completedSteps: string[]
) {
super(
`Workflow failed at step "${stepName}" (${stepIndex}): ${cause.message}. ` +
`Completed: [${completedSteps.join(", ")}]`
);
}
}
// Simple file-based storage (swap for Redis/DB in production)
interface CheckpointStorage {
save(checkpoint: Checkpoint): Promise<void>;
load(workflowId: string): Promise<Checkpoint | null>;
delete(workflowId: string): Promise<void>;
}
class FileCheckpointStorage implements CheckpointStorage {
constructor(private dir: string = "./checkpoints") {}
async save(checkpoint: Checkpoint): Promise<void> {
const fs = await import("fs/promises");
await fs.mkdir(this.dir, { recursive: true });
const path = `${this.dir}/${checkpoint.workflowId}.json`;
await fs.writeFile(path, JSON.stringify(checkpoint, null, 2));
}
async load(workflowId: string): Promise<Checkpoint | null> {
const fs = await import("fs/promises");
const path = `${this.dir}/${workflowId}.json`;
try {
const data = await fs.readFile(path, "utf-8");
return JSON.parse(data);
} catch {
return null;
}
}
async delete(workflowId: string): Promise<void> {
const fs = await import("fs/promises");
const path = `${this.dir}/${workflowId}.json`;
try {
await fs.unlink(path);
} catch {
// Already deleted
}
}
}
// Usage: research agent workflow
const workflow = new CheckpointedWorkflow(new FileCheckpointStorage());
workflow
.addStep({
name: "gather_requirements",
handler: async (state) => {
// Parse user query, extract entities
return { entities: ["MCP", "production", "patterns"], query: state.userQuery };
},
retryable: false,
maxRetries: 0,
timeoutMs: 10000,
})
.addStep({
name: "search_sources",
handler: async (state) => {
// Search across multiple sources
return { sources: ["arxiv:123", "github:456", "blog:789"] };
},
retryable: true,
maxRetries: 3,
timeoutMs: 30000,
})
.addStep({
name: "synthesize_results",
handler: async (state) => {
// LLM call to synthesize
return { synthesis: "..." };
},
retryable: true,
maxRetries: 2,
timeoutMs: 60000,
})
.addStep({
name: "format_output",
handler: async (state) => {
return { finalReport: `Report based on ${(state.sources as string[]).length} sources` };
},
retryable: false,
maxRetries: 0,
timeoutMs: 5000,
});
// If this crashes at step 2, next run resumes from step 2
const result = await workflow.execute("research-mcp-patterns", {
userQuery: "Best practices for MCP in production",
});
The resume behavior is automatic. Crash at step 3? Next run loads the checkpoint and starts at step 3 with all previous state intact.
Pattern 6: Token Budget Allocator Across Agent Teams
When you have a team of agents working on a shared task, one chatty agent can consume the entire token budget. A budget allocator ensures fair distribution.
from dataclasses import dataclass, field
from typing import Optional
import time
import threading
@dataclass
class AgentBudget:
agent_id: str
allocated_tokens: int
used_tokens: int = 0
priority: float = 1.0 # Higher = gets more budget on reallocation
last_activity: float = field(default_factory=time.time)
@property
def remaining(self) -> int:
return max(0, self.allocated_tokens - self.used_tokens)
@property
def utilization(self) -> float:
if self.allocated_tokens == 0:
return 0
return self.used_tokens / self.allocated_tokens
class TokenBudgetAllocator:
def __init__(self, total_budget: int):
self.total_budget = total_budget
self.agents: dict[str, AgentBudget] = {}
self._lock = threading.Lock()
def register_agent(
self,
agent_id: str,
priority: float = 1.0,
initial_allocation_pct: Optional[float] = None,
) -> AgentBudget:
with self._lock:
if initial_allocation_pct:
allocation = int(self.total_budget * initial_allocation_pct)
else:
# Equal split among all agents
n_agents = len(self.agents) + 1
allocation = self.total_budget // n_agents
# Rebalance existing agents
for existing in self.agents.values():
existing.allocated_tokens = self.total_budget // n_agents
budget = AgentBudget(
agent_id=agent_id,
allocated_tokens=allocation,
priority=priority,
)
self.agents[agent_id] = budget
return budget
def request_tokens(
self,
agent_id: str,
tokens_needed: int,
) -> tuple[bool, int]:
"""Request tokens. Returns (approved, tokens_granted)."""
with self._lock:
budget = self.agents.get(agent_id)
if not budget:
return False, 0
if budget.remaining >= tokens_needed:
budget.used_tokens += tokens_needed
budget.last_activity = time.time()
return True, tokens_needed
# Try to steal from idle agents
stolen = self._try_steal(agent_id, tokens_needed - budget.remaining)
available = budget.remaining + stolen
if available >= tokens_needed:
budget.used_tokens += tokens_needed
budget.allocated_tokens += stolen
budget.last_activity = time.time()
return True, tokens_needed
# Partial grant
if available > 0:
budget.used_tokens += available
budget.allocated_tokens += stolen
budget.last_activity = time.time()
return True, available
return False, 0
def _try_steal(self, requester_id: str, tokens_needed: int) -> int:
"""Try to reallocate tokens from idle/low-priority agents."""
stolen = 0
now = time.time()
# Sort donors: idle first, then low priority
donors = [
(aid, ab)
for aid, ab in self.agents.items()
if aid != requester_id and ab.remaining > 0
]
donors.sort(key=lambda x: (
-(now - x[1].last_activity), # Most idle first
x[1].priority, # Lowest priority first
))
for donor_id, donor_budget in donors:
if stolen >= tokens_needed:
break
# Don't steal more than 50% of donor's remaining
stealable = min(
donor_budget.remaining // 2,
tokens_needed - stolen,
)
if stealable > 0:
donor_budget.allocated_tokens -= stealable
stolen += stealable
return stolen
def report_usage(
self,
agent_id: str,
actual_tokens_used: int,
) -> None:
"""Report actual usage after an LLM call (for accurate tracking)."""
with self._lock:
budget = self.agents.get(agent_id)
if budget:
# Adjust if actual differs from requested
budget.used_tokens = max(0, budget.used_tokens)
budget.last_activity = time.time()
def dashboard(self) -> dict:
total_used = sum(a.used_tokens for a in self.agents.values())
total_allocated = sum(a.allocated_tokens for a in self.agents.values())
return {
"total_budget": self.total_budget,
"total_allocated": total_allocated,
"total_used": total_used,
"budget_remaining": self.total_budget - total_used,
"agents": {
aid: {
"allocated": ab.allocated_tokens,
"used": ab.used_tokens,
"remaining": ab.remaining,
"utilization": f"{ab.utilization:.0%}",
"priority": ab.priority,
}
for aid, ab in self.agents.items()
},
}
# Usage
allocator = TokenBudgetAllocator(total_budget=100_000)
allocator.register_agent("researcher", priority=2.0)
allocator.register_agent("writer", priority=1.5)
allocator.register_agent("reviewer", priority=1.0)
# Researcher requests tokens for a big search
approved, granted = allocator.request_tokens("researcher", 15000)
print(f"Researcher: approved={approved}, granted={granted}")
# Writer requests tokens — might steal from idle reviewer
approved, granted = allocator.request_tokens("writer", 20000)
print(f"Writer: approved={approved}, granted={granted}")
print(allocator.dashboard())
The token-stealing mechanism is what makes this work in practice. Idle agents shouldn't hoard tokens while active agents starve.
Pattern 7: Dead Letter Queue for Failed Agent Tasks
When an agent task fails after all retries, it shouldn't just disappear. A dead letter queue captures failed tasks for analysis, replay, or manual resolution.
import json
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Optional, Callable
from pathlib import Path
from datetime import datetime, timezone
@dataclass
class DeadLetter:
letter_id: str
task_type: str
payload: dict
error: str
error_type: str
agent_id: str
attempts: int
first_attempt_at: float
last_attempt_at: float
context: dict = field(default_factory=dict)
resolved: bool = False
resolution: Optional[str] = None
def age_hours(self) -> float:
return (time.time() - self.first_attempt_at) / 3600
class DeadLetterQueue:
def __init__(
self,
storage_dir: str = "./dead_letters",
max_age_hours: float = 72.0,
alert_callback: Optional[Callable] = None,
):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.max_age_hours = max_age_hours
self.alert_callback = alert_callback
self._letters: dict[str, DeadLetter] = {}
self._load_existing()
def _load_existing(self):
for f in self.storage_dir.glob("*.json"):
try:
data = json.loads(f.read_text())
letter = DeadLetter(**data)
self._letters[letter.letter_id] = letter
except Exception:
pass
def add(
self,
task_type: str,
payload: dict,
error: Exception,
agent_id: str,
attempts: int,
context: Optional[dict] = None,
) -> DeadLetter:
letter = DeadLetter(
letter_id=f"dl-{uuid.uuid4().hex[:10]}",
task_type=task_type,
payload=payload,
error=str(error),
error_type=type(error).__name__,
agent_id=agent_id,
attempts=attempts,
first_attempt_at=time.time(),
last_attempt_at=time.time(),
context=context or {},
)
self._letters[letter.letter_id] = letter
self._persist(letter)
if self.alert_callback:
self.alert_callback(letter)
return letter
def list_unresolved(
self,
task_type: Optional[str] = None,
limit: int = 50,
) -> list[DeadLetter]:
letters = [
l for l in self._letters.values()
if not l.resolved
]
if task_type:
letters = [l for l in letters if l.task_type == task_type]
letters.sort(key=lambda l: l.last_attempt_at, reverse=True)
return letters[:limit]
async def replay(
self,
letter_id: str,
handler: Callable,
modify_payload: Optional[Callable] = None,
) -> tuple[bool, Any]:
"""Replay a dead letter task."""
letter = self._letters.get(letter_id)
if not letter:
raise ValueError(f"Dead letter {letter_id} not found")
payload = letter.payload
if modify_payload:
payload = modify_payload(payload)
try:
result = await handler(payload)
letter.resolved = True
letter.resolution = f"Replayed successfully at {datetime.now(timezone.utc).isoformat()}"
self._persist(letter)
return True, result
except Exception as e:
letter.last_attempt_at = time.time()
letter.attempts += 1
letter.error = str(e)
self._persist(letter)
return False, str(e)
def resolve(self, letter_id: str, resolution: str) -> bool:
"""Manually resolve a dead letter."""
letter = self._letters.get(letter_id)
if not letter:
return False
letter.resolved = True
letter.resolution = resolution
self._persist(letter)
return True
def purge_old(self) -> int:
"""Remove resolved letters older than max_age_hours."""
cutoff = time.time() - (self.max_age_hours * 3600)
to_remove = [
lid for lid, l in self._letters.items()
if l.resolved and l.last_attempt_at < cutoff
]
for lid in to_remove:
del self._letters[lid]
path = self.storage_dir / f"{lid}.json"
path.unlink(missing_ok=True)
return len(to_remove)
def summary(self) -> dict:
unresolved = [l for l in self._letters.values() if not l.resolved]
resolved = [l for l in self._letters.values() if l.resolved]
by_type: dict[str, int] = {}
by_error: dict[str, int] = {}
for l in unresolved:
by_type[l.task_type] = by_type.get(l.task_type, 0) + 1
by_error[l.error_type] = by_error.get(l.error_type, 0) + 1
return {
"total": len(self._letters),
"unresolved": len(unresolved),
"resolved": len(resolved),
"by_task_type": by_type,
"by_error_type": by_error,
"oldest_unresolved_hours": round(
max((l.age_hours() for l in unresolved), default=0), 1
),
}
def _persist(self, letter: DeadLetter):
path = self.storage_dir / f"{letter.letter_id}.json"
path.write_text(json.dumps(asdict(letter), indent=2, default=str))
# Integration with agent system
dlq = DeadLetterQueue(
storage_dir="./dead_letters",
alert_callback=lambda l: print(
f"🚨 Dead letter: {l.task_type} failed after {l.attempts} attempts: {l.error[:100]}"
),
)
# In your agent error handler:
async def run_agent_task(task_type: str, payload: dict, agent_id: str):
max_retries = 3
for attempt in range(max_retries):
try:
return await execute_task(task_type, payload)
except Exception as e:
if attempt == max_retries - 1:
dlq.add(
task_type=task_type,
payload=payload,
error=e,
agent_id=agent_id,
attempts=max_retries,
context={"last_attempt": attempt + 1},
)
raise
# Check DLQ status
print(dlq.summary())
# → {'total': 3, 'unresolved': 2, 'by_task_type': {'search': 1, 'synthesize': 1}, ...}
The replay mechanism is the killer feature. Found the bug? Fix it and replay the dead letter — no need to recreate the original request.
Putting It All Together: The Agent Operating System
These 7 patterns form a layered architecture for multi-agent systems:
┌─────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────┤
│ [Cost Router] → Pick the right model │
│ [Budget Allocator] → Fair token sharing │
├─────────────────────────────────────────────┤
│ [Supervisor + Backpressure] │
│ [Shared State + Conflict Resolution] │
├─────────────────────────────────────────────┤
│ [Agent Memory with Decay] │
│ [Checkpoint + Recovery] │
├─────────────────────────────────────────────┤
│ [Dead Letter Queue] │
│ Observability + Alerting │
└─────────────────────────────────────────────┘
Each layer solves a different failure mode:
- Cost Router: Prevents budget blowouts
- Budget Allocator: Prevents resource starvation
- Supervisor: Prevents overload
- Shared State: Prevents data corruption
- Memory: Prevents context pollution
- Checkpoint: Prevents lost progress
- Dead Letter Queue: Prevents silent failures
Key Takeaways
- Backpressure stops cascading failures when agents can't keep up
- Conflict resolution on shared state prevents stale writes
- Cost-aware routing can cut LLM spend by 60-80%
- Memory decay keeps context fresh and relevant
- Checkpointing makes long workflows crash-safe
- Token budgets prevent chatty agents from starving the team
- Dead letter queues capture failures for replay and debugging
Building a single agent is easy. Building a system of agents that runs reliably is engineering. These patterns are the difference.
Want production-ready implementations of these patterns plus agent templates for LangGraph, CrewAI, and AutoGen? Check out the AI Dev Toolkit — ship multi-agent systems that actually work in production.
Top comments (0)