DEV Community

137Foundry
137Foundry

Posted on

Idempotency in Data Pipelines: How to Prevent Duplicate Records

A pipeline that runs twice should produce the same result as one that runs once. That property is idempotency, and its absence is one of the most common sources of silent data corruption in integration systems. A partially completed run gets retried, the retry reprocesses records that already loaded, and the destination ends up with duplicates that neither the source system nor any monitoring alert ever surfaced.

Designing for idempotency is not complex, but it requires making explicit decisions about state management that are easy to skip when building the initial pipeline.

What Idempotency Means in Data Integration

An idempotent operation produces the same effect when applied once or multiple times. In data integration terms, this means:

  • Inserting the same record twice produces one record, not two
  • Running a pipeline over the same time window twice produces the same output as running it once
  • Retrying a failed partial run does not create duplicates for the records that already loaded

The opposite is a non-idempotent pipeline: every execution adds records, so duplicate runs produce duplicate data. Most pipelines start as non-idempotent because insert operations are simpler to implement than upsert operations, and the duplicate problem only becomes visible after a retry event occurs.

Common Sources of Duplicate Records

Pipeline retries after partial success. A run processes 8,000 of 10,000 records successfully, then fails. The retry starts from the beginning and reprocesses the 8,000 records that already loaded. Without idempotency, these 8,000 records now exist twice.

Parallel execution without coordination. Two instances of the same pipeline run simultaneously, both extracting from the same source window and loading to the same destination. This happens more often than expected with cloud schedulers that retry hung jobs while the original is still running.

Checkpoint failures. A pipeline tracks its progress with checkpoints (offsets, cursors, timestamps). If the checkpoint is written after loading but before the success acknowledgment, a crash between the load and the checkpoint write causes the load to be repeated on the next run.

Upsert as the Foundation of Idempotency

The most reliable approach to idempotency at the storage layer is the upsert operation: insert the record if it does not exist, update it if it does. In SQL terms, this is typically INSERT ... ON CONFLICT DO UPDATE (PostgreSQL) or MERGE (SQL Server, Oracle).

For PostgreSQL, the pattern looks like:

INSERT INTO events (event_id, payload, processed_at)
VALUES ($1, $2, $3)
ON CONFLICT (event_id) DO UPDATE
SET payload = EXCLUDED.payload,
    processed_at = EXCLUDED.processed_at;
Enter fullscreen mode Exit fullscreen mode

The event_id column is the natural key that identifies whether a record already exists. For this to work reliably, every record must have a stable unique identifier that is consistent across extraction runs. If the source does not provide one, you must generate one deterministically from the record's content.

Deterministic ID Generation

When source records do not include a stable unique identifier, you can generate one by hashing the record's identifying fields:

import hashlib
import json

def generate_record_id(record, key_fields):
    """Generate a stable ID from record content."""
    key_data = {field: record[field] for field in key_fields}
    canonical = json.dumps(key_data, sort_keys=True)
    return hashlib.sha256(canonical.encode()).hexdigest()
Enter fullscreen mode Exit fullscreen mode

This approach produces the same ID for the same input data, allowing upserts to detect duplicates even when the source does not provide a unique key. The fields used for hashing must be stable (not timestamps or auto-incremented values) and must uniquely identify the record within the source system.

Time Window Idempotency

For pipelines that extract data by time window, idempotency requires that reprocessing the same window produces the same result. Two approaches work:

Truncate and reload. Before loading data for a time window, delete all existing records for that window and reload from scratch. This is simple and reliable but requires the destination to support deletions and may not work if other processes are writing to the same table concurrently.

Upsert with timestamp tracking. Keep the upsert approach but track which time windows have been fully processed. On retry, skip windows that are marked complete and reprocess only windows that failed mid-run. The Kafka documentation covers offset management patterns that implement this for stream-based pipelines.

Monitoring for Duplicate Records

Even with idempotency in place, monitoring for duplicates provides a safety net:

  • Count distinct records at source and at destination for the same time window. A destination count higher than the source count (allowing for fan-out) indicates duplicates.
  • Check cardinality of the natural key at the destination. Any key value with count greater than one is a duplicate.
  • Alert when the destination record count for a time window increases between run N and run N+1 without a corresponding increase in the source count.

These checks can be run as part of the reconciliation job described in the guide on monitoring data integration pipelines in production.

HTTP Idempotency for API-Based Pipelines

For pipelines that write to destination systems via API, HTTP idempotency keys are the equivalent mechanism. Many modern APIs accept an idempotency key header that causes the server to de-duplicate requests with the same key. The HTTP RFC 7231 defines idempotency at the HTTP method level, and many API providers extend this with explicit idempotency keys.

Submit the same idempotency key with the same payload, and the API returns the previous result without re-processing. This protects against retries caused by network timeouts where the original request succeeded but the response was lost.

Data center infrastructure for pipeline reliability
Photo by Bùi Hoàng Long on Pexels

Testing Idempotency in Pipeline Code

Idempotency is a property that is difficult to verify by inspection. The only way to confirm a pipeline is truly idempotent is to run it twice against the same input and compare the outputs.

The test structure is straightforward:

  1. Run the pipeline once against a known test dataset.
  2. Capture the destination state: record count, contents of key records, and any generated IDs.
  3. Run the pipeline again against the same dataset without clearing the destination.
  4. Assert that the destination state is identical to what was captured in step 2.

For upsert-based pipelines, the destination record count must not increase on the second run, and the content of each record must match the first run's output.

def test_pipeline_is_idempotent(test_dataset, pipeline, destination):
    # First run
    pipeline.run(test_dataset)
    state_after_first_run = destination.snapshot()
    count_first = len(state_after_first_run)

    # Second run with same input, destination not cleared
    pipeline.run(test_dataset)
    state_after_second_run = destination.snapshot()
    count_second = len(state_after_second_run)

    assert count_first == count_second, (
        f"Duplicate records created: {count_second - count_first}"
    )
    assert state_after_first_run == state_after_second_run
Enter fullscreen mode Exit fullscreen mode

For time-window-based pipelines, the test should cover reprocessing an overlapping window: run for window [T1, T2], then run again for [T0, T2] where T0 is before T1. Records in the T1-T2 overlap should not be duplicated after the second run.

Testing idempotency during development is significantly cheaper than discovering and remediating duplicate data in production. A deduplication job on a production table with tens of millions of records is a multi-hour operation that disrupts normal pipeline runs and may still leave edge cases unresolved if the duplicate detection logic is not precise.

Idempotency as a Design Constraint, Not a Fix

The most important thing about idempotency is that it needs to be designed in from the start. Adding idempotency to an existing non-idempotent pipeline that has been running in production requires auditing the destination for existing duplicates, migrating the storage layer to support upserts, and potentially deduplicating historical data. That is a significant effort compared to building with upserts from day one.

137Foundry builds data integration pipelines with reliability properties including idempotency, dead letter queues, and schema change detection built in. The data integration service and the broader services hub describe the full scope of what we work on.

Top comments (0)