DEV Community

Cover image for Designing Resilient and Resumable Batch Scoring Jobs
beefed.ai
beefed.ai

Posted on • Originally published at beefed.ai

Designing Resilient and Resumable Batch Scoring Jobs

  • Where large-scale batch scoring actually breaks (and why)
  • Checkpointing, state, and idempotency: building blocks for resumability
  • Orchestration patterns: retries, partial reruns, and backfills that don't double-score
  • Testing recovery paths and documenting a battle-tested runbook
  • A runnable checklist and Spark + Delta pattern for resumable batch jobs

Operational failures — not model quality — are the usual root cause when production scoring stops being trusted: long-running jobs die mid-run, partial outputs land in sinks, and downstream consumers either see duplicates or gaps. Design your batch scoring as resumable batch jobs from day one: treat reruns as first-class events and the rest becomes engineering detail.

You run nightly scoring on terabytes, and the symptoms are always the same: partial directories with leftover files, downstream dashboards with missing rows, and a frantic re-run that doubles predictions for half the universe. Those symptoms point at three missing guarantees: durable checkpoints of progress, idempotent (or transactional) writes, and orchestration that accepts partial reruns. The rest of this article shows concrete, operational patterns I use to guarantee exactly-once processing or safe reruns in large-scale batch scoring.

Where large-scale batch scoring actually breaks (and why)

  • Driver or cluster preemption: long jobs on spot/preemptible instances can be killed mid-run; without fine-grained progress markers you must re-run whole job and risk duplicates or gaps.

  • Partial commits to object storage: writing Parquet/CSV directly into a final path and crashing before a manifest/marker is written leaves orphan files that downstream queries may or may not see. Object stores like S3 do not provide an in-built multi-file transactional commit, so higher-level transaction logs or commit protocols are necessary. Delta Lake implements a transactional log to avoid partial-commit visibility; this addresses the problem of orphaned files and commit atomicy for table snapshots.

  • Long lineage / recompute cost: Spark RDDs / transformations with huge lineage graphs can blow up recovery time; use explicit checkpointing to truncate lineage when necessary. Use RDD.checkpoint() or localCheckpoint() with caution — local checkpoints trade fault-tolerance for speed.

  • Concurrency and write conflicts: multiple clusters or retries racing to write to the same partition create conflict and corrupt data without an ordering or transactional coordinator. Delta Lake uses optimistic concurrency control and a transaction log to preserve ACID semantics per table.

  • Lack of idempotent sinks: many sinks (plain files, some databases) will happily accept duplicate writes; without deterministic primary keys or transactional semantics, retries create duplication. Transactional file-formats (Delta, Hudi, Iceberg) or sink-level deduplication avoid this.

  • Orchestration blind spots: monolithic DAG tasks that process months of data in one step are impossible to resume cheaply; orchestration tools must be used to coordinate partitioned execution and backfills. Airflow, Dagster, and others support backfills and re-exec-from-failure semantics — but the pipeline must be designed to exploit them. [16search0]

Every failure mode above is survivable — but only if your pipeline records progress durably, writes results idempotently (or transactionally), and your orchestrator can re-run only what’s needed.

Checkpointing, state, and idempotency: building blocks for resumability

