The batch job had been running for 47 minutes. Item 847 of 1000. A network timeout. The process crashed.
Forty-seven minutes of LLM calls, gone. The job had to restart from item 1.
This is the kind of failure that costs money and time. The fix is not to prevent crashes — crashes happen. The fix is to resume from where you stopped.
The Shape of the Fix
from agent_resume import AgentResume, JsonlStore
store = JsonlStore(path="./checkpoints/batch-job-42.jsonl")
agent = AgentResume(store=store)
with agent.resume_or_start(job_id="batch-job-42") as session:
for item in session.remaining(all_items):
result = process_item(item) # expensive LLM call
session.mark_done(item.id, result=result)
print("Job complete")
First run: session.remaining() yields all items. As you call mark_done(), each item is checkpointed. If the process crashes at item 847, the checkpoint file contains items 0-846.
Second run: session.remaining() reads the checkpoint and yields only items 847-999. You resume exactly where you stopped.
What It Does NOT Do
agent-resume does not make your process_item() call idempotent. If mark_done() was already called for an item (meaning the LLM call completed and the result was saved), that item is skipped on resume. But if the crash happened during process_item() — after the LLM call started but before mark_done() — that call is retried.
It does not handle parallel workers. The JsonlStore uses a file lock for append operations, but concurrent workers processing different items from the same job is not tested and may have race conditions. Use one worker per job.
It does not store results. mark_done() records item IDs, not results. If you need to persist results as you go, write them to your own store alongside the checkpoint.
Inside the Library
The checkpoint file is append-only JSONL. Each completed item is one line:
{"id": "item-001", "ts": 1748107200}
{"id": "item-002", "ts": 1748107201}
...
On resume_or_start(), the store reads the file, builds a set of completed IDs, and returns a session object. session.remaining(items) filters out completed IDs.
mark_done() appends one line. The append is atomic at the OS level (open with O_APPEND). A crash mid-append produces a partial line that load() skips (with a warning).
def mark_done(self, item_id: str, result=None) -> None:
record = json.dumps({"id": item_id, "ts": int(time.time())}) + "\n"
with open(self._path, "a") as f:
f.write(record)
The result parameter is accepted but not stored in the default JsonlStore. If you want result persistence, subclass JsonlStore and override mark_done() to include result data.
The 35 tests cover: fresh start (empty file), mid-job resume, resume at boundaries (0 complete, all complete), mark_done idempotency, partial-write recovery, and the context manager exit.
When to Use It
Use it for any batch job longer than a few minutes that processes a list of items. LLM batch processing, bulk document analysis, dataset enrichment, any loop where each item requires one or more LLM calls.
The cost of checkpointing is one file write per item. For 1000 items with 1-second LLM calls, the overhead is negligible. For 10 items with 5-minute LLM calls, you almost certainly want checkpointing — losing one item means waiting 5 more minutes.
Skip it for short jobs. If your entire batch takes 30 seconds, restarting from scratch on failure is acceptable. The checkpoint overhead is not worth it.
Install
pip install git+https://github.com/MukundaKatta/agent-resume
from agent_resume import AgentResume, JsonlStore
from agent_deadline import Deadline, DeadlineExceeded
store = JsonlStore(path="./checkpoints/nightly-sync.jsonl")
agent = AgentResume(store=store)
deadline = Deadline.from_now(seconds=3600) # 1 hour max
try:
with agent.resume_or_start(job_id="nightly-sync-2026-05-24") as session:
for item in session.remaining(load_all_documents()):
deadline.check_or_raise()
summary = summarize_document(item)
save_summary(item.id, summary)
session.mark_done(item.id)
except DeadlineExceeded:
print("Time limit reached. Will resume next run.")
Sibling Libraries
| Library | What it solves |
|---|---|
agent-state-checkpoint |
Full agent state snapshot (not just item IDs) |
agent-deadline |
Wall-clock deadline for the outer job loop |
conversation-codec |
Persist conversation history between sessions |
llm-stop-conditions |
Composable stop conditions within each item's loop |
agentsnap |
Record actual token usage per item |
For per-item loops that need their own stop conditions and deadlines, nest agent-deadline and llm-stop-conditions inside the session.remaining() loop. agent-resume handles the outer job; the inner loop handles each item.
What's Next
Result storage is the obvious next step. The current JsonlStore only tracks IDs. A ResultJsonlStore that saves the result alongside the ID would cover the common case of wanting to both checkpoint progress and persist results in one place.
Distributed workers sharing a checkpoint is a harder problem. It requires either a lock-based append protocol (file locking on NFS is unreliable) or a database backend. A PostgresStore backend with row-level locking would handle parallel workers safely.
A repair command for corrupted checkpoint files would be useful. Right now partial writes are silently skipped. A python -m agent_resume repair ./checkpoints/batch-job-42.jsonl command that validates and repairs the file would be a good operational tool.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)