DEV Community

Siddhant Kulkarni
Siddhant Kulkarni

Posted on

Production-Ready Multi-Agent Systems with LangGraph: A Complete Tutorial

A step-by-step guide to building, testing and deploying a multi-agent document processing pipeline with LangGraph


Multi-agent systems are one of the most powerful patterns in modern AI engineering and one of the easiest to get wrong. A single LLM call is straightforward. Chaining a few together is manageable. But orchestrating four specialized agents that pass state, handle failures, loop on feedback and support human intervention? That requires real architecture.

This tutorial walks through building a production-grade multi-agent document processing system using LangGraph. By the end, you'll have a working pipeline where a Researcher agent gathers information, a Writer drafts content, a Reviewer critiques it and an Editor produces the final output, all orchestrated as a compiled state graph with error handling, human-in-the-loop breakpoints and a clear deployment path.

Let's build it.


Architecture Overview

Before writing any code, let's map out the system. Here's the high-level agent workflow:

┌────────────────────────────────────────────────────────────┐
│                    DOCUMENT PROCESSING GRAPH               │
│                                                            │
│  ┌────────┐    ┌────────┐    ┌──────────┐    ┌──────────┐  │
│  │Research│──▶│ Writer │───▶│ Reviewer │──▶│  Editor  │  │
│  │ Agent  │    │ Agent  │◀──│  Agent   │    │  Agent   │  │
│  └────────┘    └────────┘    └──────────┘    └──────────┘  │
│       │                           │                │       │
│       ▼                           ▼                ▼       │
│  ┌─────────┐              ┌────────────┐    ┌──────────┐   │
│  │ Tools:  │              │  HUMAN-IN- │    │  OUTPUT  │   │
│  │ Search, │              │  THE-LOOP  │    │ (Final)  │   │
│  │ Parse   │              │ BREAKPOINT │    └──────────┘   │
│  └─────────┘              └────────────┘                   │
└────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Each agent is a node in a LangGraph StateGraph. Edges define transitions. Conditional edges handle routing logic, for example sending the Writer's output back for revision if the Reviewer scores it below a threshold.

Here's the state machine in detail:

                    ┌───────────────┐
                    │     START     │
                    └───────┬───────┘
                            │
                            ▼
                    ┌───────────────┐
                    │   Researcher  │
                    │  (gather data)│
                    └───────┬───────┘
                            │
                            ▼
                    ┌───────────────┐
              ┌───▶│    Writer     │
              │     │ (draft text)  │
              │     └───────┬───────┘
              │             │
              │             ▼
              │     ┌───────────────┐
              │     │   Reviewer    │──────┐
              │     │(score & notes)│      │
              │     └───────┬───────┘      │
              │             │              │
              │        score < 7?     score >= 7?
              │             │              │
              │             ▼              ▼
              │     ┌──────────────┐ ┌──────────┐
              └─────│  (revision)  │ │  Editor  │
                    │  loop back   │ │ (polish) │
                    └──────────────┘ └────┬─────┘
                                          │
                                          ▼
                                   ┌────────────┐
                                   │    END     │
                                   └────────────┘
Enter fullscreen mode Exit fullscreen mode

Step 1: Define the Shared State

Every node in a LangGraph graph reads from and writes to a shared state object. Designing this state well is critical, it's the contract between all your agents.

# state.py
from __future__ import annotations
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage


def replace_value(existing: str, new: str) -> str:
    """Reducer that replaces the old value with the new one."""
    return new


def increment(existing: int, new: int) -> int:
    """Reducer that sums values useful for counters."""
    return existing + new


class AgentState(TypedDict):
    """Shared state passed between all agent nodes.

    Each field uses an annotated reducer so that node outputs
    are merged correctly into the running state.
    """
    # Conversation-style message history (appended automatically)
    messages: Annotated[list[BaseMessage], add_messages]

    # Structured data passed between stages
    research_notes: Annotated[str, replace_value]
    draft: Annotated[str, replace_value]
    review_score: Annotated[int, replace_value]
    review_feedback: Annotated[str, replace_value]
    final_output: Annotated[str, replace_value]

    # Control flow
    revision_count: Annotated[int, increment]
    max_revisions: Annotated[int, replace_value]
    topic: Annotated[str, replace_value]
