My first version was a single agent with 12 tools. Product search, page lookup, order tracking, general chitchat. One system prompt, one LLM call, one response.
It worked fine for simple questions. "Show me running shoes" returned running shoes. "What's your return policy?" returned the return policy.
Then someone asked: "Do you have the Nike Air Max in size 42, and what's your return policy for shoes?"
The agent searched for Nike Air Max, found it, answered the sizing question, and completely forgot about the return policy. It had already used up its reasoning budget on the product search. The return policy part of the question just vanished.
That is when I started breaking it into specialized agents.
The Architecture
I built a chat assistant for e-commerce stores. It needs to handle product discovery (search, filter, compare), customer support (store policies, shipping info), and order tracking (status lookup via the store's API). These are fundamentally different tasks. A product search agent needs vector search tools and product database access. A customer support agent needs policy documents. An order tracking agent needs an external API client with HMAC signing.
Here is the graph:
START -> summarize -> classify -> [parallel agents] -> merge -> END
The classify node looks at the customer's message and decides which agents need to run. If someone asks "Show me running shoes and what's your return policy?", the classifier produces two assignments:
[
AgentAssignment(agent="product_expert", sub_query="Show me running shoes"),
AgentAssignment(agent="customer_support", sub_query="What is your return policy?"),
]
Each agent gets its own sub-query. They run in parallel. Then a merge node combines the responses.
Here's what the implementation actually looks like.
The Send API Is the Non-Obvious Part
Most LangGraph tutorials show conditional edges: "if category is X, go to node A; if Y, go to node B." That is fine for single-agent routing. But when you need two agents to run in parallel on the same message, you need the Send API.
from langgraph.types import Send
def _route_to_agents(state: GraphState) -> list[Send] | str:
assignments = state.get("classification_assignments", [])
agent_to_node = {
"product_expert": "product_expert",
"customer_support": "customer_support",
"order_tracking": "order_tracking",
"general": "general",
}
sends = []
for assignment in assignments:
agent = assignment.get("agent")
node = agent_to_node.get(agent)
if node:
sub_query = assignment.get("sub_query")
sends.append(Send(node, {**state, "current_sub_query": sub_query}))
if sends:
return sends
# Fallback: if no assignments, send to general
messages = state.get("messages", [])
fallback_query = messages[-1].content if messages else ""
return [Send("general", {**state, "current_sub_query": fallback_query})]
Each Send creates a branch with its own copy of the state, with current_sub_query set to that agent's specific sub-question. The agents run in parallel and their results converge at the merge node.
The part that tripped me up: when parallel branches write to the same state key, you need a reducer. Without one, the last branch to finish just overwrites the others.
from typing import Annotated
def _merge_agent_results(left: dict | None, right: dict | None) -> dict:
if right is not None and not right:
return {} # Empty dict resets (used between turns)
if left is None:
return right or {}
if right is None:
return left
return {**left, **right}
class GraphState(MessagesState):
agent_results: Annotated[dict | None, _merge_agent_results] = None
The Annotated type with the reducer function tells LangGraph how to merge agent_results when multiple branches update it. Each agent writes its result under its own key (like {"product_expert": {...}}), and the reducer merges them into one dict.
One subtle detail: that empty-dict check at the top of the reducer. The classify node resets agent_results to {} at the start of each turn. Without this, results from the previous turn would bleed into the current one. I found this bug after a customer got a response that mixed product recommendations from their first question with policy information from their second.
State Management with contextvars
LangGraph passes state between nodes as function arguments. But tools, services, and utility functions that run deep in the call stack also need access to request-scoped data. I needed three distinct scopes:
ExecutionContext (immutable per request): store ID, language, thread ID. Set once, never changes.
@dataclass(frozen=True)
class ExecutionContext:
store_id: int
language: str
thread_id: str | None = None
GraphState (mutable per turn): messages, conversation summary, classification results, agent outputs.
ServiceCache (request-scoped singletons): LLM instances, search services, database lookups that should only happen once per request.
class ServiceCache:
@classmethod
def get_llm(cls, model_type="normal", temperature=None):
ctx = ExecutionContext.get()
cache_key = f"llm:{ctx.store_id}:{model_type}:{temperature}"
cache = cls._get_cache()
if cache_key not in cache:
cache[cache_key] = LLMHelperService.for_context(
model_type=model_type, temperature=temperature,
)
return cache[cache_key]
All three live in contextvars.ContextVar, which gives thread safety without passing objects through every function signature. A tool 5 layers deep in the call stack can call ExecutionContext.get_store_id() and get the right store.
The alternative is threading the store ID through every function call. I tried that first. It works until you have 15 tools across 3 agents, each calling 2-3 services. Then your function signatures turn into def search_products(query, store_id, language, ...) everywhere.
Merging N Agent Responses
Tutorials show merging two things. Production needs to handle 1, 2, 3, or 4 agents responding simultaneously.
For a single agent, there is no merge. Just pass through:
if len(agents) == 1:
first_result = agent_results[agents[0]]
response_text = first_result.get("response", "")
For multiple agents, an LLM merges the responses into a single coherent reply. The merge prompt gets all agent responses formatted as numbered sections, plus the original query and the customer's language.
One design decision I had to make: confidence aggregation. When three agents respond and one has low confidence, should that trigger a human handoff? My formula weights the minimum confidence at 70% and the average at 30%:
def _get_confidence(agent_results: dict) -> float:
confidences = [r.get("confidence", 0.5) for r in agent_results.values()]
if len(confidences) == 1:
return confidences[0]
return 0.7 * min(confidences) + 0.3 * sum(confidences) / len(confidences)
This way, one confused agent can flag the whole response for review, but two confident agents can offset a slightly uncertain third. I tuned these weights manually over a few hundred test conversations.
What Goes Wrong
Misclassification. The classifier sometimes sends a product question to the general agent, or splits a single question into two agents when one would do. I use the "fast" LLM for classification (cheaper, faster) and the "normal" model for the agents themselves. The tradeoff is that the fast model occasionally gets the routing wrong. Structured output with Pydantic helps (the classifier returns a typed list[AgentAssignment], not free text), but it is not perfect.
Feature availability at runtime. Order tracking requires the store to have an API endpoint configured. If the classifier routes to order tracking but the store has not set one up, I remap to customer support at dispatch time:
if agent == "order_tracking" and not order_tracking_available:
agent = "customer_support"
This means the customer support agent gets a question like "Where is my order #12345?" and has to explain that order tracking is not available, rather than just failing silently.
The first-turn optimization. Generating a conversation summary on the very first message is wasteful. There is nothing to summarize. So the summarize node checks:
def _is_first_turn(messages, previous_summary):
return len(messages) == 1 and not previous_summary
On first turn, it just truncates the message to 200 characters as the "summary." The LLM call only happens from the second turn onward, when there is actual conversation history to compress.
Limitations
This architecture adds latency. Classify, dispatch, execute, merge: that is 3-4 LLM calls minimum for a multi-agent response, versus 1 for a single agent. For simple questions ("What are your store hours?"), the overhead of classification and merging is unnecessary. I have not found a clean way to short-circuit this without adding yet another classification step.
The connection pool for the PostgreSQL checkpointer needs careful sizing. I run 7 Gunicorn workers with a pool of max 5 connections each. That is 35 potential connections just for checkpointing, on top of Django's own database connections. If your database has a low max_connections, this will bite you.
The confidence aggregation formula is hand-tuned. It works for my use case but I have no proof it generalizes. If you have a better approach for aggregating confidence across parallel agents, I would like to hear it.
The product expert agent uses a hybrid search pipeline (vector + BM25 with cross-encoder reranking). You can see all four agents working on a real catalog with a free sandbox at Emporiqa.
Top comments (0)