A single AI agent with a few tools can handle simple tasks. But real-world workflows — process a customer refund, generate a research report, onboard a new user — involve multiple steps, conditional logic, parallel execution, error recovery, and human approvals.
That's orchestration: the layer that coordinates what your agent does, in what order, and what happens when things go wrong. Without it, your agent is a talented freelancer with no project management. With it, your agent becomes a reliable workflow engine.
This guide covers three approaches to agent orchestration: LangGraph (graph-based), Temporal (durable workflows), and custom orchestrators — with trade-offs and code for each.
## Why Orchestration Matters
Consider a simple task: "Process a refund for order #12345."
Without orchestration, your agent calls tools in whatever order the LLM decides. Sometimes it works. Sometimes it processes the refund before checking eligibility. Sometimes it retries a failed API call 47 times. Sometimes it forgets to send the confirmation email.
With orchestration:
verify_identity → check_eligibility → [approve if > $100] → process_refund → send_confirmation
│ │ │ │
└── retry 2x └── if ineligible └── timeout 5min └── retry 3x
then escalate → explain why → escalate then log error
Every step has defined behavior, error handling, and transitions. The workflow is testable, observable, and predictable.
## Approach 1: LangGraph — Graph-Based Orchestration
LangGraph (from LangChain) models your agent as a **state machine graph**. Nodes are steps, edges are transitions, and state flows through the graph.
### Core Concepts
- **State** — A typed dictionary that accumulates data through the graph
- **Nodes** — Functions that read state, do work, and return updated state
- **Edges** — Transitions between nodes (conditional or unconditional)
- **Checkpointing** — Save state at each step for recovery and debugging
### Example: Customer Support Agent
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class SupportState(TypedDict):
messages: list # Conversation history
intent: str # Classified intent
customer_id: str | None # Authenticated customer
order: dict | None # Looked-up order
resolution: str | None # How we resolved the issue
should_escalate: bool
# Node functions
async def classify_intent(state: SupportState) -> SupportState:
intent = await llm.classify(state["messages"][-1])
return {"intent": intent}
async def authenticate(state: SupportState) -> SupportState:
customer = await lookup_customer(state["messages"])
return {"customer_id": customer["id"] if customer else None}
async def lookup_order(state: SupportState) -> SupportState:
order = await get_order(state["customer_id"], state["messages"])
return {"order": order}
async def generate_response(state: SupportState) -> SupportState:
response = await llm.respond(state)
return {"messages": [response], "resolution": "resolved"}
async def escalate(state: SupportState) -> SupportState:
ticket = await create_support_ticket(state)
return {"resolution": f"Escalated: {ticket['id']}"}
# Conditional edges
def route_after_classify(state: SupportState) -> Literal["authenticate", "respond", "escalate"]:
if state["intent"] in ["order_status", "refund"]:
return "authenticate"
if state["intent"] == "general_question":
return "respond"
return "escalate"
def route_after_auth(state: SupportState) -> Literal["lookup_order", "escalate"]:
if state["customer_id"]:
return "lookup_order"
return "escalate"
# Build the graph
graph = StateGraph(SupportState)
graph.add_node("classify", classify_intent)
graph.add_node("authenticate", authenticate)
graph.add_node("lookup_order", lookup_order)
graph.add_node("respond", generate_response)
graph.add_node("escalate", escalate)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classify)
graph.add_conditional_edges("authenticate", route_after_auth)
graph.add_edge("lookup_order", "respond")
graph.add_edge("respond", END)
graph.add_edge("escalate", END)
# Compile with checkpointing
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = graph.compile(checkpointer=checkpointer)
# Run
result = await app.ainvoke(
{"messages": ["I want a refund for order #12345"], "should_escalate": False},
config={"configurable": {"thread_id": "conv-123"}}
)
**Tip:** LangGraph's checkpointing is its killer feature. Every state transition is saved, so you can resume interrupted workflows, replay for debugging, and implement human-in-the-loop by pausing at any node.
### Human-in-the-Loop with LangGraph
# Add an interrupt point before processing refunds
graph.add_node("request_approval", request_human_approval)
# In the approval node:
async def request_human_approval(state: SupportState) -> SupportState:
if state["order"]["amount"] > 10000: # > $100
# This pauses the graph until a human responds
raise NodeInterrupt("Refund > $100 needs approval")
return state
# Resume after human approves:
await app.ainvoke(
None, # No new input, just resume
config={"configurable": {"thread_id": "conv-123"}}
)
### LangGraph Trade-offs
ProsCons
Visual graph structureLangChain ecosystem lock-in
Built-in checkpointingLearning curve for graph concepts
Human-in-the-loop nativeDebugging complex graphs is hard
Streaming supportOverhead for simple workflows
LangGraph Studio for visualizationState management gets complex
## Approach 2: Temporal — Durable Workflow Orchestration
Temporal is an enterprise workflow engine originally designed for microservices. It's overkill for simple agents but perfect for **long-running, mission-critical workflows** that must never lose state.
### Why Temporal for Agents?
- **Durable execution** — If your server crashes mid-workflow, Temporal resumes exactly where it left off
- **Built-in retries** — Configure retry policies per activity (exponential backoff, max attempts)
- **Timeouts** — Activity timeouts, workflow timeouts, heartbeat timeouts
- **Versioning** — Deploy new workflow versions without breaking running workflows
- **Visibility** — Built-in UI shows every workflow, its state, and history
### Example: Research Report Agent
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def search_web(query: str) -> list[str]:
"""Search the web and return relevant URLs."""
return await web_search_tool.search(query, top_k=10)
@activity.defn
async def scrape_page(url: str) -> str:
"""Scrape and extract content from a URL."""
return await scraper.extract(url)
@activity.defn
async def analyze_content(content: str, question: str) -> dict:
"""Use LLM to analyze scraped content."""
return await llm.analyze(content, question)
@activity.defn
async def write_report(findings: list[dict], topic: str) -> str:
"""Generate final research report."""
return await llm.generate_report(findings, topic)
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> str:
# Step 1: Search for sources (with retry)
urls = await workflow.execute_activity(
search_web,
topic,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Step 2: Scrape pages in parallel
scrape_tasks = [
workflow.execute_activity(
scrape_page,
url,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=2)
)
for url in urls[:5] # Top 5 results
]
contents = await asyncio.gather(*scrape_tasks, return_exceptions=True)
contents = [c for c in contents if isinstance(c, str)]
# Step 3: Analyze each source
findings = []
for content in contents:
finding = await workflow.execute_activity(
analyze_content,
args=[content, topic],
start_to_close_timeout=timedelta(seconds=120),
)
findings.append(finding)
# Step 4: Write final report
report = await workflow.execute_activity(
write_report,
args=[findings, topic],
start_to_close_timeout=timedelta(seconds=180),
)
return report
### Temporal Trade-offs
ProsCons
Battle-tested durability (used by Uber, Netflix)Heavy infrastructure (Temporal server + DB)
Survives crashes, deploys, outagesSteep learning curve
Built-in retry, timeout, versioningOverkill for simple agents
Great visibility UIOperational complexity
Language-agnostic (Python, Go, Java, TS)Adds 50-100ms latency per activity
## Approach 3: Custom Orchestrator
Sometimes you don't need a framework. A custom orchestrator gives you full control with minimal dependencies.
import asyncio
from dataclasses import dataclass, field
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Step:
name: str
fn: callable
depends_on: list[str] = field(default_factory=list)
retry_count: int = 2
timeout_seconds: int = 60
condition: callable = None # Skip if returns False
status: StepStatus = StepStatus.PENDING
result: any = None
error: str = None
class Orchestrator:
def __init__(self):
self.steps: dict[str, Step] = {}
self.context: dict = {}
def add_step(self, step: Step):
self.steps[step.name] = step
async def run(self) -> dict:
while self._has_pending_steps():
# Find steps ready to run (dependencies met)
ready = [s for s in self.steps.values()
if s.status == StepStatus.PENDING
and self._dependencies_met(s)]
if not ready:
break # Deadlock or all done
# Run ready steps in parallel
tasks = [self._execute_step(s) for s in ready]
await asyncio.gather(*tasks)
return self.context
async def _execute_step(self, step: Step):
# Check condition
if step.condition and not step.condition(self.context):
step.status = StepStatus.SKIPPED
return
step.status = StepStatus.RUNNING
for attempt in range(step.retry_count + 1):
try:
result = await asyncio.wait_for(
step.fn(self.context),
timeout=step.timeout_seconds
)
step.result = result
step.status = StepStatus.COMPLETED
self.context[step.name] = result
return
except asyncio.TimeoutError:
step.error = f"Timeout after {step.timeout_seconds}s"
except Exception as e:
step.error = str(e)
if attempt bool:
return all(
self.steps[dep].status == StepStatus.COMPLETED
for dep in step.depends_on
)
def _has_pending_steps(self) -> bool:
return any(s.status == StepStatus.PENDING for s in self.steps.values())
# Usage
orch = Orchestrator()
orch.add_step(Step("classify", classify_intent))
orch.add_step(Step("authenticate", authenticate, depends_on=["classify"],
condition=lambda ctx: ctx["classify"]["requires_auth"]))
orch.add_step(Step("lookup", lookup_order, depends_on=["authenticate"]))
orch.add_step(Step("respond", generate_response, depends_on=["lookup"]))
result = await orch.run()
### Custom Orchestrator Trade-offs
ProsCons
Full control, zero dependenciesYou build everything yourself
Minimal overheadNo built-in persistence/recovery
Easy to understand and debugParallel execution logic is tricky
Fits any patternGrows complex over time
## Orchestration Patterns
### Pattern 1: Sequential Pipeline
Steps run one after another. Output of step N is input to step N+1.
**Use when:** Each step depends on the previous result. Example: classify → retrieve → generate → validate.
### Pattern 2: Fan-Out / Fan-In
One step spawns multiple parallel tasks, then a final step aggregates results.
**Use when:** Independent subtasks can run simultaneously. Example: search 5 sources in parallel → merge findings.
# Fan-out / Fan-in with LangGraph
from langgraph.graph import StateGraph
async def fan_out(state):
"""Create parallel research tasks."""
tasks = [{"query": q} for q in state["queries"]]
return {"parallel_tasks": tasks}
async def research(state):
"""Run research for one query."""
return {"finding": await search_and_analyze(state["query"])}
async def fan_in(state):
"""Merge all findings into a report."""
return {"report": await synthesize(state["findings"])}
### Pattern 3: Conditional Branching
Different paths based on runtime conditions.
**Use when:** The workflow varies by input type. Example: refund requests go through approval, general questions go straight to response.
### Pattern 4: Loop with Exit Condition
Repeat a sequence until a condition is met.
**Use when:** Iterative refinement is needed. Example: generate → evaluate → if score < threshold, regenerate.
# Loop pattern in LangGraph
def should_continue(state) -> Literal["regenerate", "finalize"]:
if state["quality_score"] >= 0.8:
return "finalize"
if state["attempts"] >= 3:
return "finalize" # Give up after 3 tries
return "regenerate"
graph.add_conditional_edges("evaluate", should_continue)
### Pattern 5: Saga (Compensating Transactions)
When a later step fails, undo earlier steps.
**Use when:** Multi-step operations that should be atomic. Example: reserve inventory → charge card → if shipping fails → refund card → release inventory.
## Choosing Your Orchestration Approach
ScenarioBest ApproachWhy
Simple agent (3-5 steps)Custom or just ReActFrameworks add unnecessary complexity
Complex but short-lived (< 5 min)LangGraphGood graph model, checkpointing, HITL
Long-running (hours/days)TemporalDurable execution survives crashes
Human approval workflowsLangGraph or TemporalBoth have native interrupt/signal support
Mission-critical / financialTemporalBattle-tested, audit trail, exactly-once
Maximum flexibilityCustomNo framework constraints
Team already uses LangChainLangGraphEcosystem integration
## Common Orchestration Mistakes
### 1. Over-Orchestrating Simple Agents
If your agent has 3 steps and no branching, you don't need LangGraph or Temporal. A simple while loop with tool calling is fine. Add orchestration when complexity justifies it.
### 2. No Error Boundaries
A failure in step 4 shouldn't crash the entire workflow. Each step needs its own error handling: retry policy, fallback behavior, and graceful degradation.
### 3. Missing Timeouts
LLM calls can hang. Tool calls can hang. Without timeouts, your workflow hangs forever. Set timeouts on every async operation: 30s for LLM calls, 60s for tool calls, 5 minutes for the full workflow.
### 4. No Observability
If you can't see which step is running, which failed, and why, you can't debug production issues. Log every step transition with timing, input/output, and status.
### 5. Tight Coupling Between Steps
Steps should communicate through state, not direct calls. This makes them independently testable, replaceable, and reorderable.
Designing agent orchestration? [AI Agents Weekly](/newsletter.html) covers workflows, frameworks, and production deployment patterns 3x/week. Join free.
## Conclusion
Orchestration is the boring infrastructure that makes agents reliable. It's the difference between a demo that works 80% of the time and a production system that handles edge cases, recovers from failures, and scales.
Start simple: if your agent has fewer than 5 steps, a custom orchestrator or plain ReAct loop is enough. Move to LangGraph when you need conditional branching, human-in-the-loop, or checkpointing. Graduate to Temporal when workflows run for hours, involve financial transactions, or must survive infrastructure failures.
The best orchestration is the one you don't notice — it just makes your agent work, every time.
Get our free AI Agent Starter Kit — templates, checklists, and deployment guides for building production AI agents.
Top comments (0)