TL;DR: Multi-agent systems coordinate specialized AI agents to solve complex tasks no single agent can handle alone. This post covers the core architecture patterns (message bus, workspace-based, orchestrated pipeline), shows practical Python implementations for agent communication, task routing, and fault tolerance, and shares hard-won lessons from building a 16-agent production system.
Meta description: Learn how to build multi-agent AI systems in Python with practical code examples. Covers agent communication patterns, task orchestration, fault tolerance, and production architecture for coordinating multiple AI agents.
What Is a Multi-Agent System and Why Should You Build One?
A multi-agent system (MAS) is an architecture where multiple autonomous agents — each with a defined role, expertise, and decision-making capability — collaborate to accomplish goals that exceed any single agent's capacity. Think of it as a software engineering team: you wouldn't ask your frontend developer to conduct a security audit.
The same principle applies to AI. A single monolithic prompt stuffed with every instruction degrades in quality as complexity grows. Splitting responsibilities across specialized agents — a CEO for strategy, a security reviewer for threat modeling, a QA lead for testing — produces measurably better outputs.
Long-tail keywords targeted: python multi-agent system architecture, how to coordinate AI agents in Python, agent communication patterns Python, building autonomous AI agents, multi-agent orchestration framework
How Do You Structure Agent Communication in Python?
The foundation of any multi-agent system is the communication layer. There are three dominant patterns:
1. Workspace-Based (File System)
The simplest pattern — and often the most debuggable. Each agent gets an inbox, outbox, and status file:
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class AgentMessage:
from_agent: str
to_agent: str
content: str
priority: str = "normal"
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow().isoformat()
class WorkspaceRouter:
def __init__(self, base_path: str = "workspace"):
self.base = Path(base_path)
def send(self, message: AgentMessage) -> Path:
inbox = self.base / message.to_agent / "inbox"
inbox.mkdir(parents=True, exist_ok=True)
filename = f"{message.timestamp}_{message.from_agent}.json"
path = inbox / filename
path.write_text(json.dumps(asdict(message), indent=2))
return path
def receive(self, agent_name: str) -> list[AgentMessage]:
inbox = self.base / agent_name / "inbox"
if not inbox.exists():
return []
messages = []
for file in sorted(inbox.glob("*.json")):
data = json.loads(file.read_text())
messages.append(AgentMessage(**data))
file.unlink() # consume the message
return messages
This pattern is surprisingly powerful. It survives process crashes (messages persist on disk), supports any language (agents just read/write JSON), and gives you a complete audit trail.
2. Message Bus (Pub/Sub)
For higher throughput, use a message broker like NATS or Redis Streams:
import nats
import json
class MessageBusRouter:
def __init__(self):
self.nc = None
async def connect(self, url: str = "nats://localhost:4222"):
self.nc = await nats.connect(url)
async def send(self, to_agent: str, message: dict):
subject = f"agents.{to_agent}.inbox"
await self.nc.publish(subject, json.dumps(message).encode())
async def subscribe(self, agent_name: str, handler):
subject = f"agents.{agent_name}.inbox"
await self.nc.subscribe(subject, cb=handler)
3. Orchestrated Pipeline
A central coordinator routes tasks through a defined sequence of agents:
class Pipeline:
def __init__(self, stages: list[str]):
self.stages = stages
async def execute(self, task: dict, agents: dict) -> dict:
context = {"original_task": task, "results": {}}
for stage in self.stages:
agent = agents[stage]
result = await agent.process(task, context)
context["results"][stage] = result
if result.get("halt"):
break
return context
# Usage: Research → Plan → Implement → Review → Validate
pipeline = Pipeline(["researcher", "architect", "implementer", "reviewer", "qa"])
How Do You Define Agent Roles Without Over-Engineering?
The biggest mistake in multi-agent design is making agents too granular. Each agent needs enough context to be useful but narrow enough to stay focused. Here's a practical pattern:
from abc import ABC, abstractmethod
class BaseAgent(ABC):
def __init__(self, name: str, role: str, expertise: list[str]):
self.name = name
self.role = role
self.expertise = expertise
self.status = "idle"
@abstractmethod
async def process(self, task: dict, context: dict) -> dict:
"""Process a task and return results."""
...
def can_handle(self, task: dict) -> float:
"""Return confidence score 0-1 for handling this task."""
task_keywords = set(task.get("keywords", []))
overlap = task_keywords & set(self.expertise)
return len(overlap) / max(len(self.expertise), 1)
class SecurityReviewer(BaseAgent):
def __init__(self):
super().__init__(
name="security_reviewer",
role="Threat modeling and vulnerability assessment",
expertise=["authentication", "authorization", "injection",
"encryption", "compliance", "owasp"]
)
async def process(self, task: dict, context: dict) -> dict:
# Analyze code for security concerns
findings = []
code = task.get("code", "")
if "eval(" in code or "exec(" in code:
findings.append({
"severity": "critical",
"issue": "Dynamic code execution detected",
"recommendation": "Use ast.literal_eval or structured parsing"
})
return {"agent": self.name, "findings": findings, "passed": len(findings) == 0}
A good rule of thumb: if you can't describe an agent's job in one sentence, it's doing too much.
What Happens When an Agent Fails?
Fault tolerance separates toy projects from production systems. Every agent should report structured status, and the orchestrator should handle failures gracefully:
import asyncio
from enum import Enum
class Verdict(Enum):
PASS = "pass"
FLAG = "flag" # proceed with caution
BLOCK = "block" # prevent action
ESCALATE = "escalate" # route to human
HALT = "halt" # stop everything
class ResilientOrchestrator:
def __init__(self, agents: dict, max_retries: int = 2, timeout: float = 30.0):
self.agents = agents
self.max_retries = max_retries
self.timeout = timeout
async def run_agent(self, name: str, task: dict, context: dict) -> dict:
agent = self.agents[name]
for attempt in range(self.max_retries + 1):
try:
result = await asyncio.wait_for(
agent.process(task, context),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
if attempt == self.max_retries:
return {"agent": name, "verdict": Verdict.ESCALATE.value,
"error": f"Timed out after {self.max_retries + 1} attempts"}
except Exception as e:
if attempt == self.max_retries:
return {"agent": name, "verdict": Verdict.ESCALATE.value,
"error": str(e)}
return {"agent": name, "verdict": Verdict.HALT.value}
The verdict system is critical. Rather than binary pass/fail, graduated verdicts let the system degrade gracefully — flagging low-risk issues for later review while halting on critical failures.
How Do You Route Tasks to the Right Agent?
Naive routing (hardcoded if/else chains) breaks fast. Score-based routing scales:
class TaskRouter:
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
def route(self, task: dict) -> BaseAgent:
scores = [(agent, agent.can_handle(task)) for agent in self.agents]
scores.sort(key=lambda x: x[1], reverse=True)
best_agent, confidence = scores[0]
if confidence < 0.3:
raise ValueError(f"No agent confident enough for task: {task}")
return best_agent
def route_multi(self, task: dict, threshold: float = 0.5) -> list[BaseAgent]:
"""Route to multiple agents for parallel review."""
return [agent for agent in self.agents
if agent.can_handle(task) >= threshold]
Key Takeaways
Start with file-based communication. It's debuggable, crash-resistant, and language-agnostic. Move to a message bus only when throughput demands it.
Define agent boundaries by responsibility, not by technology. A "SecurityReviewer" is better than a "PythonSecurityStaticAnalyzer." Keep roles broad enough to be useful.
Use graduated verdicts (PASS/FLAG/BLOCK/ESCALATE/HALT) instead of binary pass/fail. This lets your system degrade gracefully rather than failing hard on edge cases.
Score-based routing beats hardcoded routing. Let agents declare their expertise and score their confidence. This scales to dozens of agents without spaghetti logic.
Design for failure from day one. Timeouts, retries, and escalation paths aren't optional — they're what separate a demo from a system you can actually run.
Resist the urge to over-engineer. A 200-line orchestrator with 5 well-defined agents will outperform a complex framework with 50 micro-agents. Start simple, split agents only when you have evidence one is doing too much.
This article was generated with AI assistance and reviewed for accuracy. If you found it helpful, consider supporting the author:
Top comments (0)