DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Durable Agent State Checkpoints: Resume Where You Left Off When the Run Crashes

Your agent runs for 4 minutes. At minute 3, the process gets OOM-killed. Or the network drops. Or the user closes the tab. The agent starts over from scratch. Everything it did in those 3 minutes is gone.

This is fine for a 30-second run. It is not fine for an agent that is searching, writing, and summarizing 50 documents. You cannot afford to restart from scratch every time something goes wrong.

agent-state-checkpoint gives your agent durable JSON checkpoints so it can resume from the last saved point instead of restarting from zero.


The Shape of the Fix

from agent_state_checkpoint import StateCheckpoint

ckpt = StateCheckpoint(path="./checkpoints/run-abc123.json")

# Load existing state or start fresh
state = ckpt.load() or {
    "documents_processed": [],
    "current_index": 0,
    "findings": [],
}

documents = get_all_documents()

for i, doc in enumerate(documents):
    if i < state["current_index"]:
        continue  # Already processed, skip

    findings = analyze_document(doc)
    state["findings"].extend(findings)
    state["documents_processed"].append(doc["id"])
    state["current_index"] = i + 1

    # Save after each document
    ckpt.save(state)

final_result = compile_report(state["findings"])
ckpt.mark_complete(result=final_result)
Enter fullscreen mode Exit fullscreen mode

If the process dies at document 32, the next run loads the checkpoint, skips documents 0-31, and resumes from document 32.


What It Does NOT Do

agent-state-checkpoint does not checkpoint automatically. You call ckpt.save(state) explicitly after each unit of work. The checkpoint frequency is your decision: after every document, every 10 documents, every minute. Finer granularity means less work lost on failure; coarser granularity means less I/O overhead.

It does not checkpoint LLM messages or conversation history. It saves whatever Python dict you pass to save(). If you want to checkpoint the message list, include it in the state dict. If you want to resume from the same LLM context, include the messages.

It does not sync across processes. One StateCheckpoint instance, one file, one process. For distributed checkpointing across multiple worker processes, you need a shared backend (database, S3).


Inside the Library

The checkpoint uses atomic file writes to prevent partial writes on crash:

import json
import os
import tempfile

class StateCheckpoint:
    def __init__(self, path: str):
        self._path = Path(path)
        self._path.parent.mkdir(parents=True, exist_ok=True)

    def save(self, state: dict) -> None:
        record = {
            "state": state,
            "status": "in_progress",
            "saved_at": time.time(),
        }
        self._atomic_write(record)

    def load(self) -> dict | None:
        if not self._path.exists():
            return None

        with self._path.open() as f:
            record = json.load(f)

        if record.get("status") == "complete":
            return None  # Already done, caller should use mark_complete result

        return record.get("state")

    def mark_complete(self, result: Any = None) -> None:
        record = {
            "state": None,
            "status": "complete",
            "result": result,
            "completed_at": time.time(),
        }
        self._atomic_write(record)

    def _atomic_write(self, record: dict) -> None:
        # Write to temp file, then rename (atomic on POSIX)
        tmp_path = self._path.with_suffix(".tmp")
        try:
            with tmp_path.open("w") as f:
                json.dump(record, f)
                f.flush()
                os.fsync(f.fileno())
            tmp_path.rename(self._path)
        except Exception:
            tmp_path.unlink(missing_ok=True)
            raise

    def get_result(self) -> Any:
        if not self._path.exists():
            return None
        with self._path.open() as f:
            record = json.load(f)
        if record.get("status") == "complete":
            return record.get("result")
        return None

    def clear(self) -> None:
        self._path.unlink(missing_ok=True)
Enter fullscreen mode Exit fullscreen mode

The os.rename() on POSIX is atomic: either the old file stays or the new file appears. You never get a partial write. The fsync() before rename ensures the data reaches disk before the rename.


When to Use It

Use it for batch jobs: analyzing a large corpus of documents, running evaluations over a test set, generating content for many items. Any job where restart means redoing work.

Use it for multi-step agentic workflows where each step has a meaningful completion state. After step 1 (data collection) succeeds, checkpoint the collected data. After step 2 (analysis) succeeds, checkpoint the analysis. If step 3 (writing) fails, you do not re-run steps 1 and 2.

Use it alongside agent-resume for the higher-level checkpoint protocol. agent-resume handles the resume_or_start / mark_done lifecycle; agent-state-checkpoint handles the raw JSON state I/O. They complement each other.

Skip it for short interactive sessions. If your agent responds to a single user message and terminates, there is no meaningful point to resume from. Checkpointing adds overhead without benefit.


Install

pip install git+https://github.com/MukundaKatta/agent-state-checkpoint

# Or from PyPI
pip install agent-state-checkpoint
Enter fullscreen mode Exit fullscreen mode
from agent_state_checkpoint import StateCheckpoint

def run_research_agent(task_id: str, urls: list[str]) -> dict:
    ckpt = StateCheckpoint(path=f"./checkpoints/{task_id}.json")

    state = ckpt.load() or {
        "processed_urls": [],
        "summaries": [],
        "current_idx": 0,
    }

    print(f"Resuming from index {state['current_idx']} of {len(urls)}")

    for i, url in enumerate(urls):
        if i < state["current_idx"]:
            continue

        try:
            summary = fetch_and_summarize(url)
            state["summaries"].append({
                "url": url,
                "summary": summary,
            })
            state["processed_urls"].append(url)
            state["current_idx"] = i + 1
            ckpt.save(state)
            print(f"Saved checkpoint at {i + 1}/{len(urls)}")
        except Exception as e:
            print(f"Failed on {url}: {e}. Checkpoint saved at {state['current_idx']}.")
            raise

    report = generate_report(state["summaries"])
    ckpt.mark_complete(result=report)
    return report
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
agent-resume Higher-level resume_or_start / mark_done lifecycle
conversation-codec JSONL conversation persistence with encryption
agent-step-log Per-step log for observability alongside checkpointing
llm-fixture-replay VCR-style replay for testing checkpointed state
agent-scratchpad In-memory keyed notepad for transient state

The persistence stack: agent-state-checkpoint for durable state, agent-resume for job lifecycle management, agent-step-log for observability, conversation-codec for conversation history.


What's Next

Version-tagged checkpoints: each save includes a schema_version field. On load, if the schema version does not match the current code's expected version, run a migration function before returning the state. Prevents incompatible checkpoint loads after code changes.

S3 backend: StateCheckpoint(path="s3://bucket/checkpoints/run-abc.json") for cloud-native agents where the process may run on ephemeral compute and local disk is not reliable.

Incremental checkpoints: instead of writing the full state on every save, write only the delta. For large state dicts, this reduces I/O on each checkpoint call. The load function then reconstructs the full state by replaying the deltas.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)