DEV Community

dohko
dohko

Posted on

7 AI Agent Orchestration Patterns That Actually Scale Beyond a Single Demo

Every AI agent framework has a "build a research agent in 10 lines" tutorial. Cool. Now try running 50 agents concurrently, handling failures, managing shared state, and keeping costs under control.

That's where demos die and engineering begins.

These are 7 orchestration patterns that work across frameworks — LangGraph, CrewAI, AutoGen, OpenAI Agents SDK, or your own custom setup. The patterns are framework-agnostic because good architecture outlasts any library.


Pattern 1: The Supervisor with Backpressure

The classic supervisor pattern — one agent delegates to workers — breaks down under load. Worker 3 is slow, but the supervisor keeps sending it tasks. Queue grows. Memory grows. Everything dies.

Backpressure means: when a worker is overwhelmed, the system slows down instead of crashing.

import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from enum import Enum
import time
import uuid


class WorkerState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    OVERLOADED = "overloaded"
    FAILED = "failed"


@dataclass
class WorkerAgent:
    agent_id: str
    name: str
    handler: Callable
    max_concurrent: int = 3
    current_tasks: int = 0
    state: WorkerState = WorkerState.IDLE
    total_completed: int = 0
    total_errors: int = 0
    avg_duration_ms: float = 0

    @property
    def load_factor(self) -> float:
        return self.current_tasks / self.max_concurrent

    def can_accept(self) -> bool:
        return self.current_tasks < self.max_concurrent and self.state != WorkerState.FAILED


@dataclass
class Task:
    task_id: str
    task_type: str
    payload: dict
    priority: int = 0  # Lower = higher priority
    created_at: float = field(default_factory=time.time)
    timeout_seconds: float = 60.0


@dataclass
class TaskResult:
    task_id: str
    worker_id: str
    success: bool
    result: Any = None
    error: Optional[str] = None
    duration_ms: float = 0


class BackpressureSupervisor:
    def __init__(
        self,
        max_queue_size: int = 100,
        backpressure_threshold: float = 0.8,
    ):
        self.workers: dict[str, WorkerAgent] = {}
        self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue(
            maxsize=max_queue_size
        )
        self.results: dict[str, TaskResult] = {}
        self.backpressure_threshold = backpressure_threshold
        self._running = False

    def register_worker(
        self,
        name: str,
        handler: Callable,
        task_types: list[str],
        max_concurrent: int = 3,
    ) -> str:
        agent_id = f"worker-{name}-{uuid.uuid4().hex[:6]}"
        worker = WorkerAgent(
            agent_id=agent_id,
            name=name,
            handler=handler,
            max_concurrent=max_concurrent,
        )
        self.workers[agent_id] = worker
        # Store task type mapping
        if not hasattr(self, "_type_map"):
            self._type_map: dict[str, list[str]] = {}
        for t in task_types:
            self._type_map.setdefault(t, []).append(agent_id)
        return agent_id

    def system_load(self) -> float:
        """Overall system load factor (0.0 to 1.0)."""
        if not self.workers:
            return 1.0
        total_capacity = sum(w.max_concurrent for w in self.workers.values())
        total_active = sum(w.current_tasks for w in self.workers.values())
        return total_active / total_capacity if total_capacity > 0 else 1.0

    async def submit(self, task: Task) -> bool:
        """Submit a task. Returns False if backpressure rejects it."""
        load = self.system_load()

        if load >= self.backpressure_threshold:
            # Check if any worker can handle this type
            candidates = self._type_map.get(task.task_type, [])
            available = [
                wid for wid in candidates
                if self.workers[wid].can_accept()
            ]

            if not available:
                return False  # Backpressure: reject task

        try:
            self.task_queue.put_nowait((task.priority, task.created_at, task))
            return True
        except asyncio.QueueFull:
            return False

    async def _dispatch_loop(self):
        """Main dispatch loop — assigns tasks to workers."""
        while self._running:
            try:
                priority, _, task = await asyncio.wait_for(
                    self.task_queue.get(), timeout=1.0
                )
            except asyncio.TimeoutError:
                continue

            # Find best worker for this task
            candidates = self._type_map.get(task.task_type, [])
            available = [
                self.workers[wid]
                for wid in candidates
                if self.workers[wid].can_accept()
            ]

            if not available:
                # Re-queue with slight delay
                await asyncio.sleep(0.5)
                await self.task_queue.put((priority, task.created_at, task))
                continue

            # Pick least loaded worker
            worker = min(available, key=lambda w: w.load_factor)
            asyncio.create_task(self._execute(worker, task))

    async def _execute(self, worker: WorkerAgent, task: Task):
        """Execute a task on a worker with timeout and error handling."""
        worker.current_tasks += 1
        if worker.load_factor > 0.8:
            worker.state = WorkerState.OVERLOADED
        else:
            worker.state = WorkerState.BUSY

        start = time.perf_counter()

        try:
            result = await asyncio.wait_for(
                worker.handler(task.payload),
                timeout=task.timeout_seconds,
            )

            duration = (time.perf_counter() - start) * 1000
            worker.total_completed += 1
            # Running average
            worker.avg_duration_ms = (
                worker.avg_duration_ms * 0.9 + duration * 0.1
            )

            self.results[task.task_id] = TaskResult(
                task_id=task.task_id,
                worker_id=worker.agent_id,
                success=True,
                result=result,
                duration_ms=round(duration, 2),
            )

        except asyncio.TimeoutError:
            worker.total_errors += 1
            self.results[task.task_id] = TaskResult(
                task_id=task.task_id,
                worker_id=worker.agent_id,
                success=False,
                error=f"Timeout after {task.timeout_seconds}s",
                duration_ms=task.timeout_seconds * 1000,
            )

        except Exception as e:
            worker.total_errors += 1
            duration = (time.perf_counter() - start) * 1000

            self.results[task.task_id] = TaskResult(
                task_id=task.task_id,
                worker_id=worker.agent_id,
                success=False,
                error=str(e),
                duration_ms=round(duration, 2),
            )

            # Mark worker as failed if error rate > 50%
            total = worker.total_completed + worker.total_errors
            if total > 5 and worker.total_errors / total > 0.5:
                worker.state = WorkerState.FAILED

        finally:
            worker.current_tasks -= 1
            if worker.current_tasks == 0 and worker.state != WorkerState.FAILED:
                worker.state = WorkerState.IDLE

    async def start(self):
        self._running = True
        asyncio.create_task(self._dispatch_loop())

    async def stop(self):
        self._running = False

    def dashboard(self) -> dict:
        return {
            "system_load": round(self.system_load() * 100, 1),
            "queue_size": self.task_queue.qsize(),
            "workers": {
                w.name: {
                    "state": w.state.value,
                    "load": f"{w.load_factor:.0%}",
                    "completed": w.total_completed,
                    "errors": w.total_errors,
                    "avg_ms": round(w.avg_duration_ms, 1),
                }
                for w in self.workers.values()
            },
        }
