DEV Community

German Yamil
German Yamil

Posted on

Building Crash-Recovery State Machines for Long-Running LLM Pipelines in Python

LLM pipelines fail. API timeouts, rate limits, unexpected token counts, network blips — any of these can kill a long-running job mid-way through.

If you restart from zero every time, you're wasting money and time. Here's a state machine pattern with atomic checkpoints that lets you resume exactly where you left off.

The Core Problem

Imagine a pipeline that processes 10 chapters:

Chapter 1 → DONE
Chapter 2 → DONE
...
Chapter 7 → [process crashes here]
Chapter 8 → not started
...
Enter fullscreen mode Exit fullscreen mode

Without recovery, you restart from Chapter 1. With a state machine, you restart from Chapter 7.

The State Machine

Four states cover every situation:

from enum import Enum

class ChapterStatus(Enum):
    PENDING = "PENDING"           # Not started
    RUNNING = "RUNNING"           # Currently being processed
    DONE = "DONE"                 # Completed successfully
    NEEDS_REVIEW = "NEEDS_REVIEW" # Failed, requires human intervention
Enter fullscreen mode Exit fullscreen mode

State transitions:

PENDING → RUNNING    (when processing starts)
RUNNING → DONE       (on success)
RUNNING → NEEDS_REVIEW  (on failure)
RUNNING → PENDING    (crash recovery — automatic on restart)
Enter fullscreen mode Exit fullscreen mode

Atomic Checkpoint File

Store state in a JSON file, updated atomically:

import json
import os
import tempfile
from pathlib import Path

CHECKPOINT_FILE = "pipeline_checkpoint.json"

def init_checkpoint(chapter_ids: list[str]) -> None:
    """Create checkpoint file if it doesn't exist."""
    if os.path.exists(CHECKPOINT_FILE):
        return  # Don't overwrite existing progress

    data = {
        "version": 1,
        "chapters": [
            {"id": ch_id, "status": "PENDING", "error": None}
            for ch_id in chapter_ids
        ]
    }
    _write_checkpoint(data)

def _write_checkpoint(data: dict) -> None:
    """Write checkpoint atomically using temp file + rename."""
    tmp_path = CHECKPOINT_FILE + ".tmp"
    with open(tmp_path, "w") as f:
        json.dump(data, f, indent=2)
    os.replace(tmp_path, CHECKPOINT_FILE)  # Atomic on POSIX systems

def read_checkpoint() -> dict:
    with open(CHECKPOINT_FILE) as f:
        return json.load(f)

def update_chapter_status(
    chapter_id: str,
    status: str,
    error: str = None
) -> None:
    data = read_checkpoint()
    chapter = next(c for c in data["chapters"] if c["id"] == chapter_id)
    chapter["status"] = status
    chapter["error"] = error
    _write_checkpoint(data)
Enter fullscreen mode Exit fullscreen mode

Key detail: os.replace() is atomic on POSIX systems. If the process crashes during the write, the old checkpoint file survives intact. You never end up with a corrupted checkpoint.

Crash Recovery

This is the key function — run it at startup before anything else:

def recover_running_orphans() -> int:
    """
    Reset any RUNNING chapters back to PENDING.

    A chapter in RUNNING state at startup means the last process
    crashed while working on it. Reset to PENDING so it retries.

    Returns the number of chapters recovered.
    """
    data = read_checkpoint()
    recovered = 0

    for chapter in data["chapters"]:
        if chapter["status"] == "RUNNING":
            chapter["status"] = "PENDING"
            chapter["error"] = "recovered_from_crash"
            recovered += 1

    if recovered:
        _write_checkpoint(data)
        print(f"⚠️  Recovered {recovered} orphaned chapter(s) from previous crash")

    return recovered
Enter fullscreen mode Exit fullscreen mode

The Main Loop

import time

def run_pipeline(process_fn, max_retries: int = 3) -> None:
    """
    Main pipeline loop with crash recovery and retry logic.
    process_fn(chapter_id) -> (success: bool, error: str | None)
    """
    # 1. Recover from any previous crash
    recover_running_orphans()

    data = read_checkpoint()
    pending = [c for c in data["chapters"] if c["status"] == "PENDING"]

    print(f"📋 {len(pending)} chapters to process")

    for chapter in pending:
        chapter_id = chapter["id"]

        # 2. Mark as RUNNING before we start
        update_chapter_status(chapter_id, "RUNNING")
        print(f"⚙️  Processing {chapter_id}...")

        success = False
        last_error = None

        # 3. Try up to max_retries times
        for attempt in range(max_retries):
            try:
                ok, error = process_fn(chapter_id)
                if ok:
                    success = True
                    break
                else:
                    last_error = error
                    print(f"  Attempt {attempt+1} failed: {error[:100]}")
                    time.sleep(2 ** attempt)  # Exponential backoff
            except Exception as e:
                last_error = str(e)
                print(f"  Attempt {attempt+1} exception: {e}")
                time.sleep(2 ** attempt)

        # 4. Update final status
        if success:
            update_chapter_status(chapter_id, "DONE")
            print(f"  ✅ DONE")
        else:
            update_chapter_status(chapter_id, "NEEDS_REVIEW", error=last_error)
            print(f"  ❌ NEEDS_REVIEW: {last_error}")

    # 5. Summary
    data = read_checkpoint()
    counts = {}
    for c in data["chapters"]:
        counts[c["status"]] = counts.get(c["status"], 0) + 1
    print(f"\n📊 Final: {counts}")
Enter fullscreen mode Exit fullscreen mode

Status Dashboard

Quick view of pipeline state:

def print_dashboard() -> None:
    data = read_checkpoint()

    status_icons = {
        "PENDING": "",
        "RUNNING": "🔄",
        "DONE": "",
        "NEEDS_REVIEW": "",
    }

    print("\n=== Pipeline Status ===")
    for chapter in data["chapters"]:
        icon = status_icons.get(chapter["status"], "?")
        err = f"{chapter['error'][:60]}" if chapter.get("error") else ""
        print(f"  {icon} {chapter['id']}: {chapter['status']}{err}")

    done = sum(1 for c in data["chapters"] if c["status"] == "DONE")
    total = len(data["chapters"])
    print(f"\nProgress: {done}/{total} ({done/total*100:.0f}%)\n")
Enter fullscreen mode Exit fullscreen mode

Usage

def my_llm_call(chapter_id: str) -> tuple[bool, str]:
    """Your actual processing logic goes here."""
    # Call your LLM API, validate output, etc.
    # Return (True, None) on success
    # Return (False, "error message") on failure
    try:
        # ... your code here
        return True, None
    except Exception as e:
        return False, str(e)

# Initialize (only creates file if it doesn't exist)
init_checkpoint(["ch01", "ch02", "ch03", "ch04", "ch05"])

# Run — safe to Ctrl+C and restart at any point
run_pipeline(my_llm_call)
Enter fullscreen mode Exit fullscreen mode

Why Not a Database?

For pipelines under ~1000 items, a JSON file is simpler and more portable:

  • No setup, no connection string, no credentials
  • git diff checkpoint.json shows you exactly what changed
  • Easy to manually edit if you need to reset a specific chapter
  • Works on any machine without installing dependencies

Switch to SQLite when you need concurrent writers or >10k items.


I built this pattern while creating an automated ebook pipeline that generates, validates, and publishes technical books end-to-end for $20/month in API costs.

The full pipeline (10 scripts, complete state machine, EPUB output): germy5.gumroad.com/l/xhxkzz — $19.99.

Top comments (0)