Design choices to make a job resumable break into three concrete capabilities: (1) durable progress state, (2) idempotent or transactional writes, and (3) deterministic input partitioning so retries are bounded.

  • Durable progress state (control/marker patterns)

    • Maintain a small control table that records processing state per partition/key: partition_key, run_id, status ∈ {PENDING, PROCESSING, COMMITTED, FAILED}, last_updated, file_manifest (optional). Persist this in a transactional metadata store (Postgres, DynamoDB, BigQuery, or a Delta table). Use an atomic claim update (e.g., conditional update or SELECT FOR UPDATE) to avoid two workers processing the same partition simultaneously.
    • Use compact “commit” markers in object storage when you must write files: write to a temporary path and then publish a single manifest or _SUCCESS marker — but prefer a transactional table format where a single metadata commit determines visibility. Delta/Hudi/Iceberg provide that.
  • Checkpointing strategies for long Spark jobs

    • Use RDD.checkpoint() or RDD.localCheckpoint() to truncate lineage when recomputation cost is high — prefer durable checkpointing (to a reliable filesystem) when you need fault tolerance; localCheckpoint() is useful for performance but not safe with dynamic allocation.
    • For streaming-style micro-batches (or very long batch loops that behave like micro-batches), Structured Streaming’s checkpointing plus WAL guarantees end-to-end semantics in stream processing. Structured Streaming’s model (micro-batch + checkpoint barrier + WAL) underpins exactly-once for supported sinks.
  • Idempotent writes and exactly-once approaches

    • Use transactional table formats for writes: Delta Lake offers ACID transactions and optimistic concurrency control; it also exposes txnAppId + txnVersion options that can make batch writes idempotent (useful inside foreachBatch and in reruns).
    • For sinks without ACID commits, implement application-level idempotency: a deterministic primary key for predictions (e.g., entity_id + event_time), then write with upsert/merge semantics. For systems that support dedup keys (e.g., BigQuery insertId / committed streams), use those features to deduplicate in the sink.
    • Streaming systems that require end-to-end exactly-once often rely on two-phase commit or transactional producers; Flink’s TwoPhaseCommitSinkFunction is the canonical example and illustrates the general two-phase approach: prepare writes, checkpoint, then commit atomically.

Important: Idempotency is simpler than trying to make every leg of your pipeline strictly transactional. Where a transactional sink exists, use it. Where it does not, design each write to be naturally idempotent (upsert by key, or write-to-staging+atomic-rename/manifest).

Orchestration patterns: retries, partial reruns, and backfills that don't double-score

Orchestration is the glue that makes checkpointing and idempotency practical at scale.

  • Metadata-driven, partitioned orchestration

    • Drive runs from your control table: the orchestrator queries partitions with status = PENDING (or FAILED) and schedules a task per partition. Each worker attempts to atomically claim the partition row (transition to PROCESSING), does work, then atomically marks it COMMITTED with a file_manifest or row_count. This makes the job resumable and exactly-once at the partition granularity.
    • Smaller tasks (hourly/day partitions or fixed-size shards) reduce blast radius and make retries cheap.
  • Retries and backoff (orchestration retries)

    • Configure exponential backoff and limits at the task level in your orchestrator (Airflow, Dagster, Prefect). Let the task fail and escalate only after retries are exhausted; do not conflate transient retries with semantic reprocessing. Airflow’s best practices recommend not storing local state for tasks and prefer remote durable stores (S3/HDFS/DB) for intermediate artifacts.
    • For backfills, use the orchestrator’s backfill feature instead of manually rerunning monolithic jobs; Airflow’s dags backfill / dags trigger semantics allow you to re-run historical data intervals.
  • Partial reruns and “re-execute from failure”

    • Use orchestration systems that support re-execution from failure or re-run-per-partition. Tools like Dagster and many modern orchestrators support “re-execute from failed step” semantics so you don’t replay already-successful, idempotent steps. [16search0]
    • When re-running, ensure your run identifiers (run_id, txnAppId + txnVersion, or insertId) align with the idempotency approach so retries do not create duplicates. Delta’s txnAppId/txnVersion pair is an explicit mechanism to make foreachBatch writes idempotent on re-run.
  • Partial commit pattern (staging + commit)

    • Write outputs to s3://bucket/tmp/{run_id}/{partition}/... and only after all files are successfully written, perform a single commit step: either (a) move files into the final location (rename may be non-atomic on object stores), or (b) write a manifest or atomic log entry that signals downstream readers to include the files. Transactional table formats avoid the object-store rename pitfalls by committing via a transaction log.

Testing recovery paths and documenting a battle-tested runbook