Enter fullscreen mode Exit fullscreen mode

The submit() returning False is the backpressure signal. The caller (your API, your UI, your upstream agent) decides what to do: queue externally, show a "busy" message, or drop the request.


Pattern 2: Shared State with Conflict Resolution

Multiple agents reading and writing shared state creates race conditions. Agent A reads user preferences, Agent B updates them, Agent A writes stale data back.

interface StateVersion {
  data: Record<string, unknown>;
  version: number;
  lastModifiedBy: string;
  timestamp: number;
}

interface StateConflict {
  key: string;
  currentVersion: number;
  attemptedVersion: number;
  currentValue: unknown;
  attemptedValue: unknown;
  modifiedBy: string;
}

type ConflictStrategy = "last-write-wins" | "first-write-wins" | "merge" | "reject";

class SharedAgentState {
  private store: Map<string, StateVersion> = new Map();
  private conflictStrategy: ConflictStrategy;
  private history: Map<string, StateVersion[]> = new Map();
  private locks: Map<string, { holder: string; acquired: number }> = new Map();

  constructor(strategy: ConflictStrategy = "reject") {
    this.conflictStrategy = strategy;
  }

  read(key: string): { data: unknown; version: number } | null {
    const entry = this.store.get(key);
    if (!entry) return null;
    return { data: structuredClone(entry.data), version: entry.version };
  }

  write(
    key: string,
    data: Record<string, unknown>,
    agentId: string,
    expectedVersion?: number
  ): { success: boolean; conflict?: StateConflict; version: number } {
    const current = this.store.get(key);

    // New key — just write
    if (!current) {
      const version: StateVersion = {
        data,
        version: 1,
        lastModifiedBy: agentId,
        timestamp: Date.now(),
      };
      this.store.set(key, version);
      this.appendHistory(key, version);
      return { success: true, version: 1 };
    }

    // Version check
    if (expectedVersion !== undefined && expectedVersion !== current.version) {
      const conflict: StateConflict = {
        key,
        currentVersion: current.version,
        attemptedVersion: expectedVersion,
        currentValue: current.data,
        attemptedValue: data,
        modifiedBy: current.lastModifiedBy,
      };

      return this.resolveConflict(key, conflict, data, agentId);
    }

    // No version check or version matches — write
    const newVersion: StateVersion = {
      data,
      version: (current?.version ?? 0) + 1,
      lastModifiedBy: agentId,
      timestamp: Date.now(),
    };
    this.store.set(key, newVersion);
    this.appendHistory(key, newVersion);
    return { success: true, version: newVersion.version };
  }

  private resolveConflict(
    key: string,
    conflict: StateConflict,
    newData: Record<string, unknown>,
    agentId: string
  ): { success: boolean; conflict?: StateConflict; version: number } {
    const current = this.store.get(key)!;

    switch (this.conflictStrategy) {
      case "last-write-wins": {
        const version: StateVersion = {
          data: newData,
          version: current.version + 1,
          lastModifiedBy: agentId,
          timestamp: Date.now(),
        };
        this.store.set(key, version);
        this.appendHistory(key, version);
        return { success: true, version: version.version };
      }

      case "first-write-wins":
        return { success: false, conflict, version: current.version };

      case "merge": {
        // Deep merge: current wins on conflicts, new adds missing keys
        const merged = this.deepMerge(
          current.data as Record<string, unknown>,
          newData
        );
        const version: StateVersion = {
          data: merged,
          version: current.version + 1,
          lastModifiedBy: `${current.lastModifiedBy}+${agentId}`,
          timestamp: Date.now(),
        };
        this.store.set(key, version);
        this.appendHistory(key, version);
        return { success: true, version: version.version };
      }

      case "reject":
      default:
        return { success: false, conflict, version: current.version };
    }
  }

  private deepMerge(
    target: Record<string, unknown>,
    source: Record<string, unknown>
  ): Record<string, unknown> {
    const result = { ...target };
    for (const key of Object.keys(source)) {
      if (
        key in result &&
        typeof result[key] === "object" &&
        typeof source[key] === "object" &&
        result[key] !== null &&
        source[key] !== null
      ) {
        result[key] = this.deepMerge(
          result[key] as Record<string, unknown>,
          source[key] as Record<string, unknown>
        );
      } else if (!(key in result)) {
        result[key] = source[key];
      }
      // If key exists in both and isn't object: target wins
    }
    return result;
  }

