DEV Community

anton
anton

Posted on

LangGraph HITL: The @task Caching Gotcha That Cost Me 3 Days

⚡ TL;DR

The Problem: LangGraph's @task decorator promises durable execution — caching results so non-deterministic code doesn't re-run after interrupt(). It works locally. It silently breaks when deployed via LangGraph API server.

Why: API server injects checkpointer at runtime. Node-level checkpointing uses this runtime checkpointer ✅. Task-level checkpointing looks for a compile-time checkpointer (which is None) ❌. Your tasks re-execute. No errors. Just inconsistent data.

The Fix: Don't use @task inside StateGraph nodes when deploying to API server. Use separate nodes instead:

# ❌ BROKEN: @task inside a node with interrupt
def my_node(state):
    result = my_task().result()  # Re-executes on resume!
    user_input = interrupt("Confirm?")
    return {"result": result}

# ✅ WORKS: Separate nodes
def compute_node(state):
    return {"result": non_deterministic_operation()}  # Checkpointed after node completes

def approval_node(state):
    user_input = interrupt(f"Confirm {state['result']}?")  # Safe — reads from checkpoint
    return {"confirmed": user_input}
Enter fullscreen mode Exit fullscreen mode

Issues Filed: Bug #6559 | Related: #5790


Want to understand WHY this happens? Keep reading — I'll walk you through the bug, the source code investigation, and all the workarounds.


🎰 The Setup: A Simple Betting Agent

Let's build something simple — a sports betting agent that recommends a player and asks for human approval before placing the bet.

from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from typing import TypedDict
import random

class State(TypedDict):
    recommendation: str
    confirmed: bool

def betting_node(state):
    # Fetch live stats (simulated - changes every call)
    messi_score = random.randint(1, 5)
    ronaldo_score = random.randint(1, 5)

    # Make recommendation based on current stats
    if messi_score > ronaldo_score:
        recommended = "Messi"
    else:
        recommended = "Ronaldo"

    print(f"Stats: Messi={messi_score}, Ronaldo={ronaldo_score}")
    print(f"Recommending: {recommended}")

    # Pause for human approval
    user_input = interrupt(f"Bet on {recommended}?")

    return {"recommendation": recommended, "confirmed": user_input == "yes"}

# Build the graph
builder = StateGraph(State)
builder.add_node("bet", betting_node)
builder.add_edge(START, "bet")
builder.add_edge("bet", END)

graph = builder.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

Looks reasonable, right? Fetch stats, recommend a player, wait for human approval, done.

Let's run it:

config = {"configurable": {"thread_id": "bet-1"}}

# First invocation — hits interrupt, pauses
result = graph.invoke({"recommendation": "", "confirmed": False}, config)
# Output: Stats: Messi=4, Ronaldo=2
# Output: Recommending: Messi
# Workflow pauses at interrupt()

# User sees "Bet on Messi?" and approves
result = graph.invoke(Command(resume="yes"), config)
# Output: Stats: Messi=1, Ronaldo=5  ← WAIT, WHAT?
# Output: Recommending: Ronaldo      ← THIS CHANGED!
Enter fullscreen mode Exit fullscreen mode

What just happened?

The user approved a bet on Messi (score 4 vs 2). But when the workflow resumed, it fetched new stats where Ronaldo is winning (5 vs 1). The final recommendation flipped.

Your user said "yes" to Messi. Your system bet on... whoever was winning at resume time.

This isn't a bug — it's how LangGraph works. When a node contains an interrupt(), the entire node re-executes on resume. Any non-deterministic code before the interrupt runs again.

🤔 The Problem: Node Re-execution

Here's what happens under the hood:

FIRST INVOCATION:
├── betting_node starts
├── Fetch stats → Messi=4, Ronaldo=2
├── Recommend Messi
├── interrupt() → PAUSE
└── State saved: waiting for human input

