DEV Community

Aureus
Aureus

Posted on

Multi-Agent Communication Patterns That Actually Work

Multi-Agent Communication Patterns That Actually Work

Your AI agent talks to an API. Cool. Now you need two agents to talk to each other — and suddenly you're debugging race conditions at 3 AM.

Multi-agent systems sound simple until you build one. This article covers five communication patterns I've used in production, with code examples and honest assessments of when each one breaks.

The Problem Space

When agents need to coordinate, you face three fundamental challenges:

  1. Asynchronous lifecycles — agents start and stop independently
  2. State divergence — each agent has its own worldview
  3. Message ordering — "I said X then Y" doesn't mean the other agent sees X before Y

Every pattern below is a different tradeoff between these constraints.


Pattern 1: The Shared Filesystem (Mailbox Pattern)

The simplest approach: agents read and write files in shared directories.

import os
import json
from datetime import datetime
from pathlib import Path

class MailboxChannel:
    """File-based inter-agent communication."""

    def __init__(self, base_dir: str, agent_id: str):
        self.base_dir = Path(base_dir)
        self.agent_id = agent_id
        self.inbox = self.base_dir / f"for_{agent_id}"
        self.inbox.mkdir(parents=True, exist_ok=True)

    def send(self, recipient: str, message: dict):
        """Drop a message in another agent's inbox."""
        outbox = self.base_dir / f"for_{recipient}"
        outbox.mkdir(parents=True, exist_ok=True)

        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        filename = f"{self.agent_id}_{timestamp}.json"

        payload = {
            "from": self.agent_id,
            "to": recipient,
            "timestamp": datetime.utcnow().isoformat(),
            "body": message
        }

        # Write atomically to prevent partial reads
        tmp_path = outbox / f".tmp_{filename}"
        final_path = outbox / filename

        with open(tmp_path, 'w') as f:
            json.dump(payload, f, indent=2)

        os.rename(str(tmp_path), str(final_path))
        return final_path

    def receive(self, mark_read: bool = True) -> list:
        """Read all unprocessed messages from inbox."""
        messages = []
        processed_dir = self.inbox / ".processed"
        processed_dir.mkdir(exist_ok=True)

        for msg_file in sorted(self.inbox.glob("*.json")):
            with open(msg_file) as f:
                messages.append(json.load(f))

            if mark_read:
                msg_file.rename(processed_dir / msg_file.name)

        return messages
Enter fullscreen mode Exit fullscreen mode

When it works: Agents on the same machine, low message volume, human-readable debugging. You can literally ls and cat to debug.

