DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Four Patterns for Multi-Agent Python Systems That Actually Work

Running multiple LLM agents together is not about making the system smarter. It is about specialization. One agent is good at writing. Another is good at research. A third checks facts. You coordinate them so each does the job it is built for.

The coordination patterns are not new. They come from distributed systems: routing, pipelines, fan-out, supervision. What changes with LLM agents is that coordination decisions often happen inside a model context, and the failure modes are less predictable than classic RPC.

This post covers four patterns. Each is concrete: code, failure modes, when it applies. No framework required. These patterns work with any LLM client, any tool registry, any Python agent implementation.

Pattern 1: Router

One input. Route to a specialist agent based on intent.

The router reads the user request and decides which downstream agent handles it. The router itself is a small LLM call or a classifier. It should not do the work.

# pip install agent-event-bus token-budget-py agent-decision-log

from agent_event_bus import EventBus
from agent_decision_log import DecisionLog

bus = EventBus()
log = DecisionLog(path="decisions.jsonl")

# Register specialist handlers
@bus.on("intent:code_review")
async def handle_code_review(event):
    result = await code_review_agent.run(event["payload"])
    return result

@bus.on("intent:summarize")
async def handle_summarize(event):
    result = await summarizer_agent.run(event["payload"])
    return result

@bus.on("intent:research")
async def handle_research(event):
    result = await research_agent.run(event["payload"])
    return result


async def route_request(user_input: str) -> str:
    # Small, fast model for routing. Not the full reasoning model.
    intent = await router_model.classify(user_input)

    log.record(
        decision="route",
        input=user_input,
        output=intent,
        reason="classifier confidence above 0.8",
    )

    result = await bus.emit(f"intent:{intent}", payload=user_input)
    return result
Enter fullscreen mode Exit fullscreen mode

The router model should be cheap and fast. A fine-tuned classifier, a small model, or a simple embedding-based matcher. Do not use your most expensive model for routing. Save that budget for specialist agents.

Pattern 2: Pipeline

Sequential. Agent A's output becomes Agent B's input.

Pipelines work when the steps are ordered and each step refines the previous output. Research then summarize then translate. Draft then review then polish.

from dataclasses import dataclass
from agent_decision_log import DecisionLog
from token_budget_py import BudgetPool

log = DecisionLog(path="pipeline_decisions.jsonl")

@dataclass
class PipelineResult:
    stage: str
    output: str
    tokens_used: int

async def run_pipeline(raw_input: str, budget_usd: float = 0.10) -> list[PipelineResult]:
    pool = BudgetPool(usd=budget_usd)
    results = []

    # Stage 1: research
    async with pool.acquire(max_usd=0.04) as stage_budget:
        research_out = await research_agent.run(
            raw_input,
            budget=stage_budget,
        )
        results.append(PipelineResult("research", research_out.text, research_out.tokens))
        log.record(decision="stage_complete", stage="research", tokens=research_out.tokens)

    # Stage 2: draft (uses research output)
    async with pool.acquire(max_usd=0.04) as stage_budget:
        draft_out = await draft_agent.run(
            f"Research notes:\n{research_out.text}\n\nWrite a report.",
            budget=stage_budget,
        )
        results.append(PipelineResult("draft", draft_out.text, draft_out.tokens))
        log.record(decision="stage_complete", stage="draft", tokens=draft_out.tokens)

    # Stage 3: review
    async with pool.acquire(max_usd=0.02) as stage_budget:
        review_out = await review_agent.run(
            draft_out.text,
            budget=stage_budget,
        )
        results.append(PipelineResult("review", review_out.text, review_out.tokens))

    return results
Enter fullscreen mode Exit fullscreen mode

The budget pool (token-budget-py) partitions spend across stages. If research burns through its allocation, the draft stage still has budget. If the total pool is exhausted, the pipeline halts cleanly rather than silently overspending.

Pattern 3: Broadcast

Same input sent to N agents. Aggregate results.

Use broadcast when you want multiple independent answers and then synthesize. Useful for research (multiple search strategies), fact checking (multiple verifiers), or generating variations.

import asyncio
from agent_event_bus import EventBus
from agenttrace import Tracer

bus = EventBus()
tracer = Tracer(export_path="traces/broadcast.jsonl")

