A step-by-step guide to building, testing and deploying a multi-agent document processing pipeline with LangGraph
Multi-agent systems are one of the most powerful patterns in modern AI engineering and one of the easiest to get wrong. A single LLM call is straightforward. Chaining a few together is manageable. But orchestrating four specialized agents that pass state, handle failures, loop on feedback and support human intervention? That requires real architecture.
This tutorial walks through building a production-grade multi-agent document processing system using LangGraph. By the end, you'll have a working pipeline where a Researcher agent gathers information, a Writer drafts content, a Reviewer critiques it and an Editor produces the final output, all orchestrated as a compiled state graph with error handling, human-in-the-loop breakpoints and a clear deployment path.
Let's build it.
Architecture Overview
Before writing any code, let's map out the system. Here's the high-level agent workflow:
┌────────────────────────────────────────────────────────────┐
│ DOCUMENT PROCESSING GRAPH │
│ │
│ ┌────────┐ ┌────────┐ ┌──────────┐ ┌──────────┐ │
│ │Research│──▶│ Writer │───▶│ Reviewer │──▶│ Editor │ │
│ │ Agent │ │ Agent │◀──│ Agent │ │ Agent │ │
│ └────────┘ └────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌────────────┐ ┌──────────┐ │
│ │ Tools: │ │ HUMAN-IN- │ │ OUTPUT │ │
│ │ Search, │ │ THE-LOOP │ │ (Final) │ │
│ │ Parse │ │ BREAKPOINT │ └──────────┘ │
│ └─────────┘ └────────────┘ │
└────────────────────────────────────────────────────────────┘
Each agent is a node in a LangGraph StateGraph. Edges define transitions. Conditional edges handle routing logic, for example sending the Writer's output back for revision if the Reviewer scores it below a threshold.
Here's the state machine in detail:
┌───────────────┐
│ START │
└───────┬───────┘
│
▼
┌───────────────┐
│ Researcher │
│ (gather data)│
└───────┬───────┘
│
▼
┌───────────────┐
┌───▶│ Writer │
│ │ (draft text) │
│ └───────┬───────┘
│ │
│ ▼
│ ┌───────────────┐
│ │ Reviewer │──────┐
│ │(score & notes)│ │
│ └───────┬───────┘ │
│ │ │
│ score < 7? score >= 7?
│ │ │
│ ▼ ▼
│ ┌──────────────┐ ┌──────────┐
└─────│ (revision) │ │ Editor │
│ loop back │ │ (polish) │
└──────────────┘ └────┬─────┘
│
▼
┌────────────┐
│ END │
└────────────┘
Step 1: Define the Shared State
Every node in a LangGraph graph reads from and writes to a shared state object. Designing this state well is critical, it's the contract between all your agents.
# state.py
from __future__ import annotations
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
def replace_value(existing: str, new: str) -> str:
"""Reducer that replaces the old value with the new one."""
return new
def increment(existing: int, new: int) -> int:
"""Reducer that sums values useful for counters."""
return existing + new
class AgentState(TypedDict):
"""Shared state passed between all agent nodes.
Each field uses an annotated reducer so that node outputs
are merged correctly into the running state.
"""
# Conversation-style message history (appended automatically)
messages: Annotated[list[BaseMessage], add_messages]
# Structured data passed between stages
research_notes: Annotated[str, replace_value]
draft: Annotated[str, replace_value]
review_score: Annotated[int, replace_value]
review_feedback: Annotated[str, replace_value]
final_output: Annotated[str, replace_value]
# Control flow
revision_count: Annotated[int, increment]
max_revisions: Annotated[int, replace_value]
topic: Annotated[str, replace_value]
A few design decisions worth calling out:
-
messagesusesadd_messages— LangGraph's built-in reducer that appends new messages and handles deduplication by ID. This gives every agent access to the full conversation history. -
revision_countusesincrement— each time the Writer revises, it emits{"revision_count": 1}and the reducer accumulates. -
Structured fields like
draftusereplace_value— the latest version always wins.
Step 2: Build the Agent Nodes
Each node is a function that takes the current state and returns a partial state update. Let's build all four.
# agents.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from state import AgentState
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
# ── Researcher Agent ──────────────────────────────────────────
def researcher_node(state: AgentState) -> dict:
"""Gathers background information on the topic.
In production, this node would call external tools: search APIs,
internal databases, document stores. Here we simulate tool use
by prompting the LLM to produce structured research notes.
"""
topic = state["topic"]
sys_prompt = SystemMessage(content=(
"You are a senior research analyst. Given a topic, produce "
"detailed, structured research notes with key facts, statistics, "
"and source references. Output in markdown bullet-point format."
))
user_prompt = HumanMessage(content=f"Research the following topic thoroughly:\n\n{topic}")
response = llm.invoke([sys_prompt, user_prompt])
return {
"research_notes": response.content,
"messages": [HumanMessage(content=f"[Researcher] Completed research on: {topic}")]
}
# ── Writer Agent ──────────────────────────────────────────────
def writer_node(state: AgentState) -> dict:
"""Drafts (or revises) the document based on research and feedback.
On the first pass, it writes from research notes alone. On revision
passes, it incorporates reviewer feedback to improve the draft.
"""
research = state["research_notes"]
feedback = state.get("review_feedback", "")
current_draft = state.get("draft", "")
revision = state.get("revision_count", 0)
if revision > 0 and current_draft:
# Revision pass — improve existing draft using feedback
sys_prompt = SystemMessage(content=(
"You are a professional technical writer. Revise the draft below "
"based on the reviewer's feedback. Maintain technical accuracy "
"and improve clarity, structure and completeness."
))
user_prompt = HumanMessage(content=(
f"## Current Draft\n{current_draft}\n\n"
f"## Reviewer Feedback\n{feedback}\n\n"
"Produce an improved version of the full document."
))
else:
# First pass — write from research notes
sys_prompt = SystemMessage(content=(
"You are a professional technical writer. Using the research notes "
"provided, write a well-structured, detailed document. Use clear "
"headings, code examples where appropriate and maintain a "
"professional tone throughout."
))
user_prompt = HumanMessage(content=f"## Research Notes\n{research}")
response = llm.invoke([sys_prompt, user_prompt])
return {
"draft": response.content,
"revision_count": 1,
"messages": [HumanMessage(
content=f"[Writer] {'Revised' if revision > 0 else 'Created'} draft "
f"(revision {revision + 1})"
)]
}
# ── Reviewer Agent ────────────────────────────────────────────
def reviewer_node(state: AgentState) -> dict:
"""Scores the draft 1-10 and provides actionable feedback.
The score determines routing: >= 7 proceeds to editing,
< 7 sends the draft back for revision (up to max_revisions).
"""
draft = state["draft"]
sys_prompt = SystemMessage(content=(
"You are a senior technical editor and reviewer. Evaluate the "
"document on: accuracy, completeness, clarity, structure and "
"readability.\n\n"
"You MUST respond in EXACTLY this format:\n"
"SCORE: <integer 1-10>\n"
"FEEDBACK: <detailed, actionable feedback>"
))
user_prompt = HumanMessage(content=f"Review this document:\n\n{draft}")
response = llm.invoke([sys_prompt, user_prompt])
content = response.content
# Parse the structured response
score = 5 # default if parsing fails
feedback = content
for line in content.split("\n"):
if line.strip().upper().startswith("SCORE:"):
try:
score = int(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
if line.strip().upper().startswith("FEEDBACK:"):
feedback = line.split(":", 1)[1].strip()
return {
"review_score": score,
"review_feedback": feedback,
"messages": [HumanMessage(
content=f"[Reviewer] Score: {score}/10 — {feedback[:120]}..."
)]
}
# ── Editor Agent ──────────────────────────────────────────────
def editor_node(state: AgentState) -> dict:
"""Final polish pass — grammar, formatting, consistency."""
draft = state["draft"]
sys_prompt = SystemMessage(content=(
"You are a meticulous copy editor. Polish the document for grammar, "
"punctuation, formatting consistency and readability. Do not change "
"the substance or meaning. Return the final, publication-ready version."
))
user_prompt = HumanMessage(content=f"Edit and polish:\n\n{draft}")
response = llm.invoke([sys_prompt, user_prompt])
return {
"final_output": response.content,
"messages": [HumanMessage(content="[Editor] Final version ready.")]
}
Step 3: Wire Up the Graph with Conditional Routing
Now for the orchestration layer. This is where LangGraph shines, defining the directed graph of agent transitions including conditional branches.
# graph.py
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state import AgentState
from agents import researcher_node, writer_node, reviewer_node, editor_node
# Maximum revision loops before forcing completion
MAX_REVISIONS = 3
def route_after_review(state: AgentState) -> str:
"""Conditional edge: decide whether to revise or proceed to editing.
Three possible outcomes:
1. Score >= 7 → move to editor (good enough)
2. Score < 7, budget → loop back to writer (needs work)
3. Score < 7, no budget → move to editor anyway (cap reached)
"""
score = state.get("review_score", 0)
revisions = state.get("revision_count", 0)
max_rev = state.get("max_revisions", MAX_REVISIONS)
if score >= 7:
return "editor"
elif revisions < max_rev:
return "writer" # loop back for revision
else:
return "editor" # exhausted revision budget
def build_graph() -> StateGraph:
"""Construct and compile the multi-agent document processing graph."""
graph = StateGraph(AgentState)
# ── Register nodes ──────────────────────────────────
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_node("editor", editor_node)
# ── Define edges ────────────────────────────────────
graph.set_entry_point("researcher")
graph.add_edge("researcher", "writer")
graph.add_edge("writer", "reviewer")
# Conditional routing after review
graph.add_conditional_edges(
"reviewer",
route_after_review,
{
"writer": "writer", # revision loop
"editor": "editor", # proceed to final edit
}
)
graph.add_edge("editor", END)
# ── Compile with checkpointing ──────────────────────
# MemorySaver for development; swap to SqliteSaver or
# PostgresSaver in production for persistence.
checkpointer = MemorySaver()
compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["editor"] # human-in-the-loop breakpoint
)
return compiled
Here's the message passing pattern between agents through the shared state:
┌──────────────────────────────────────────────────────────────────┐
│ MESSAGE & STATE FLOW │
│ │
│ Researcher │
│ ├─ writes: research_notes, messages │
│ │ │
│ ▼ │
│ Writer (reads: research_notes, review_feedback) │
│ ├─ writes: draft, revision_count, messages │
│ │ │
│ ▼ │
│ Reviewer (reads: draft) │
│ ├─ writes: review_score, review_feedback, messages │
│ │ │
│ ├──── score < 7? ──▶ Writer (reads updated review_feedback) │
│ │ │
│ ▼ score >= 7 │
│ Editor (reads: draft) │
│ ├─ writes: final_output, messages │
│ │ │
│ ▼ │
│ END │
└──────────────────────────────────────────────────────────────────┘
Step 4: Run the Pipeline
# main.py
from graph import build_graph
app = build_graph()
# Initial input state
initial_state = {
"topic": "Best practices for implementing vector search in PostgreSQL using pgvector",
"messages": [],
"research_notes": "",
"draft": "",
"review_score": 0,
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"max_revisions": 3,
}
# Thread config for checkpointing
config = {"configurable": {"thread_id": "doc-processing-001"}}
# Run until the human-in-the-loop breakpoint (before "editor")
print("Starting pipeline...")
for event in app.stream(initial_state, config=config):
# Each event is a dict with the node name as key
for node_name, output in event.items():
if node_name == "__end__":
continue
print(f"\n{'='*60}")
print(f" Node: {node_name}")
if "review_score" in output:
print(f" Review Score: {output['review_score']}/10")
if "revision_count" in output:
print(f" Revision: #{output['revision_count']}")
print(f"{'='*60}")
# ── Human-in-the-loop checkpoint ─────────────────────────
# The graph paused before "editor". Inspect the state:
current_state = app.get_state(config)
print(f"\nPaused before: {current_state.next}")
print(f"Current draft preview: {current_state.values['draft'][:200]}...")
# Approve and continue (or modify state before resuming)
approval = input("\nApprove draft for final editing? (yes/no): ")
if approval.lower() == "yes":
# Resume execution from the breakpoint
for event in app.stream(None, config=config):
for node_name, output in event.items():
if node_name == "__end__":
continue
print(f"\n Node: {node_name} — complete")
# Retrieve final output
final_state = app.get_state(config)
print("\n\nFinal Document:")
print(final_state.values["final_output"])
else:
print("Pipeline halted by human reviewer.")
Step 5: Error Handling and Retry Logic
Production systems need to handle LLM API failures, timeouts and malformed responses. Here's a robust retry wrapper that can be applied to any agent node.
# retry.py
import time
import logging
from functools import wraps
from typing import Callable
from langchain_core.exceptions import OutputParserException
logger = logging.getLogger(__name__)
def with_retry(
max_attempts: int = 3,
backoff_base: float = 2.0,
retryable_exceptions: tuple = (Exception,),
):
"""Decorator that adds retry logic with exponential backoff to agent nodes.
Usage:
@with_retry(max_attempts=3)
def researcher_node(state: AgentState) -> dict:
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(state: dict) -> dict:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(state)
except retryable_exceptions as e:
last_exception = e
if attempt == max_attempts:
logger.error(
f"Node '{func.__name__}' failed after "
f"{max_attempts} attempts: {e}"
)
# Return a safe fallback state so the graph
# can continue or route to an error handler
return {
"messages": [
{
"role": "system",
"content": (
f"[ERROR] {func.__name__} failed: {str(e)}"
),
}
]
}
wait = backoff_base ** attempt
logger.warning(
f"Node '{func.__name__}' attempt {attempt} failed: {e}. "
f"Retrying in {wait}s..."
)
time.sleep(wait)
raise last_exception # unreachable, but satisfies type checkers
return wrapper
return decorator
# Apply to agents:
# @with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, OutputParserException))
# def researcher_node(state: AgentState) -> dict:
# ...
For the Reviewer specifically, add output validation to handle malformed score parsing:
# validation.py
from state import AgentState
def validate_review_output(state: AgentState) -> AgentState:
"""Post-processing guard for the reviewer node output.
Ensures review_score is within bounds and feedback is non-empty.
Call this in the reviewer node before returning state.
"""
score = state.get("review_score", 5)
# Clamp score to valid range
score = max(1, min(10, score))
# Ensure feedback exists
feedback = state.get("review_feedback", "")
if not feedback.strip():
feedback = "No detailed feedback provided. Defaulting to revision."
score = min(score, 6) # force revision if feedback is empty
return {
**state,
"review_score": score,
"review_feedback": feedback,
}
Step 6: Testing the Agent System
Testing multi-agent systems requires three layers: unit tests for individual nodes, integration tests for the full graph and mocked LLM calls for deterministic, CI-friendly runs.
Unit Testing Individual Agents
# tests/test_agents.py
import pytest
from unittest.mock import patch, MagicMock
from agents import researcher_node, writer_node, reviewer_node
from state import AgentState
@pytest.fixture
def base_state() -> AgentState:
"""Minimal valid state for testing."""
return {
"topic": "Unit testing LLM applications",
"messages": [],
"research_notes": "",
"draft": "",
"review_score": 0,
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"max_revisions": 3,
}
class TestResearcherNode:
@patch("agents.llm")
def test_produces_research_notes(self, mock_llm, base_state):
"""Researcher should populate research_notes from LLM output."""
mock_response = MagicMock()
mock_response.content = "- Fact 1: Testing is essential\n- Fact 2: Mocks help"
mock_llm.invoke.return_value = mock_response
result = researcher_node(base_state)
assert "research_notes" in result
assert len(result["research_notes"]) > 0
assert "messages" in result
mock_llm.invoke.assert_called_once()
class TestReviewerNode:
@patch("agents.llm")
def test_parses_high_score(self, mock_llm, base_state):
"""Reviewer should correctly parse a high score."""
mock_response = MagicMock()
mock_response.content = "SCORE: 8\nFEEDBACK: Well-structured and thorough."
mock_llm.invoke.return_value = mock_response
base_state["draft"] = "Some draft content to review."
result = reviewer_node(base_state)
assert result["review_score"] == 8
assert "Well-structured" in result["review_feedback"]
@patch("agents.llm")
def test_handles_malformed_output(self, mock_llm, base_state):
"""Reviewer should default to score 5 on unparseable output."""
mock_response = MagicMock()
mock_response.content = "This document is decent overall."
mock_llm.invoke.return_value = mock_response
base_state["draft"] = "Some draft content."
result = reviewer_node(base_state)
assert result["review_score"] == 5 # default fallback
Integration Testing the Full Graph
# tests/test_graph_integration.py
import pytest
from unittest.mock import patch, MagicMock
from graph import build_graph
def make_mock_response(content: str) -> MagicMock:
mock = MagicMock()
mock.content = content
return mock
class TestFullGraphExecution:
@patch("agents.llm")
def test_happy_path_completes(self, mock_llm):
"""Graph should reach END when reviewer scores >= 7."""
# Sequence: researcher → writer → reviewer (score 8) → editor
mock_llm.invoke.side_effect = [
make_mock_response("Research notes here"), # researcher
make_mock_response("Draft document here"), # writer
make_mock_response("SCORE: 8\nFEEDBACK: Good"), # reviewer
make_mock_response("Final polished document"), # editor
]
# Build without interrupt_before so test runs to completion
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state import AgentState
from agents import researcher_node, writer_node, reviewer_node, editor_node
graph = StateGraph(AgentState)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_node("editor", editor_node)
graph.set_entry_point("researcher")
graph.add_edge("researcher", "writer")
graph.add_edge("writer", "reviewer")
from graph import route_after_review
graph.add_conditional_edges("reviewer", route_after_review, {
"writer": "writer",
"editor": "editor",
})
graph.add_edge("editor", END)
app = graph.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "test-001"}}
initial = {
"topic": "Test topic",
"messages": [],
"research_notes": "",
"draft": "",
"review_score": 0,
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"max_revisions": 3,
}
events = list(app.stream(initial, config=config))
# Verify the pipeline reached the editor
final_state = app.get_state(config)
assert final_state.values["final_output"] == "Final polished document"
@patch("agents.llm")
def test_revision_loop_triggers(self, mock_llm):
"""Graph should loop back to writer when score < 7."""
mock_llm.invoke.side_effect = [
make_mock_response("Research notes"), # researcher
make_mock_response("Weak first draft"), # writer (pass 1)
make_mock_response("SCORE: 4\nFEEDBACK: Needs work"), # reviewer (fail)
make_mock_response("Improved second draft"), # writer (pass 2)
make_mock_response("SCORE: 8\nFEEDBACK: Much better"),# reviewer (pass)
make_mock_response("Final polished version"), # editor
]
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state import AgentState
from agents import researcher_node, writer_node, reviewer_node, editor_node
from graph import route_after_review
graph = StateGraph(AgentState)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_node("editor", editor_node)
graph.set_entry_point("researcher")
graph.add_edge("researcher", "writer")
graph.add_edge("writer", "reviewer")
graph.add_conditional_edges("reviewer", route_after_review, {
"writer": "writer",
"editor": "editor",
})
graph.add_edge("editor", END)
app = graph.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "test-002"}}
initial = {
"topic": "Test topic",
"messages": [],
"research_notes": "",
"draft": "",
"review_score": 0,
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"max_revisions": 3,
}
events = list(app.stream(initial, config=config))
final = app.get_state(config)
# Writer was called twice (initial + revision)
assert final.values["revision_count"] == 2
assert final.values["final_output"] == "Final polished version"
Testing the Routing Logic in Isolation
# tests/test_routing.py
from graph import route_after_review
def test_high_score_routes_to_editor():
state = {"review_score": 8, "revision_count": 1, "max_revisions": 3}
assert route_after_review(state) == "editor"
def test_low_score_routes_to_writer():
state = {"review_score": 4, "revision_count": 1, "max_revisions": 3}
assert route_after_review(state) == "writer"
def test_low_score_at_max_revisions_routes_to_editor():
state = {"review_score": 3, "revision_count": 3, "max_revisions": 3}
assert route_after_review(state) == "editor"
def test_boundary_score_of_seven_routes_to_editor():
state = {"review_score": 7, "revision_count": 0, "max_revisions": 3}
assert route_after_review(state) == "editor"
Step 7: Deployment on AWS
For production deployment, the two most practical options are ECS Fargate for long-running pipelines and Lambda for lightweight, event-triggered runs.
ECS Fargate (Recommended for Multi-Agent Pipelines)
Multi-agent document processing often takes 30-120 seconds per run, making Lambda's cold starts and 15-minute timeout less ideal. ECS Fargate gives predictable performance.
┌──────────────────────────────────────────────────────┐
│ AWS DEPLOYMENT │
│ │
│ ┌─────────┐ ┌───────────────┐ ┌────────────┐ │
│ │ API GW │──▶│ ECS Fargate │───▶│ PostgreSQL │ │
│ │ / ALB │ │ (LangGraph) │ │ (checkpts) │ │
│ └─────────┘ └───────┬───────┘ └────────────┘ │
│ │ │
│ ┌────┴─────┐ │
│ │ Redis │ │
│ │ (state) │ │
│ └──────────┘ │
└──────────────────────────────────────────────────────┘
# Dockerfile
# ── Dockerfile ──────────────────────────────────────────
# FROM python:3.11-slim
# WORKDIR /app
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt
# COPY . .
# CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
# api.py — FastAPI wrapper for the graph
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
from graph import build_graph
import os
app = FastAPI()
class ProcessRequest(BaseModel):
topic: str
thread_id: str
max_revisions: int = 3
class ProcessResponse(BaseModel):
thread_id: str
status: str
final_output: str | None = None
# Use PostgresSaver in production for durable checkpoints
DB_URI = os.environ["DATABASE_URL"]
def get_graph():
"""Build graph with production-grade Postgres checkpointer."""
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from state import AgentState
from agents import researcher_node, writer_node, reviewer_node, editor_node
checkpointer = PostgresSaver.from_conn_string(DB_URI)
checkpointer.setup() # creates tables if they don't exist
graph = StateGraph(AgentState)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_node("editor", editor_node)
graph.set_entry_point("researcher")
graph.add_edge("researcher", "writer")
graph.add_edge("writer", "reviewer")
from graph import route_after_review
graph.add_conditional_edges("reviewer", route_after_review, {
"writer": "writer",
"editor": "editor",
})
graph.add_edge("editor", END)
return graph.compile(checkpointer=checkpointer)
@app.post("/process", response_model=ProcessResponse)
async def process_document(req: ProcessRequest, bg: BackgroundTasks):
"""Kick off async document processing."""
bg.add_task(run_pipeline, req.topic, req.thread_id, req.max_revisions)
return ProcessResponse(thread_id=req.thread_id, status="processing")
async def run_pipeline(topic: str, thread_id: str, max_revisions: int):
compiled = get_graph()
config = {"configurable": {"thread_id": thread_id}}
initial = {
"topic": topic,
"messages": [],
"research_notes": "",
"draft": "",
"review_score": 0,
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"max_revisions": max_revisions,
}
async for _ in compiled.astream(initial, config=config):
pass # events are checkpointed automatically
@app.get("/status/{thread_id}", response_model=ProcessResponse)
async def get_status(thread_id: str):
"""Poll processing status by thread ID."""
compiled = get_graph()
config = {"configurable": {"thread_id": thread_id}}
state = compiled.get_state(config)
if state.next:
return ProcessResponse(thread_id=thread_id, status=f"in_progress (next: {state.next})")
output = state.values.get("final_output", "")
return ProcessResponse(
thread_id=thread_id,
status="completed" if output else "unknown",
final_output=output,
)
Infrastructure as Code (Terraform Snippet)
# ecs.tf — Key resource definitions (simplified)
resource "aws_ecs_service" "langgraph_agents" {
name = "langgraph-document-processor"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.langgraph.arn
desired_count = 2
capacity_provider_strategy {
capacity_provider = "FARGATE"
weight = 1
}
network_configuration {
subnets = var.private_subnets
security_groups = [aws_security_group.ecs.id]
}
}
resource "aws_ecs_task_definition" "langgraph" {
family = "langgraph-processor"
requires_compatibilities = ["FARGATE"]
cpu = 1024 # 1 vCPU
memory = 2048 # 2 GB — sufficient for orchestration
container_definitions = jsonencode([{
name = "langgraph-app"
image = "${aws_ecr_repository.app.repository_url}:latest"
portMappings = [{ containerPort = 8000 }]
environment = [
{ name = "DATABASE_URL", value = var.db_connection_string },
{ name = "OPENAI_API_KEY", valueFrom = var.openai_secret_arn },
]
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = "/ecs/langgraph"
"awslogs-region" = var.region
}
}
}])
}
Common Pitfalls
These are the issues that consistently trip people up when building multi-agent systems with LangGraph. Each one comes from patterns that are easy to miss until they break in production.
1. State Reducer Mismatches
The problem: Assigning a new value to messages instead of appending, which silently drops conversation history.
# WRONG — this replaces the entire message list
return {"messages": [HumanMessage(content="new message")]}
# RIGHT — add_messages reducer handles append/dedup automatically
# The list you return gets MERGED into the existing messages
# This works correctly because of the add_messages annotation
return {"messages": [HumanMessage(content="new message")]}
The code looks identical because the behavior depends on the reducer. If you accidentally define messages without Annotated[..., add_messages], the second snippet silently replaces instead of appending.
2. Infinite Revision Loops
The problem: The reviewer consistently scores below the threshold and there's no escape hatch.
The fix: Always cap revisions (as done in route_after_review above) and log when the cap is hit. In production, emit a metric so you can track how often documents exhaust their revision budget.
3. Non-Deterministic Test Failures
The problem: Tests pass locally but fail in CI because they hit a real LLM API.
The fix: Mock at the LLM layer, not at the agent layer. Patch agents.llm.invoke so you're still testing your parsing and state-update logic.
4. Checkpoint Bloat
The problem: Every intermediate state is checkpointed and for long documents with multiple revisions, the checkpoint store grows fast.
The fix: Configure checkpoint TTLs. With PostgresSaver, add a scheduled cleanup job:
-- Clean up checkpoints older than 7 days
DELETE FROM checkpoints
WHERE created_at < NOW() - INTERVAL '7 days';
5. Missing Error Boundaries Between Nodes
The problem: One agent throws an unhandled exception and the entire graph crashes with no state saved.
The fix: Use the retry decorator shown in Step 5. Additionally, add a dedicated error-handler node that the graph can route to on failure, allowing graceful degradation instead of a full crash.
6. Blocking on Human-in-the-Loop
The problem: The graph pauses at an interrupt_before node and the process sits idle consuming resources while waiting for human input.
The fix: Use async execution with the astream API and persist state via PostgresSaver. The process can shut down after checkpointing. When the human approves, a new process resumes from the checkpoint. This is exactly why durable checkpointing matters in production.
Wrapping Up
The system built in this tutorial covers the full lifecycle of a multi-agent LangGraph application:
- State design with typed reducers that make data flow explicit
- Specialized agents that each own a single responsibility
- Conditional routing with revision loops and bounded retries
- Human-in-the-loop breakpoints backed by durable checkpoints
- Three layers of testing — unit, integration and routing logic in isolation
- Production deployment on ECS Fargate with Postgres-backed state
The complete source code for this tutorial is structured as follows:
langgraph-multi-agent/
├── state.py # AgentState definition with reducers
├── agents.py # All four agent node functions
├── graph.py # Graph construction and conditional routing
├── retry.py # Retry decorator with exponential backoff
├── validation.py # Output validation utilities
├── api.py # FastAPI production wrapper
├── main.py # Local development runner
├── Dockerfile
├── ecs.tf # Terraform deployment config
├── requirements.txt
└── tests/
├── test_agents.py
├── test_graph_integration.py
└── test_routing.py
The patterns here: state machines for orchestration, bounded loops, explicit error handling, checkpoint-based resumption generalize well beyond document processing. The same architecture works for code review pipelines, data validation workflows, customer support automation and any scenario where multiple AI specialists need to collaborate with oversight.
Start with the simplest graph that works, add complexity only where the failure modes demand it and always test the routing logic independently from the LLM calls.
Found this useful? Follow for more deep dives into production AI systems, LLM orchestration and applied ML engineering.
Top comments (0)