Enter fullscreen mode Exit fullscreen mode

A few design decisions worth calling out:

  • messages uses add_messages — LangGraph's built-in reducer that appends new messages and handles deduplication by ID. This gives every agent access to the full conversation history.
  • revision_count uses increment — each time the Writer revises, it emits {"revision_count": 1} and the reducer accumulates.
  • Structured fields like draft use replace_value — the latest version always wins.

Step 2: Build the Agent Nodes

Each node is a function that takes the current state and returns a partial state update. Let's build all four.

# agents.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from state import AgentState

llm = ChatOpenAI(model="gpt-4o", temperature=0.3)


# ── Researcher Agent ──────────────────────────────────────────
def researcher_node(state: AgentState) -> dict:
    """Gathers background information on the topic.

    In production, this node would call external tools: search APIs,
    internal databases, document stores. Here we simulate tool use
    by prompting the LLM to produce structured research notes.
    """
    topic = state["topic"]
    sys_prompt = SystemMessage(content=(
        "You are a senior research analyst. Given a topic, produce "
        "detailed, structured research notes with key facts, statistics, "
        "and source references. Output in markdown bullet-point format."
    ))
    user_prompt = HumanMessage(content=f"Research the following topic thoroughly:\n\n{topic}")

    response = llm.invoke([sys_prompt, user_prompt])

    return {
        "research_notes": response.content,
        "messages": [HumanMessage(content=f"[Researcher] Completed research on: {topic}")]
    }


# ── Writer Agent ──────────────────────────────────────────────
def writer_node(state: AgentState) -> dict:
    """Drafts (or revises) the document based on research and feedback.

    On the first pass, it writes from research notes alone. On revision
    passes, it incorporates reviewer feedback to improve the draft.
    """
    research = state["research_notes"]
    feedback = state.get("review_feedback", "")
    current_draft = state.get("draft", "")
    revision = state.get("revision_count", 0)

    if revision > 0 and current_draft:
        # Revision pass — improve existing draft using feedback
        sys_prompt = SystemMessage(content=(
            "You are a professional technical writer. Revise the draft below "
            "based on the reviewer's feedback. Maintain technical accuracy "
            "and improve clarity, structure and completeness."
        ))
        user_prompt = HumanMessage(content=(
            f"## Current Draft\n{current_draft}\n\n"
            f"## Reviewer Feedback\n{feedback}\n\n"
            "Produce an improved version of the full document."
        ))
    else:
        # First pass — write from research notes
        sys_prompt = SystemMessage(content=(
            "You are a professional technical writer. Using the research notes "
            "provided, write a well-structured, detailed document. Use clear "
            "headings, code examples where appropriate and maintain a "
            "professional tone throughout."
        ))
        user_prompt = HumanMessage(content=f"## Research Notes\n{research}")

    response = llm.invoke([sys_prompt, user_prompt])

    return {
        "draft": response.content,
        "revision_count": 1,
        "messages": [HumanMessage(
            content=f"[Writer] {'Revised' if revision > 0 else 'Created'} draft "
                    f"(revision {revision + 1})"
        )]
    }


# ── Reviewer Agent ────────────────────────────────────────────
def reviewer_node(state: AgentState) -> dict:
    """Scores the draft 1-10 and provides actionable feedback.

    The score determines routing: >= 7 proceeds to editing,
    < 7 sends the draft back for revision (up to max_revisions).
    """
    draft = state["draft"]

    sys_prompt = SystemMessage(content=(
        "You are a senior technical editor and reviewer. Evaluate the "
        "document on: accuracy, completeness, clarity, structure and "
        "readability.\n\n"
        "You MUST respond in EXACTLY this format:\n"
        "SCORE: <integer 1-10>\n"
        "FEEDBACK: <detailed, actionable feedback>"
    ))
    user_prompt = HumanMessage(content=f"Review this document:\n\n{draft}")

    response = llm.invoke([sys_prompt, user_prompt])
    content = response.content

    # Parse the structured response
    score = 5  # default if parsing fails
    feedback = content
    for line in content.split("\n"):
        if line.strip().upper().startswith("SCORE:"):
            try:
                score = int(line.split(":")[1].strip())
            except (ValueError, IndexError):
                pass
        if line.strip().upper().startswith("FEEDBACK:"):
            feedback = line.split(":", 1)[1].strip()

    return {
        "review_score": score,
        "review_feedback": feedback,
        "messages": [HumanMessage(
            content=f"[Reviewer] Score: {score}/10 — {feedback[:120]}..."
        )]
    }