Testing the recovery path is often the part teams skip — and the place where processes fail in production.

  • Unit and integration tests

    • Write unit tests around your idempotency logic (dedupe keys, upsert/merge SQL). For example: run the scoring job twice against a small dataset with the same run_id and assert the output table row-count is unchanged and no duplicates exist.
    • Implement an integration test that simulates a partial failure: start a job, kill the process after file writes but before the commit, then re-run and assert no duplication or corruption.
  • End-to-end failure injection (chaos experiments)

    • Run controlled chaos experiments in a staging environment: terminate workers, kill the driver, throttle network I/O, and assert the pipeline resumes and does not corrupt data. Netflix’s Chaos Monkey is the canonical example of fault-injection for resilience testing.
  • Data validation and safety nets

    • Integrate data quality checkpoints using a validation framework (for example, Great Expectations Checkpoints) so that a failing validation prevents a commit or triggers an automated rollback. Use validation Checkpoints as a gate in your orchestrator.
  • Runbook structure and content

    • Keep runbooks ultra-terrse and action-oriented: for each alert/severity include immediate triage steps, how to read the control table, how to locate the latest run_id, how to replay a single partition, and how to perform a full backfill. PagerDuty and SRE guidance emphasize keeping runbooks concise and executable under stress.
    • Example runbook quick reference fields:
    • Title / service
    • Owner / on-call rotation
    • Symptoms that trigger this runbook
    • Quick triage (logs, control table query, last successful run_id)
    • Recovery steps (minor: re-run partition X with --resume; major: revert to previous snapshot)
    • Backfill instructions (ranges, parallelism limits, cost estimate)
    • Postmortem checklist (collect logs, tag incident, update runbook)

Callout: A runbook that can’t be executed by a competent engineer in five minutes under stress is too long. Keep it checklist-style and keep the most-used commands first. [18search8]

A runnable checklist and Spark + Delta pattern for resumable batch jobs

Below is a compact, actionable checklist and a small runnable pattern I use when I need idempotent, resumable batch scoring at scale.

Checklist (operational minimum)

  1. Partition your input into deterministic shards (e.g., date + hash mod N).
  2. Create a durable control table for partition_key, run_id, status, attempts, manifest.
  3. Use a transactional sink when possible (Delta/Hudi/Iceberg); if not possible, implement staging + manifest + atomic publish.
  4. Ensure writes include stable deduplication keys (entity_id + event_timestamp) or use sink-provided dedup semantics (e.g., BigQuery insertId / committed streams).
  5. Instrument and test: unit tests for idempotent writes, integration test for partial-failure replay, periodic chaos experiments in staging.
  6. Document a terse runbook with quick triage queries and reinstatement/backfill commands.

A compact Spark + Delta pattern (Python pseudocode)

# Assumptions:
# - Predictions are written partitioned by `data_date` (YYYY-MM-DD)
# - A control table `control.batch_partitions` (Delta or Postgres) tracks status
# - Model is loaded as `model.predict(df)` (pseudocode)
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName("resumable_batch_scoring").getOrCreate()
txn_app_id = "batch_scoring_service_v1"
batch_ts = int(time.time())   # monotonic txnVersion per run

partitions = spark.read.format("delta").load("s3://data/partitions_list").collect()
for p in partitions:
    pk = p['partition_key']  # e.g. '2025-12-15-shard-03'

    # Atomically claim a partition (example using a Delta control table)
    claim_sql = f"""
    MERGE INTO control.batch_partitions AS t
    USING (SELECT '{pk}' AS partition_key, '{batch_ts}' AS run_id, 'PROCESSING' AS status) AS s
    ON t.partition_key = s.partition_key
    WHEN MATCHED AND t.status IN ('PENDING','FAILED') THEN
      UPDATE SET status = 'PROCESSING', run_id = s.run_id, attempts = t.attempts + 1, updated_at = current_timestamp()
    WHEN NOT MATCHED THEN
      INSERT (partition_key, run_id, status, attempts, updated_at)
      VALUES (s.partition_key, s.run_id, s.status, 1, current_timestamp())
    """
    spark.sql(claim_sql)

    try:
        df = spark.read.parquet(f"s3://data/input/{pk}")
        preds = model.predict(df)  # pseudocode; produce dataframe `preds`

        # Idempotent write using Delta txn options
        (preds.write
              .format("delta")
              .mode("append")
              .option("txnAppId", txn_app_id)
              .option("txnVersion", batch_ts)    # monotonic per run
              .save("/mnt/delta/predictions"))

        # Mark partition as committed and store a manifest or row_count
        spark.sql(f"UPDATE control.batch_partitions SET status='COMMITTED', manifest='OK', updated_at=current_timestamp() WHERE partition_key='{pk}'")
    except Exception as e:
        spark.sql(f"UPDATE control.batch_partitions SET status='FAILED', last_error = '{str(e)}', updated_at=current_timestamp() WHERE partition_key='{pk}'")
        raise
