A few days ago, I wrote about building an agentic pipeline from scratch in pure Python. The idea was intentionally simple: receive a task, invoke tools, and generate a response.
That architecture works surprisingly well for linear workflows. But real-world AI systems become complicated much faster than most tutorials suggest.
Eventually, one agent is no longer enough. A single reasoning loop starts accumulating too many responsibilities. The prompt grows uncontrollably, tool definitions pile up, execution logic becomes tangled, and debugging turns into a nightmare. What started as a clean “AI agent” slowly becomes a monolith trying to do everything at once.
This is where the Supervisor Pattern becomes useful.
Instead of relying on one giant agent, we introduce a central orchestrator responsible for coordinating specialized executors. Some executors may be tools, others may be reusable workflows, and others may be autonomous agents focused on a specific domain.
Conceptually, this is much closer to how modern AI systems operate internally. Systems like GitHub Copilot, Claude, and many enterprise AI platforms are not simply “one prompt talking to one model.” A significant part of the engineering complexity comes from orchestration: deciding what should execute, when it should execute, and how results should be combined.
In this article, we will build a simplified supervisor-based multi-agent architecture entirely in pure Python without relying on orchestration frameworks.
From Single Agents to Execution Runtimes
Most beginner agent tutorials follow roughly the same structure. A user sends a task, the agent reasons about it, invokes tools when necessary, and eventually produces an answer.
At small scale, this works well enough.
But imagine a more realistic request:
Research vector databases,
generate implementation examples,
analyze project files,
and review the generated solution.
A single agent now has to:
- reason about research,
- inspect files,
- generate code,
- review output,
- manage tools,
- maintain context,
- and orchestrate execution order.
The problem is not necessarily model capability. The problem is architecture.
At some point, the “agent” stops being just a reasoning unit and accidentally becomes a workflow engine.
The Supervisor Pattern solves this by separating orchestration from execution.
Instead of one overloaded agent, we create specialized executors coordinated by a supervisor.
The architecture looks like this:
Supervisor
|
+----------------+----------------+
| | |
Tools Skills Agents
| | |
File Search Ticket Workflow Coding Agent
Web Search Log Analysis Research Agent
The important idea here is that the supervisor does not care whether it is invoking a tool, a workflow, or another agent. Everything follows the same execution contract.
Creating a Common Execution Interface
One of the simplest but most important architectural decisions is standardizing how execution works.
Instead of creating separate orchestration logic for tools, agents, and workflows, we can define a shared interface:
from abc import ABC, abstractmethod
class Executable(ABC):
@abstractmethod
async def execute(self, task: str, context: dict):
pass
This abstraction becomes surprisingly powerful.
A search tool can implement it. A coding agent can implement it. A reusable workflow can implement it. To the supervisor, everything becomes just another executable component.
That greatly simplifies orchestration.
Building Specialized Executors
Now let’s create a few executors with different responsibilities.
We will start with a simple search tool:
class SearchTool(Executable):
async def execute(self, task, context):
return f"""
Searching documentation for:
{task}
"""
Then a file analysis tool:
class FileAnalysisTool(Executable):
async def execute(self, task, context):
return f"""
Analyzing project files for:
{task}
"""
These tools are intentionally small and focused. They represent atomic capabilities.
Now we can create specialized agents.
A research agent:
class ResearchAgent(Executable):
async def execute(self, task, context):
return f"""
Research findings for:
{task}
"""
A coding agent:
class CodingAgent(Executable):
async def execute(self, task, context):
return f"""
Generated implementation for:
{task}
"""
And finally, a review agent:
class ReviewAgent(Executable):
async def execute(self, task, context):
return f"""
Code review completed for:
{task}
"""
This separation of concerns is one of the biggest advantages of supervisor architectures. Each executor remains isolated and focused, which makes the overall system easier to scale and debug.
Dynamic Registration
Hardcoding dependencies quickly becomes painful as the number of executors grows.
Instead, we can create a registry capable of dynamically storing and discovering executors at runtime.
class Registry:
def __init__(self):
self.executables = {}
def register(
self,
name,
description,
executable_type,
instance
):
self.executables[name] = {
"description": description,
"type": executable_type,
"instance": instance
}
def get(self, name):
return self.executables[name]
def list(self):
return self.executables
Now we can register everything dynamically:
registry = Registry()
registry.register(
"search_tool",
"Searches technical documentation",
"tool",
SearchTool()
)
registry.register(
"file_analysis_tool",
"Analyzes project files",
"tool",
FileAnalysisTool()
)
registry.register(
"research_agent",
"Performs research tasks",
"agent",
ResearchAgent()
)
registry.register(
"coding_agent",
"Generates code",
"agent",
CodingAgent()
)
registry.register(
"review_agent",
"Reviews implementations",
"agent",
ReviewAgent()
)
At this point, the system starts feeling much more like a runtime instead of a simple chatbot wrapper.
Building the Supervisor
The supervisor is the heart of the architecture.
Its responsibility is not necessarily to solve the task directly, but rather to decide which executors should participate in solving it.
We will start with a very simple planner:
class Supervisor:
def __init__(self, registry):
self.registry = registry
async def plan(self, task):
selected = []
lower_task = task.lower()
if "research" in lower_task:
selected.append("research_agent")
if "implement" in lower_task:
selected.append("coding_agent")
if "review" in lower_task:
selected.append("review_agent")
if "search" in lower_task:
selected.append("search_tool")
return selected
This planner is intentionally primitive. In production systems, this planning phase is often delegated to an LLM that returns structured execution plans.
But even this simplified version already demonstrates the architecture.
The supervisor receives a goal and decides dynamically which executors should participate.
Parallel Execution
Now comes the most interesting part.
Instead of executing everything sequentially, the supervisor can orchestrate independent tasks concurrently.
import asyncio
class Supervisor:
def __init__(self, registry):
self.registry = registry
async def plan(self, task):
selected = []
lower_task = task.lower()
if "research" in lower_task:
selected.append("research_agent")
if "implement" in lower_task:
selected.append("coding_agent")
if "review" in lower_task:
selected.append("review_agent")
if "search" in lower_task:
selected.append("search_tool")
return selected
async def execute(self, task, context):
selected = await self.plan(task)
executions = []
for name in selected:
executable = self.registry.get(name)["instance"]
executions.append(
executable.execute(task, context)
)
results = await asyncio.gather(*executions)
return results
This changes the nature of the system completely.
We are no longer building a linear tool-calling loop. We are building an orchestration runtime capable of coordinating distributed execution.
That distinction matters a lot.
Running the System
Now we can execute the entire pipeline:
async def main():
supervisor = Supervisor(registry)
result = await supervisor.execute(
"""
Research vector databases,
search implementation examples,
implement a prototype,
and review the generated code
""",
{}
)
for item in result:
print(item)
asyncio.run(main())
The interesting part is not the mock outputs themselves. The interesting part is the orchestration model emerging underneath.
The supervisor analyzes the task, dynamically selects executors, parallelizes execution, and aggregates results back into a unified workflow.
That is much closer to how modern AI systems actually operate internally.
The Architectural Shift
At this point, the system has evolved far beyond a simple “AI chatbot.”
The supervisor is acting simultaneously as:
- planner,
- router,
- scheduler,
- orchestrator.
This is also why many production AI systems are significantly more complicated than “send prompt, receive response.”
A large portion of the engineering complexity comes from:
- orchestration,
- execution management,
- concurrency,
- state propagation,
- retries,
- failure isolation,
- observability.
The model itself is only one piece of the system.
Moving Toward LLM-Based Planning
Our planner currently uses simple rule matching:
if "review" in task:
selected.append("review_agent")
But modern systems usually replace this with an LLM planner capable of generating structured execution plans.
Something like:
prompt = f"""
Task:
{task}
Available executors:
- research_agent
- coding_agent
- review_agent
- search_tool
Return the executors required.
"""
The LLM might return:
{
"executors": [
"research_agent",
"coding_agent",
"review_agent"
],
"parallel": true
}
At that point, the runtime becomes significantly more autonomous.
The supervisor is no longer following hardcoded execution paths. It is dynamically constructing execution graphs at runtime.
Production Realities
This is where things become genuinely difficult.
Once agents can invoke tools, workflows, and even other agents,
the runtime itself becomes the primary engineering challenge.
You suddenly need to think about:
- recursion protection,
- concurrency limits,
- cancellation,
- retries,
- structured outputs,
- tracing,
- execution graphs,
- timeout management.
For example, agents invoking agents can accidentally create infinite loops:
Supervisor -> ResearchAgent
ResearchAgent -> Supervisor
Supervisor -> ResearchAgent
You quickly realize that the difficult part of AI systems is often not model invocation.
The difficult part is building reliable orchestration around the model.
Final Thoughts
Once you understand the Supervisor Pattern, you stop thinking about AI agents as isolated chatbots.
You start thinking in terms of execution runtimes, orchestration graphs, distributed reasoning, and autonomous workflows.
That shift in perspective changes everything.
And interestingly, none of this requires a framework.
Underneath most orchestration libraries, the core execution model is still surprisingly simple:
await executable.execute(task, context)
Everything else is architecture layered on top.
Top comments (0)