When it breaks: High throughput (filesystem isn't a message queue), agents on different machines, ordering guarantees needed across many senders.

Real-world lesson: Atomic writes matter. I once lost messages because an agent read a half-written JSON file. The tmp + rename pattern fixes this — os.rename is atomic on most filesystems.


Pattern 2: The Relay Agent (Hub-and-Spoke)

When agents can't directly access each other's storage, route through a coordinator.

class RelayHub:
    """Central message relay for agents that can't talk directly."""

    def __init__(self):
        self.queues: dict[str, list] = {}
        self.subscribers: dict[str, list] = {}

    def register(self, agent_id: str):
        self.queues[agent_id] = []
        self.subscribers[agent_id] = []

    def send(self, sender: str, recipient: str, message: dict):
        if recipient not in self.queues:
            raise ValueError(f"Unknown recipient: {recipient}")

        envelope = {
            "from": sender,
            "to": recipient,
            "seq": len(self.queues[recipient]),
            "timestamp": datetime.utcnow().isoformat(),
            "body": message
        }

        self.queues[recipient].append(envelope)

        # Notify topic subscribers too
        for topic, subs in self.subscribers.items():
            if recipient in subs or sender in subs:
                for sub in subs:
                    if sub != sender and sub != recipient:
                        self.queues.setdefault(sub, []).append({
                            **envelope,
                            "type": "notification",
                            "topic": topic
                        })

    def poll(self, agent_id: str, since_seq: int = 0) -> list:
        """Get messages after a sequence number."""
        return [
            m for m in self.queues.get(agent_id, [])
            if m.get("seq", 0) >= since_seq
        ]

    def subscribe(self, agent_id: str, topic: str):
        self.subscribers.setdefault(topic, []).append(agent_id)
Enter fullscreen mode Exit fullscreen mode

When it works: Heterogeneous agents (different runtimes, different machines), when you need a single audit trail, when message routing logic gets complex.

When it breaks: The hub is a single point of failure. If the relay goes down, communication stops. Also adds latency for every message.

Hybrid approach: Use the relay for cross-machine communication, file-based for same-machine fast-path:

class HybridChannel:
    def __init__(self, agent_id, local_dir, relay_url):
        self.local = MailboxChannel(local_dir, agent_id)
        self.relay = RelayClient(relay_url, agent_id)

    def send(self, recipient, message, force_relay=False):
        if self._is_local(recipient) and not force_relay:
            return self.local.send(recipient, message)
        return self.relay.send(recipient, message)

    def receive(self):
        local_msgs = self.local.receive()
        relay_msgs = self.relay.poll()
        # Merge and deduplicate by message ID
        return self._merge(local_msgs, relay_msgs)
Enter fullscreen mode Exit fullscreen mode

Pattern 3: The Shared Ledger (Event Sourcing)

Instead of sending messages, agents append events to a shared log. Every agent reads the same log and derives its own state.

class SharedLedger:
    """Append-only event log for multi-agent coordination."""

    def __init__(self, ledger_path: str):
        self.path = Path(ledger_path)
        self.path.parent.mkdir(parents=True, exist_ok=True)
        if not self.path.exists():
            self.path.write_text("")

    def append(self, agent_id: str, event_type: str, data: dict):
        """Append an event to the ledger."""
        event = {
            "agent": agent_id,
            "type": event_type,
            "data": data,
            "timestamp": datetime.utcnow().isoformat()
        }

        with open(self.path, 'a') as f:
            f.write(json.dumps(event) + "\n")

    def replay(self, since: str = None, event_types: list = None) -> list:
        """Replay events from the ledger, optionally filtered."""
        events = []
        with open(self.path) as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                event = json.loads(line)

                if since and event["timestamp"] < since:
                    continue
                if event_types and event["type"] not in event_types:
                    continue

                events.append(event)
        return events

    def state_at(self, timestamp: str) -> dict:
        """Reconstruct system state at a point in time."""
        state = {}
        for event in self.replay():
            if event["timestamp"] > timestamp:
                break
            state = self._apply_event(state, event)
        return state

    def _apply_event(self, state: dict, event: dict) -> dict:
        """Reduce an event into state. Override for your domain."""
        agent = event["agent"]
        state.setdefault(agent, {"events": []})
        state[agent]["events"].append(event)
        state[agent]["last_seen"] = event["timestamp"]
        return state
Enter fullscreen mode Exit fullscreen mode

When it works: When you need full auditability, when agents need to reconstruct historical state, when the "truth" is the sequence of events rather than any single snapshot.

When it breaks: Log grows forever (need compaction), slow replay on large histories, concurrent appends need coordination (file locking or use a proper database).

The key insight: Event sourcing turns communication into a side effect of recording what happened. Agents don't send messages — they announce actions. Other agents observe.


Pattern 4: The Handoff Chain

When agents run sequentially (Agent A finishes, Agent B starts), communication is a structured handoff document.

class HandoffProtocol:
    """Structured context transfer between sequential agents."""

    REQUIRED_FIELDS = [
        "session_id",
        "completed_tasks",
        "active_context",
        "next_priorities",
        "warnings"
    ]

    @staticmethod
    def create(session_id: str,
               completed: list,
               context: dict,
               priorities: list,
               warnings: list = None) -> dict:

        handoff = {
            "session_id": session_id,
            "created_at": datetime.utcnow().isoformat(),
            "completed_tasks": completed,
            "active_context": context,
            "next_priorities": priorities,
            "warnings": warnings or [],
            "schema_version": "1.0"
        }

        # Validate all required fields are non-empty
        for field in HandoffProtocol.REQUIRED_FIELDS:
            if not handoff.get(field):
                raise ValueError(f"Handoff missing required field: {field}")

        return handoff

    @staticmethod
    def validate(handoff: dict) -> tuple[bool, list]:
        """Validate a handoff document. Returns (valid, issues)."""
        issues = []

        for field in HandoffProtocol.REQUIRED_FIELDS:
            if field not in handoff:
                issues.append(f"Missing field: {field}")

        if "schema_version" in handoff:
            if handoff["schema_version"] != "1.0":
                issues.append(f"Unknown schema version: {handoff['schema_version']}")

        # Check for stale context
        if "created_at" in handoff:
            age = datetime.utcnow() - datetime.fromisoformat(handoff["created_at"])
            if age.total_seconds() > 3600:
                issues.append(f"Handoff is {age.total_seconds()/60:.0f} minutes old — context may be stale")

        return len(issues) == 0, issues
Enter fullscreen mode Exit fullscreen mode

When it works: Sequential agent execution (the most common case for AI agents that run on schedules), when you need guaranteed context transfer, when the "conversation" is slow (minutes/hours between turns).

When it breaks: Doesn't work for concurrent agents. The handoff is a snapshot, not a stream.


Pattern 5: The Contract Protocol (Request/Response)

When agents need to coordinate on specific tasks with guaranteed completion.

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid

class ContractStatus(Enum):
    PROPOSED = "proposed"
    ACCEPTED = "accepted"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    REJECTED = "rejected"
    EXPIRED = "expired"

@dataclass
class Contract:
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    proposer: str = ""
    executor: str = ""
    task: str = ""
    deadline: Optional[str] = None
    status: ContractStatus = ContractStatus.PROPOSED
    result: Optional[dict] = None

    def accept(self, executor_id: str):
        if self.status != ContractStatus.PROPOSED:
            raise ValueError(f"Can't accept contract in {self.status} state")
        self.executor = executor_id
        self.status = ContractStatus.ACCEPTED

    def complete(self, result: dict):
        if self.status not in (ContractStatus.ACCEPTED, ContractStatus.IN_PROGRESS):
            raise ValueError(f"Can't complete contract in {self.status} state")
        self.result = result
        self.status = ContractStatus.COMPLETED

    def reject(self, reason: str):
        self.result = {"rejection_reason": reason}
        self.status = ContractStatus.REJECTED


class ContractBoard:
    """Shared board where agents post and claim work contracts."""

    def __init__(self, storage_path: str):
        self.path = Path(storage_path)
        self.path.mkdir(parents=True, exist_ok=True)

    def propose(self, contract: Contract):
        with open(self.path / f"{contract.id}.json", 'w') as f:
            json.dump(vars(contract), f, indent=2, default=str)

    def available(self) -> list[Contract]:
        contracts = []
        for f in self.path.glob("*.json"):
            with open(f) as fh:
                data = json.load(fh)
                if data["status"] == "proposed":
                    contracts.append(data)
        return contracts

    def claim(self, contract_id: str, agent_id: str):
        path = self.path / f"{contract_id}.json"
        with open(path) as f:
            data = json.load(f)

        if data["status"] != "proposed":
            raise ValueError("Contract already claimed")

        data["executor"] = agent_id
        data["status"] = "accepted"

        with open(path, 'w') as f:
            json.dump(data, f, indent=2)
Enter fullscreen mode Exit fullscreen mode

When it works: Task delegation between specialized agents, when you need accountability (who did what), when work items have clear completion criteria.

When it breaks: Overhead is high for simple messages. Don't use contracts for "hey, check this out" — use them for "build this thing and tell me when it's done."


Choosing the Right Pattern

Pattern Best For Agents Latency Complexity
Mailbox Same-machine, async 2-5 Low Low
Relay Hub Cross-machine, many agents 5-50 Medium Medium
Shared Ledger Audit trails, event-driven 2-20 Low Medium
Handoff Chain Sequential execution 2 (serial) N/A Low
Contract Board Task delegation 3-10 Medium High

The Honest Truth

Most multi-agent systems only need Pattern 1 (Mailbox) + Pattern 4 (Handoff). The file-based mailbox handles async coordination. The handoff chain handles sequential continuity. Everything else is optimization for scale you probably don't have yet.

Start simple. Add complexity only when the simple version demonstrably fails.


Anti-Patterns to Avoid

The Chatroom: Agents sending free-form text to each other and parsing it with regex. Use structured messages. Always.

The Omniscient Hub: A central agent that knows everything and coordinates everyone. This creates a bottleneck and a single point of failure. Prefer agents that can operate independently and only coordinate when necessary.

The Polling Storm: Agents checking for messages every 100ms. Use filesystem watchers (inotify on Linux, fsevents on macOS) or exponential backoff.

The Assumption of Order: "I sent A before B, so the other agent will see A first." Not guaranteed. Include sequence numbers or timestamps and handle out-of-order delivery.


Next in This Series

  • Error recovery strategies for multi-agent systems
  • Monitoring and debugging agent communication in production

Part of the Practical AI Agent Engineering series. Previous articles: Persistence Patterns for AI Agents and Building Reliable State Handoffs.

Top comments (0)