Multi-Agent Architecture in Practice: How to Split Work Across AI Agents Without Chaos
Single-agent systems hit a wall fast. The moment you need research + coding + testing happening in parallel, you need multiple agents. But most multi-agent setups fail because of coordination problems, not capability problems.
Here are the patterns that actually work in production, with code you can use today.
The Core Problem: Agents Fighting Each Other
The naive approach:
Agent A: "I'll refactor the auth module"
Agent B: "I'll add logging to the auth module"
→ Both edit the same files → merge conflicts → broken code
This happens constantly in multi-agent setups without proper coordination. The fix isn't "smarter agents" — it's better architecture.
Pattern 1: The Dispatcher Pattern
One orchestrator agent that never writes code. It only delegates and monitors.
# agent-team.yaml
dispatcher:
role: orchestrator
model: claude-sonnet-4.6 # Cheap, fast, good at planning
capabilities:
- task_decomposition
- agent_assignment
- conflict_detection
rules:
- NEVER write code directly
- NEVER assign two agents to the same file
- Always verify task dependencies before dispatching
workers:
coder:
role: implementation
model: claude-sonnet-4.6
capabilities: [write_code, run_tests]
reviewer:
role: quality
model: claude-opus-4.6 # Higher reasoning for reviews
capabilities: [code_review, security_audit]
researcher:
role: context
model: gpt-5.4-mini # Cheap for search/docs
capabilities: [web_search, doc_lookup, api_exploration]
class Dispatcher:
def __init__(self, agents: dict):
self.agents = agents
self.file_locks = {} # Track which agent owns which files
self.task_queue = []
def decompose_and_assign(self, task: str):
# Step 1: Break task into subtasks
subtasks = self.plan(task)
# Step 2: Check for file conflicts
for subtask in subtasks:
files = subtask.affected_files
for f in files:
if f in self.file_locks:
# Queue this task until the file is free
subtask.depends_on = self.file_locks[f]
else:
self.file_locks[f] = subtask.id
# Step 3: Dispatch non-blocked tasks
for subtask in subtasks:
if not subtask.depends_on:
agent = self.select_agent(subtask)
agent.execute(subtask)
def select_agent(self, subtask):
"""Match task type to best agent"""
if subtask.type in ['implement', 'fix']:
return self.agents['coder']
elif subtask.type in ['review', 'security']:
return self.agents['reviewer']
elif subtask.type in ['research', 'docs']:
return self.agents['researcher']
Why it works: The dispatcher prevents conflicts before they happen. No two agents ever touch the same file simultaneously.
Pattern 2: Shared Memory (The Blackboard Pattern)
Agents need shared context. Without it, Agent B doesn't know what Agent A discovered.
class SharedMemory:
"""Append-only shared memory for agent teams"""
def __init__(self):
self.entries = []
self.index = {} # tag -> [entry_ids]
def write(self, agent_id: str, content: str, tags: list[str]):
entry = {
"id": len(self.entries),
"agent": agent_id,
"content": content,
"tags": tags,
"timestamp": time.time()
}
self.entries.append(entry)
for tag in tags:
self.index.setdefault(tag, []).append(entry["id"])
return entry["id"]
def read(self, tags: list[str] = None, last_n: int = 10):
"""Read recent entries, optionally filtered by tags"""
if tags:
ids = set()
for tag in tags:
ids.update(self.index.get(tag, []))
entries = [self.entries[i] for i in sorted(ids)]
else:
entries = self.entries
return entries[-last_n:]
def summarize(self, max_tokens: int = 2000):
"""Compress memory for context-limited agents"""
recent = self.entries[-50:]
summary = "\n".join(
f"[{e['agent']}] {e['content'][:100]}"
for e in recent
)
return summary[:max_tokens]
Usage in practice:
# Researcher finds something
memory.write("researcher",
"The Stripe API changed: webhook signatures now require v2 header",
tags=["stripe", "api_change", "breaking"])
# Coder reads before implementing
context = memory.read(tags=["stripe"])
# Now the coder knows about the API change BEFORE writing code
Pattern 3: Review Gates
Never let agent output go straight to production. Every code change goes through a review agent.
class ReviewGate:
def __init__(self, reviewer_agent):
self.reviewer = reviewer_agent
self.criteria = {
"security": ["no hardcoded secrets", "input validation", "SQL injection"],
"quality": ["error handling", "edge cases", "type safety"],
"style": ["matches AGENTS.md conventions", "consistent naming"]
}
async def review(self, code_diff: str, context: str) -> ReviewResult:
prompt = f"""
Review this code change against these criteria:
{json.dumps(self.criteria, indent=2)}
Project context:
{context}
Diff:
{code_diff}
Return: APPROVE, REQUEST_CHANGES, or BLOCK
For each issue, specify: file, line, severity, suggestion
"""
result = await self.reviewer.execute(prompt)
if result.decision == "BLOCK":
# Critical issue — alert human
notify_human(result)
return result
The key insight: Use a more expensive model for review than for implementation. A $0.50 review that catches a $5,000 bug is the best ROI in your entire pipeline.
Pattern 4: Graceful Degradation
Agents fail. APIs time out. Models hallucinate. Plan for it.
class ResilientAgent:
def __init__(self, primary_model, fallback_model):
self.primary = primary_model
self.fallback = fallback_model
self.max_retries = 3
async def execute(self, task):
for attempt in range(self.max_retries):
try:
result = await self.primary.run(task)
# Sanity check the output
if self.validate(result):
return result
else:
task.context += f"\nPrevious attempt failed validation: {result.error}"
except (TimeoutError, RateLimitError):
# Fall back to cheaper model
result = await self.fallback.run(task)
if self.validate(result):
return result
# All attempts failed — return to dispatcher
return TaskResult(
status="failed",
reason="Max retries exceeded",
partial_work=result # Save partial progress
)
Pattern 5: Cost-Aware Scheduling
Not all tasks are equal. Route based on value, not just type.
class CostAwareScheduler:
def __init__(self, daily_budget: float = 10.0):
self.budget = daily_budget
self.spent = 0.0
self.model_costs = {
"opus": 0.50, # avg cost per call
"sonnet": 0.05,
"mini": 0.005
}
def select_model(self, task):
remaining = self.budget - self.spent
if remaining < 1.0:
# Budget tight — everything goes to mini
return "mini"
if task.priority == "critical":
return "opus"
elif task.priority == "normal" and remaining > 5.0:
return "sonnet"
else:
return "mini"
def track(self, model: str, actual_cost: float):
self.spent += actual_cost
if self.spent > self.budget * 0.8:
alert("Budget 80% consumed — switching to economy mode")
Putting It All Together
┌─────────────┐
│ Dispatcher │ Plans, assigns, monitors
└──────┬───────┘
│
┌────┴────┐
│ Memory │ Shared context (blackboard)
└────┬────┘
│
┌──────┼──────────┐
│ │ │
▼ ▼ ▼
Coder Researcher Reviewer
│ ▲
│ │
└──── Review Gate ───┘
│
▼
Production
This architecture handles 90% of multi-agent use cases. The patterns are framework-agnostic — they work with LangChain, CrewAI, AutoGen, or raw API calls.
The Production Checklist
- ☐ Define agent roles (never let one agent do everything)
- ☐ Implement file locking (prevent edit conflicts)
- ☐ Set up shared memory (agents need context from each other)
- ☐ Add review gates (never trust agent output blindly)
- ☐ Build fallback chains (primary → fallback → human)
- ☐ Track costs per agent per task type
- ☐ Set budget limits with automatic degradation
Full Multi-Agent Kit
The complete multi-agent architecture configs, dispatcher templates, and shared memory implementations are in the AI Dev Toolkit — 264 production frameworks for building real agent systems. Includes 5 orchestration patterns with working code.
Running multi-agent systems in production? What coordination problems have you hit? Drop your war stories in the comments.
Top comments (0)