DEV Community

Pax
Pax

Posted on • Originally published at paxrel.com

AI Agent Orchestration: LangGraph, Temporal & Custom Workflows (2026 Guide)

A single AI agent with a few tools can handle simple tasks. But real-world workflows — process a customer refund, generate a research report, onboard a new user — involve multiple steps, conditional logic, parallel execution, error recovery, and human approvals.

    That's orchestration: the layer that coordinates what your agent does, in what order, and what happens when things go wrong. Without it, your agent is a talented freelancer with no project management. With it, your agent becomes a reliable workflow engine.

    This guide covers three approaches to agent orchestration: LangGraph (graph-based), Temporal (durable workflows), and custom orchestrators — with trade-offs and code for each.

    ## Why Orchestration Matters

    Consider a simple task: "Process a refund for order #12345."

    Without orchestration, your agent calls tools in whatever order the LLM decides. Sometimes it works. Sometimes it processes the refund before checking eligibility. Sometimes it retries a failed API call 47 times. Sometimes it forgets to send the confirmation email.

    With orchestration:
Enter fullscreen mode Exit fullscreen mode
verify_identity → check_eligibility → [approve if > $100] → process_refund → send_confirmation
      │                    │                    │                   │
      └── retry 2x         └── if ineligible    └── timeout 5min   └── retry 3x
          then escalate        → explain why        → escalate         then log error
Enter fullscreen mode Exit fullscreen mode
    Every step has defined behavior, error handling, and transitions. The workflow is testable, observable, and predictable.

    ## Approach 1: LangGraph — Graph-Based Orchestration

    LangGraph (from LangChain) models your agent as a **state machine graph**. Nodes are steps, edges are transitions, and state flows through the graph.

    ### Core Concepts


        - **State** — A typed dictionary that accumulates data through the graph
        - **Nodes** — Functions that read state, do work, and return updated state
        - **Edges** — Transitions between nodes (conditional or unconditional)
        - **Checkpointing** — Save state at each step for recovery and debugging


    ### Example: Customer Support Agent
Enter fullscreen mode Exit fullscreen mode
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class SupportState(TypedDict):
    messages: list          # Conversation history
    intent: str             # Classified intent
    customer_id: str | None # Authenticated customer
    order: dict | None      # Looked-up order
    resolution: str | None  # How we resolved the issue
    should_escalate: bool

# Node functions
async def classify_intent(state: SupportState) -> SupportState:
    intent = await llm.classify(state["messages"][-1])
    return {"intent": intent}

async def authenticate(state: SupportState) -> SupportState:
    customer = await lookup_customer(state["messages"])
    return {"customer_id": customer["id"] if customer else None}

async def lookup_order(state: SupportState) -> SupportState:
    order = await get_order(state["customer_id"], state["messages"])
    return {"order": order}

async def generate_response(state: SupportState) -> SupportState:
    response = await llm.respond(state)
    return {"messages": [response], "resolution": "resolved"}

async def escalate(state: SupportState) -> SupportState:
    ticket = await create_support_ticket(state)
    return {"resolution": f"Escalated: {ticket['id']}"}

# Conditional edges
def route_after_classify(state: SupportState) -> Literal["authenticate", "respond", "escalate"]:
    if state["intent"] in ["order_status", "refund"]:
        return "authenticate"
    if state["intent"] == "general_question":
        return "respond"
    return "escalate"

def route_after_auth(state: SupportState) -> Literal["lookup_order", "escalate"]:
    if state["customer_id"]:
        return "lookup_order"
    return "escalate"

# Build the graph
graph = StateGraph(SupportState)

graph.add_node("classify", classify_intent)
graph.add_node("authenticate", authenticate)
graph.add_node("lookup_order", lookup_order)
graph.add_node("respond", generate_response)
graph.add_node("escalate", escalate)

graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classify)
graph.add_conditional_edges("authenticate", route_after_auth)
graph.add_edge("lookup_order", "respond")
graph.add_edge("respond", END)
graph.add_edge("escalate", END)