# ── Editor Agent ──────────────────────────────────────────────
def editor_node(state: AgentState) -> dict:
    """Final polish pass — grammar, formatting, consistency."""
    draft = state["draft"]

    sys_prompt = SystemMessage(content=(
        "You are a meticulous copy editor. Polish the document for grammar, "
        "punctuation, formatting consistency and readability. Do not change "
        "the substance or meaning. Return the final, publication-ready version."
    ))
    user_prompt = HumanMessage(content=f"Edit and polish:\n\n{draft}")

    response = llm.invoke([sys_prompt, user_prompt])

    return {
        "final_output": response.content,
        "messages": [HumanMessage(content="[Editor] Final version ready.")]
    }
Enter fullscreen mode Exit fullscreen mode

Step 3: Wire Up the Graph with Conditional Routing

Now for the orchestration layer. This is where LangGraph shines, defining the directed graph of agent transitions including conditional branches.

# graph.py
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state import AgentState
from agents import researcher_node, writer_node, reviewer_node, editor_node

# Maximum revision loops before forcing completion
MAX_REVISIONS = 3


def route_after_review(state: AgentState) -> str:
    """Conditional edge: decide whether to revise or proceed to editing.

    Three possible outcomes:
      1. Score >= 7         → move to editor (good enough)
      2. Score < 7, budget  → loop back to writer (needs work)
      3. Score < 7, no budget → move to editor anyway (cap reached)
    """
    score = state.get("review_score", 0)
    revisions = state.get("revision_count", 0)
    max_rev = state.get("max_revisions", MAX_REVISIONS)

    if score >= 7:
        return "editor"
    elif revisions < max_rev:
        return "writer"       # loop back for revision
    else:
        return "editor"       # exhausted revision budget


def build_graph() -> StateGraph:
    """Construct and compile the multi-agent document processing graph."""

    graph = StateGraph(AgentState)

    # ── Register nodes ──────────────────────────────────
    graph.add_node("researcher", researcher_node)
    graph.add_node("writer", writer_node)
    graph.add_node("reviewer", reviewer_node)
    graph.add_node("editor", editor_node)

    # ── Define edges ────────────────────────────────────
    graph.set_entry_point("researcher")
    graph.add_edge("researcher", "writer")
    graph.add_edge("writer", "reviewer")

    # Conditional routing after review
    graph.add_conditional_edges(
        "reviewer",
        route_after_review,
        {
            "writer": "writer",    # revision loop
            "editor": "editor",    # proceed to final edit
        }
    )

    graph.add_edge("editor", END)

    # ── Compile with checkpointing ──────────────────────
    # MemorySaver for development; swap to SqliteSaver or
    # PostgresSaver in production for persistence.
    checkpointer = MemorySaver()
    compiled = graph.compile(
        checkpointer=checkpointer,
        interrupt_before=["editor"]   # human-in-the-loop breakpoint
    )

    return compiled
Enter fullscreen mode Exit fullscreen mode

Here's the message passing pattern between agents through the shared state:

┌──────────────────────────────────────────────────────────────────┐
│                   MESSAGE & STATE FLOW                           │
│                                                                  │
│  Researcher                                                      │
│  ├─ writes: research_notes, messages                             │
│  │                                                               │
│  ▼                                                               │
│  Writer (reads: research_notes, review_feedback)                 │
│  ├─ writes: draft, revision_count, messages                      │
│  │                                                               │
│  ▼                                                               │
│  Reviewer (reads: draft)                                         │
│  ├─ writes: review_score, review_feedback, messages              │
│  │                                                               │
│  ├──── score < 7? ──▶ Writer (reads updated review_feedback)    │
│  │                                                               │
│  ▼ score >= 7                                                    │
│  Editor (reads: draft)                                           │
│  ├─ writes: final_output, messages                               │
│  │                                                               │
│  ▼                                                               │
│  END                                                             │
└──────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Step 4: Run the Pipeline