  // Optimistic locking helper
  async acquireLock(
    key: string,
    agentId: string,
    timeoutMs: number = 5000
  ): Promise<boolean> {
    const deadline = Date.now() + timeoutMs;

    while (Date.now() < deadline) {
      const existing = this.locks.get(key);

      if (!existing || Date.now() - existing.acquired > 30000) {
        // No lock or stale lock
        this.locks.set(key, { holder: agentId, acquired: Date.now() });
        return true;
      }

      if (existing.holder === agentId) return true; // Re-entrant

      await new Promise((resolve) => setTimeout(resolve, 100));
    }

    return false;
  }

  releaseLock(key: string, agentId: string): boolean {
    const lock = this.locks.get(key);
    if (lock?.holder === agentId) {
      this.locks.delete(key);
      return true;
    }
    return false;
  }

  private appendHistory(key: string, version: StateVersion): void {
    const hist = this.history.get(key) ?? [];
    hist.push(structuredClone(version));
    // Keep last 20 versions
    if (hist.length > 20) hist.shift();
    this.history.set(key, hist);
  }

  getHistory(key: string): StateVersion[] {
    return this.history.get(key) ?? [];
  }
}
Enter fullscreen mode Exit fullscreen mode

For most agent systems, "merge" strategy works best — agents contribute different fields to the same state object without overwriting each other.


Pattern 3: Cost-Aware Task Routing

Not all LLM calls are equal. A simple classification task doesn't need GPT-4o. A complex reasoning task shouldn't go to a cheap model. Route tasks based on estimated cost and complexity.

from dataclasses import dataclass
from typing import Optional
from enum import Enum


class Complexity(Enum):
    TRIVIAL = 1    # Classification, extraction
    SIMPLE = 2     # Summarization, Q&A
    MODERATE = 3   # Multi-step reasoning
    COMPLEX = 4    # Code generation, analysis
    EXPERT = 5     # Architecture, novel solutions


@dataclass
class ModelConfig:
    name: str
    provider: str
    cost_per_1k_input: float   # USD
    cost_per_1k_output: float  # USD
    max_context: int
    speed_tokens_per_sec: float
    min_complexity: Complexity
    max_complexity: Complexity


@dataclass
class TaskEstimate:
    estimated_input_tokens: int
    estimated_output_tokens: int
    complexity: Complexity
    requires_tools: bool = False
    requires_vision: bool = False
    max_latency_ms: Optional[float] = None


class CostAwareRouter:
    def __init__(self, monthly_budget_usd: float = 100.0):
        self.models: list[ModelConfig] = []
        self.monthly_budget = monthly_budget_usd
        self.month_spend: float = 0.0
        self.call_log: list[dict] = []

    def add_model(self, config: ModelConfig) -> None:
        self.models.append(config)

    def estimate_cost(self, model: ModelConfig, task: TaskEstimate) -> float:
        input_cost = (task.estimated_input_tokens / 1000) * model.cost_per_1k_input
        output_cost = (task.estimated_output_tokens / 1000) * model.cost_per_1k_output
        return round(input_cost + output_cost, 6)

    def estimate_latency(self, model: ModelConfig, task: TaskEstimate) -> float:
        """Estimated latency in ms."""
        total_tokens = task.estimated_input_tokens + task.estimated_output_tokens
        seconds = total_tokens / model.speed_tokens_per_sec
        return seconds * 1000

    def route(self, task: TaskEstimate) -> ModelConfig:
        """Pick the cheapest model that meets all requirements."""
        candidates = []

        for model in self.models:
            # Filter by complexity range
            if task.complexity.value < model.min_complexity.value:
                continue
            if task.complexity.value > model.max_complexity.value:
                continue

            # Filter by context size
            total_tokens = task.estimated_input_tokens + task.estimated_output_tokens
            if total_tokens > model.max_context:
                continue

            # Filter by latency requirement
            if task.max_latency_ms:
                estimated_latency = self.estimate_latency(model, task)
                if estimated_latency > task.max_latency_ms:
                    continue

            cost = self.estimate_cost(model, task)

            # Budget check
            if self.month_spend + cost > self.monthly_budget:
                continue

            candidates.append((model, cost))

        if not candidates:
            # Fallback: pick cheapest regardless of complexity match
            fallback = min(self.models, key=lambda m: m.cost_per_1k_input)
            return fallback

        # Sort by cost, pick cheapest
        candidates.sort(key=lambda x: x[1])
        chosen_model, chosen_cost = candidates[0]

        self.month_spend += chosen_cost
        self.call_log.append({
            "model": chosen_model.name,
            "cost": chosen_cost,
            "complexity": task.complexity.value,
        })

        return chosen_model

    def budget_report(self) -> dict:
        return {
            "monthly_budget": self.monthly_budget,
            "spent": round(self.month_spend, 4),
            "remaining": round(self.monthly_budget - self.month_spend, 4),
            "utilization_pct": round(
                self.month_spend / self.monthly_budget * 100, 1
            ),
            "total_calls": len(self.call_log),
            "by_model": self._group_by_model(),
        }

    def _group_by_model(self) -> dict:
        groups: dict[str, dict] = {}
        for entry in self.call_log:
            name = entry["model"]
            if name not in groups:
                groups[name] = {"calls": 0, "total_cost": 0}
            groups[name]["calls"] += 1
            groups[name]["total_cost"] = round(
                groups[name]["total_cost"] + entry["cost"], 4
            )
        return groups


