DEV Community

Cover image for Building a Multi-Agent AI System with Python
WDSEGA
WDSEGA

Posted on

Building a Multi-Agent AI System with Python

The era of single-prompt AI interactions is behind us. As large language models become more capable, the real challenge has shifted from "can AI do this?" to "how do we coordinate multiple AI agents to solve complex problems together?"

In this guide, we'll explore the architecture patterns, implementation strategies, and practical considerations for building multi-agent AI systems with Python.

Why Multi-Agent Systems?

A single LLM, no matter how powerful, has limitations. It can only process one task at a time, lacks persistent memory across sessions, and struggles with tasks that require different expertise or tools.

Multi-agent systems solve these problems by:

  • Specialization: Each agent focuses on a specific domain (research, coding, analysis, communication)
  • Parallelism: Multiple agents can work on different subtasks simultaneously
  • Resilience: If one agent fails, others can continue or retry
  • Scalability: Add new capabilities by adding new agents without changing existing ones

Think of it like a software team. You wouldn't expect one developer to handle frontend, backend, DevOps, and QA simultaneously. The same principle applies to AI agents.

Core Architecture Patterns

Pattern 1: Orchestrator-Worker

The most common pattern. A central orchestrator agent receives the user's request, breaks it down into subtasks, and delegates to specialized workers.

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class AgentRole(Enum):
    ORCHESTRATOR = "orchestrator"
    RESEARCHER = "researcher"
    CODER = "coder"
    REVIEWER = "reviewer"
    SUMMARIZER = "summarizer"

@dataclass
class AgentMessage:
    sender: AgentRole
    content: str
    metadata: dict = None
    parent_id: Optional[str] = None

class BaseAgent:
    def __init__(self, role: AgentRole, llm_client):
        self.role = role
        self.llm = llm_client
        self.memory: List[AgentMessage] = []

    def process(self, message: AgentMessage) -> AgentMessage:
        """Process incoming message and return response."""
        raise NotImplementedError

    def remember(self, message: AgentMessage):
        self.memory.append(message)
Enter fullscreen mode Exit fullscreen mode

Pattern 2: Pipeline

Agents are arranged in a sequential pipeline where the output of one feeds into the next. This works well for workflows with clear stages.

class AgentPipeline:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents

    def execute(self, initial_input: str) -> str:
        result = initial_input
        for agent in self.agents:
            message = AgentMessage(
                sender=AgentRole.ORCHESTRATOR,
                content=result
            )
            response = agent.process(message)
            result = response.content
        return result
Enter fullscreen mode Exit fullscreen mode

Pattern 3: Blackboard

A shared knowledge base where agents can read and write information asynchronously. This pattern is ideal for research-heavy tasks.

class Blackboard:
    def __init__(self):
        self.knowledge = {}
        self.subscribers = defaultdict(list)

    def write(self, key: str, value: str, agent: BaseAgent):
        self.knowledge[key] = {
            "value": value,
            "author": agent.role.value,
            "timestamp": datetime.now().isoformat()
        }
        self._notify(key, value, agent)

    def read(self, key: str) -> Optional[str]:
        entry = self.knowledge.get(key)
        return entry["value"] if entry else None
Enter fullscreen mode Exit fullscreen mode

Building a Real Multi-Agent System

Let's build a practical example: a content research and writing system.

Step 1: Define Agent Roles

class ResearchAgent(BaseAgent):
    """Gathers information from various sources."""
    def process(self, message: AgentMessage) -> AgentMessage:
        self.remember(message)
        prompt = f"""Research the following topic thoroughly:
        {message.content}

        Provide:
        1. Key facts and statistics
        2. Recent developments (last 6 months)
        3. Expert opinions and quotes
        4. Contrasting viewpoints
        """
        response = self.llm.generate(prompt)
        return AgentMessage(
            sender=self.role,
            content=response,
            parent_id=message.content
        )

class WritingAgent(BaseAgent):
    """Creates content based on research."""
    def process(self, message: AgentMessage) -> AgentMessage:
        self.remember(message)
        prompt = f"""Based on this research, write an engaging article:
        {message.content}

        Requirements:
        - Professional but accessible tone
        - Include specific examples
        - Structure with clear headings
        - Minimum 800 words
        """
        response = self.llm.generate(prompt)
        return AgentMessage(
            sender=self.role,
            content=response,
            parent_id=message.content
        )

class ReviewAgent(BaseAgent):
    """Reviews and provides feedback on content."""
    def process(self, message: AgentMessage) -> AgentMessage:
        self.remember(message)
        prompt = f"""Review this article critically:
        {message.content}

        Check for:
        1. Factual accuracy
        2. Logical flow
        3. Clarity and readability
        4. Missing perspectives

        Provide specific improvement suggestions.
        """
        response = self.llm.generate(prompt)
        return AgentMessage(
            sender=self.role,
            content=response,
            parent_id=message.content
        )