Enter fullscreen mode Exit fullscreen mode

Small comparison table (quick reference)

Pattern Exactly-once support Best for Note
Delta Lake (transaction log) Yes (table-scoped ACID) Large file-based analytics + concurrent writers txnAppId/txnVersion enable idempotent writes.
Apache Hudi Yes (upsert + incremental commits) CDC/upsert-heavy workloads Good for incremental updates and incremental queries.
Apache Iceberg Yes (manifest/atomic commits) Table-level ACID over object stores Strong metadata management; per-table atomic commits.
Plain S3 + manifest No (manual) Simple outputs for low-concurrency Implement staging + manifest; careful with orphan files.
BigQuery Storage Write API Exactly-once with committed streams High-throughput streaming to BigQuery Use committed streams & insertId semantics where available.

Sources

Structured Streaming Programming Guide (Spark 3.0.0) - Explains checkpointing, write-ahead logs and the fault-tolerance semantics behind Structured Streaming and exactly-once guarantees.

pyspark.RDD.checkpoint — PySpark documentation (3.4.2) - RDD checkpointing API and localCheckpoint() semantics and caveats.

Concurrency control — Delta Lake Documentation - Delta Lake’s ACID guarantees, optimistic concurrency control, and snapshot semantics used to avoid partial commits and concurrent corruption.

Multi-cluster writes to Delta Lake Storage in S3 (Delta blog) - Design explanation of atomic commit challenges on S3 and Delta's S3DynamoDBLogStore approach to prevent concurrent commit conflicts.

Table streaming reads and writes — Delta Lake Documentation (idempotent writes in foreachBatch) - txnAppId and txnVersion options for idempotent writes inside foreachBatch.

Write Operations | Apache Hudi - Hudi’s upsert / incremental write semantics for incremental and CDC-style use cases.

Hive — Apache Iceberg documentation - Notes about table-level atomicity and per-table commit semantics in Iceberg.

Streaming data into BigQuery (Storage Write API and insert semantics) - BigQuery streaming insertion options, insertId semantics, and the Storage Write API’s committed streams for exactly-once.

An overview of end-to-end exactly-once processing in Apache Flink - Two-phase commit and checkpointing explanation for end-to-end exactly-once in stream processing.

Message Delivery Guarantees for Apache Kafka (Confluent) - Definitions and trade-offs for at-most-once, at-least-once, and exactly-once semantics in message delivery.

Best Practices — Airflow Documentation (2.6.0) - Orchestration best practices, backfill behavior, and notes on storing state and communicating between tasks.

Run a Checkpoint | Great Expectations - How to use Great Expectations Checkpoints for production validation, and how to run validations programmatically as a gate.

What is a Runbook? | PagerDuty - Runbook structure, why runbooks exist, and guidance for keeping them concise and executable under pressure.

Netflix/chaosmonkey (GitHub) - Chaos Monkey example and the chaos engineering rationale for proactively testing failure modes.

Treat reruns as a first-class operational mode: durable progress markers, deterministic partitioning, and idempotent/transactional writes convert failures from "data disasters" into routine operational events that your runbook can resolve quickly and repeatably.

Top comments (0)