# Setup
router = CostAwareRouter(monthly_budget_usd=50.0)

router.add_model(ModelConfig(
    name="gpt-4.1-nano",
    provider="openai",
    cost_per_1k_input=0.00010,
    cost_per_1k_output=0.00040,
    max_context=1_048_576,
    speed_tokens_per_sec=300,
    min_complexity=Complexity.TRIVIAL,
    max_complexity=Complexity.SIMPLE,
))

router.add_model(ModelConfig(
    name="gpt-4.1-mini",
    provider="openai",
    cost_per_1k_input=0.0004,
    cost_per_1k_output=0.0016,
    max_context=1_048_576,
    speed_tokens_per_sec=200,
    min_complexity=Complexity.TRIVIAL,
    max_complexity=Complexity.MODERATE,
))

router.add_model(ModelConfig(
    name="claude-sonnet-4",
    provider="anthropic",
    cost_per_1k_input=0.003,
    cost_per_1k_output=0.015,
    max_context=200_000,
    speed_tokens_per_sec=150,
    min_complexity=Complexity.MODERATE,
    max_complexity=Complexity.EXPERT,
))

router.add_model(ModelConfig(
    name="claude-opus-4",
    provider="anthropic",
    cost_per_1k_input=0.015,
    cost_per_1k_output=0.075,
    max_context=200_000,
    speed_tokens_per_sec=80,
    min_complexity=Complexity.COMPLEX,
    max_complexity=Complexity.EXPERT,
))

# Route a task
task = TaskEstimate(
    estimated_input_tokens=2000,
    estimated_output_tokens=500,
    complexity=Complexity.SIMPLE,
    max_latency_ms=3000,
)

model = router.route(task)
print(f"Routed to: {model.name} (${router.estimate_cost(model, task):.4f})")
# → Routed to: gpt-4.1-nano ($0.0004)
Enter fullscreen mode Exit fullscreen mode

This pattern alone can cut your LLM costs by 60-80%. Most agent tasks are TRIVIAL or SIMPLE — don't send them to your expensive model.


Pattern 4: Agent Memory with Decay

Long-running agents accumulate context. But not all context ages equally. A user preference from 5 minutes ago matters more than a search result from 2 hours ago.

import time
import math
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum


class MemoryType(Enum):
    FACT = "fact"               # User said X, system state
    INSTRUCTION = "instruction" # User preferences, rules
    RESULT = "result"           # Tool results, search hits
    CONVERSATION = "conversation"  # Chat messages
    DECISION = "decision"       # Agent's past decisions


@dataclass
class MemoryEntry:
    content: str
    memory_type: MemoryType
    created_at: float = field(default_factory=time.time)
    last_accessed: float = field(default_factory=time.time)
    access_count: int = 0
    importance: float = 1.0  # 0.0 to 1.0
    metadata: dict = field(default_factory=dict)

    def relevance_score(self, now: Optional[float] = None) -> float:
        """Calculate current relevance with time decay."""
        now = now or time.time()
        age_hours = (now - self.created_at) / 3600
        recency_hours = (now - self.last_accessed) / 3600

        # Different decay rates per type
        decay_rates = {
            MemoryType.FACT: 0.01,          # Very slow decay
            MemoryType.INSTRUCTION: 0.005,   # Almost permanent
            MemoryType.RESULT: 0.1,          # Fast decay
            MemoryType.CONVERSATION: 0.05,   # Moderate decay
            MemoryType.DECISION: 0.02,       # Slow decay
        }

        decay_rate = decay_rates.get(self.memory_type, 0.05)

        # Exponential decay based on age
        time_factor = math.exp(-decay_rate * age_hours)

        # Boost from recent access
        recency_boost = math.exp(-0.1 * recency_hours)

        # Frequency boost (diminishing returns)
        frequency_boost = math.log(1 + self.access_count) / 5

        score = (
            self.importance * 0.4 +
            time_factor * 0.3 +
            recency_boost * 0.2 +
            frequency_boost * 0.1
        )

        return min(1.0, max(0.0, score))


