DEV Community

MrClaw207
MrClaw207

Posted on

Your Agents Are Fine. The Handoff Between Them Isn't.

Most of the multi-agent demos you'll see are a single-agent architecture wearing a costume.

They show you Agent A doing something, then Agent B doing something else. What they don't show you is what happens when Agent A's output doesn't match what Agent B expects — or when the handoff silently fails and the whole chain keeps running as if nothing happened.

I've shipped three multi-agent systems in production this year. The agents themselves were never the hard part. The handoffs were.

What "Handoff" Actually Means in Practice

A handoff isn't just passing output from one agent to another. It's:

  1. Schema alignment — Agent B needs to parse Agent A's output reliably
  2. Failure propagation — when one agent fails, the chain needs to know
  3. Context window hygiene — every handoff is a chance to accumulate noise

The most common mistake is treating agents as black boxes connected by a string. You prompt Agent A, get a result, stuff it into Agent B. It works until it doesn't, and when it breaks, you have no idea where.

Here's a concrete example. A common pattern: a planner agent decomposes a task, then a set of worker agents execute sub-tasks in parallel.

The naive version:

# Naive handoff — no contract, no error handling
planner_output = planner_agent.run(task)
worker_results = [worker.run(subtask) for subtask in planner_output["subtasks"]]
final = aggregator.run(worker_results)
Enter fullscreen mode Exit fullscreen mode

This will fail in production. Not because the agents are bad, but because planner_output["subtasks"] might be a list one run and a string the next. Or the planner might return {"subtasks": []} and the workers silently do nothing. Or a worker throws an exception and the whole thing eats it.

The explicit contract version:

from pydantic import BaseModel
from typing import Optional

class SubTask(BaseModel):
    id: str
    description: str
    priority: int

class PlannerOutput(BaseModel):
    subtasks: list[SubTask]
    reasoning: str
    confidence: float  # New: lets downstream agents calibrate trust

class WorkerResult(BaseModel):
    task_id: str
    status: Literal["success", "failed", "skipped"]
    output: Optional[str] = None
    error: Optional[str] = None

# Planner produces a typed contract
plan = planner_agent.run(task, output_schema=PlannerOutput)

# Workers validate input and produce typed output
results = []
for subtask in plan.subtasks:
    try:
        result = worker.run(subtask, input_schema=SubTask, output_schema=WorkerResult)
        results.append(result)
    except ValidationError as e:
        results.append(WorkerResult(task_id=subtask.id, status="failed", error=str(e)))

# Aggregator receives structured data it can actually reason about
summary = aggregator.run(results, input_schema=list[WorkerResult])
Enter fullscreen mode Exit fullscreen mode

Now when something breaks, you know exactly which task, which agent, and what went wrong.

The Three Failure Modes Nobody Talks About

1. Silent truncation. Agent A produces 2,000 tokens. Agent B's context window is 128k but you're running a system with a 4k budget on the worker. The output gets silently truncated. Agent B processes a partial result and returns confident nonsense. The fix: measure actual token counts at every handoff and fail explicitly if you exceed budget.

2. Schema drift. Your planner prompt changes slightly. Now it returns reasoning as a single word instead of a paragraph. Agent B was doing string matching on reasoning. The fix: use structured output (Pydantic, JSON schema) everywhere, not prompts.

3. Parallel agent race conditions. You launch 5 workers in parallel. Three finish. Two are still running. Your aggregator starts processing. It gets partial results and returns. This is especially nasty because it works fine in testing with small workloads and fails in production with real latency. The fix: use a barrier (e.g., asyncio.gather with return_exceptions=False, or a result collector that waits for all or fails fast).

A Minimal Production Pattern That Actually Works

After burning through all three failure modes, I settled on this structure:

import asyncio
from dataclasses import dataclass
from enum import Enum

class AgentRole(Enum):
    PLANNER = "planner"
    WORKER = "worker"
    AGGREGATOR = "aggregator"

@dataclass
class HandoffEnvelope:
    """Every handoff gets wrapped in metadata."""
    source: AgentRole
    target: AgentRole
    payload: dict
    trace_id: str  # For debugging across agents
    confidence: float
    warnings: list[str]  # e.g., ["output truncated from 2048 to 1024 tokens"]

async def run_pipeline(task: str, config: PipelineConfig) -> str:
    trace_id = generate_trace_id()

    # Phase 1: Plan with explicit output contract
    plan_envelope = HandoffEnvelope(
        source=AgentRole.PLANNER,
        target=AgentRole.WORKER,
        payload={},  # Filled by planner
        trace_id=trace_id,
        confidence=0.0,
        warnings=[]
    )
    plan_output = await planner.run(task, output_schema=PlannerOutput)
    plan_envelope.payload = plan_output.model_dump()
    plan_envelope.confidence = plan_output.confidence

    if plan_output.confidence < config.min_confidence:
        raise PipelineError(f"Plan confidence {plan_output.confidence} below threshold")

    # Phase 2: Execute workers with error isolation
    worker_tasks = [
        worker_pool.run(subtask, envelope=plan_envelope)
        for subtask in plan_output.subtasks
    ]
    worker_results = await asyncio.gather(*worker_tasks, return_exceptions=True)

    # Phase 3: Aggregate with partial-result tolerance
    valid_results = [r for r in worker_results if isinstance(r, WorkerResult)]
    failed_count = len(worker_results) - len(valid_results)

    aggregate_envelope = HandoffEnvelope(
        source=AgentRole.WORKER,
        target=AgentRole.AGGREGATOR,
        payload={"results": [r.model_dump() for r in valid_results]},
        trace_id=trace_id,
        confidence=len(valid_results) / len(worker_results),  # Ratio as confidence
        warnings=[f"{failed_count}/{len(worker_results)} workers failed"]
    )

    return await aggregator.run(valid_results, envelope=aggregate_envelope)
Enter fullscreen mode Exit fullscreen mode

This is not elegant. It's verbose and explicit. That's the point.

What I Learned

The first multi-agent system I built looked clever. It used dynamic routing, context-aware agent selection, and implicit handoffs based on agent names. It worked great until I ran it on 50 concurrent tasks at 3 AM and woke up to a mess of partial results and silent failures.

The second version was ugly but correct. Every handoff was a typed contract. Every failure was explicit. Every agent was isolated.

The third version — the current one — is the first version's elegance built on the second version's discipline. The agents still use structured output. The handoffs still carry metadata. But I've hidden the boilerplate behind a thin framework so the actual agent logic stays clean.

If you're building multi-agent systems: start with the ugly-correct version. Get it wrong in production first. Then make it elegant.

The handoff problem doesn't get easier — but you stop being surprised by it.

Top comments (0)