# Compile with checkpointing
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = graph.compile(checkpointer=checkpointer)

# Run
result = await app.ainvoke(
    {"messages": ["I want a refund for order #12345"], "should_escalate": False},
    config={"configurable": {"thread_id": "conv-123"}}
)
Enter fullscreen mode Exit fullscreen mode
        **Tip:** LangGraph's checkpointing is its killer feature. Every state transition is saved, so you can resume interrupted workflows, replay for debugging, and implement human-in-the-loop by pausing at any node.


    ### Human-in-the-Loop with LangGraph
Enter fullscreen mode Exit fullscreen mode
# Add an interrupt point before processing refunds
graph.add_node("request_approval", request_human_approval)

# In the approval node:
async def request_human_approval(state: SupportState) -> SupportState:
    if state["order"]["amount"] > 10000:  # > $100
        # This pauses the graph until a human responds
        raise NodeInterrupt("Refund > $100 needs approval")
    return state

# Resume after human approves:
await app.ainvoke(
    None,  # No new input, just resume
    config={"configurable": {"thread_id": "conv-123"}}
)
Enter fullscreen mode Exit fullscreen mode
    ### LangGraph Trade-offs

        ProsCons
        Visual graph structureLangChain ecosystem lock-in
        Built-in checkpointingLearning curve for graph concepts
        Human-in-the-loop nativeDebugging complex graphs is hard
        Streaming supportOverhead for simple workflows
        LangGraph Studio for visualizationState management gets complex


    ## Approach 2: Temporal — Durable Workflow Orchestration

    Temporal is an enterprise workflow engine originally designed for microservices. It's overkill for simple agents but perfect for **long-running, mission-critical workflows** that must never lose state.

    ### Why Temporal for Agents?


        - **Durable execution** — If your server crashes mid-workflow, Temporal resumes exactly where it left off
        - **Built-in retries** — Configure retry policies per activity (exponential backoff, max attempts)
        - **Timeouts** — Activity timeouts, workflow timeouts, heartbeat timeouts
        - **Versioning** — Deploy new workflow versions without breaking running workflows
        - **Visibility** — Built-in UI shows every workflow, its state, and history


    ### Example: Research Report Agent
Enter fullscreen mode Exit fullscreen mode
from temporalio import workflow, activity
from datetime import timedelta

@activity.defn
async def search_web(query: str) -> list[str]:
    """Search the web and return relevant URLs."""
    return await web_search_tool.search(query, top_k=10)

@activity.defn
async def scrape_page(url: str) -> str:
    """Scrape and extract content from a URL."""
    return await scraper.extract(url)

@activity.defn
async def analyze_content(content: str, question: str) -> dict:
    """Use LLM to analyze scraped content."""
    return await llm.analyze(content, question)

@activity.defn
async def write_report(findings: list[dict], topic: str) -> str:
    """Generate final research report."""
    return await llm.generate_report(findings, topic)