class AgentMemory:
    def __init__(
        self,
        max_entries: int = 500,
        max_context_tokens: int = 8000,
        eviction_threshold: float = 0.1,
    ):
        self.entries: list[MemoryEntry] = []
        self.max_entries = max_entries
        self.max_context_tokens = max_context_tokens
        self.eviction_threshold = eviction_threshold

    def add(
        self,
        content: str,
        memory_type: MemoryType,
        importance: float = 0.5,
        metadata: Optional[dict] = None,
    ) -> MemoryEntry:
        entry = MemoryEntry(
            content=content,
            memory_type=memory_type,
            importance=importance,
            metadata=metadata or {},
        )
        self.entries.append(entry)

        # Evict if over capacity
        if len(self.entries) > self.max_entries:
            self._evict()

        return entry

    def recall(
        self,
        top_k: int = 20,
        memory_types: Optional[list[MemoryType]] = None,
        min_relevance: float = 0.0,
    ) -> list[MemoryEntry]:
        """Retrieve most relevant memories."""
        now = time.time()
        candidates = self.entries

        if memory_types:
            candidates = [e for e in candidates if e.memory_type in memory_types]

        # Score and sort
        scored = [
            (entry, entry.relevance_score(now))
            for entry in candidates
        ]
        scored = [(e, s) for e, s in scored if s >= min_relevance]
        scored.sort(key=lambda x: x[1], reverse=True)

        # Update access stats
        results = []
        for entry, score in scored[:top_k]:
            entry.last_accessed = now
            entry.access_count += 1
            results.append(entry)

        return results

    def build_context(
        self,
        max_tokens: Optional[int] = None,
    ) -> str:
        """Build a context string from memories, fitting within token budget."""
        max_tokens = max_tokens or self.max_context_tokens
        memories = self.recall(top_k=50)

        # Group by type for organized context
        groups: dict[MemoryType, list[MemoryEntry]] = {}
        for entry in memories:
            groups.setdefault(entry.memory_type, []).append(entry)

        sections = []
        total_chars = 0
        char_budget = max_tokens * 4  # Rough token estimate

        # Priority order for context building
        type_order = [
            MemoryType.INSTRUCTION,
            MemoryType.DECISION,
            MemoryType.FACT,
            MemoryType.CONVERSATION,
            MemoryType.RESULT,
        ]

        for mem_type in type_order:
            if mem_type not in groups:
                continue

            section_entries = groups[mem_type]
            header = f"## {mem_type.value.title()}s"
            section_lines = [header]

            for entry in section_entries:
                line = f"- {entry.content}"
                if total_chars + len(line) > char_budget:
                    break
                section_lines.append(line)
                total_chars += len(line)

            if len(section_lines) > 1:
                sections.append("\n".join(section_lines))

        return "\n\n".join(sections)

    def _evict(self):
        """Remove lowest-relevance entries."""
        now = time.time()
        self.entries.sort(key=lambda e: e.relevance_score(now), reverse=True)

        # Keep only top max_entries
        evicted = self.entries[self.max_entries:]
        self.entries = self.entries[:self.max_entries]

        if evicted:
            print(f"🧹 Evicted {len(evicted)} low-relevance memories")

    def stats(self) -> dict:
        now = time.time()
        scores = [e.relevance_score(now) for e in self.entries]
        return {
            "total_entries": len(self.entries),
            "by_type": {
                t.value: sum(1 for e in self.entries if e.memory_type == t)
                for t in MemoryType
            },
            "avg_relevance": round(sum(scores) / len(scores), 3) if scores else 0,
            "min_relevance": round(min(scores), 3) if scores else 0,
            "max_relevance": round(max(scores), 3) if scores else 0,
        }


# Usage
memory = AgentMemory(max_entries=200, max_context_tokens=4000)

# User preferences decay very slowly
memory.add(
    "User prefers Python over JavaScript",
    MemoryType.INSTRUCTION,
    importance=0.9,
)

# Search results decay fast
memory.add(
    "Found 3 repos matching 'MCP server': repo-a (234 stars), repo-b (89 stars), repo-c (12 stars)",
    MemoryType.RESULT,
    importance=0.4,
)

# Build context for next LLM call
context = memory.build_context(max_tokens=2000)
Enter fullscreen mode Exit fullscreen mode

The decay rates per type make all the difference. Instructions should persist almost forever. Search results should fade quickly.


Pattern 5: Agent Checkpoint and Recovery

Long-running agent workflows crash. Network blip, OOM, timeout. Without checkpointing, you lose all progress and start over.

interface Checkpoint {
  workflowId: string;
  stepIndex: number;
  stepName: string;
  state: Record<string, unknown>;
  completedSteps: string[];
  timestamp: number;
  metadata: Record<string, unknown>;
}

interface WorkflowStep {
  name: string;
  handler: (state: Record<string, unknown>) => Promise<Record<string, unknown>>;
  retryable: boolean;
  maxRetries: number;
  timeoutMs: number;
}

class CheckpointedWorkflow {
  private steps: WorkflowStep[] = [];
  private checkpoints: Map<string, Checkpoint> = new Map();
  private storage: CheckpointStorage;

  constructor(storage: CheckpointStorage) {
    this.storage = storage;
  }

  addStep(step: WorkflowStep): this {
    this.steps.push(step);
    return this;
  }

  async execute(
    workflowId: string,
    initialState: Record<string, unknown> = {}
  ): Promise<Record<string, unknown>> {
    // Check for existing checkpoint
    let checkpoint = await this.storage.load(workflowId);
    let state = checkpoint?.state ?? initialState;
    let startIndex = checkpoint ? checkpoint.stepIndex + 1 : 0;
    let completedSteps = checkpoint?.completedSteps ?? [];

    if (checkpoint) {
      console.log(
        `♻️ Resuming workflow "${workflowId}" from step ${startIndex} ` +
        `(${completedSteps.length}/${this.steps.length} complete)`
      );
    }

    for (let i = startIndex; i < this.steps.length; i++) {
      const step = this.steps[i];
      let attempts = 0;
      let lastError: Error | null = null;

      while (attempts <= step.maxRetries) {
        try {
          console.log(
            `▶️ Step ${i + 1}/${this.steps.length}: ${step.name} ` +
            `(attempt ${attempts + 1}/${step.maxRetries + 1})`
          );

          const result = await Promise.race([
            step.handler(state),
            this.timeout(step.timeoutMs, step.name),
          ]);

          state = { ...state, ...result };
          completedSteps.push(step.name);

          // Save checkpoint
          const cp: Checkpoint = {
            workflowId,
            stepIndex: i,
            stepName: step.name,
            state,
            completedSteps,
            timestamp: Date.now(),
            metadata: { attempts: attempts + 1 },
          };
          await this.storage.save(cp);

          break; // Step succeeded
        } catch (error) {
          lastError = error as Error;
          attempts++;

          if (!step.retryable || attempts > step.maxRetries) {
            // Save failed state for debugging
            await this.storage.save({
              workflowId,
              stepIndex: i,
              stepName: step.name,
              state: {
                ...state,
                _failedAt: step.name,
                _error: lastError.message,
                _attempts: attempts,
              },
              completedSteps,
              timestamp: Date.now(),
              metadata: { failed: true },
            });

            throw new WorkflowStepError(
              step.name,
              i,
              lastError,
              completedSteps
            );
          }

          // Exponential backoff
          const delay = Math.min(1000 * Math.pow(2, attempts), 30000);
          console.log(
            `⚠️ Step "${step.name}" failed (attempt ${attempts}), ` +
            `retrying in ${delay}ms: ${lastError.message}`
          );
          await new Promise((r) => setTimeout(r, delay));
        }
      }
    }

    // Workflow complete — clean up checkpoint
    await this.storage.delete(workflowId);
    console.log(`✅ Workflow "${workflowId}" complete`);

    return state;
  }