RESUME (after user approves):
├── betting_node starts AGAIN ← entire node re-runs!
├── Fetch stats → Messi=1, Ronaldo=5 ← different!
├── Recommend Ronaldo ← changed!
├── interrupt() → returns "yes" (from user)
└── Return: recommendation=Ronaldo, confirmed=True
Enter fullscreen mode Exit fullscreen mode

The checkpointer saves state between nodes, not the execution progress within a node. Everything before interrupt() runs twice.

For idempotent operations (same input → same output), this is fine. But random.randint(), datetime.now(), API calls to live data? They'll return different values.

So how do we fix this?

📖 The Documented Solution: @task for Durable Execution

If you search for this problem, you'll land on LangGraph's Durable Execution documentation. It introduces the @task decorator — designed exactly for this scenario.

The docs say:

"To utilize features like human-in-the-loop, any randomness should be encapsulated inside of tasks."

"If a node contains multiple operations, you may find it easier to convert each operation into a task rather than refactor the operations into individual nodes."

Perfect! Let's wrap our non-deterministic code in a @task:

from langgraph.func import task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
import random

@task
def fetch_live_stats():
    """Now wrapped in @task — should be cached on resume"""
    messi = random.randint(1, 5)
    ronaldo = random.randint(1, 5)
    print(f">>> TASK EXECUTING: Messi={messi}, Ronaldo={ronaldo}")
    return {"messi": messi, "ronaldo": ronaldo}

def betting_node(state):
    # Fetch stats — @task should cache this result
    stats = fetch_live_stats().result()

    # Make recommendation
    if stats["messi"] > stats["ronaldo"]:
        recommended = "Messi"
    else:
        recommended = "Ronaldo"

    print(f"Recommending: {recommended}")

    # Pause for human approval
    user_input = interrupt(f"Bet on {recommended}?")

    return {"recommendation": recommended, "confirmed": user_input == "yes"}

# Build graph with checkpointer
builder = StateGraph(State)
builder.add_node("bet", betting_node)
builder.add_edge(START, "bet")
builder.add_edge("bet", END)

graph = builder.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

The idea is simple:

  • @task marks fetch_live_stats() as a durable operation
  • First execution: task runs and result is checkpointed
  • Resume: task result is loaded from checkpoint, no re-execution

Let's test it:

config = {"configurable": {"thread_id": "bet-2"}}

# First invocation
result = graph.invoke({"recommendation": "", "confirmed": False}, config)
# Output: >>> TASK EXECUTING: Messi=4, Ronaldo=2
# Output: Recommending: Messi
# Pauses at interrupt()

# Resume after approval
result = graph.invoke(Command(resume="yes"), config)
# Output: Recommending: Messi ← No "TASK EXECUTING" log!
# The task didn't re-run. It used cached result.
Enter fullscreen mode Exit fullscreen mode

It works! The task executed once, the result was cached, and on resume we got consistent data. Messi was recommended, user approved Messi, bet placed on Messi.

This is durable execution in action. Exactly what the docs promised.

Ship it to production, right?

🚀 Deploying to LangGraph API Server

Your code works locally. Time to deploy. You set up langgraph.json, configure your graph, and deploy to LangGraph API server (or run langgraph dev for local testing).

But wait — when you try to compile with a checkpointer:

graph = builder.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

The API server throws an error:

ValueError: Heads up! Your graph includes a custom checkpointer. 
With LangGraph API, persistence is handled automatically by the 
platform, so providing a custom checkpointer here isn't necessary 
and will be ignored when deployed. To simplify your setup, please 
remove the custom checkpointer from your graph definition.
Enter fullscreen mode Exit fullscreen mode

Okay, fair enough. The docs also mention this:

"When using the LangGraph API, you don't need to implement or configure checkpointers manually — persistence is handled automatically by the platform."

So you remove the checkpointer:

graph = builder.compile()  # No checkpointer — API server handles it
Enter fullscreen mode Exit fullscreen mode

Deploy. Test the workflow.

First invocation:
>>> TASK EXECUTING: Messi=4, Ronaldo=2
Recommending: Messi
# Pauses for approval

