Running multiple LLM agents together is not about making the system smarter. It is about specialization. One agent is good at writing. Another is good at research. A third checks facts. You coordinate them so each does the job it is built for.
The coordination patterns are not new. They come from distributed systems: routing, pipelines, fan-out, supervision. What changes with LLM agents is that coordination decisions often happen inside a model context, and the failure modes are less predictable than classic RPC.
This post covers four patterns. Each is concrete: code, failure modes, when it applies. No framework required. These patterns work with any LLM client, any tool registry, any Python agent implementation.
Pattern 1: Router
One input. Route to a specialist agent based on intent.
The router reads the user request and decides which downstream agent handles it. The router itself is a small LLM call or a classifier. It should not do the work.
# pip install agent-event-bus token-budget-py agent-decision-log
from agent_event_bus import EventBus
from agent_decision_log import DecisionLog
bus = EventBus()
log = DecisionLog(path="decisions.jsonl")
# Register specialist handlers
@bus.on("intent:code_review")
async def handle_code_review(event):
result = await code_review_agent.run(event["payload"])
return result
@bus.on("intent:summarize")
async def handle_summarize(event):
result = await summarizer_agent.run(event["payload"])
return result
@bus.on("intent:research")
async def handle_research(event):
result = await research_agent.run(event["payload"])
return result
async def route_request(user_input: str) -> str:
# Small, fast model for routing. Not the full reasoning model.
intent = await router_model.classify(user_input)
log.record(
decision="route",
input=user_input,
output=intent,
reason="classifier confidence above 0.8",
)
result = await bus.emit(f"intent:{intent}", payload=user_input)
return result
The router model should be cheap and fast. A fine-tuned classifier, a small model, or a simple embedding-based matcher. Do not use your most expensive model for routing. Save that budget for specialist agents.
Pattern 2: Pipeline
Sequential. Agent A's output becomes Agent B's input.
Pipelines work when the steps are ordered and each step refines the previous output. Research then summarize then translate. Draft then review then polish.
from dataclasses import dataclass
from agent_decision_log import DecisionLog
from token_budget_py import BudgetPool
log = DecisionLog(path="pipeline_decisions.jsonl")
@dataclass
class PipelineResult:
stage: str
output: str
tokens_used: int
async def run_pipeline(raw_input: str, budget_usd: float = 0.10) -> list[PipelineResult]:
pool = BudgetPool(usd=budget_usd)
results = []
# Stage 1: research
async with pool.acquire(max_usd=0.04) as stage_budget:
research_out = await research_agent.run(
raw_input,
budget=stage_budget,
)
results.append(PipelineResult("research", research_out.text, research_out.tokens))
log.record(decision="stage_complete", stage="research", tokens=research_out.tokens)
# Stage 2: draft (uses research output)
async with pool.acquire(max_usd=0.04) as stage_budget:
draft_out = await draft_agent.run(
f"Research notes:\n{research_out.text}\n\nWrite a report.",
budget=stage_budget,
)
results.append(PipelineResult("draft", draft_out.text, draft_out.tokens))
log.record(decision="stage_complete", stage="draft", tokens=draft_out.tokens)
# Stage 3: review
async with pool.acquire(max_usd=0.02) as stage_budget:
review_out = await review_agent.run(
draft_out.text,
budget=stage_budget,
)
results.append(PipelineResult("review", review_out.text, review_out.tokens))
return results
The budget pool (token-budget-py) partitions spend across stages. If research burns through its allocation, the draft stage still has budget. If the total pool is exhausted, the pipeline halts cleanly rather than silently overspending.
Pattern 3: Broadcast
Same input sent to N agents. Aggregate results.
Use broadcast when you want multiple independent answers and then synthesize. Useful for research (multiple search strategies), fact checking (multiple verifiers), or generating variations.
import asyncio
from agent_event_bus import EventBus
from agenttrace import Tracer
bus = EventBus()
tracer = Tracer(export_path="traces/broadcast.jsonl")
async def broadcast_and_aggregate(query: str) -> dict:
with tracer.span("broadcast") as span:
# Fire all agents in parallel
tasks = [
research_agent_a.run(query),
research_agent_b.run(query),
research_agent_c.run(query),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions, log them
valid = []
for i, r in enumerate(results):
if isinstance(r, Exception):
span.add_event(f"agent_{i}_failed", error=str(r))
else:
valid.append(r)
if not valid:
raise RuntimeError("All broadcast agents failed")
# Synthesize: pass all valid results to an aggregator model
combined = "\n\n---\n\n".join(r.text for r in valid)
synthesis = await aggregator_agent.run(
f"Synthesize these research results:\n\n{combined}"
)
span.set_attribute("agents_succeeded", len(valid))
return {"synthesis": synthesis.text, "source_count": len(valid)}
Broadcast is expensive. N agents means N model calls in parallel. Use it when the quality gain is worth the cost. For most tasks, one good agent with good tools is cheaper and sufficient.
Pattern 4: Supervisor
One agent reviews another's output and requests revisions.
The supervisor pattern adds a quality gate. The supervisor model reads the worker's output and either approves it or returns a critique. The worker revises. This loops until approved or until a max-iteration limit fires.
from agent_deadline import Deadline
from agent_decision_log import DecisionLog
log = DecisionLog(path="supervisor_decisions.jsonl")
async def supervised_run(task: str, max_revisions: int = 3) -> str:
deadline = Deadline(seconds=120) # hard wall, not just iteration count
draft = await worker_agent.run(task)
iteration = 0
while not deadline.expired() and iteration < max_revisions:
review = await supervisor_agent.run(
f"Task: {task}\n\nSubmission:\n{draft.text}\n\n"
"Approve or return a numbered critique."
)
log.record(
decision="supervisor_review",
iteration=iteration,
approved="APPROVED" in review.text,
critique_length=len(review.text),
)
if "APPROVED" in review.text:
break
# Worker revises based on critique
draft = await worker_agent.run(
f"Original task: {task}\n\nYour previous attempt:\n{draft.text}\n\n"
f"Supervisor critique:\n{review.text}\n\n"
"Revise and improve."
)
iteration += 1
if deadline.expired():
log.record(decision="supervisor_timeout", iterations_completed=iteration)
return draft.text
The deadline is a hard cap. Without it, a demanding supervisor and a stubborn worker loop until your budget runs out. Set the deadline shorter than you think you need.
What Does NOT Work at Multi-Agent Scale
Shared context window. Agents do not share memory by default. If agent A discovers a fact, agent B does not know it unless you pass it explicitly. Context passing is your responsibility, not the framework's.
Budget partitioning across agents. If you give each agent its own budget, you cannot guarantee the total spend. Use a shared pool (token-budget-py BudgetPool) and let agents draw from it. If the pool is empty, agents block or fail fast.
Assuming agents finish in order. Parallel agents complete in arbitrary order. Your aggregation step needs to handle partial results and late arrivals. Use asyncio.gather with return_exceptions=True, not bare awaits.
Quick-Start Snippet
pip install agent-event-bus token-budget-py agent-decision-log agenttrace
from agent_event_bus import EventBus
from token_budget_py import BudgetPool
from agent_decision_log import DecisionLog
bus = EventBus()
pool = BudgetPool(usd=0.25)
log = DecisionLog(path="multi_agent.jsonl")
# Register handlers
@bus.on("task:summarize")
async def on_summarize(event):
async with pool.acquire(max_usd=0.05) as budget:
result = await summarizer.run(event["text"], budget=budget)
log.record(decision="task_complete", task="summarize", tokens=result.tokens)
return result.text
# Dispatch
result = await bus.emit("task:summarize", text="Your input here")
Related Libraries
| Library | What It Does |
|---|---|
| agent-event-bus | In-process pub/sub for routing events between agents |
| token-budget-py | Shared USD/token budget pool for multi-agent spend control |
| agent-deadline | Cooperative per-task deadline enforcement |
| agent-decision-log | Structured JSONL log of agent routing and review decisions |
| agentsnap | Snapshot traces for regression testing multi-agent flows |
| agenttrace | Cost and latency tracing across agent spans |
What's Next
Multi-agent systems fail differently than single-agent systems. When something goes wrong, you need to know which agent failed, what it was given, and what it returned. That is what agenttrace and agent-decision-log are for: structured, queryable records of every routing and review decision in the run.
The next gap after observability is error recovery. What happens when the research agent times out halfway through a pipeline? Post 113 covers the three recovery patterns that matter: retry, fallback, and graceful degrade.
Top comments (0)