  private timeout(ms: number, stepName: string): Promise<never> {
    return new Promise((_, reject) =>
      setTimeout(
        () => reject(new Error(`Step "${stepName}" timed out after ${ms}ms`)),
        ms
      )
    );
  }
}

class WorkflowStepError extends Error {
  constructor(
    public stepName: string,
    public stepIndex: number,
    public cause: Error,
    public completedSteps: string[]
  ) {
    super(
      `Workflow failed at step "${stepName}" (${stepIndex}): ${cause.message}. ` +
      `Completed: [${completedSteps.join(", ")}]`
    );
  }
}

// Simple file-based storage (swap for Redis/DB in production)
interface CheckpointStorage {
  save(checkpoint: Checkpoint): Promise<void>;
  load(workflowId: string): Promise<Checkpoint | null>;
  delete(workflowId: string): Promise<void>;
}

class FileCheckpointStorage implements CheckpointStorage {
  constructor(private dir: string = "./checkpoints") {}

  async save(checkpoint: Checkpoint): Promise<void> {
    const fs = await import("fs/promises");
    await fs.mkdir(this.dir, { recursive: true });
    const path = `${this.dir}/${checkpoint.workflowId}.json`;
    await fs.writeFile(path, JSON.stringify(checkpoint, null, 2));
  }

  async load(workflowId: string): Promise<Checkpoint | null> {
    const fs = await import("fs/promises");
    const path = `${this.dir}/${workflowId}.json`;
    try {
      const data = await fs.readFile(path, "utf-8");
      return JSON.parse(data);
    } catch {
      return null;
    }
  }

  async delete(workflowId: string): Promise<void> {
    const fs = await import("fs/promises");
    const path = `${this.dir}/${workflowId}.json`;
    try {
      await fs.unlink(path);
    } catch {
      // Already deleted
    }
  }
}

// Usage: research agent workflow
const workflow = new CheckpointedWorkflow(new FileCheckpointStorage());

workflow
  .addStep({
    name: "gather_requirements",
    handler: async (state) => {
      // Parse user query, extract entities
      return { entities: ["MCP", "production", "patterns"], query: state.userQuery };
    },
    retryable: false,
    maxRetries: 0,
    timeoutMs: 10000,
  })
  .addStep({
    name: "search_sources",
    handler: async (state) => {
      // Search across multiple sources
      return { sources: ["arxiv:123", "github:456", "blog:789"] };
    },
    retryable: true,
    maxRetries: 3,
    timeoutMs: 30000,
  })
  .addStep({
    name: "synthesize_results",
    handler: async (state) => {
      // LLM call to synthesize
      return { synthesis: "..." };
    },
    retryable: true,
    maxRetries: 2,
    timeoutMs: 60000,
  })
  .addStep({
    name: "format_output",
    handler: async (state) => {
      return { finalReport: `Report based on ${(state.sources as string[]).length} sources` };
    },
    retryable: false,
    maxRetries: 0,
    timeoutMs: 5000,
  });

// If this crashes at step 2, next run resumes from step 2
const result = await workflow.execute("research-mcp-patterns", {
  userQuery: "Best practices for MCP in production",
});
Enter fullscreen mode Exit fullscreen mode

The resume behavior is automatic. Crash at step 3? Next run loads the checkpoint and starts at step 3 with all previous state intact.


Pattern 6: Token Budget Allocator Across Agent Teams

When you have a team of agents working on a shared task, one chatty agent can consume the entire token budget. A budget allocator ensures fair distribution.

from dataclasses import dataclass, field
from typing import Optional
import time
import threading


@dataclass
class AgentBudget:
    agent_id: str
    allocated_tokens: int
    used_tokens: int = 0
    priority: float = 1.0  # Higher = gets more budget on reallocation
    last_activity: float = field(default_factory=time.time)

    @property
    def remaining(self) -> int:
        return max(0, self.allocated_tokens - self.used_tokens)

    @property
    def utilization(self) -> float:
        if self.allocated_tokens == 0:
            return 0
        return self.used_tokens / self.allocated_tokens