async def broadcast_and_aggregate(query: str) -> dict:
    with tracer.span("broadcast") as span:
        # Fire all agents in parallel
        tasks = [
            research_agent_a.run(query),
            research_agent_b.run(query),
            research_agent_c.run(query),
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Filter out exceptions, log them
        valid = []
        for i, r in enumerate(results):
            if isinstance(r, Exception):
                span.add_event(f"agent_{i}_failed", error=str(r))
            else:
                valid.append(r)

        if not valid:
            raise RuntimeError("All broadcast agents failed")

        # Synthesize: pass all valid results to an aggregator model
        combined = "\n\n---\n\n".join(r.text for r in valid)
        synthesis = await aggregator_agent.run(
            f"Synthesize these research results:\n\n{combined}"
        )

        span.set_attribute("agents_succeeded", len(valid))
        return {"synthesis": synthesis.text, "source_count": len(valid)}
Enter fullscreen mode Exit fullscreen mode

Broadcast is expensive. N agents means N model calls in parallel. Use it when the quality gain is worth the cost. For most tasks, one good agent with good tools is cheaper and sufficient.

Pattern 4: Supervisor

One agent reviews another's output and requests revisions.

The supervisor pattern adds a quality gate. The supervisor model reads the worker's output and either approves it or returns a critique. The worker revises. This loops until approved or until a max-iteration limit fires.

from agent_deadline import Deadline
from agent_decision_log import DecisionLog

log = DecisionLog(path="supervisor_decisions.jsonl")

async def supervised_run(task: str, max_revisions: int = 3) -> str:
    deadline = Deadline(seconds=120)  # hard wall, not just iteration count

    draft = await worker_agent.run(task)
    iteration = 0

    while not deadline.expired() and iteration < max_revisions:
        review = await supervisor_agent.run(
            f"Task: {task}\n\nSubmission:\n{draft.text}\n\n"
            "Approve or return a numbered critique."
        )

        log.record(
            decision="supervisor_review",
            iteration=iteration,
            approved="APPROVED" in review.text,
            critique_length=len(review.text),
        )

        if "APPROVED" in review.text:
            break

        # Worker revises based on critique
        draft = await worker_agent.run(
            f"Original task: {task}\n\nYour previous attempt:\n{draft.text}\n\n"
            f"Supervisor critique:\n{review.text}\n\n"
            "Revise and improve."
        )
        iteration += 1

    if deadline.expired():
        log.record(decision="supervisor_timeout", iterations_completed=iteration)

    return draft.text
Enter fullscreen mode Exit fullscreen mode

The deadline is a hard cap. Without it, a demanding supervisor and a stubborn worker loop until your budget runs out. Set the deadline shorter than you think you need.

What Does NOT Work at Multi-Agent Scale

Shared context window. Agents do not share memory by default. If agent A discovers a fact, agent B does not know it unless you pass it explicitly. Context passing is your responsibility, not the framework's.

Budget partitioning across agents. If you give each agent its own budget, you cannot guarantee the total spend. Use a shared pool (token-budget-py BudgetPool) and let agents draw from it. If the pool is empty, agents block or fail fast.

Assuming agents finish in order. Parallel agents complete in arbitrary order. Your aggregation step needs to handle partial results and late arrivals. Use asyncio.gather with return_exceptions=True, not bare awaits.

Quick-Start Snippet

pip install agent-event-bus token-budget-py agent-decision-log agenttrace

from agent_event_bus import EventBus
from token_budget_py import BudgetPool
from agent_decision_log import DecisionLog

bus = EventBus()
pool = BudgetPool(usd=0.25)
log = DecisionLog(path="multi_agent.jsonl")

# Register handlers
@bus.on("task:summarize")
async def on_summarize(event):
    async with pool.acquire(max_usd=0.05) as budget:
        result = await summarizer.run(event["text"], budget=budget)
        log.record(decision="task_complete", task="summarize", tokens=result.tokens)
        return result.text

# Dispatch
result = await bus.emit("task:summarize", text="Your input here")
Enter fullscreen mode Exit fullscreen mode

Related Libraries

Library What It Does
agent-event-bus In-process pub/sub for routing events between agents
token-budget-py Shared USD/token budget pool for multi-agent spend control
agent-deadline Cooperative per-task deadline enforcement
agent-decision-log Structured JSONL log of agent routing and review decisions
agentsnap Snapshot traces for regression testing multi-agent flows
agenttrace Cost and latency tracing across agent spans

What's Next

Multi-agent systems fail differently than single-agent systems. When something goes wrong, you need to know which agent failed, what it was given, and what it returned. That is what agenttrace and agent-decision-log are for: structured, queryable records of every routing and review decision in the run.

The next gap after observability is error recovery. What happens when the research agent times out halfway through a pipeline? Post 113 covers the three recovery patterns that matter: retry, fallback, and graceful degrade.

Top comments (0)