DEV Community

dohko
dohko

Posted on

Multi-Agent Architecture in Practice: How to Split Work Across AI Agents Without Chaos

Multi-Agent Architecture in Practice: How to Split Work Across AI Agents Without Chaos

Single-agent systems hit a wall fast. The moment you need research + coding + testing happening in parallel, you need multiple agents. But most multi-agent setups fail because of coordination problems, not capability problems.

Here are the patterns that actually work in production, with code you can use today.


The Core Problem: Agents Fighting Each Other

The naive approach:

Agent A: "I'll refactor the auth module"
Agent B: "I'll add logging to the auth module"
→ Both edit the same files → merge conflicts → broken code
Enter fullscreen mode Exit fullscreen mode

This happens constantly in multi-agent setups without proper coordination. The fix isn't "smarter agents" — it's better architecture.


Pattern 1: The Dispatcher Pattern

One orchestrator agent that never writes code. It only delegates and monitors.

# agent-team.yaml
dispatcher:
  role: orchestrator
  model: claude-sonnet-4.6  # Cheap, fast, good at planning
  capabilities:
    - task_decomposition
    - agent_assignment
    - conflict_detection
  rules:
    - NEVER write code directly
    - NEVER assign two agents to the same file
    - Always verify task dependencies before dispatching

workers:
  coder:
    role: implementation
    model: claude-sonnet-4.6
    capabilities: [write_code, run_tests]

  reviewer:
    role: quality
    model: claude-opus-4.6  # Higher reasoning for reviews
    capabilities: [code_review, security_audit]

  researcher:
    role: context
    model: gpt-5.4-mini  # Cheap for search/docs
    capabilities: [web_search, doc_lookup, api_exploration]
Enter fullscreen mode Exit fullscreen mode
class Dispatcher:
    def __init__(self, agents: dict):
        self.agents = agents
        self.file_locks = {}  # Track which agent owns which files
        self.task_queue = []

    def decompose_and_assign(self, task: str):
        # Step 1: Break task into subtasks
        subtasks = self.plan(task)

        # Step 2: Check for file conflicts
        for subtask in subtasks:
            files = subtask.affected_files
            for f in files:
                if f in self.file_locks:
                    # Queue this task until the file is free
                    subtask.depends_on = self.file_locks[f]
                else:
                    self.file_locks[f] = subtask.id

        # Step 3: Dispatch non-blocked tasks
        for subtask in subtasks:
            if not subtask.depends_on:
                agent = self.select_agent(subtask)
                agent.execute(subtask)

    def select_agent(self, subtask):
        """Match task type to best agent"""
        if subtask.type in ['implement', 'fix']:
            return self.agents['coder']
        elif subtask.type in ['review', 'security']:
            return self.agents['reviewer']
        elif subtask.type in ['research', 'docs']:
            return self.agents['researcher']
Enter fullscreen mode Exit fullscreen mode

Why it works: The dispatcher prevents conflicts before they happen. No two agents ever touch the same file simultaneously.


Pattern 2: Shared Memory (The Blackboard Pattern)

Agents need shared context. Without it, Agent B doesn't know what Agent A discovered.

class SharedMemory:
    """Append-only shared memory for agent teams"""

    def __init__(self):
        self.entries = []
        self.index = {}  # tag -> [entry_ids]

    def write(self, agent_id: str, content: str, tags: list[str]):
        entry = {
            "id": len(self.entries),
            "agent": agent_id,
            "content": content,
            "tags": tags,
            "timestamp": time.time()
        }
        self.entries.append(entry)
        for tag in tags:
            self.index.setdefault(tag, []).append(entry["id"])
        return entry["id"]

    def read(self, tags: list[str] = None, last_n: int = 10):
        """Read recent entries, optionally filtered by tags"""
        if tags:
            ids = set()
            for tag in tags:
                ids.update(self.index.get(tag, []))
            entries = [self.entries[i] for i in sorted(ids)]
        else:
            entries = self.entries
        return entries[-last_n:]

    def summarize(self, max_tokens: int = 2000):
        """Compress memory for context-limited agents"""
        recent = self.entries[-50:]
        summary = "\n".join(
            f"[{e['agent']}] {e['content'][:100]}" 
            for e in recent
        )
        return summary[:max_tokens]
Enter fullscreen mode Exit fullscreen mode

Usage in practice:

# Researcher finds something
memory.write("researcher", 
    "The Stripe API changed: webhook signatures now require v2 header",
    tags=["stripe", "api_change", "breaking"])