class TokenBudgetAllocator:
    def __init__(self, total_budget: int):
        self.total_budget = total_budget
        self.agents: dict[str, AgentBudget] = {}
        self._lock = threading.Lock()

    def register_agent(
        self,
        agent_id: str,
        priority: float = 1.0,
        initial_allocation_pct: Optional[float] = None,
    ) -> AgentBudget:
        with self._lock:
            if initial_allocation_pct:
                allocation = int(self.total_budget * initial_allocation_pct)
            else:
                # Equal split among all agents
                n_agents = len(self.agents) + 1
                allocation = self.total_budget // n_agents

                # Rebalance existing agents
                for existing in self.agents.values():
                    existing.allocated_tokens = self.total_budget // n_agents

            budget = AgentBudget(
                agent_id=agent_id,
                allocated_tokens=allocation,
                priority=priority,
            )
            self.agents[agent_id] = budget
            return budget

    def request_tokens(
        self,
        agent_id: str,
        tokens_needed: int,
    ) -> tuple[bool, int]:
        """Request tokens. Returns (approved, tokens_granted)."""
        with self._lock:
            budget = self.agents.get(agent_id)
            if not budget:
                return False, 0

            if budget.remaining >= tokens_needed:
                budget.used_tokens += tokens_needed
                budget.last_activity = time.time()
                return True, tokens_needed

            # Try to steal from idle agents
            stolen = self._try_steal(agent_id, tokens_needed - budget.remaining)
            available = budget.remaining + stolen

            if available >= tokens_needed:
                budget.used_tokens += tokens_needed
                budget.allocated_tokens += stolen
                budget.last_activity = time.time()
                return True, tokens_needed

            # Partial grant
            if available > 0:
                budget.used_tokens += available
                budget.allocated_tokens += stolen
                budget.last_activity = time.time()
                return True, available

            return False, 0

    def _try_steal(self, requester_id: str, tokens_needed: int) -> int:
        """Try to reallocate tokens from idle/low-priority agents."""
        stolen = 0
        now = time.time()

        # Sort donors: idle first, then low priority
        donors = [
            (aid, ab)
            for aid, ab in self.agents.items()
            if aid != requester_id and ab.remaining > 0
        ]
        donors.sort(key=lambda x: (
            -(now - x[1].last_activity),  # Most idle first
            x[1].priority,                # Lowest priority first
        ))

        for donor_id, donor_budget in donors:
            if stolen >= tokens_needed:
                break

            # Don't steal more than 50% of donor's remaining
            stealable = min(
                donor_budget.remaining // 2,
                tokens_needed - stolen,
            )

            if stealable > 0:
                donor_budget.allocated_tokens -= stealable
                stolen += stealable

        return stolen

    def report_usage(
        self,
        agent_id: str,
        actual_tokens_used: int,
    ) -> None:
        """Report actual usage after an LLM call (for accurate tracking)."""
        with self._lock:
            budget = self.agents.get(agent_id)
            if budget:
                # Adjust if actual differs from requested
                budget.used_tokens = max(0, budget.used_tokens)
                budget.last_activity = time.time()

    def dashboard(self) -> dict:
        total_used = sum(a.used_tokens for a in self.agents.values())
        total_allocated = sum(a.allocated_tokens for a in self.agents.values())

        return {
            "total_budget": self.total_budget,
            "total_allocated": total_allocated,
            "total_used": total_used,
            "budget_remaining": self.total_budget - total_used,
            "agents": {
                aid: {
                    "allocated": ab.allocated_tokens,
                    "used": ab.used_tokens,
                    "remaining": ab.remaining,
                    "utilization": f"{ab.utilization:.0%}",
                    "priority": ab.priority,
                }
                for aid, ab in self.agents.items()
            },
        }


# Usage
allocator = TokenBudgetAllocator(total_budget=100_000)

allocator.register_agent("researcher", priority=2.0)
allocator.register_agent("writer", priority=1.5)
allocator.register_agent("reviewer", priority=1.0)

# Researcher requests tokens for a big search
approved, granted = allocator.request_tokens("researcher", 15000)
print(f"Researcher: approved={approved}, granted={granted}")

# Writer requests tokens — might steal from idle reviewer
approved, granted = allocator.request_tokens("writer", 20000)
print(f"Writer: approved={approved}, granted={granted}")

print(allocator.dashboard())
Enter fullscreen mode Exit fullscreen mode

The token-stealing mechanism is what makes this work in practice. Idle agents shouldn't hoard tokens while active agents starve.


Pattern 7: Dead Letter Queue for Failed Agent Tasks

When an agent task fails after all retries, it shouldn't just disappear. A dead letter queue captures failed tasks for analysis, replay, or manual resolution.

import json
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Optional, Callable
from pathlib import Path
from datetime import datetime, timezone


@dataclass
class DeadLetter:
    letter_id: str
    task_type: str
    payload: dict
    error: str
    error_type: str
    agent_id: str
    attempts: int
    first_attempt_at: float
    last_attempt_at: float
    context: dict = field(default_factory=dict)
    resolved: bool = False
    resolution: Optional[str] = None

    def age_hours(self) -> float:
        return (time.time() - self.first_attempt_at) / 3600