Resume:
>>> TASK EXECUTING: Messi=1, Ronaldo=5  ← WAIT, IT RAN AGAIN?!
Recommending: Ronaldo                    ← INCONSISTENT!
Enter fullscreen mode Exit fullscreen mode

The task re-executed. We're back to the original problem.

No errors. No warnings. Just silently broken durable execution.

🔍 Down the Rabbit Hole: What's Actually Happening?

At this point, I was confused. The @task decorator clearly works — I saw it work locally. But deployed via API server? Silent failure.

Let me summarize what we know:

Scenario Task Caching Result
compile(checkpointer=MemorySaver()) + local ✅ Works Consistent
compile() + API server runtime injection ❌ Fails Task re-executes
compile(checkpointer=...) + API server 🚫 Error Server rejects it

The API server claims to handle persistence automatically. And it does — for node-level state. If you add a node between your non-deterministic code and the interrupt, that works fine.

But @task? Something's different.

Time to read some source code.

🔬 Source Code Dive

How @task Works

First, let's understand what @task actually does. In langgraph/func/__init__.py:

class _TaskFunction(Generic[P, T]):
    def __init__(self, func, *, retry_policy, cache_policy, name):
        self.func = func
        self.retry_policy = retry_policy  
        self.cache_policy = cache_policy
        # That's it. No checkpointer stored here.
Enter fullscreen mode Exit fullscreen mode

The @task decorator wraps your function but stores no reference to any checkpointer. It just holds the function and some policies.

So where does checkpointing happen?

How Task Results Get Cached

In langgraph/pregel/_runner.py, there's a _call() function that executes tasks:

def _call(task, func, input, *, schedule_task, ...):
    # Check if task was previously executed
    if next_task := schedule_task(task(), counter, Call(...)):
        if next_task.writes:  # Previous result exists!
            # Return cached result — NO re-execution
            ret = next((v for c, v in next_task.writes if c == RETURN), MISSING)
            fut.set_result(ret)
            return fut
        else:
            # No cached result — execute the task
            fut = submit()(run_with_retry, next_task, ...)
Enter fullscreen mode Exit fullscreen mode

The key is schedule_task(). It checks for previous task results (next_task.writes). If found, it returns the cached value. If not, the task runs again.

But where does schedule_task look for cached results?

The Critical Discovery

Digging deeper, schedule_task loads previous writes from Pregel.checkpointer — the checkpointer passed at compile time.

class Pregel:
    def __init__(self, *, checkpointer=None, ...):
        self.checkpointer = checkpointer  # Set at compile time
Enter fullscreen mode Exit fullscreen mode

When you do builder.compile(checkpointer=MemorySaver()):

  • Pregel.checkpointer = MemorySaver()
  • schedule_task can load previous task writes ✅
  • Task caching works ✅

When you do builder.compile() for API server:

  • Pregel.checkpointer = None
  • API server injects checkpointer later at runtime
  • But schedule_task still looks at Pregel.checkpointer which is None
  • No cached results found → task re-executes ❌

The runtime-injected checkpointer is invisible to schedule_task.

🧠 The Root Cause: Compile-Time vs Runtime

Here's the asymmetry I discovered:

Operation Where It Looks Runtime Injection Works?
Node state SAVE Runtime config ✅ Yes
Node state LOAD Runtime config ✅ Yes
Task result SAVE Possibly runtime ⚠️ Maybe
Task result LOAD Pregel.checkpointer (compile-time) ❌ No

The API server's runtime checkpointer works perfectly for node-level checkpointing:

  • State saved between nodes ✅
  • Resume from correct node ✅
  • Time-travel debugging ✅

But task-level checkpointing requires the checkpointer at compile time because schedule_task is bound to Pregel.checkpointer before the runtime config exists.

┌─────────────────────────────────────────────────────────────────┐
│                     COMPILE TIME                                │
│  graph = builder.compile()                                      │
│  └── Pregel.checkpointer = None                                 │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                     RUNTIME (API Server)                        │
│  Injects checkpointer via config                                │
│  └── Node checkpointing: ✅ Uses runtime config                 │
│  └── Task checkpointing: ❌ Still looks at Pregel.checkpointer  │
│                             (which is None)                     │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

