LLM pipelines fail. API timeouts, rate limits, unexpected token counts, network blips — any of these can kill a long-running job mid-way through.
If you restart from zero every time, you're wasting money and time. Here's a state machine pattern with atomic checkpoints that lets you resume exactly where you left off.
The Core Problem
Imagine a pipeline that processes 10 chapters:
Chapter 1 → DONE
Chapter 2 → DONE
...
Chapter 7 → [process crashes here]
Chapter 8 → not started
...
Without recovery, you restart from Chapter 1. With a state machine, you restart from Chapter 7.
The State Machine
Four states cover every situation:
from enum import Enum
class ChapterStatus(Enum):
PENDING = "PENDING" # Not started
RUNNING = "RUNNING" # Currently being processed
DONE = "DONE" # Completed successfully
NEEDS_REVIEW = "NEEDS_REVIEW" # Failed, requires human intervention
State transitions:
PENDING → RUNNING (when processing starts)
RUNNING → DONE (on success)
RUNNING → NEEDS_REVIEW (on failure)
RUNNING → PENDING (crash recovery — automatic on restart)
Atomic Checkpoint File
Store state in a JSON file, updated atomically:
import json
import os
import tempfile
from pathlib import Path
CHECKPOINT_FILE = "pipeline_checkpoint.json"
def init_checkpoint(chapter_ids: list[str]) -> None:
"""Create checkpoint file if it doesn't exist."""
if os.path.exists(CHECKPOINT_FILE):
return # Don't overwrite existing progress
data = {
"version": 1,
"chapters": [
{"id": ch_id, "status": "PENDING", "error": None}
for ch_id in chapter_ids
]
}
_write_checkpoint(data)
def _write_checkpoint(data: dict) -> None:
"""Write checkpoint atomically using temp file + rename."""
tmp_path = CHECKPOINT_FILE + ".tmp"
with open(tmp_path, "w") as f:
json.dump(data, f, indent=2)
os.replace(tmp_path, CHECKPOINT_FILE) # Atomic on POSIX systems
def read_checkpoint() -> dict:
with open(CHECKPOINT_FILE) as f:
return json.load(f)
def update_chapter_status(
chapter_id: str,
status: str,
error: str = None
) -> None:
data = read_checkpoint()
chapter = next(c for c in data["chapters"] if c["id"] == chapter_id)
chapter["status"] = status
chapter["error"] = error
_write_checkpoint(data)
Key detail: os.replace() is atomic on POSIX systems. If the process crashes during the write, the old checkpoint file survives intact. You never end up with a corrupted checkpoint.
Crash Recovery
This is the key function — run it at startup before anything else:
def recover_running_orphans() -> int:
"""
Reset any RUNNING chapters back to PENDING.
A chapter in RUNNING state at startup means the last process
crashed while working on it. Reset to PENDING so it retries.
Returns the number of chapters recovered.
"""
data = read_checkpoint()
recovered = 0
for chapter in data["chapters"]:
if chapter["status"] == "RUNNING":
chapter["status"] = "PENDING"
chapter["error"] = "recovered_from_crash"
recovered += 1
if recovered:
_write_checkpoint(data)
print(f"⚠️ Recovered {recovered} orphaned chapter(s) from previous crash")
return recovered
The Main Loop
import time
def run_pipeline(process_fn, max_retries: int = 3) -> None:
"""
Main pipeline loop with crash recovery and retry logic.
process_fn(chapter_id) -> (success: bool, error: str | None)
"""
# 1. Recover from any previous crash
recover_running_orphans()
data = read_checkpoint()
pending = [c for c in data["chapters"] if c["status"] == "PENDING"]
print(f"📋 {len(pending)} chapters to process")
for chapter in pending:
chapter_id = chapter["id"]
# 2. Mark as RUNNING before we start
update_chapter_status(chapter_id, "RUNNING")
print(f"⚙️ Processing {chapter_id}...")
success = False
last_error = None
# 3. Try up to max_retries times
for attempt in range(max_retries):
try:
ok, error = process_fn(chapter_id)
if ok:
success = True
break
else:
last_error = error
print(f" Attempt {attempt+1} failed: {error[:100]}")
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
last_error = str(e)
print(f" Attempt {attempt+1} exception: {e}")
time.sleep(2 ** attempt)
# 4. Update final status
if success:
update_chapter_status(chapter_id, "DONE")
print(f" ✅ DONE")
else:
update_chapter_status(chapter_id, "NEEDS_REVIEW", error=last_error)
print(f" ❌ NEEDS_REVIEW: {last_error}")
# 5. Summary
data = read_checkpoint()
counts = {}
for c in data["chapters"]:
counts[c["status"]] = counts.get(c["status"], 0) + 1
print(f"\n📊 Final: {counts}")
Status Dashboard
Quick view of pipeline state:
def print_dashboard() -> None:
data = read_checkpoint()
status_icons = {
"PENDING": "⏳",
"RUNNING": "🔄",
"DONE": "✅",
"NEEDS_REVIEW": "❌",
}
print("\n=== Pipeline Status ===")
for chapter in data["chapters"]:
icon = status_icons.get(chapter["status"], "?")
err = f" — {chapter['error'][:60]}" if chapter.get("error") else ""
print(f" {icon} {chapter['id']}: {chapter['status']}{err}")
done = sum(1 for c in data["chapters"] if c["status"] == "DONE")
total = len(data["chapters"])
print(f"\nProgress: {done}/{total} ({done/total*100:.0f}%)\n")
Usage
def my_llm_call(chapter_id: str) -> tuple[bool, str]:
"""Your actual processing logic goes here."""
# Call your LLM API, validate output, etc.
# Return (True, None) on success
# Return (False, "error message") on failure
try:
# ... your code here
return True, None
except Exception as e:
return False, str(e)
# Initialize (only creates file if it doesn't exist)
init_checkpoint(["ch01", "ch02", "ch03", "ch04", "ch05"])
# Run — safe to Ctrl+C and restart at any point
run_pipeline(my_llm_call)
Why Not a Database?
For pipelines under ~1000 items, a JSON file is simpler and more portable:
- No setup, no connection string, no credentials
-
git diff checkpoint.jsonshows you exactly what changed - Easy to manually edit if you need to reset a specific chapter
- Works on any machine without installing dependencies
Switch to SQLite when you need concurrent writers or >10k items.
I built this pattern while creating an automated ebook pipeline that generates, validates, and publishes technical books end-to-end for $20/month in API costs.
The full pipeline (10 scripts, complete state machine, EPUB output): germy5.gumroad.com/l/xhxkzz — $19.99.
Top comments (0)