class DeadLetterQueue:
    def __init__(
        self,
        storage_dir: str = "./dead_letters",
        max_age_hours: float = 72.0,
        alert_callback: Optional[Callable] = None,
    ):
        self.storage_dir = Path(storage_dir)
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        self.max_age_hours = max_age_hours
        self.alert_callback = alert_callback
        self._letters: dict[str, DeadLetter] = {}
        self._load_existing()

    def _load_existing(self):
        for f in self.storage_dir.glob("*.json"):
            try:
                data = json.loads(f.read_text())
                letter = DeadLetter(**data)
                self._letters[letter.letter_id] = letter
            except Exception:
                pass

    def add(
        self,
        task_type: str,
        payload: dict,
        error: Exception,
        agent_id: str,
        attempts: int,
        context: Optional[dict] = None,
    ) -> DeadLetter:
        letter = DeadLetter(
            letter_id=f"dl-{uuid.uuid4().hex[:10]}",
            task_type=task_type,
            payload=payload,
            error=str(error),
            error_type=type(error).__name__,
            agent_id=agent_id,
            attempts=attempts,
            first_attempt_at=time.time(),
            last_attempt_at=time.time(),
            context=context or {},
        )

        self._letters[letter.letter_id] = letter
        self._persist(letter)

        if self.alert_callback:
            self.alert_callback(letter)

        return letter

    def list_unresolved(
        self,
        task_type: Optional[str] = None,
        limit: int = 50,
    ) -> list[DeadLetter]:
        letters = [
            l for l in self._letters.values()
            if not l.resolved
        ]

        if task_type:
            letters = [l for l in letters if l.task_type == task_type]

        letters.sort(key=lambda l: l.last_attempt_at, reverse=True)
        return letters[:limit]

    async def replay(
        self,
        letter_id: str,
        handler: Callable,
        modify_payload: Optional[Callable] = None,
    ) -> tuple[bool, Any]:
        """Replay a dead letter task."""
        letter = self._letters.get(letter_id)
        if not letter:
            raise ValueError(f"Dead letter {letter_id} not found")

        payload = letter.payload
        if modify_payload:
            payload = modify_payload(payload)

        try:
            result = await handler(payload)
            letter.resolved = True
            letter.resolution = f"Replayed successfully at {datetime.now(timezone.utc).isoformat()}"
            self._persist(letter)
            return True, result
        except Exception as e:
            letter.last_attempt_at = time.time()
            letter.attempts += 1
            letter.error = str(e)
            self._persist(letter)
            return False, str(e)

    def resolve(self, letter_id: str, resolution: str) -> bool:
        """Manually resolve a dead letter."""
        letter = self._letters.get(letter_id)
        if not letter:
            return False
        letter.resolved = True
        letter.resolution = resolution
        self._persist(letter)
        return True

    def purge_old(self) -> int:
        """Remove resolved letters older than max_age_hours."""
        cutoff = time.time() - (self.max_age_hours * 3600)
        to_remove = [
            lid for lid, l in self._letters.items()
            if l.resolved and l.last_attempt_at < cutoff
        ]

        for lid in to_remove:
            del self._letters[lid]
            path = self.storage_dir / f"{lid}.json"
            path.unlink(missing_ok=True)

        return len(to_remove)

    def summary(self) -> dict:
        unresolved = [l for l in self._letters.values() if not l.resolved]
        resolved = [l for l in self._letters.values() if l.resolved]

        by_type: dict[str, int] = {}
        by_error: dict[str, int] = {}
        for l in unresolved:
            by_type[l.task_type] = by_type.get(l.task_type, 0) + 1
            by_error[l.error_type] = by_error.get(l.error_type, 0) + 1

        return {
            "total": len(self._letters),
            "unresolved": len(unresolved),
            "resolved": len(resolved),
            "by_task_type": by_type,
            "by_error_type": by_error,
            "oldest_unresolved_hours": round(
                max((l.age_hours() for l in unresolved), default=0), 1
            ),
        }

    def _persist(self, letter: DeadLetter):
        path = self.storage_dir / f"{letter.letter_id}.json"
        path.write_text(json.dumps(asdict(letter), indent=2, default=str))


# Integration with agent system
dlq = DeadLetterQueue(
    storage_dir="./dead_letters",
    alert_callback=lambda l: print(
        f"🚨 Dead letter: {l.task_type} failed after {l.attempts} attempts: {l.error[:100]}"
    ),
)

# In your agent error handler:
async def run_agent_task(task_type: str, payload: dict, agent_id: str):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return await execute_task(task_type, payload)
        except Exception as e:
            if attempt == max_retries - 1:
                dlq.add(
                    task_type=task_type,
                    payload=payload,
                    error=e,
                    agent_id=agent_id,
                    attempts=max_retries,
                    context={"last_attempt": attempt + 1},
                )
                raise

# Check DLQ status
print(dlq.summary())
# → {'total': 3, 'unresolved': 2, 'by_task_type': {'search': 1, 'synthesize': 1}, ...}
Enter fullscreen mode Exit fullscreen mode

The replay mechanism is the killer feature. Found the bug? Fix it and replay the dead letter — no need to recreate the original request.


Putting It All Together: The Agent Operating System

These 7 patterns form a layered architecture for multi-agent systems:

┌─────────────────────────────────────────────┐
│              Your Application               │
├─────────────────────────────────────────────┤
│  [Cost Router] → Pick the right model       │
│  [Budget Allocator] → Fair token sharing     │
├─────────────────────────────────────────────┤
│  [Supervisor + Backpressure]                 │
│  [Shared State + Conflict Resolution]        │
├─────────────────────────────────────────────┤
│  [Agent Memory with Decay]                   │
│  [Checkpoint + Recovery]                     │
├─────────────────────────────────────────────┤
│  [Dead Letter Queue]                         │
│  Observability + Alerting                    │
└─────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Each layer solves a different failure mode:

  • Cost Router: Prevents budget blowouts
  • Budget Allocator: Prevents resource starvation
  • Supervisor: Prevents overload
  • Shared State: Prevents data corruption
  • Memory: Prevents context pollution
  • Checkpoint: Prevents lost progress
  • Dead Letter Queue: Prevents silent failures

Key Takeaways

  1. Backpressure stops cascading failures when agents can't keep up
  2. Conflict resolution on shared state prevents stale writes
  3. Cost-aware routing can cut LLM spend by 60-80%
  4. Memory decay keeps context fresh and relevant
  5. Checkpointing makes long workflows crash-safe
  6. Token budgets prevent chatty agents from starving the team
  7. Dead letter queues capture failures for replay and debugging

Building a single agent is easy. Building a system of agents that runs reliably is engineering. These patterns are the difference.


Want production-ready implementations of these patterns plus agent templates for LangGraph, CrewAI, and AutoGen? Check out the AI Dev Toolkit — ship multi-agent systems that actually work in production.

Top comments (0)