This is why your tasks re-execute. The load mechanism can't see the runtime checkpointer.

✅ The Fix: Patterns That Actually Work

Now that we understand why @task fails with API server, let's look at patterns that actually work in production.

Pattern 1: Separate Nodes (Recommended)

The most reliable solution: split your non-deterministic operation into its own node.

Node-level checkpointing works with runtime injection. So if the non-deterministic code is in a separate node that completes before the HITL node, its output is safely checkpointed.

from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from typing import TypedDict
import random

class State(TypedDict):
    recommendation: str
    confirmed: bool

def fetch_stats_node(state):
    """Separate node for non-deterministic operation"""
    messi = random.randint(1, 5)
    ronaldo = random.randint(1, 5)

    print(f">>> FETCHING STATS: Messi={messi}, Ronaldo={ronaldo}")

    recommended = "Messi" if messi > ronaldo else "Ronaldo"
    return {"recommendation": recommended}
    # ✅ State checkpointed here — before HITL node

def approval_node(state):
    """HITL node — only reads from checkpointed state"""
    recommended = state["recommendation"]  # Safe — from checkpoint

    user_input = interrupt(f"Bet on {recommended}?")

    return {"confirmed": user_input == "yes"}

# Build graph
builder = StateGraph(State)
builder.add_node("fetch", fetch_stats_node)
builder.add_node("approve", approval_node)
builder.add_edge(START, "fetch")
builder.add_edge("fetch", "approve")
builder.add_edge("approve", END)

graph = builder.compile()  # No checkpointer needed — API server handles it
Enter fullscreen mode Exit fullscreen mode

Why it works:

FIRST INVOCATION:
├── fetch_stats_node executes
│   └── Messi=4, Ronaldo=2 → recommendation="Messi"
├── ✅ STATE CHECKPOINTED: {recommendation: "Messi"}
├── approval_node starts
│   └── interrupt() → PAUSE
└── Waiting for human input

RESUME:
├── fetch_stats_node SKIPPED (already completed)
├── approval_node resumes
│   └── Reads state["recommendation"] → "Messi" (from checkpoint)
│   └── interrupt() returns "yes"
└── Return: {recommendation: "Messi", confirmed: True}
Enter fullscreen mode Exit fullscreen mode

The non-deterministic code never re-runs because the node completed and state was checkpointed.

Trade-off: More verbose graph structure. But explicit is better than broken.


Pattern 2: Idempotent Code Before Interrupt

If you must keep everything in one node, ensure all code before interrupt() is idempotent — same input always produces same output.

def betting_node(state):
    query = state["query"]  # e.g., "compare messi ronaldo"

    # ✅ Idempotent: same query → same recommendation
    recommendation = deterministic_recommend(query)

    # ✅ Idempotent: hash-based selection
    player = ["Messi", "Ronaldo"][hash(query) % 2]

    # ❌ NOT idempotent — avoid these before interrupt:
    # - random.choice()
    # - datetime.now()
    # - External API calls (live data)
    # - Database queries (mutable data)

    user_input = interrupt(f"Bet on {recommendation}?")
    return {"recommendation": recommendation, "confirmed": user_input == "yes"}
Enter fullscreen mode Exit fullscreen mode

When to use: When your logic can be made deterministic based on input state.

Trade-off: Severely limits what you can do. No live data, no randomness.


Pattern 3: HITL First

If you need user input before doing non-deterministic work, flip the order:

def betting_node(state):
    # HITL first — nothing before it to re-execute
    player = interrupt("Who do you want to bet on? (Messi/Ronaldo)")

    # Non-deterministic code AFTER interrupt is fine
    # This only runs once — after user provides input
    live_odds = fetch_live_odds(player)  # Safe here

    return {"player": player, "odds": live_odds}
Enter fullscreen mode Exit fullscreen mode

Why it works: On resume, code before interrupt() re-runs. If there's nothing there, nothing breaks.

When to use: When user input drives the non-deterministic operation.