Enter fullscreen mode Exit fullscreen mode

Step 2: Implement the Orchestrator

class OrchestratorAgent(BaseAgent):
    def __init__(self, llm_client, workers: List[BaseAgent]):
        super().__init__(AgentRole.ORCHESTRATOR, llm_client)
        self.workers = {w.role: w for w in workers}
        self.max_iterations = 5

    def execute(self, task: str) -> str:
        plan = self._create_plan(task)

        for step in plan:
            agent = self.workers.get(step["role"])
            if not agent:
                continue

            result = agent.process(AgentMessage(
                sender=AgentRole.ORCHESTRATOR,
                content=step["input"]
            ))

            # Feed results to next agent
            if step.get("next_role"):
                next_agent = self.workers[step["next_role"]]
                final = next_agent.process(result)

        return final.content

    def _create_plan(self, task: str) -> list:
        prompt = f"""Break down this task into agent steps:
        {task}

        Available agents: {list(self.workers.keys())}
        Return a JSON plan with role, input, and next_role fields.
        """
        return json.loads(self.llm.generate(prompt))
Enter fullscreen mode Exit fullscreen mode

Step 3: Add Error Handling and Retry Logic

class ResilientOrchestrator(OrchestratorAgent):
    def execute(self, task: str) -> str:
        plan = self._create_plan(task)
        results = {}

        for attempt in range(self.max_iterations):
            try:
                for step in plan:
                    agent = self.workers.get(step["role"])
                    if not agent:
                        continue

                    try:
                        result = agent.process(AgentMessage(
                            sender=AgentRole.ORCHESTRATOR,
                            content=step["input"]
                        ))
                        results[step["role"].value] = result.content
                    except Exception as e:
                        print(f"Agent {step['role']} failed: {e}")
                        continue

                # Validate results
                if self._validate_results(results, task):
                    return self._synthesize(results)

            except Exception as e:
                print(f"Attempt {attempt + 1} failed: {e}")
                continue

        raise RuntimeError("Max iterations reached")
Enter fullscreen mode Exit fullscreen mode

Communication Patterns Between Agents

Direct Messaging

Agents communicate directly with each other through the orchestrator. This is simple but can create bottlenecks.

Event-Driven

Agents emit events that other agents can subscribe to. This is more scalable and decoupled.

class EventBus:
    def __init__(self):
        self.handlers = defaultdict(list)

    def subscribe(self, event_type: str, handler):
        self.handlers[event_type].append(handler)

    def emit(self, event_type: str, data: dict):
        for handler in self.handlers[event_type]:
            handler(data)

# Usage
bus = EventBus()
bus.subscribe("research_complete", writing_agent.on_research_done)
bus.subscribe("draft_ready", review_agent.on_draft_ready)
Enter fullscreen mode Exit fullscreen mode

Shared Memory with Context Window Management

One of the biggest challenges in multi-agent systems is managing context. Each agent has a limited context window, and passing full conversation histories between agents quickly becomes impractical.

class ContextManager:
    def __init__(self, max_tokens: int = 8000):
        self.max_tokens = max_tokens
        self.summaries = {}

    def compress(self, messages: List[AgentMessage]) -> str:
        """Compress message history into a summary."""
        total_tokens = sum(
            len(m.content.split()) for m in messages
        )

        if total_tokens <= self.max_tokens:
            return "\n".join(m.content for m in messages)

        # Summarize older messages, keep recent ones
        recent = messages[-5:]
        older = messages[:-5]
        summary = self._summarize(older)

        return f"[Summary of earlier context]\n{summary}\n\n[Recent messages]\n" + \
               "\n".join(m.content for m in recent)
Enter fullscreen mode Exit fullscreen mode

State Management and Persistence

For production systems, you need persistent state management:

import sqlite3
import json

class AgentStateStore:
    def __init__(self, db_path: str = "agent_state.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS agent_states (
                task_id TEXT PRIMARY KEY,
                agent_role TEXT,
                state JSON,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

    def save_state(self, task_id: str, role: str, state: dict):
        self.conn.execute(
            "INSERT OR REPLACE INTO agent_states VALUES (?, ?, ?, ?)",
            (task_id, role, json.dumps(state), datetime.now())
        )
        self.conn.commit()

    def load_state(self, task_id: str, role: str) -> Optional[dict]:
        cursor = self.conn.execute(
            "SELECT state FROM agent_states WHERE task_id=? AND agent_role=?",
            (task_id, role)
        )
        row = cursor.fetchone()
        return json.loads(row[0]) if row else None
Enter fullscreen mode Exit fullscreen mode

For the complete guide with all code examples and advanced patterns, read the full article on our blog.


Originally published at WD Tech Blog. Follow for more Python tutorials, AI tools, and developer resources.

Top comments (0)