# main.py
from graph import build_graph

app = build_graph()

# Initial input state
initial_state = {
    "topic": "Best practices for implementing vector search in PostgreSQL using pgvector",
    "messages": [],
    "research_notes": "",
    "draft": "",
    "review_score": 0,
    "review_feedback": "",
    "final_output": "",
    "revision_count": 0,
    "max_revisions": 3,
}

# Thread config for checkpointing
config = {"configurable": {"thread_id": "doc-processing-001"}}

# Run until the human-in-the-loop breakpoint (before "editor")
print("Starting pipeline...")
for event in app.stream(initial_state, config=config):
    # Each event is a dict with the node name as key
    for node_name, output in event.items():
        if node_name == "__end__":
            continue
        print(f"\n{'='*60}")
        print(f"  Node: {node_name}")
        if "review_score" in output:
            print(f"  Review Score: {output['review_score']}/10")
        if "revision_count" in output:
            print(f"  Revision: #{output['revision_count']}")
        print(f"{'='*60}")

# ── Human-in-the-loop checkpoint ─────────────────────────
# The graph paused before "editor". Inspect the state:
current_state = app.get_state(config)
print(f"\nPaused before: {current_state.next}")
print(f"Current draft preview: {current_state.values['draft'][:200]}...")

# Approve and continue (or modify state before resuming)
approval = input("\nApprove draft for final editing? (yes/no): ")
if approval.lower() == "yes":
    # Resume execution from the breakpoint
    for event in app.stream(None, config=config):
        for node_name, output in event.items():
            if node_name == "__end__":
                continue
            print(f"\n  Node: {node_name} — complete")

    # Retrieve final output
    final_state = app.get_state(config)
    print("\n\nFinal Document:")
    print(final_state.values["final_output"])
else:
    print("Pipeline halted by human reviewer.")
Enter fullscreen mode Exit fullscreen mode

Step 5: Error Handling and Retry Logic

Production systems need to handle LLM API failures, timeouts and malformed responses. Here's a robust retry wrapper that can be applied to any agent node.

# retry.py
import time
import logging
from functools import wraps
from typing import Callable
from langchain_core.exceptions import OutputParserException

logger = logging.getLogger(__name__)