Trade-off: You can't compute a recommendation before asking the user.


Pattern 4: Pure Functional API

The @task decorator was designed to work with @entrypoint — the Functional API. Unlike StateGraph, @entrypoint accepts a compile-time checkpointer that you control.

from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt
import random

@task
def fetch_live_stats():
    """Task result will be cached properly"""
    messi = random.randint(1, 5)
    ronaldo = random.randint(1, 5)
    print(f">>> TASK EXECUTING: Messi={messi}, Ronaldo={ronaldo}")
    return {"messi": messi, "ronaldo": ronaldo}

@entrypoint(checkpointer=MemorySaver())  # Compile-time checkpointer!
def betting_workflow(query: str):
    stats = fetch_live_stats().result()  # Cached on resume ✅

    recommended = "Messi" if stats["messi"] > stats["ronaldo"] else "Ronaldo"

    user_input = interrupt(f"Bet on {recommended}?")

    return {"recommendation": recommended, "confirmed": user_input == "yes"}
Enter fullscreen mode Exit fullscreen mode

Why it works: @entrypoint passes checkpointer to Pregel at decoration time. The checkpointer exists before runtime, so schedule_task can find cached task results.

Trade-off:

  • Different mental model than StateGraph
  • Less granular time-travel debugging (no node-by-node replay)
  • May not work with API server deployment (same limitation applies)

Pattern 5: Custom Deployment (Full Control)

If you need @task inside StateGraph nodes and need deployment, skip the managed API server. Build your own FastAPI wrapper:

from fastapi import FastAPI
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command

# Compile with YOUR checkpointer
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
graph = builder.compile(checkpointer=checkpointer)

app = FastAPI()

@app.post("/invoke")
async def invoke(thread_id: str, input: dict):
    config = {"configurable": {"thread_id": thread_id}}
    return graph.invoke(input, config)

@app.post("/resume")
async def resume(thread_id: str, value: str):
    config = {"configurable": {"thread_id": thread_id}}
    return graph.invoke(Command(resume=value), config)
Enter fullscreen mode Exit fullscreen mode

When to use: Full control over checkpointing is non-negotiable.

Trade-off: You're building and maintaining your own infrastructure.

🔗 Resources & Issues Filed

This isn't just a blog post — I've reported this to the LangGraph team:

Bug Report:

  • GitHub Issue #6559 — @task checkpointing does not work with API server runtime-injected checkpointer

Documentation Issue:

  • Filed request to update Durable Execution docs with this limitation

Related Issues:

Official Documentation:

Source Files Referenced:

  • langgraph/func/__init__.py — @task and @entrypoint decorators
  • langgraph/pregel/_runner.py — Task execution and caching logic

🧪 Test Your Own Workflows

Before deploying any HITL workflow, test this:

  1. Add logging before your interrupt() calls
  2. Invoke the workflow (should hit interrupt and pause)
  3. Resume the workflow
  4. Check your logs — did the code before interrupt run twice?
def my_node(state):
    print(">>> THIS SHOULD ONLY PRINT ONCE")  # Watch this!

    result = some_operation()

    user_input = interrupt("Continue?")
    return {"result": result}
Enter fullscreen mode Exit fullscreen mode

If you see that log message twice, your durable execution is broken.


💬 Final Thoughts

LangGraph is powerful. The Pregel-based runtime, checkpointing system, and HITL primitives are genuinely well-designed. But like any complex system, there are edge cases the documentation doesn't cover.

This particular issue — @task not working with API server — cost me three days of debugging. The silent failure mode (no errors, just wrong data) made it especially frustrating.

I hope this post saves you that time.

If you found this useful:

  • ⭐ Star the GitHub issues to raise visibility
  • 💬 Comment if you've hit similar issues
  • 📤 Share with your team before they hit this in production

Have questions or found other LangGraph gotchas? Let's discuss in the comments.

Top comments (1)

Collapse
 
aditi_sanghi_ca25daed1099 profile image
Aditi Sanghi

Great insights, saving this blog!!