@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, topic: str) -> str:
        # Step 1: Search for sources (with retry)
        urls = await workflow.execute_activity(
            search_web,
            topic,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        # Step 2: Scrape pages in parallel
        scrape_tasks = [
            workflow.execute_activity(
                scrape_page,
                url,
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=RetryPolicy(maximum_attempts=2)
            )
            for url in urls[:5]  # Top 5 results
        ]
        contents = await asyncio.gather(*scrape_tasks, return_exceptions=True)
        contents = [c for c in contents if isinstance(c, str)]

        # Step 3: Analyze each source
        findings = []
        for content in contents:
            finding = await workflow.execute_activity(
                analyze_content,
                args=[content, topic],
                start_to_close_timeout=timedelta(seconds=120),
            )
            findings.append(finding)

        # Step 4: Write final report
        report = await workflow.execute_activity(
            write_report,
            args=[findings, topic],
            start_to_close_timeout=timedelta(seconds=180),
        )

        return report
Enter fullscreen mode Exit fullscreen mode
    ### Temporal Trade-offs

        ProsCons
        Battle-tested durability (used by Uber, Netflix)Heavy infrastructure (Temporal server + DB)
        Survives crashes, deploys, outagesSteep learning curve
        Built-in retry, timeout, versioningOverkill for simple agents
        Great visibility UIOperational complexity
        Language-agnostic (Python, Go, Java, TS)Adds 50-100ms latency per activity


    ## Approach 3: Custom Orchestrator

    Sometimes you don't need a framework. A custom orchestrator gives you full control with minimal dependencies.
Enter fullscreen mode Exit fullscreen mode
import asyncio
from dataclasses import dataclass, field
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class Step:
    name: str
    fn: callable
    depends_on: list[str] = field(default_factory=list)
    retry_count: int = 2
    timeout_seconds: int = 60
    condition: callable = None  # Skip if returns False
    status: StepStatus = StepStatus.PENDING
    result: any = None
    error: str = None

class Orchestrator:
    def __init__(self):
        self.steps: dict[str, Step] = {}
        self.context: dict = {}

    def add_step(self, step: Step):
        self.steps[step.name] = step

    async def run(self) -> dict:
        while self._has_pending_steps():
            # Find steps ready to run (dependencies met)
            ready = [s for s in self.steps.values()
                    if s.status == StepStatus.PENDING
                    and self._dependencies_met(s)]

            if not ready:
                break  # Deadlock or all done

            # Run ready steps in parallel
            tasks = [self._execute_step(s) for s in ready]
            await asyncio.gather(*tasks)

        return self.context

    async def _execute_step(self, step: Step):
        # Check condition
        if step.condition and not step.condition(self.context):
            step.status = StepStatus.SKIPPED
            return

        step.status = StepStatus.RUNNING

        for attempt in range(step.retry_count + 1):
            try:
                result = await asyncio.wait_for(
                    step.fn(self.context),
                    timeout=step.timeout_seconds
                )
                step.result = result
                step.status = StepStatus.COMPLETED
                self.context[step.name] = result
                return
            except asyncio.TimeoutError:
                step.error = f"Timeout after {step.timeout_seconds}s"
            except Exception as e:
                step.error = str(e)

            if attempt  bool:
        return all(
            self.steps[dep].status == StepStatus.COMPLETED
            for dep in step.depends_on
        )

    def _has_pending_steps(self) -> bool:
        return any(s.status == StepStatus.PENDING for s in self.steps.values())

# Usage
orch = Orchestrator()
orch.add_step(Step("classify", classify_intent))
orch.add_step(Step("authenticate", authenticate, depends_on=["classify"],
                    condition=lambda ctx: ctx["classify"]["requires_auth"]))
orch.add_step(Step("lookup", lookup_order, depends_on=["authenticate"]))
orch.add_step(Step("respond", generate_response, depends_on=["lookup"]))

result = await orch.run()
Enter fullscreen mode Exit fullscreen mode
    ### Custom Orchestrator Trade-offs

        ProsCons
        Full control, zero dependenciesYou build everything yourself
        Minimal overheadNo built-in persistence/recovery
        Easy to understand and debugParallel execution logic is tricky
        Fits any patternGrows complex over time


    ## Orchestration Patterns

    ### Pattern 1: Sequential Pipeline
    Steps run one after another. Output of step N is input to step N+1.

    **Use when:** Each step depends on the previous result. Example: classify → retrieve → generate → validate.

    ### Pattern 2: Fan-Out / Fan-In
    One step spawns multiple parallel tasks, then a final step aggregates results.

    **Use when:** Independent subtasks can run simultaneously. Example: search 5 sources in parallel → merge findings.
Enter fullscreen mode Exit fullscreen mode
# Fan-out / Fan-in with LangGraph
from langgraph.graph import StateGraph

async def fan_out(state):
    """Create parallel research tasks."""
    tasks = [{"query": q} for q in state["queries"]]
    return {"parallel_tasks": tasks}

async def research(state):
    """Run research for one query."""
    return {"finding": await search_and_analyze(state["query"])}

async def fan_in(state):
    """Merge all findings into a report."""
    return {"report": await synthesize(state["findings"])}
Enter fullscreen mode Exit fullscreen mode
    ### Pattern 3: Conditional Branching
    Different paths based on runtime conditions.

    **Use when:** The workflow varies by input type. Example: refund requests go through approval, general questions go straight to response.

    ### Pattern 4: Loop with Exit Condition
    Repeat a sequence until a condition is met.

    **Use when:** Iterative refinement is needed. Example: generate → evaluate → if score < threshold, regenerate.
Enter fullscreen mode Exit fullscreen mode
# Loop pattern in LangGraph
def should_continue(state) -> Literal["regenerate", "finalize"]:
    if state["quality_score"] >= 0.8:
        return "finalize"
    if state["attempts"] >= 3:
        return "finalize"  # Give up after 3 tries
    return "regenerate"

graph.add_conditional_edges("evaluate", should_continue)
Enter fullscreen mode Exit fullscreen mode
    ### Pattern 5: Saga (Compensating Transactions)
    When a later step fails, undo earlier steps.

    **Use when:** Multi-step operations that should be atomic. Example: reserve inventory → charge card → if shipping fails → refund card → release inventory.

    ## Choosing Your Orchestration Approach


        ScenarioBest ApproachWhy
        Simple agent (3-5 steps)Custom or just ReActFrameworks add unnecessary complexity
        Complex but short-lived (< 5 min)LangGraphGood graph model, checkpointing, HITL
        Long-running (hours/days)TemporalDurable execution survives crashes
        Human approval workflowsLangGraph or TemporalBoth have native interrupt/signal support
        Mission-critical / financialTemporalBattle-tested, audit trail, exactly-once
        Maximum flexibilityCustomNo framework constraints
        Team already uses LangChainLangGraphEcosystem integration


    ## Common Orchestration Mistakes

    ### 1. Over-Orchestrating Simple Agents
    If your agent has 3 steps and no branching, you don't need LangGraph or Temporal. A simple while loop with tool calling is fine. Add orchestration when complexity justifies it.

    ### 2. No Error Boundaries
    A failure in step 4 shouldn't crash the entire workflow. Each step needs its own error handling: retry policy, fallback behavior, and graceful degradation.

    ### 3. Missing Timeouts
    LLM calls can hang. Tool calls can hang. Without timeouts, your workflow hangs forever. Set timeouts on every async operation: 30s for LLM calls, 60s for tool calls, 5 minutes for the full workflow.

    ### 4. No Observability
    If you can't see which step is running, which failed, and why, you can't debug production issues. Log every step transition with timing, input/output, and status.

    ### 5. Tight Coupling Between Steps
    Steps should communicate through state, not direct calls. This makes them independently testable, replaceable, and reorderable.


        Designing agent orchestration? [AI Agents Weekly](/newsletter.html) covers workflows, frameworks, and production deployment patterns 3x/week. Join free.



    ## Conclusion

    Orchestration is the boring infrastructure that makes agents reliable. It's the difference between a demo that works 80% of the time and a production system that handles edge cases, recovers from failures, and scales.

    Start simple: if your agent has fewer than 5 steps, a custom orchestrator or plain ReAct loop is enough. Move to LangGraph when you need conditional branching, human-in-the-loop, or checkpointing. Graduate to Temporal when workflows run for hours, involve financial transactions, or must survive infrastructure failures.

    The best orchestration is the one you don't notice — it just makes your agent work, every time.
Enter fullscreen mode Exit fullscreen mode

Get our free AI Agent Starter Kit — templates, checklists, and deployment guides for building production AI agents.

Top comments (0)