def with_retry(
    max_attempts: int = 3,
    backoff_base: float = 2.0,
    retryable_exceptions: tuple = (Exception,),
):
    """Decorator that adds retry logic with exponential backoff to agent nodes.

    Usage:
        @with_retry(max_attempts=3)
        def researcher_node(state: AgentState) -> dict:
            ...
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(state: dict) -> dict:
            last_exception = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(state)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt == max_attempts:
                        logger.error(
                            f"Node '{func.__name__}' failed after "
                            f"{max_attempts} attempts: {e}"
                        )
                        # Return a safe fallback state so the graph
                        # can continue or route to an error handler
                        return {
                            "messages": [
                                {
                                    "role": "system",
                                    "content": (
                                        f"[ERROR] {func.__name__} failed: {str(e)}"
                                    ),
                                }
                            ]
                        }
                    wait = backoff_base ** attempt
                    logger.warning(
                        f"Node '{func.__name__}' attempt {attempt} failed: {e}. "
                        f"Retrying in {wait}s..."
                    )
                    time.sleep(wait)
            raise last_exception  # unreachable, but satisfies type checkers
        return wrapper
    return decorator


# Apply to agents:
# @with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, OutputParserException))
# def researcher_node(state: AgentState) -> dict:
#     ...
Enter fullscreen mode Exit fullscreen mode

For the Reviewer specifically, add output validation to handle malformed score parsing:

# validation.py
from state import AgentState


def validate_review_output(state: AgentState) -> AgentState:
    """Post-processing guard for the reviewer node output.

    Ensures review_score is within bounds and feedback is non-empty.
    Call this in the reviewer node before returning state.
    """
    score = state.get("review_score", 5)

    # Clamp score to valid range
    score = max(1, min(10, score))

    # Ensure feedback exists
    feedback = state.get("review_feedback", "")
    if not feedback.strip():
        feedback = "No detailed feedback provided. Defaulting to revision."
        score = min(score, 6)  # force revision if feedback is empty

    return {
        **state,
        "review_score": score,
        "review_feedback": feedback,
    }
Enter fullscreen mode Exit fullscreen mode

Step 6: Testing the Agent System

Testing multi-agent systems requires three layers: unit tests for individual nodes, integration tests for the full graph and mocked LLM calls for deterministic, CI-friendly runs.

Unit Testing Individual Agents

# tests/test_agents.py
import pytest
from unittest.mock import patch, MagicMock
from agents import researcher_node, writer_node, reviewer_node
from state import AgentState


@pytest.fixture
def base_state() -> AgentState:
    """Minimal valid state for testing."""
    return {
        "topic": "Unit testing LLM applications",
        "messages": [],
        "research_notes": "",
        "draft": "",
        "review_score": 0,
        "review_feedback": "",
        "final_output": "",
        "revision_count": 0,
        "max_revisions": 3,
    }


class TestResearcherNode:
    @patch("agents.llm")
    def test_produces_research_notes(self, mock_llm, base_state):
        """Researcher should populate research_notes from LLM output."""
        mock_response = MagicMock()
        mock_response.content = "- Fact 1: Testing is essential\n- Fact 2: Mocks help"
        mock_llm.invoke.return_value = mock_response

        result = researcher_node(base_state)

        assert "research_notes" in result
        assert len(result["research_notes"]) > 0
        assert "messages" in result
        mock_llm.invoke.assert_called_once()


class TestReviewerNode:
    @patch("agents.llm")
    def test_parses_high_score(self, mock_llm, base_state):
        """Reviewer should correctly parse a high score."""
        mock_response = MagicMock()
        mock_response.content = "SCORE: 8\nFEEDBACK: Well-structured and thorough."
        mock_llm.invoke.return_value = mock_response
        base_state["draft"] = "Some draft content to review."

        result = reviewer_node(base_state)

        assert result["review_score"] == 8
        assert "Well-structured" in result["review_feedback"]

    @patch("agents.llm")
    def test_handles_malformed_output(self, mock_llm, base_state):
        """Reviewer should default to score 5 on unparseable output."""
        mock_response = MagicMock()
        mock_response.content = "This document is decent overall."
        mock_llm.invoke.return_value = mock_response
        base_state["draft"] = "Some draft content."

        result = reviewer_node(base_state)

        assert result["review_score"] == 5  # default fallback
Enter fullscreen mode Exit fullscreen mode

Integration Testing the Full Graph

# tests/test_graph_integration.py
import pytest
from unittest.mock import patch, MagicMock
from graph import build_graph


def make_mock_response(content: str) -> MagicMock:
    mock = MagicMock()
    mock.content = content
    return mock


class TestFullGraphExecution:
    @patch("agents.llm")
    def test_happy_path_completes(self, mock_llm):
        """Graph should reach END when reviewer scores >= 7."""
        # Sequence: researcher → writer → reviewer (score 8) → editor
        mock_llm.invoke.side_effect = [
            make_mock_response("Research notes here"),       # researcher
            make_mock_response("Draft document here"),       # writer
            make_mock_response("SCORE: 8\nFEEDBACK: Good"), # reviewer
            make_mock_response("Final polished document"),   # editor
        ]

        # Build without interrupt_before so test runs to completion
        from langgraph.graph import StateGraph, END
        from langgraph.checkpoint.memory import MemorySaver
        from state import AgentState
        from agents import researcher_node, writer_node, reviewer_node, editor_node

        graph = StateGraph(AgentState)
        graph.add_node("researcher", researcher_node)
        graph.add_node("writer", writer_node)
        graph.add_node("reviewer", reviewer_node)
        graph.add_node("editor", editor_node)
        graph.set_entry_point("researcher")
        graph.add_edge("researcher", "writer")
        graph.add_edge("writer", "reviewer")

        from graph import route_after_review
        graph.add_conditional_edges("reviewer", route_after_review, {
            "writer": "writer",
            "editor": "editor",
        })
        graph.add_edge("editor", END)
        app = graph.compile(checkpointer=MemorySaver())

        config = {"configurable": {"thread_id": "test-001"}}
        initial = {
            "topic": "Test topic",
            "messages": [],
            "research_notes": "",
            "draft": "",
            "review_score": 0,
            "review_feedback": "",
            "final_output": "",
            "revision_count": 0,
            "max_revisions": 3,
        }

        events = list(app.stream(initial, config=config))

        # Verify the pipeline reached the editor
        final_state = app.get_state(config)
        assert final_state.values["final_output"] == "Final polished document"

    @patch("agents.llm")
    def test_revision_loop_triggers(self, mock_llm):
        """Graph should loop back to writer when score < 7."""
        mock_llm.invoke.side_effect = [
            make_mock_response("Research notes"),                  # researcher
            make_mock_response("Weak first draft"),                # writer (pass 1)
            make_mock_response("SCORE: 4\nFEEDBACK: Needs work"), # reviewer (fail)
            make_mock_response("Improved second draft"),           # writer (pass 2)
            make_mock_response("SCORE: 8\nFEEDBACK: Much better"),# reviewer (pass)
            make_mock_response("Final polished version"),          # editor
        ]

        from langgraph.graph import StateGraph, END
        from langgraph.checkpoint.memory import MemorySaver
        from state import AgentState
        from agents import researcher_node, writer_node, reviewer_node, editor_node
        from graph import route_after_review

        graph = StateGraph(AgentState)
        graph.add_node("researcher", researcher_node)
        graph.add_node("writer", writer_node)
        graph.add_node("reviewer", reviewer_node)
        graph.add_node("editor", editor_node)
        graph.set_entry_point("researcher")
        graph.add_edge("researcher", "writer")
        graph.add_edge("writer", "reviewer")
        graph.add_conditional_edges("reviewer", route_after_review, {
            "writer": "writer",
            "editor": "editor",
        })
        graph.add_edge("editor", END)
        app = graph.compile(checkpointer=MemorySaver())

        config = {"configurable": {"thread_id": "test-002"}}
        initial = {
            "topic": "Test topic",
            "messages": [],
            "research_notes": "",
            "draft": "",
            "review_score": 0,
            "review_feedback": "",
            "final_output": "",
            "revision_count": 0,
            "max_revisions": 3,
        }

        events = list(app.stream(initial, config=config))
        final = app.get_state(config)

        # Writer was called twice (initial + revision)
        assert final.values["revision_count"] == 2
        assert final.values["final_output"] == "Final polished version"
Enter fullscreen mode Exit fullscreen mode

Testing the Routing Logic in Isolation

# tests/test_routing.py
from graph import route_after_review


def test_high_score_routes_to_editor():
    state = {"review_score": 8, "revision_count": 1, "max_revisions": 3}
    assert route_after_review(state) == "editor"


def test_low_score_routes_to_writer():
    state = {"review_score": 4, "revision_count": 1, "max_revisions": 3}
    assert route_after_review(state) == "writer"


def test_low_score_at_max_revisions_routes_to_editor():
    state = {"review_score": 3, "revision_count": 3, "max_revisions": 3}
    assert route_after_review(state) == "editor"


def test_boundary_score_of_seven_routes_to_editor():
    state = {"review_score": 7, "revision_count": 0, "max_revisions": 3}
    assert route_after_review(state) == "editor"
Enter fullscreen mode Exit fullscreen mode

Step 7: Deployment on AWS

For production deployment, the two most practical options are ECS Fargate for long-running pipelines and Lambda for lightweight, event-triggered runs.

ECS Fargate (Recommended for Multi-Agent Pipelines)

Multi-agent document processing often takes 30-120 seconds per run, making Lambda's cold starts and 15-minute timeout less ideal. ECS Fargate gives predictable performance.

┌──────────────────────────────────────────────────────┐
│                  AWS DEPLOYMENT                      │
│                                                      │
│  ┌─────────┐    ┌───────────────┐    ┌────────────┐  │
│  │ API GW  │──▶│  ECS Fargate  │───▶│ PostgreSQL │  │
│  │ / ALB   │    │  (LangGraph)  │    │ (checkpts) │  │
│  └─────────┘    └───────┬───────┘    └────────────┘  │
│                         │                            │
│                    ┌────┴─────┐                      │
│                    │  Redis   │                      │
│                    │ (state)  │                      │
│                    └──────────┘                      │
└──────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode
# Dockerfile
# ── Dockerfile ──────────────────────────────────────────
# FROM python:3.11-slim
# WORKDIR /app
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt
# COPY . .
# CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]

# api.py — FastAPI wrapper for the graph
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
from graph import build_graph
import os

app = FastAPI()


class ProcessRequest(BaseModel):
    topic: str
    thread_id: str
    max_revisions: int = 3


class ProcessResponse(BaseModel):
    thread_id: str
    status: str
    final_output: str | None = None


# Use PostgresSaver in production for durable checkpoints
DB_URI = os.environ["DATABASE_URL"]


def get_graph():
    """Build graph with production-grade Postgres checkpointer."""
    from langgraph.graph import StateGraph, END
    from langgraph.checkpoint.postgres import PostgresSaver
    from state import AgentState
    from agents import researcher_node, writer_node, reviewer_node, editor_node

    checkpointer = PostgresSaver.from_conn_string(DB_URI)
    checkpointer.setup()  # creates tables if they don't exist

    graph = StateGraph(AgentState)
    graph.add_node("researcher", researcher_node)
    graph.add_node("writer", writer_node)
    graph.add_node("reviewer", reviewer_node)
    graph.add_node("editor", editor_node)
    graph.set_entry_point("researcher")
    graph.add_edge("researcher", "writer")
    graph.add_edge("writer", "reviewer")

    from graph import route_after_review
    graph.add_conditional_edges("reviewer", route_after_review, {
        "writer": "writer",
        "editor": "editor",
    })
    graph.add_edge("editor", END)

    return graph.compile(checkpointer=checkpointer)


@app.post("/process", response_model=ProcessResponse)
async def process_document(req: ProcessRequest, bg: BackgroundTasks):
    """Kick off async document processing."""
    bg.add_task(run_pipeline, req.topic, req.thread_id, req.max_revisions)
    return ProcessResponse(thread_id=req.thread_id, status="processing")


async def run_pipeline(topic: str, thread_id: str, max_revisions: int):
    compiled = get_graph()
    config = {"configurable": {"thread_id": thread_id}}
    initial = {
        "topic": topic,
        "messages": [],
        "research_notes": "",
        "draft": "",
        "review_score": 0,
        "review_feedback": "",
        "final_output": "",
        "revision_count": 0,
        "max_revisions": max_revisions,
    }
    async for _ in compiled.astream(initial, config=config):
        pass  # events are checkpointed automatically


@app.get("/status/{thread_id}", response_model=ProcessResponse)
async def get_status(thread_id: str):
    """Poll processing status by thread ID."""
    compiled = get_graph()
    config = {"configurable": {"thread_id": thread_id}}
    state = compiled.get_state(config)

    if state.next:
        return ProcessResponse(thread_id=thread_id, status=f"in_progress (next: {state.next})")

    output = state.values.get("final_output", "")
    return ProcessResponse(
        thread_id=thread_id,
        status="completed" if output else "unknown",
        final_output=output,
    )
Enter fullscreen mode Exit fullscreen mode

Infrastructure as Code (Terraform Snippet)

# ecs.tf — Key resource definitions (simplified)
resource "aws_ecs_service" "langgraph_agents" {
  name            = "langgraph-document-processor"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.langgraph.arn
  desired_count   = 2

  capacity_provider_strategy {
    capacity_provider = "FARGATE"
    weight            = 1
  }

  network_configuration {
    subnets         = var.private_subnets
    security_groups = [aws_security_group.ecs.id]
  }
}

resource "aws_ecs_task_definition" "langgraph" {
  family                   = "langgraph-processor"
  requires_compatibilities = ["FARGATE"]
  cpu                      = 1024    # 1 vCPU
  memory                   = 2048    # 2 GB — sufficient for orchestration

  container_definitions = jsonencode([{
    name  = "langgraph-app"
    image = "${aws_ecr_repository.app.repository_url}:latest"
    portMappings = [{ containerPort = 8000 }]
    environment = [
      { name = "DATABASE_URL", value = var.db_connection_string },
      { name = "OPENAI_API_KEY", valueFrom = var.openai_secret_arn },
    ]
    logConfiguration = {
      logDriver = "awslogs"
      options = {
        "awslogs-group"  = "/ecs/langgraph"
        "awslogs-region" = var.region
      }
    }
  }])
}
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls

These are the issues that consistently trip people up when building multi-agent systems with LangGraph. Each one comes from patterns that are easy to miss until they break in production.

1. State Reducer Mismatches

The problem: Assigning a new value to messages instead of appending, which silently drops conversation history.

# WRONG — this replaces the entire message list
return {"messages": [HumanMessage(content="new message")]}

# RIGHT — add_messages reducer handles append/dedup automatically
# The list you return gets MERGED into the existing messages
# This works correctly because of the add_messages annotation
return {"messages": [HumanMessage(content="new message")]}
Enter fullscreen mode Exit fullscreen mode

The code looks identical because the behavior depends on the reducer. If you accidentally define messages without Annotated[..., add_messages], the second snippet silently replaces instead of appending.

2. Infinite Revision Loops

The problem: The reviewer consistently scores below the threshold and there's no escape hatch.

The fix: Always cap revisions (as done in route_after_review above) and log when the cap is hit. In production, emit a metric so you can track how often documents exhaust their revision budget.

3. Non-Deterministic Test Failures

The problem: Tests pass locally but fail in CI because they hit a real LLM API.

The fix: Mock at the LLM layer, not at the agent layer. Patch agents.llm.invoke so you're still testing your parsing and state-update logic.

4. Checkpoint Bloat

The problem: Every intermediate state is checkpointed and for long documents with multiple revisions, the checkpoint store grows fast.

The fix: Configure checkpoint TTLs. With PostgresSaver, add a scheduled cleanup job:

-- Clean up checkpoints older than 7 days
DELETE FROM checkpoints
WHERE created_at < NOW() - INTERVAL '7 days';
Enter fullscreen mode Exit fullscreen mode

5. Missing Error Boundaries Between Nodes

The problem: One agent throws an unhandled exception and the entire graph crashes with no state saved.

The fix: Use the retry decorator shown in Step 5. Additionally, add a dedicated error-handler node that the graph can route to on failure, allowing graceful degradation instead of a full crash.

6. Blocking on Human-in-the-Loop

The problem: The graph pauses at an interrupt_before node and the process sits idle consuming resources while waiting for human input.

The fix: Use async execution with the astream API and persist state via PostgresSaver. The process can shut down after checkpointing. When the human approves, a new process resumes from the checkpoint. This is exactly why durable checkpointing matters in production.


Wrapping Up

The system built in this tutorial covers the full lifecycle of a multi-agent LangGraph application:

  • State design with typed reducers that make data flow explicit
  • Specialized agents that each own a single responsibility
  • Conditional routing with revision loops and bounded retries
  • Human-in-the-loop breakpoints backed by durable checkpoints
  • Three layers of testing — unit, integration and routing logic in isolation
  • Production deployment on ECS Fargate with Postgres-backed state

The complete source code for this tutorial is structured as follows:

langgraph-multi-agent/
├── state.py          # AgentState definition with reducers
├── agents.py         # All four agent node functions
├── graph.py          # Graph construction and conditional routing
├── retry.py          # Retry decorator with exponential backoff
├── validation.py     # Output validation utilities
├── api.py            # FastAPI production wrapper
├── main.py           # Local development runner
├── Dockerfile
├── ecs.tf            # Terraform deployment config
├── requirements.txt
└── tests/
    ├── test_agents.py
    ├── test_graph_integration.py
    └── test_routing.py
Enter fullscreen mode Exit fullscreen mode

The patterns here: state machines for orchestration, bounded loops, explicit error handling, checkpoint-based resumption generalize well beyond document processing. The same architecture works for code review pipelines, data validation workflows, customer support automation and any scenario where multiple AI specialists need to collaborate with oversight.

Start with the simplest graph that works, add complexity only where the failure modes demand it and always test the routing logic independently from the LLM calls.


Found this useful? Follow for more deep dives into production AI systems, LLM orchestration and applied ML engineering.

Top comments (0)