Building Multi-Agent Systems with LangChain: A Complete Guide
Multi-agent systems represent the next evolution in AI application development. Instead of relying on a single monolithic AI model, multi-agent systems enable multiple specialized agents to collaborate, reason, and solve complex problems together. At Groovy Web, we've built production-grade multi-agent systems that power everything from automated research pipelines to enterprise knowledge management platforms.
This guide will take you through everything you need to know to build sophisticated multi-agent systems using LangChain and LangGraph.
Table of Contents
- Understanding Multi-Agent Systems
- Core Concepts and Architecture
- Setting Up Your Development Environment
- Building Your First Multi-Agent System
- Advanced Communication Patterns
- Task Delegation Strategies
- Real-World Use Case: Research Assistant
- Production Best Practices
- Performance Optimization
- Monitoring and Debugging
Understanding Multi-Agent Systems
What Are Multi-Agent Systems?
A multi-agent system consists of multiple autonomous agents that interact with each other to achieve individual or collective goals. Each agent has specific capabilities, knowledge, and responsibilities. By working together, they can solve problems that would be difficult or impossible for a single agent to handle alone.
Why Use Multi-Agent Systems?
1. Specialization
Different agents can specialize in different domains or tasks. For example:
- A research agent that gathers and synthesizes information
- A code agent that writes and reviews code
- An analysis agent that evaluates and critiques results
- A formatting agent that structures output for specific audiences
2. Parallel Processing
Agents can work simultaneously on different aspects of a problem, dramatically reducing total processing time.
3. Resilience
If one agent fails, others can continue working, making the system more robust.
4. Scalability
You can add new agents without restructuring the entire system.
5. Better Reasoning
Agents can debate, critique, and refine each other's work, leading to higher-quality outputs.
Core Concepts and Architecture
Agent Types
1. ReAct Agents
ReAct (Reasoning + Acting) agents combine reasoning traces with action execution. They:
- Think through problems step-by-step
- Decide what actions to take
- Observe the results
- Continue until completion
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
from langchain import hub
# Initialize the model
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Define tools
def search_tool(query: str) -> str:
"""Search for information online."""
# Implementation here
return f"Results for: {query}"
def calculator_tool(expression: str) -> str:
"""Evaluate mathematical expressions."""
try:
result = eval(expression)
return f"Result: {result}"
except:
return "Error: Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching current information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for mathematical calculations"
)
]
# Get the prompt template
prompt = hub.pull("hwchase17/openai-tools-agent")
# Create the agent
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
2. OpenAI Functions Agents
Optimized for OpenAI's function calling API, these agents are more reliable and faster than ReAct agents.
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
@tool
def search(query: str) -> str:
"""Search the web for current information."""
# Implementation
return f"Search results for: {query}"
@tool
def analyze_code(code: str) -> str:
"""Analyze code for potential issues."""
# Implementation
return f"Analysis of: {code[:50]}..."
tools = [search, analyze_code]
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = hub.pull("hwchase17/openai-functions-agent")
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
Communication Patterns
1. Hierarchical Communication
A coordinator agent manages other agents and delegates tasks.
┌─────────────────────────────────────┐
│ Coordinator Agent │
│ - Receives user request │
│ - Decomposes into subtasks │
│ - Assigns to specialist agents │
│ - Aggregates results │
└─────────────────────────────────────┘
│
├────────────────┬────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Research │ │ Code Agent │ │ Analysis │
│ Agent │ │ │ │ Agent │
└─────────────┘ └─────────────┘ └─────────────┘
2. Peer-to-Peer Communication
Agents communicate directly with each other without a central coordinator.
3. Broadcast Communication
An agent sends messages to all other agents simultaneously.
State Management
Multi-agent systems need to maintain state across agent interactions. LangGraph provides excellent state management capabilities:
from typing import TypedDict, Annotated, Sequence
from operator import add
from langchain_openai import ChatOpenAI
class AgentState(TypedDict):
messages: Annotated[Sequence[str], add]
current_step: str
research_data: dict
code_generated: list
analysis_results: dict
next_agent: str
Setting Up Your Development Environment
Installation
# Core LangChain packages
pip install langchain langchain-openai langchain-community
# LangGraph for multi-agent orchestration
pip install langgraph
# Additional utilities
pip install python-dotenv tiktoken
# For specific tools
pip install requests beautifulsoup4 pandas numpy
Environment Configuration
Create a .env file:
OPENAI_API_KEY=your_api_key_here
SERPER_API_KEY=your_search_api_key # For web search
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langchain_api_key
LANGCHAIN_PROJECT=multi-agent-system
Project Structure
multi-agent-system/
├── agents/
│ ├── __init__.py
│ ├── base.py # Base agent classes
│ ├── research.py # Research specialist
│ ├── code.py # Code generation specialist
│ ├── analysis.py # Analysis specialist
│ └── coordinator.py # Coordinator agent
├── tools/
│ ├── __init__.py
│ ├── search.py
│ ├── database.py
│ └── file_ops.py
├── utils/
│ ├── __init__.py
│ ├── state.py
│ └── monitoring.py
├── config.py
├── main.py
└── requirements.txt
Building Your First Multi-Agent System
Let's build a practical multi-agent system: an automated content research and creation pipeline.
Step 1: Define Agent States
from typing import TypedDict, Annotated, Sequence, List
from operator import add
from langchain_core.messages import BaseMessage
class ContentResearchState(TypedDict):
"""State for content research multi-agent system"""
# Core conversation
messages: Annotated[Sequence[BaseMessage], add]
# Topic and requirements
topic: str
target_audience: str
content_type: str # blog, whitepaper, tutorial, etc.
# Research phase
research_queries: List[str]
research_results: List[dict]
sources_used: List[str]
# Content creation phase
outline: dict
draft_content: str
reviewed_content: str
final_content: str
# Metadata
current_agent: str
agent_history: List[str]
iteration_count: int
quality_score: float
Step 2: Create Individual Agents
Research Agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
import requests
class ResearchResult(BaseModel):
"""Schema for research results"""
query: str = Field(description="The search query used")
key_findings: List[str] = Field(description="Key findings from research")
sources: List[str] = Field(description="Credible sources found")
data_points: List[dict] = Field(description="Specific data points and statistics")
confidence_score: float = Field(description="Confidence in findings (0-1)")
class ResearchAgent:
"""Specialized agent for conducting research"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm
self.name = "Research Agent"
def generate_search_queries(self, topic: str, audience: str, count: int = 5) -> List[str]:
"""Generate optimal search queries for the topic"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert research strategist. Given a topic and target audience,
generate {count} diverse, high-quality search queries that will uncover:
- Latest trends and developments
- Statistics and data
- Expert opinions and case studies
- Common pain points and solutions
- Competitor content gaps
Return only the queries, one per line."""),
("user", "Topic: {topic}\nTarget Audience: {audience}")
])
chain = prompt | self.llm
response = chain.invoke({
"topic": topic,
"audience": audience,
"count": count
})
queries = [q.strip() for q in response.content.split('\n') if q.strip()]
return queries[:count]
def conduct_research(self, queries: List[str]) -> List[ResearchResult]:
"""Conduct research using multiple queries"""
results = []
for query in queries:
# Implement your search logic here
# This could use Serper API, Tavily, or custom search
search_results = self._search(query)
# Analyze and structure results
analysis_prompt = ChatPromptTemplate.from_messages([
("system", """Analyze the following search results and extract:
1. Key findings (3-5 points)
2. Credible sources (top 3-5)
3. Important data points with statistics
4. Confidence score (0-1) based on source quality
Search Query: {query}
Search Results: {results}"""),
("user", "Provide structured analysis.")
])
parser = PydanticOutputParser(pydantic_object=ResearchResult)
chain = analysis_prompt | self.llm | parser
try:
result = chain.invoke({
"query": query,
"results": search_results
})
results.append(result)
except Exception as e:
print(f"Error analyzing results for query '{query}': {e}")
continue
return results
def _search(self, query: str) -> str:
"""Execute search using your preferred API"""
# Example using a placeholder search function
# In production, use Serper, Tavily, or similar
return f"Search results for: {query}"
def synthesize_research(self, results: List[ResearchResult]) -> str:
"""Synthesize all research into a comprehensive summary"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a research synthesizer. Combine findings from multiple
research queries into a comprehensive, structured summary that includes:
1. Executive Summary (3-4 sentences)
2. Key Themes (3-5 main themes)
3. Critical Data Points (organized by theme)
4. Source Credibility Assessment
5. Research Gaps (what's missing)
Research Results:
{results}"""),
("user", "Provide comprehensive synthesis.")
])
formatted_results = "\n\n".join([
f"Query: {r.query}\nFindings: {r.key_findings}\nSources: {r.sources}"
for r in results
])
chain = prompt | self.llm
response = chain.invoke({"results": formatted_results})
return response.content
Content Generation Agent
from typing import Optional
import json
class ContentAgent:
"""Specialized agent for content creation"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm
self.name = "Content Agent"
def create_outline(self, topic: str, research: str, content_type: str) -> dict:
"""Create structured content outline"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content strategist. Create a detailed outline
for a {content_type} about {topic}.
Based on the research provided, create an outline that includes:
1. Compelling title options (5 variations)
2. Introduction structure (hook, thesis, roadmap)
3. Main sections (3-7) with subsections
4. Key points for each section
5. Data and examples to include
6. Conclusion structure
7. Call-to-action recommendations
Research:
{research}
Return as JSON with this structure:
{{
"title_options": ["..."],
"introduction": {{"hook": "...", "thesis": "...", "sections_preview": ["..."]}},
"main_sections": [
{{
"heading": "...",
"subsections": ["..."],
"key_points": ["..."],
"data_points": ["..."],
"word_count_estimate": 500
}}
],
"conclusion": {{"summary": "...", "key_takeaways": ["..."], "cta": "..."}},
"seo_keywords": ["..."],
"total_word_count_estimate": 2500
}}"""),
("user", "Create comprehensive outline.")
])
chain = prompt | self.llm
response = chain.invoke({
"topic": topic,
"research": research,
"content_type": content_type
})
try:
outline = json.loads(response.content)
return outline
except:
# Fallback if JSON parsing fails
return {"raw_outline": response.content}
def generate_content(self, outline: dict, research: str, tone: str = "professional") -> str:
"""Generate full content based on outline"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content writer. Write a comprehensive article
based on the provided outline and research.
Requirements:
- Use a {tone} tone
- Include all sections from the outline
- Incorporate data and examples from research
- Use clear, engaging language
- Add transitions between sections
- Include subheadings for readability
- Optimize for SEO with natural keyword usage
- Add meta description (150-160 characters)
Outline:
{outline}
Research:
{research}
Write the complete article now."""),
("user", "Generate full content.")
])
chain = prompt | self.llm
response = chain.invoke({
"outline": json.dumps(outline, indent=2),
"research": research,
"tone": tone
})
return response.content
Review and Refinement Agent
class ReviewAgent:
"""Specialized agent for content review and refinement"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm
self.name = "Review Agent"
def review_content(self, content: str, outline: dict) -> dict:
"""Review content against requirements and best practices"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content editor. Review the following content
against the outline and provide detailed feedback.
Evaluate:
1. Structure and Organization (0-10)
2. Content Quality and Depth (0-10)
3. Clarity and Readability (0-10)
4. SEO Optimization (0-10)
5. Engagement and Flow (0-10)
6. Factual Accuracy (0-10)
Provide:
- Overall quality score (0-100)
- Strengths (3-5 points)
- Weaknesses (3-5 points)
- Specific improvement suggestions (5-10 points)
- Recommended changes with examples
Content:
{content}
Original Outline:
{outline}
Return review as JSON."""),
("user", "Provide comprehensive review.")
])
chain = prompt | self.llm
response = chain.invoke({
"content": content,
"outline": json.dumps(outline, indent=2)
})
try:
review = json.loads(response.content)
return review
except:
return {"raw_review": response.content}
def refine_content(self, content: str, review: dict) -> str:
"""Refine content based on review feedback"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content editor. Refine the following content
based on the review feedback provided.
Review Feedback:
{review}
Original Content:
{content}
Requirements:
- Address all weaknesses identified
- Implement suggested improvements
- Maintain the strengths
- Preserve the original voice and style
- Ensure all changes improve quality
Return the refined content."""),
("user", "Refine the content.")
])
chain = prompt | self.llm
response = chain.invoke({
"review": json.dumps(review, indent=2),
"content": content
})
return response.content
Step 3: Build the Multi-Agent Orchestration with LangGraph
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import operator
from typing import Literal
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Initialize agents
research_agent = ResearchAgent(llm)
content_agent = ContentAgent(llm)
review_agent = ReviewAgent(llm)
def research_node(state: ContentResearchState) -> ContentResearchState:
"""Conduct research phase"""
print("🔬 Research Agent: Starting research phase...")
# Generate search queries
queries = research_agent.generate_search_queries(
state["topic"],
state["target_audience"]
)
state["research_queries"] = queries
# Conduct research
results = research_agent.conduct_research(queries)
state["research_results"] = [r.dict() for r in results]
# Synthesize research
synthesis = research_agent.synthesize_research(results)
state["messages"].append(("system", f"Research synthesis: {synthesis}"))
# Update agent history
state["agent_history"].append("research")
state["current_agent"] = "content"
print(f"✅ Research completed. Found {len(results)} research results.")
return state
def outline_node(state: ContentResearchState) -> ContentResearchState:
"""Create content outline"""
print("📝 Content Agent: Creating outline...")
# Get research synthesis
research_text = state["messages"][-1][1]
# Create outline
outline = content_agent.create_outline(
state["topic"],
research_text,
state["content_type"]
)
state["outline"] = outline
print(f"✅ Outline created with {len(outline.get('main_sections', []))} main sections.")
return state
def content_generation_node(state: ContentResearchState) -> ContentResearchState:
"""Generate content"""
print("✍️ Content Agent: Generating content...")
research_text = state["messages"][-1][1]
content = content_agent.generate_content(
state["outline"],
research_text
)
state["draft_content"] = content
print(f"✅ Content generated ({len(content)} characters).")
return state
def review_node(state: ContentResearchState) -> ContentResearchState:
"""Review and refine content"""
print("👀 Review Agent: Reviewing content...")
review = review_agent.review_content(
state["draft_content"],
state["outline"]
)
state["quality_score"] = review.get("overall_quality_score", 0)
state["messages"].append(("system", f"Review: {review}"))
print(f"📊 Quality Score: {state['quality_score']}/100")
# If quality is insufficient, refine
if state["quality_score"] < 80:
print("🔄 Quality below threshold. Refining...")
refined = review_agent.refine_content(
state["draft_content"],
review
)
state["reviewed_content"] = refined
state["final_content"] = refined
else:
state["reviewed_content"] = state["draft_content"]
state["final_content"] = state["draft_content"]
print("✅ Review complete.")
return state
def should_continue(state: ContentResearchState) -> Literal["continue", "end"]:
"""Decide whether to continue or end"""
if state.get("quality_score", 0) >= 80:
return "end"
elif state["iteration_count"] >= 3:
return "end"
else:
state["iteration_count"] += 1
return "continue"
# Build the graph
workflow = StateGraph(ContentResearchState)
# Add nodes
workflow.add_node("research", research_node)
workflow.add_node("outline", outline_node)
workflow.add_node("generate_content", content_generation_node)
workflow.add_node("review", review_node)
# Define edges
workflow.set_entry_point("research")
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "generate_content")
workflow.add_edge("generate_content", "review")
workflow.add_conditional_edges(
"review",
should_continue,
{
"continue": "generate_content",
"end": END
}
)
# Compile the graph
app = workflow.compile()
Step 4: Execute the Multi-Agent System
def run_content_research_system(
topic: str,
target_audience: str,
content_type: str
) -> dict:
"""Execute the complete multi-agent system"""
# Initialize state
initial_state = ContentResearchState(
messages=[],
topic=topic,
target_audience=target_audience,
content_type=content_type,
research_queries=[],
research_results=[],
sources_used=[],
outline={},
draft_content="",
reviewed_content="",
final_content="",
current_agent="research",
agent_history=[],
iteration_count=0,
quality_score=0.0
)
print(f"\n🚀 Starting Multi-Agent Content Research System")
print(f"📌 Topic: {topic}")
print(f"👥 Target Audience: {target_audience}")
print(f"📄 Content Type: {content_type}\n")
print("=" * 70)
# Execute the workflow
result = app.invoke(initial_state)
print("\n" + "=" * 70)
print("✅ Multi-Agent System Execution Complete!")
print(f"\n📊 Final Quality Score: {result['quality_score']}/100")
print(f"🔄 Iterations: {result['iteration_count']}")
print(f"👥 Agents Used: {', '.join(result['agent_history'])}")
return result
# Example usage
if __name__ == "__main__":
result = run_content_research_system(
topic="Building Multi-Agent Systems with LangChain",
target_audience="Software Engineers and AI Developers",
content_type="technical_blog_post"
)
# Save final content
with open("final_content.md", "w") as f:
f.write(result["final_content"])
print("\n💾 Content saved to final_content.md")
Advanced Communication Patterns
1. Agent Handoff Protocol
Sometimes agents need to dynamically hand off tasks based on their capabilities:
def agent_handoff(state: ContentResearchState) -> str:
"""Determine which agent should handle the next step"""
current_agent = state["current_agent"]
messages = state["messages"]
# Analyze the situation
if current_agent == "research":
if len(state["research_results"]) < 3:
# Need more research
return "research"
else:
# Ready for content creation
return "content"
elif current_agent == "content":
quality_score = state.get("quality_score", 0)
if quality_score < 80:
return "review"
else:
return END
elif current_agent == "review":
iterations = state["iteration_count"]
if iterations < 3:
return "content"
else:
return END
return END
2. Collaborative Decision Making
Agents can collaborate on decisions:
class CollaborativeDecisionAgent:
"""Agent that facilitates collaborative decision-making"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm
def facilitate_discussion(self, agents: list, topic: str, state: dict) -> dict:
"""Facilitate discussion between multiple agents"""
discussion_history = []
for agent in agents:
# Get each agent's perspective
perspective = agent.provide_perspective(topic, state)
discussion_history.append({
"agent": agent.name,
"perspective": perspective
})
# Synthesize perspectives into a decision
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """You are a decision synthesizer. Given perspectives from
multiple specialized agents, make a recommendation.
Topic: {topic}
Perspectives:
{perspectives}
Provide:
1. Recommended decision
2. Rationale (300-500 words)
3. Confidence level (0-1)
4. Potential risks
5. Alternative approaches"""),
("user", "Synthesize and recommend.")
])
chain = synthesis_prompt | self.llm
decision = chain.invoke({
"topic": topic,
"perspectives": json.dumps(discussion_history, indent=2)
})
return {
"decision": decision.content,
"discussion_history": discussion_history
}
3. Hierarchical Task Delegation
class CoordinatorAgent:
"""Top-level coordinator that delegates to specialist agents"""
def __init__(self, llm: ChatOpenAI, specialists: dict):
self.llm = llm
self.specialists = specialists
def decompose_task(self, task: str) -> list:
"""Break down complex task into subtasks"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task decomposition specialist. Break down the
following task into subtasks that can be handled by specialized agents.
Available specialists:
{specialists}
Task: {task}
Return a list of subtasks, each with:
- description
- assigned_specialist
- dependencies (list of subtask IDs)
- estimated_complexity (1-10)
Format as JSON list."""),
("user", "Decompose this task.")
])
specialist_list = "\n".join([
f"- {name}: {agent.description}"
for name, agent in self.specialists.items()
])
chain = prompt | self.llm
response = chain.invoke({
"task": task,
"specialists": specialist_list
})
try:
subtasks = json.loads(response.content)
return subtasks
except:
return []
def execute_workflow(self, task: str) -> dict:
"""Execute complete workflow with coordination"""
# Decompose task
subtasks = self.decompose_task(task)
# Execute subtasks in dependency order
results = {}
completed = set()
for subtask in sorted(subtasks, key=lambda x: len(x.get("dependencies", []))):
# Check if dependencies are met
if all(dep in completed for dep in subtask.get("dependencies", [])):
specialist = self.specialists[subtask["assigned_specialist"]]
result = specialist.execute(subtask, results)
results[subtask["description"]] = result
completed.add(subtask["description"])
return results
Task Delegation Strategies
1. Dynamic Task Routing
Route tasks to the most appropriate agent based on task characteristics:
class TaskRouter:
"""Intelligently route tasks to appropriate agents"""
def __init__(self, agents: dict):
self.agents = agents
def route_task(self, task_description: str, context: dict) -> str:
"""Determine which agent should handle a task"""
# Analyze task characteristics
task_type = self._classify_task(task_description)
# Select best agent
agent_scores = {}
for agent_name, agent in self.agents.items():
score = agent.can_handle(task_type, context)
agent_scores[agent_name] = score
# Return agent with highest score
best_agent = max(agent_scores, key=agent_scores.get)
return best_agent
def _classify_task(self, task: str) -> str:
"""Classify task into a category"""
# Implement task classification logic
pass
2. Parallel Task Execution
Execute independent tasks in parallel:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelExecutor:
"""Execute multiple agents in parallel"""
def __init__(self, max_workers: int = 5):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def execute_parallel(self, tasks: list) -> list:
"""Execute multiple tasks in parallel"""
loop = asyncio.get_event_loop()
futures = []
for task in tasks:
future = loop.run_in_executor(
self.executor,
task["agent"].execute,
task["input"]
)
futures.append(future)
# Wait for all tasks to complete
results = loop.run_until_complete(asyncio.gather(*futures))
return results
3. Sequential Task Pipelines
Create pipelines where output of one agent feeds into the next:
class TaskPipeline:
"""Create sequential processing pipelines"""
def __init__(self, agents: list):
self.agents = agents
self.pipeline = self._build_pipeline()
def _build_pipeline(self) -> callable:
"""Build processing pipeline"""
def pipeline(input_data):
result = input_data
for agent in self.agents:
result = agent.process(result)
return result
return pipeline
def execute(self, input_data):
"""Execute the pipeline"""
return self.pipeline(input_data)
def add_agent(self, agent, position: int = None):
"""Add agent to pipeline"""
if position is None:
self.agents.append(agent)
else:
self.agents.insert(position, agent)
self.pipeline = self._build_pipeline()
Real-World Use Case: Research Assistant
Let's build a complete research assistant that can answer complex questions by coordinating multiple specialist agents.
System Architecture
User Query
│
▼
┌─────────────────────────────────────────┐
│ Coordinator Agent │
│ - Parse query │
│ - Identify research needs │
│ - Delegate to specialists │
└─────────────────────────────────────────┘
│
├──────────────┬──────────────┬──────────────┐
▼ ▼ ▼ ▼
Research Agent Analysis Agent Code Agent Writing Agent
(Web Search) (Data Analysis) (Generate) (Format)
│ │ │ │
└──────────────┴──────────────┴──────────────┘
│
▼
┌─────────────────┐
│ Synthesize │
│ and Present │
└─────────────────┘
Implementation
class ResearchAssistant:
"""Complete research assistant system"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
# Initialize specialist agents
self.agents = {
"researcher": ResearchAgent(self.llm),
"analyst": AnalysisAgent(self.llm),
"writer": WritingAgent(self.llm),
"coder": CodeAgent(self.llm)
}
# Build workflow graph
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the research assistant workflow"""
class ResearchState(TypedDict):
query: str
research_plan: list
research_data: dict
analysis: dict
answer: str
sources: list
confidence: float
workflow = StateGraph(ResearchState)
# Define nodes
def plan_research(state: ResearchState) -> ResearchState:
# Plan what research is needed
plan = self.agents["researcher"].plan_research(state["query"])
state["research_plan"] = plan
return state
def conduct_research(state: ResearchState) -> ResearchState:
# Conduct research based on plan
data = self.agents["researcher"].execute_research(state["research_plan"])
state["research_data"] = data
return state
def analyze_data(state: ResearchState) -> ResearchState:
# Analyze research findings
analysis = self.agents["analyst"].analyze(state["research_data"])
state["analysis"] = analysis
return state
def generate_answer(state: ResearchState) -> ResearchState:
# Generate comprehensive answer
answer = self.agents["writer"].write_answer(
state["query"],
state["research_data"],
state["analysis"]
)
state["answer"] = answer
return state
# Add nodes to workflow
workflow.add_node("plan", plan_research)
workflow.add_node("research", conduct_research)
workflow.add_node("analyze", analyze_data)
workflow.add_node("write", generate_answer)
# Define edges
workflow.set_entry_point("plan")
workflow.add_edge("plan", "research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)
return workflow.compile()
def ask(self, query: str) -> dict:
"""Ask a research question"""
print(f"\n🔍 Research Question: {query}\n")
# Initialize state
initial_state = {
"query": query,
"research_plan": [],
"research_data": {},
"analysis": {},
"answer": "",
"sources": [],
"confidence": 0.0
}
# Execute workflow
result = self.workflow.invoke(initial_state)
return result
# Example usage
assistant = ResearchAssistant()
result = assistant.ask(
"What are the current best practices for building scalable "
"multi-agent systems with LangChain in 2026?"
)
print("\n📚 Research Results:")
print(f"\n{result['answer']}")
print(f"\n📊 Confidence: {result['confidence']*100}%")
print(f"\n📖 Sources: {len(result['sources'])} sources used")
Production Best Practices
1. Error Handling and Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class ResilientAgent:
"""Agent with built-in resilience"""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def execute_with_retry(self, task: dict):
"""Execute task with automatic retry on failure"""
try:
return self.execute(task)
except Exception as e:
logger.error(f"Agent execution failed: {e}")
raise
2. Rate Limiting
from ratelimit import limits, sleep_and_retry
class RateLimitedAgent:
"""Agent with rate limiting"""
@sleep_and_retry
@limits(calls=100, period=60) # 100 calls per minute
def api_call(self, endpoint: str, data: dict):
"""Make rate-limited API calls"""
# Implementation
pass
3. Caching
from functools import lru_cache
import hashlib
import json
class CachedAgent:
"""Agent with intelligent caching"""
def __init__(self):
self.cache = {}
def get_cache_key(self, task: dict) -> str:
"""Generate cache key from task"""
task_str = json.dumps(task, sort_keys=True)
return hashlib.md5(task_str.encode()).hexdigest()
def execute_cached(self, task: dict):
"""Execute with caching"""
cache_key = self.get_cache_key(task)
if cache_key in self.cache:
logger.info("Cache hit!")
return self.cache[cache_key]
result = self.execute(task)
self.cache[cache_key] = result
return result
4. Monitoring and Observability
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
agent_calls = Counter('agent_calls_total', 'Total agent calls', ['agent_name', 'status'])
agent_duration = Histogram('agent_duration_seconds', 'Agent execution duration', ['agent_name'])
agent_errors = Counter('agent_errors_total', 'Total agent errors', ['agent_name', 'error_type'])
class MonitoredAgent:
"""Agent with comprehensive monitoring"""
def execute_monitored(self, task: dict):
"""Execute with monitoring"""
agent_name = self.__class__.__name__
start_time = time.time()
try:
result = self.execute(task)
# Record success metrics
agent_calls.labels(agent_name=agent_name, status='success').inc()
agent_duration.labels(agent_name=agent_name).observe(time.time() - start_time)
return result
except Exception as e:
# Record error metrics
agent_calls.labels(agent_name=agent_name, status='error').inc()
agent_errors.labels(agent_name=agent_name, error_type=type(e).__name__).inc()
raise
Performance Optimization
1. Batch Processing
class BatchProcessor:
"""Process multiple tasks efficiently in batches"""
def __init__(self, agent, batch_size: int = 10):
self.agent = agent
self.batch_size = batch_size
def process_batch(self, tasks: list) -> list:
"""Process tasks in batches"""
results = []
for i in range(0, len(tasks), self.batch_size):
batch = tasks[i:i + self.batch_size]
batch_results = self._process_batch(batch)
results.extend(batch_results)
return results
def _process_batch(self, batch: list) -> list:
"""Process a single batch"""
# Implement batch processing logic
pass
2. Parallel Agent Execution
from concurrent.futures import ProcessPoolExecutor
class ParallelAgentPool:
"""Execute agents in parallel processes"""
def __init__(self, max_workers: int = 4):
self.executor = ProcessPoolExecutor(max_workers=max_workers)
def execute_parallel(self, agent_class, tasks: list) -> list:
"""Execute multiple agents in parallel"""
futures = [
self.executor.submit(agent_class().execute, task)
for task in tasks
]
results = [future.result() for future in futures]
return results
Monitoring and Debugging
1. LangSmith Integration
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_langsmith_api_key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-system"
# All LangChain operations are now automatically traced
2. Custom Logging
import logging
from datetime import datetime
class AgentLogger:
"""Detailed logging for agent operations"""
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.logger = logging.getLogger(agent_name)
def log_execution(self, task: dict, result: dict, duration: float):
"""Log execution details"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"agent": self.agent_name,
"task": task,
"result_summary": self._summarize_result(result),
"duration_seconds": duration
}
self.logger.info(json.dumps(log_entry))
3. Visualization
from IPython.display import Image, display
def visualize_workflow(workflow):
"""Visualize the workflow graph"""
try:
display(Image(workflow.get_graph().draw_mermaid_png()))
except:
print("Graph visualization not available")
Conclusion
Multi-agent systems represent a powerful paradigm for building sophisticated AI applications. By leveraging LangChain and LangGraph, you can create systems that:
- Divide complex problems into manageable subtasks
- Assign specialized agents to handle specific aspects
- Enable agents to collaborate and communicate
- Scale horizontally by adding more agents
- Maintain clear separation of concerns
The key to success is careful design of:
- Agent specialization - Each agent should have a clear, focused purpose
- Communication patterns - Define how agents exchange information
- State management - Maintain consistent state across agent interactions
- Error handling - Build resilient systems that can recover from failures
- Monitoring - Track agent performance and system health
Next Steps
Ready to build your own multi-agent system? Here's what to do next:
- Start simple - Begin with 2-3 agents and gradually expand
- Test thoroughly - Verify each agent works correctly before integrating
- Monitor performance - Use LangSmith to trace execution flows
- Iterate rapidly - Refine agent behaviors based on results
- Scale carefully - Add complexity only when needed
Need Help Building Your Multi-Agent System?
At Groovy Web, we specialize in building production-grade AI systems with multi-agent architectures. Whether you need a research automation platform, content generation system, or custom AI workflow, we can help.
Contact Groovy Web for a free consultation about your AI project.
Further Reading:
Published: January 29, 2026 | Author: Groovy Web Team | Category: AI Development
Ready to Build Your Own Multi-Agent System?
At Groovy Web, we've shipped 200+ AI-first applications — including multi-agent orchestration systems running in production. If you're a founder or CTO looking to move fast without rebuilding from scratch, let's talk.
This post was originally published on groovyweb.co
Top comments (0)