Originalmente en bcloud.consulting
TL;DR
Después de 25+ implementaciones multi-agente con LangGraph, estos 5 patrones cubren el 95% de casos:
- Supervisor-Worker (90% casos)
- Pipeline Sequential (procesamiento)
- Collaborative Team (análisis complejo)
- Hierarchical (enterprise scale)
- Market-based (resource optimization)
Por Qué Multi-Agent > Single Agent
Un LLM intentando hacer todo es como un desarrollador full-stack construyendo Amazon solo.
No escala.
Multi-agent permite:
- Especialización por dominio
- Paralelización de tareas
- Mejor accuracy (validación cruzada)
- Escalabilidad real
Patrón #1: Supervisor-Worker (90% de casos)
El caballo de batalla. Simple, efectivo, escalable.
from langgraph.graph import StateGraph, END
from typing import Dict, List, Any
import asyncio
class SupervisorWorkerSystem:
"""
Supervisor decompone, asigna, y sintetiza.
Workers ejecutan tareas especializadas.
"""
def __init__(self):
self.supervisor = SupervisorAgent()
self.workers = {
'research': ResearchWorker(),
'analysis': AnalysisWorker(),
'writing': WritingWorker(),
'validation': ValidationWorker()
}
self.setup_graph()
def setup_graph(self):
self.graph = StateGraph(WorkflowState)
# Add nodes
self.graph.add_node("supervisor", self.supervisor_node)
for name, worker in self.workers.items():
self.graph.add_node(name, worker.process)
# Define edges
self.graph.add_edge("supervisor", "research")
self.graph.add_edge("supervisor", "analysis")
self.graph.add_edge("research", "writing")
self.graph.add_edge("analysis", "writing")
self.graph.add_edge("writing", "validation")
self.graph.add_edge("validation", END)
self.app = self.graph.compile()
async def supervisor_node(self, state: WorkflowState):
# Decompose task
subtasks = self.supervisor.decompose_task(state['task'])
# Assign to workers
assignments = {}
for subtask in subtasks:
best_worker = self.supervisor.select_worker(subtask)
assignments[best_worker] = subtask
state['assignments'] = assignments
return state
async def process(self, task: str) -> Dict[str, Any]:
initial_state = WorkflowState(task=task)
result = await self.app.ainvoke(initial_state)
return result
class SupervisorAgent:
def decompose_task(self, task: str) -> List[Dict]:
prompt = f"""
Descompón esta tarea en subtareas específicas:
{task}
Retorna lista de subtareas con tipo y descripción.
"""
# LLM decomposition logic
return subtasks
def select_worker(self, subtask: Dict) -> str:
# Logic para seleccionar mejor worker para subtask
task_type = subtask['type']
return self.worker_mapping.get(task_type, 'general')
Use case real: Customer Service Automation
- 5k tickets/día
- 4 workers especializados
- 87% resolución automática
- 30 segundos respuesta promedio
Patrón #2: Pipeline Sequential
Perfecto para procesamiento step-by-step donde output de uno es input del siguiente.
class PipelineOrchestrator:
"""
Procesamiento secuencial con transformaciones en cada paso.
"""
def __init__(self):
self.pipeline = [
DocumentExtractor(), # PDF → Text
ContentClassifier(), # Classify document type
InformationExtractor(), # Extract key info
DataValidator(), # Validate extracted data
DatabaseWriter() # Store results
]
async def process(self, document: bytes) -> Dict:
result = {'document': document, 'metadata': {}}
for i, agent in enumerate(self.pipeline):
try:
print(f"Step {i+1}/{len(self.pipeline)}: {agent.__class__.__name__}")
result = await agent.process(result)
# Validation checkpoint
if hasattr(result, 'error') and result['error']:
return await self.handle_pipeline_error(result, agent)
# Store intermediate results
result['metadata'][f'step_{i}'] = {
'agent': agent.__class__.__name__,
'timestamp': datetime.now().isoformat(),
'success': True
}
except Exception as e:
return await self.handle_exception(e, agent, result)
return result
async def handle_pipeline_error(self, result, failed_agent):
# Retry logic o alternative path
if self.can_retry(failed_agent):
return await failed_agent.retry(result)
else:
return await self.fallback_processing(result)
Caso real: Legal Document Processing
- 10k documentos/día
- 5 agentes en pipeline
- 96% accuracy
- 5min → 30seg por documento
Patrón #3: Collaborative Team
Agentes debaten y llegan a consenso. Ideal para decisiones complejas.
class CollaborativeTeamSystem:
"""
Múltiples agentes colaboran, debaten, y consensuan.
"""
def __init__(self):
self.team = {
'optimist': OptimistAgent(),
'pessimist': PessimistAgent(),
'realist': RealistAgent(),
'analyst': AnalystAgent()
}
self.moderator = ModeratorAgent()
async def deliberate(self, problem: str, max_rounds: int = 3):
context = {'problem': problem, 'proposals': {}, 'round': 0}
for round in range(max_rounds):
context['round'] = round
# Each agent proposes solution
proposals = await self.gather_proposals(context)
# Agents critique each other
critiques = await self.cross_evaluate(proposals)
# Check for consensus
if self.has_consensus(proposals, critiques):
return await self.finalize_solution(proposals)
# Moderator synthesizes feedback
feedback = await self.moderator.synthesize_feedback(
proposals,
critiques
)
# Agents revise based on feedback
context['proposals'] = proposals
context['feedback'] = feedback
# No consensus - moderator decides
return await self.moderator.make_final_decision(context)
async def gather_proposals(self, context: Dict) -> Dict:
tasks = []
for name, agent in self.team.items():
task = agent.propose_solution(context)
tasks.append((name, task))
proposals = {}
for name, task in tasks:
proposals[name] = await task
return proposals
async def cross_evaluate(self, proposals: Dict) -> Dict:
critiques = {}
for evaluator_name, evaluator in self.team.items():
critiques[evaluator_name] = {}
for proposal_name, proposal in proposals.items():
if evaluator_name != proposal_name:
critique = await evaluator.critique(proposal)
critiques[evaluator_name][proposal_name] = critique
return critiques
Use case: Investment Analysis Platform
- 4 agentes especializados
- Debate 3 rondas promedio
- 73% accuracy predicciones (vs 52% single model)
Patrón #4: Hierarchical
Estructura organizacional para escala enterprise.
class HierarchicalOrchestrator:
"""
Jerarquía tipo empresa: CEO → VPs → Managers → Workers
"""
def __init__(self):
self.hierarchy = self.build_hierarchy()
def build_hierarchy(self):
return {
'ceo': CEOAgent(),
'vps': {
'engineering': VPEngineeringAgent(),
'sales': VPSalesAgent(),
'support': VPSupportAgent()
},
'managers': {
'engineering': {
'backend': BackendManagerAgent(),
'frontend': FrontendManagerAgent(),
'devops': DevOpsManagerAgent()
},
'sales': {
'enterprise': EnterpriseSalesManager(),
'smb': SMBSalesManager()
},
'support': {
'technical': TechSupportManager(),
'customer': CustomerSuccessManager()
}
},
'workers': {
# 50+ worker agents
}
}
async def process_request(self, request: Dict) -> Dict:
# CEO level decision
strategy = await self.hierarchy['ceo'].strategize(request)
# VP level planning
department = strategy['primary_department']
vp = self.hierarchy['vps'][department]
plan = await vp.create_execution_plan(strategy)
# Manager level coordination
tasks = []
for manager_type, subtasks in plan['assignments'].items():
manager = self.hierarchy['managers'][department][manager_type]
task = manager.coordinate_execution(subtasks)
tasks.append(task)
# Worker level execution (parallel)
results = await asyncio.gather(*tasks)
# Rollup results through hierarchy
return await self.rollup_results(results, strategy)
Implementación real: Supply Chain Optimization
- Fortune 500 company
- 50+ agentes coordinados
- 23% reducción costes
- 40% mejora delivery times
Patrón #5: Market-based
Agentes compiten/cooperan basado en economía de mercado.
class MarketBasedSystem:
"""
Agentes hacen 'bids' para tareas basado en capability/cost.
"""
def __init__(self):
self.agents = [
SpecialistAgent("legal", cost_per_task=10),
SpecialistAgent("technical", cost_per_task=8),
SpecialistAgent("financial", cost_per_task=12),
GeneralistAgent(cost_per_task=5)
]
self.budget = 1000
async def process_task_batch(self, tasks: List[Dict]) -> List[Dict]:
results = []
for task in tasks:
# Agents bid for task
bids = await self.collect_bids(task)
# Select winner (best score/cost ratio)
winner = self.select_winning_bid(bids)
if winner and self.budget >= winner['cost']:
# Execute task
result = await winner['agent'].execute(task)
results.append(result)
# Pay agent
self.budget -= winner['cost']
winner['agent'].add_credits(winner['cost'])
else:
# No budget or no suitable agent
results.append(self.fallback_processing(task))
return results
async def collect_bids(self, task: Dict) -> List[Dict]:
bids = []
for agent in self.agents:
capability = await agent.assess_capability(task)
if capability > 0.5: # Minimum threshold
cost = agent.calculate_cost(task)
score = capability / cost # Efficiency score
bids.append({
'agent': agent,
'capability': capability,
'cost': cost,
'score': score
})
return sorted(bids, key=lambda x: x['score'], reverse=True)
Ventaja: Auto-optimización de recursos basado en performance.
Métricas Reales en Producción
| Métrica | Supervisor-Worker | Pipeline | Collaborative | Hierarchical | Market |
|---|---|---|---|---|---|
| Setup complexity | Low | Low | Medium | High | Medium |
| Scalability | High | Medium | Medium | Very High | High |
| Typical accuracy | 90-95% | 85-95% | 88-96% | 85-92% | 87-93% |
| Best for | General | Processing | Analysis | Enterprise | Optimization |
| Avg ROI (12mo) | 250% | 300% | 230% | 350% | 270% |
Stack Recomendado
# requirements.txt
langgraph==0.0.26
langchain==0.1.0
redis==5.0.1
celery==5.3.4
fastapi==0.109.0
prometheus-client==0.19.0
Conclusiones
→ No hay bala de plata - Elige patrón según caso
→ Empieza simple - Supervisor-Worker para MVP
→ Especialización es clave - Agentes focalizados > generales
→ State management crítico - Redis o similar
→ Monitor todo - Traces de decisiones
Recursos Completos
Guía técnica con:
- Implementaciones completas de los 5 patrones
- Case studies detallados
- Deployment patterns
- Monitoring setup
¿Qué patrón estás usando? Comparte tu experiencia 👇
Top comments (0)