# Coder reads before implementing
context = memory.read(tags=["stripe"])
# Now the coder knows about the API change BEFORE writing code
Enter fullscreen mode Exit fullscreen mode

Pattern 3: Review Gates

Never let agent output go straight to production. Every code change goes through a review agent.

class ReviewGate:
    def __init__(self, reviewer_agent):
        self.reviewer = reviewer_agent
        self.criteria = {
            "security": ["no hardcoded secrets", "input validation", "SQL injection"],
            "quality": ["error handling", "edge cases", "type safety"],
            "style": ["matches AGENTS.md conventions", "consistent naming"]
        }

    async def review(self, code_diff: str, context: str) -> ReviewResult:
        prompt = f"""
        Review this code change against these criteria:
        {json.dumps(self.criteria, indent=2)}

        Project context:
        {context}

        Diff:
        {code_diff}

        Return: APPROVE, REQUEST_CHANGES, or BLOCK
        For each issue, specify: file, line, severity, suggestion
        """

        result = await self.reviewer.execute(prompt)

        if result.decision == "BLOCK":
            # Critical issue — alert human
            notify_human(result)

        return result
Enter fullscreen mode Exit fullscreen mode

The key insight: Use a more expensive model for review than for implementation. A $0.50 review that catches a $5,000 bug is the best ROI in your entire pipeline.


Pattern 4: Graceful Degradation

Agents fail. APIs time out. Models hallucinate. Plan for it.

class ResilientAgent:
    def __init__(self, primary_model, fallback_model):
        self.primary = primary_model
        self.fallback = fallback_model
        self.max_retries = 3

    async def execute(self, task):
        for attempt in range(self.max_retries):
            try:
                result = await self.primary.run(task)

                # Sanity check the output
                if self.validate(result):
                    return result
                else:
                    task.context += f"\nPrevious attempt failed validation: {result.error}"

            except (TimeoutError, RateLimitError):
                # Fall back to cheaper model
                result = await self.fallback.run(task)
                if self.validate(result):
                    return result

        # All attempts failed — return to dispatcher
        return TaskResult(
            status="failed",
            reason="Max retries exceeded",
            partial_work=result  # Save partial progress
        )
Enter fullscreen mode Exit fullscreen mode

Pattern 5: Cost-Aware Scheduling

Not all tasks are equal. Route based on value, not just type.

class CostAwareScheduler:
    def __init__(self, daily_budget: float = 10.0):
        self.budget = daily_budget
        self.spent = 0.0
        self.model_costs = {
            "opus": 0.50,      # avg cost per call
            "sonnet": 0.05,
            "mini": 0.005
        }

    def select_model(self, task):
        remaining = self.budget - self.spent

        if remaining < 1.0:
            # Budget tight — everything goes to mini
            return "mini"

        if task.priority == "critical":
            return "opus"
        elif task.priority == "normal" and remaining > 5.0:
            return "sonnet"
        else:
            return "mini"

    def track(self, model: str, actual_cost: float):
        self.spent += actual_cost
        if self.spent > self.budget * 0.8:
            alert("Budget 80% consumed — switching to economy mode")
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

┌─────────────┐
│  Dispatcher  │  Plans, assigns, monitors
└──────┬───────┘
       │
  ┌────┴────┐
  │  Memory  │  Shared context (blackboard)
  └────┬────┘
       │
┌──────┼──────────┐
│      │          │
▼      ▼          ▼
Coder  Researcher  Reviewer
│                    ▲
│                    │
└──── Review Gate ───┘
         │
         ▼
    Production
Enter fullscreen mode Exit fullscreen mode

This architecture handles 90% of multi-agent use cases. The patterns are framework-agnostic — they work with LangChain, CrewAI, AutoGen, or raw API calls.


The Production Checklist

  1. ☐ Define agent roles (never let one agent do everything)
  2. ☐ Implement file locking (prevent edit conflicts)
  3. ☐ Set up shared memory (agents need context from each other)
  4. ☐ Add review gates (never trust agent output blindly)
  5. ☐ Build fallback chains (primary → fallback → human)
  6. ☐ Track costs per agent per task type
  7. ☐ Set budget limits with automatic degradation

Full Multi-Agent Kit

The complete multi-agent architecture configs, dispatcher templates, and shared memory implementations are in the AI Dev Toolkit — 264 production frameworks for building real agent systems. Includes 5 orchestration patterns with working code.


Running multi-agent systems in production? What coordination problems have you hit? Drop your war stories in the comments.

Top comments (0)