⚡ TL;DR
The Problem: LangGraph's @task decorator promises durable execution — caching results so non-deterministic code doesn't re-run after interrupt(). It works locally. It silently breaks when deployed via LangGraph API server.
Why: API server injects checkpointer at runtime. Node-level checkpointing uses this runtime checkpointer ✅. Task-level checkpointing looks for a compile-time checkpointer (which is None) ❌. Your tasks re-execute. No errors. Just inconsistent data.
The Fix: Don't use @task inside StateGraph nodes when deploying to API server. Use separate nodes instead:
# ❌ BROKEN: @task inside a node with interrupt
def my_node(state):
result = my_task().result() # Re-executes on resume!
user_input = interrupt("Confirm?")
return {"result": result}
# ✅ WORKS: Separate nodes
def compute_node(state):
return {"result": non_deterministic_operation()} # Checkpointed after node completes
def approval_node(state):
user_input = interrupt(f"Confirm {state['result']}?") # Safe — reads from checkpoint
return {"confirmed": user_input}
Issues Filed: Bug #6559 | Related: #5790
Want to understand WHY this happens? Keep reading — I'll walk you through the bug, the source code investigation, and all the workarounds.
🎰 The Setup: A Simple Betting Agent
Let's build something simple — a sports betting agent that recommends a player and asks for human approval before placing the bet.
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from typing import TypedDict
import random
class State(TypedDict):
recommendation: str
confirmed: bool
def betting_node(state):
# Fetch live stats (simulated - changes every call)
messi_score = random.randint(1, 5)
ronaldo_score = random.randint(1, 5)
# Make recommendation based on current stats
if messi_score > ronaldo_score:
recommended = "Messi"
else:
recommended = "Ronaldo"
print(f"Stats: Messi={messi_score}, Ronaldo={ronaldo_score}")
print(f"Recommending: {recommended}")
# Pause for human approval
user_input = interrupt(f"Bet on {recommended}?")
return {"recommendation": recommended, "confirmed": user_input == "yes"}
# Build the graph
builder = StateGraph(State)
builder.add_node("bet", betting_node)
builder.add_edge(START, "bet")
builder.add_edge("bet", END)
graph = builder.compile(checkpointer=MemorySaver())
Looks reasonable, right? Fetch stats, recommend a player, wait for human approval, done.
Let's run it:
config = {"configurable": {"thread_id": "bet-1"}}
# First invocation — hits interrupt, pauses
result = graph.invoke({"recommendation": "", "confirmed": False}, config)
# Output: Stats: Messi=4, Ronaldo=2
# Output: Recommending: Messi
# Workflow pauses at interrupt()
# User sees "Bet on Messi?" and approves
result = graph.invoke(Command(resume="yes"), config)
# Output: Stats: Messi=1, Ronaldo=5 ← WAIT, WHAT?
# Output: Recommending: Ronaldo ← THIS CHANGED!
What just happened?
The user approved a bet on Messi (score 4 vs 2). But when the workflow resumed, it fetched new stats where Ronaldo is winning (5 vs 1). The final recommendation flipped.
Your user said "yes" to Messi. Your system bet on... whoever was winning at resume time.
This isn't a bug — it's how LangGraph works. When a node contains an interrupt(), the entire node re-executes on resume. Any non-deterministic code before the interrupt runs again.
🤔 The Problem: Node Re-execution
Here's what happens under the hood:
FIRST INVOCATION:
├── betting_node starts
├── Fetch stats → Messi=4, Ronaldo=2
├── Recommend Messi
├── interrupt() → PAUSE
└── State saved: waiting for human input
RESUME (after user approves):
├── betting_node starts AGAIN ← entire node re-runs!
├── Fetch stats → Messi=1, Ronaldo=5 ← different!
├── Recommend Ronaldo ← changed!
├── interrupt() → returns "yes" (from user)
└── Return: recommendation=Ronaldo, confirmed=True
The checkpointer saves state between nodes, not the execution progress within a node. Everything before interrupt() runs twice.
For idempotent operations (same input → same output), this is fine. But random.randint(), datetime.now(), API calls to live data? They'll return different values.
So how do we fix this?
📖 The Documented Solution: @task for Durable Execution
If you search for this problem, you'll land on LangGraph's Durable Execution documentation. It introduces the @task decorator — designed exactly for this scenario.
The docs say:
"To utilize features like human-in-the-loop, any randomness should be encapsulated inside of tasks."
"If a node contains multiple operations, you may find it easier to convert each operation into a task rather than refactor the operations into individual nodes."
Perfect! Let's wrap our non-deterministic code in a @task:
from langgraph.func import task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
import random
@task
def fetch_live_stats():
"""Now wrapped in @task — should be cached on resume"""
messi = random.randint(1, 5)
ronaldo = random.randint(1, 5)
print(f">>> TASK EXECUTING: Messi={messi}, Ronaldo={ronaldo}")
return {"messi": messi, "ronaldo": ronaldo}
def betting_node(state):
# Fetch stats — @task should cache this result
stats = fetch_live_stats().result()
# Make recommendation
if stats["messi"] > stats["ronaldo"]:
recommended = "Messi"
else:
recommended = "Ronaldo"
print(f"Recommending: {recommended}")
# Pause for human approval
user_input = interrupt(f"Bet on {recommended}?")
return {"recommendation": recommended, "confirmed": user_input == "yes"}
# Build graph with checkpointer
builder = StateGraph(State)
builder.add_node("bet", betting_node)
builder.add_edge(START, "bet")
builder.add_edge("bet", END)
graph = builder.compile(checkpointer=MemorySaver())
The idea is simple:
-
@taskmarksfetch_live_stats()as a durable operation - First execution: task runs and result is checkpointed
- Resume: task result is loaded from checkpoint, no re-execution
Let's test it:
config = {"configurable": {"thread_id": "bet-2"}}
# First invocation
result = graph.invoke({"recommendation": "", "confirmed": False}, config)
# Output: >>> TASK EXECUTING: Messi=4, Ronaldo=2
# Output: Recommending: Messi
# Pauses at interrupt()
# Resume after approval
result = graph.invoke(Command(resume="yes"), config)
# Output: Recommending: Messi ← No "TASK EXECUTING" log!
# The task didn't re-run. It used cached result.
It works! The task executed once, the result was cached, and on resume we got consistent data. Messi was recommended, user approved Messi, bet placed on Messi.
This is durable execution in action. Exactly what the docs promised.
Ship it to production, right?
🚀 Deploying to LangGraph API Server
Your code works locally. Time to deploy. You set up langgraph.json, configure your graph, and deploy to LangGraph API server (or run langgraph dev for local testing).
But wait — when you try to compile with a checkpointer:
graph = builder.compile(checkpointer=MemorySaver())
The API server throws an error:
ValueError: Heads up! Your graph includes a custom checkpointer.
With LangGraph API, persistence is handled automatically by the
platform, so providing a custom checkpointer here isn't necessary
and will be ignored when deployed. To simplify your setup, please
remove the custom checkpointer from your graph definition.
Okay, fair enough. The docs also mention this:
"When using the LangGraph API, you don't need to implement or configure checkpointers manually — persistence is handled automatically by the platform."
So you remove the checkpointer:
graph = builder.compile() # No checkpointer — API server handles it
Deploy. Test the workflow.
First invocation:
>>> TASK EXECUTING: Messi=4, Ronaldo=2
Recommending: Messi
# Pauses for approval
Resume:
>>> TASK EXECUTING: Messi=1, Ronaldo=5 ← WAIT, IT RAN AGAIN?!
Recommending: Ronaldo ← INCONSISTENT!
The task re-executed. We're back to the original problem.
No errors. No warnings. Just silently broken durable execution.
🔍 Down the Rabbit Hole: What's Actually Happening?
At this point, I was confused. The @task decorator clearly works — I saw it work locally. But deployed via API server? Silent failure.
Let me summarize what we know:
| Scenario | Task Caching | Result |
|---|---|---|
compile(checkpointer=MemorySaver()) + local |
✅ Works | Consistent |
compile() + API server runtime injection |
❌ Fails | Task re-executes |
compile(checkpointer=...) + API server |
🚫 Error | Server rejects it |
The API server claims to handle persistence automatically. And it does — for node-level state. If you add a node between your non-deterministic code and the interrupt, that works fine.
But @task? Something's different.
Time to read some source code.
🔬 Source Code Dive
How @task Works
First, let's understand what @task actually does. In langgraph/func/__init__.py:
class _TaskFunction(Generic[P, T]):
def __init__(self, func, *, retry_policy, cache_policy, name):
self.func = func
self.retry_policy = retry_policy
self.cache_policy = cache_policy
# That's it. No checkpointer stored here.
The @task decorator wraps your function but stores no reference to any checkpointer. It just holds the function and some policies.
So where does checkpointing happen?
How Task Results Get Cached
In langgraph/pregel/_runner.py, there's a _call() function that executes tasks:
def _call(task, func, input, *, schedule_task, ...):
# Check if task was previously executed
if next_task := schedule_task(task(), counter, Call(...)):
if next_task.writes: # Previous result exists!
# Return cached result — NO re-execution
ret = next((v for c, v in next_task.writes if c == RETURN), MISSING)
fut.set_result(ret)
return fut
else:
# No cached result — execute the task
fut = submit()(run_with_retry, next_task, ...)
The key is schedule_task(). It checks for previous task results (next_task.writes). If found, it returns the cached value. If not, the task runs again.
But where does schedule_task look for cached results?
The Critical Discovery
Digging deeper, schedule_task loads previous writes from Pregel.checkpointer — the checkpointer passed at compile time.
class Pregel:
def __init__(self, *, checkpointer=None, ...):
self.checkpointer = checkpointer # Set at compile time
When you do builder.compile(checkpointer=MemorySaver()):
-
Pregel.checkpointer = MemorySaver()✅ -
schedule_taskcan load previous task writes ✅ - Task caching works ✅
When you do builder.compile() for API server:
-
Pregel.checkpointer = None❌ - API server injects checkpointer later at runtime
- But
schedule_taskstill looks atPregel.checkpointerwhich isNone - No cached results found → task re-executes ❌
The runtime-injected checkpointer is invisible to schedule_task.
🧠 The Root Cause: Compile-Time vs Runtime
Here's the asymmetry I discovered:
| Operation | Where It Looks | Runtime Injection Works? |
|---|---|---|
| Node state SAVE | Runtime config | ✅ Yes |
| Node state LOAD | Runtime config | ✅ Yes |
| Task result SAVE | Possibly runtime | ⚠️ Maybe |
| Task result LOAD |
Pregel.checkpointer (compile-time) |
❌ No |
The API server's runtime checkpointer works perfectly for node-level checkpointing:
- State saved between nodes ✅
- Resume from correct node ✅
- Time-travel debugging ✅
But task-level checkpointing requires the checkpointer at compile time because schedule_task is bound to Pregel.checkpointer before the runtime config exists.
┌─────────────────────────────────────────────────────────────────┐
│ COMPILE TIME │
│ graph = builder.compile() │
│ └── Pregel.checkpointer = None │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ RUNTIME (API Server) │
│ Injects checkpointer via config │
│ └── Node checkpointing: ✅ Uses runtime config │
│ └── Task checkpointing: ❌ Still looks at Pregel.checkpointer │
│ (which is None) │
└─────────────────────────────────────────────────────────────────┘
This is why your tasks re-execute. The load mechanism can't see the runtime checkpointer.
✅ The Fix: Patterns That Actually Work
Now that we understand why @task fails with API server, let's look at patterns that actually work in production.
Pattern 1: Separate Nodes (Recommended)
The most reliable solution: split your non-deterministic operation into its own node.
Node-level checkpointing works with runtime injection. So if the non-deterministic code is in a separate node that completes before the HITL node, its output is safely checkpointed.
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from typing import TypedDict
import random
class State(TypedDict):
recommendation: str
confirmed: bool
def fetch_stats_node(state):
"""Separate node for non-deterministic operation"""
messi = random.randint(1, 5)
ronaldo = random.randint(1, 5)
print(f">>> FETCHING STATS: Messi={messi}, Ronaldo={ronaldo}")
recommended = "Messi" if messi > ronaldo else "Ronaldo"
return {"recommendation": recommended}
# ✅ State checkpointed here — before HITL node
def approval_node(state):
"""HITL node — only reads from checkpointed state"""
recommended = state["recommendation"] # Safe — from checkpoint
user_input = interrupt(f"Bet on {recommended}?")
return {"confirmed": user_input == "yes"}
# Build graph
builder = StateGraph(State)
builder.add_node("fetch", fetch_stats_node)
builder.add_node("approve", approval_node)
builder.add_edge(START, "fetch")
builder.add_edge("fetch", "approve")
builder.add_edge("approve", END)
graph = builder.compile() # No checkpointer needed — API server handles it
Why it works:
FIRST INVOCATION:
├── fetch_stats_node executes
│ └── Messi=4, Ronaldo=2 → recommendation="Messi"
├── ✅ STATE CHECKPOINTED: {recommendation: "Messi"}
├── approval_node starts
│ └── interrupt() → PAUSE
└── Waiting for human input
RESUME:
├── fetch_stats_node SKIPPED (already completed)
├── approval_node resumes
│ └── Reads state["recommendation"] → "Messi" (from checkpoint)
│ └── interrupt() returns "yes"
└── Return: {recommendation: "Messi", confirmed: True}
The non-deterministic code never re-runs because the node completed and state was checkpointed.
Trade-off: More verbose graph structure. But explicit is better than broken.
Pattern 2: Idempotent Code Before Interrupt
If you must keep everything in one node, ensure all code before interrupt() is idempotent — same input always produces same output.
def betting_node(state):
query = state["query"] # e.g., "compare messi ronaldo"
# ✅ Idempotent: same query → same recommendation
recommendation = deterministic_recommend(query)
# ✅ Idempotent: hash-based selection
player = ["Messi", "Ronaldo"][hash(query) % 2]
# ❌ NOT idempotent — avoid these before interrupt:
# - random.choice()
# - datetime.now()
# - External API calls (live data)
# - Database queries (mutable data)
user_input = interrupt(f"Bet on {recommendation}?")
return {"recommendation": recommendation, "confirmed": user_input == "yes"}
When to use: When your logic can be made deterministic based on input state.
Trade-off: Severely limits what you can do. No live data, no randomness.
Pattern 3: HITL First
If you need user input before doing non-deterministic work, flip the order:
def betting_node(state):
# HITL first — nothing before it to re-execute
player = interrupt("Who do you want to bet on? (Messi/Ronaldo)")
# Non-deterministic code AFTER interrupt is fine
# This only runs once — after user provides input
live_odds = fetch_live_odds(player) # Safe here
return {"player": player, "odds": live_odds}
Why it works: On resume, code before interrupt() re-runs. If there's nothing there, nothing breaks.
When to use: When user input drives the non-deterministic operation.
Trade-off: You can't compute a recommendation before asking the user.
Pattern 4: Pure Functional API
The @task decorator was designed to work with @entrypoint — the Functional API. Unlike StateGraph, @entrypoint accepts a compile-time checkpointer that you control.
from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt
import random
@task
def fetch_live_stats():
"""Task result will be cached properly"""
messi = random.randint(1, 5)
ronaldo = random.randint(1, 5)
print(f">>> TASK EXECUTING: Messi={messi}, Ronaldo={ronaldo}")
return {"messi": messi, "ronaldo": ronaldo}
@entrypoint(checkpointer=MemorySaver()) # Compile-time checkpointer!
def betting_workflow(query: str):
stats = fetch_live_stats().result() # Cached on resume ✅
recommended = "Messi" if stats["messi"] > stats["ronaldo"] else "Ronaldo"
user_input = interrupt(f"Bet on {recommended}?")
return {"recommendation": recommended, "confirmed": user_input == "yes"}
Why it works: @entrypoint passes checkpointer to Pregel at decoration time. The checkpointer exists before runtime, so schedule_task can find cached task results.
Trade-off:
- Different mental model than StateGraph
- Less granular time-travel debugging (no node-by-node replay)
- May not work with API server deployment (same limitation applies)
Pattern 5: Custom Deployment (Full Control)
If you need @task inside StateGraph nodes and need deployment, skip the managed API server. Build your own FastAPI wrapper:
from fastapi import FastAPI
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command
# Compile with YOUR checkpointer
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
graph = builder.compile(checkpointer=checkpointer)
app = FastAPI()
@app.post("/invoke")
async def invoke(thread_id: str, input: dict):
config = {"configurable": {"thread_id": thread_id}}
return graph.invoke(input, config)
@app.post("/resume")
async def resume(thread_id: str, value: str):
config = {"configurable": {"thread_id": thread_id}}
return graph.invoke(Command(resume=value), config)
When to use: Full control over checkpointing is non-negotiable.
Trade-off: You're building and maintaining your own infrastructure.
🔗 Resources & Issues Filed
This isn't just a blog post — I've reported this to the LangGraph team:
Bug Report:
- GitHub Issue #6559 — @task checkpointing does not work with API server runtime-injected checkpointer
Documentation Issue:
- Filed request to update Durable Execution docs with this limitation
Related Issues:
- GitHub Issue #5790 — langgraph dev ignores checkpointer configuration
Official Documentation:
Source Files Referenced:
-
langgraph/func/__init__.py— @task and @entrypoint decorators -
langgraph/pregel/_runner.py— Task execution and caching logic
🧪 Test Your Own Workflows
Before deploying any HITL workflow, test this:
- Add logging before your
interrupt()calls - Invoke the workflow (should hit interrupt and pause)
- Resume the workflow
- Check your logs — did the code before interrupt run twice?
def my_node(state):
print(">>> THIS SHOULD ONLY PRINT ONCE") # Watch this!
result = some_operation()
user_input = interrupt("Continue?")
return {"result": result}
If you see that log message twice, your durable execution is broken.
💬 Final Thoughts
LangGraph is powerful. The Pregel-based runtime, checkpointing system, and HITL primitives are genuinely well-designed. But like any complex system, there are edge cases the documentation doesn't cover.
This particular issue — @task not working with API server — cost me three days of debugging. The silent failure mode (no errors, just wrong data) made it especially frustrating.
I hope this post saves you that time.
If you found this useful:
- ⭐ Star the GitHub issues to raise visibility
- 💬 Comment if you've hit similar issues
- 📤 Share with your team before they hit this in production
Have questions or found other LangGraph gotchas? Let's discuss in the comments.
Top comments (1)
Great insights, saving this blog!!