DEV Community

137Foundry
137Foundry

Posted on

Building a Reliable Python Data Sync Without a Pipeline Framework

A reliable Python data sync does not require Airflow, Prefect, or any pipeline framework. It requires a script with clear failure modes, structured logging, an idempotent write strategy, and a way to get alerted when something goes wrong.

This guide builds each piece incrementally.

Step 1: Structure the Script Around a Single Entry Point

The most important design decision for an automation script is how it starts and ends. A single run() function that either succeeds and exits 0, or fails and exits 1, gives cron (and any monitoring system) a clean signal to work with.

import sys
import logging
from datetime import datetime, timezone

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)

def sync_data():
    # your actual sync logic
    pass

def run():
    try:
        start = datetime.now(tz=timezone.utc)
        log.info("sync started")
        sync_data()
        elapsed = (datetime.now(tz=timezone.utc) - start).total_seconds()
        log.info("sync completed in %.1fs", elapsed)
    except Exception as exc:
        log.error("sync failed: %s", exc, exc_info=True)
        sys.exit(1)

if __name__ == "__main__":
    run()
Enter fullscreen mode Exit fullscreen mode

The try/except at the top level catches any unhandled exception and ensures the script exits with a non-zero code. Without this, Python scripts that raise exceptions still exit with code 0 in some configurations, and cron will not detect the failure.

Step 2: Configure Everything With Environment Variables

Hardcoded credentials, URLs, and paths make the same script behave differently in different environments without any code change. Environment variables solve this.

import os

def get_config():
    return {
        "source_url": os.environ["SOURCE_API_URL"],
        "api_key": os.environ["SOURCE_API_KEY"],
        "output_dir": os.environ.get("OUTPUT_DIR", "/data/sync"),
        "max_retries": int(os.environ.get("MAX_RETRIES", "3")),
    }
Enter fullscreen mode Exit fullscreen mode

Use os.environ["KEY"] for required variables -- this raises a KeyError immediately if the variable is not set, which surfaces the misconfiguration at startup rather than mid-run. Use os.environ.get("KEY", default) for optional variables with defaults.

Step 3: Fetch Incrementally When the Source Allows It

Full re-syncs are simple to implement but expensive at scale. If the source API supports filtering by a timestamp (last modified, created at), use it to fetch only records that changed since the last successful run.

from pathlib import Path
import json

def get_last_sync_time(state_file: Path) -> str | None:
    if state_file.exists():
        state = json.loads(state_file.read_text())
        return state.get("last_sync_time")
    return None

def save_last_sync_time(state_file: Path, timestamp: str):
    state_file.write_text(json.dumps({"last_sync_time": timestamp}))
Enter fullscreen mode Exit fullscreen mode

The state file is a simple JSON file on disk. It records the timestamp of the last successful sync. On the next run, the script uses this timestamp to request only newer records. If the run fails, the timestamp is not updated, so the next run re-fetches from the last successful point.

Step 4: Write Data Idempotently

An idempotent write produces the same result whether it runs once or ten times. For a sync job that might run multiple times due to retries or debugging, idempotency prevents duplicate records.

For database writes, the standard pattern is upsert -- insert the record if it does not exist, or update it if it does. In SQL:

INSERT INTO records (id, data, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
    data = EXCLUDED.data,
    updated_at = EXCLUDED.updated_at;
Enter fullscreen mode Exit fullscreen mode

For file-based output, write to a temporary file first, then atomically rename it to the final path. This prevents downstream consumers from reading a partially written file:

import tempfile
from pathlib import Path

def write_json_atomically(data, output_path: Path):
    tmp = Path(output_path.parent) / f".tmp_{output_path.name}"
    tmp.write_text(json.dumps(data, indent=2))
    tmp.rename(output_path)  # atomic on same filesystem
Enter fullscreen mode Exit fullscreen mode

Step 5: Add Structured Logging for Observability

Plain-text log messages are searchable with grep but hard to aggregate programmatically. Structured JSON logs are readable in both ways:

import json
import sys
from datetime import datetime, timezone

def log_event(level: str, message: str, **kwargs):
    record = {
        "ts": datetime.now(tz=timezone.utc).isoformat(),
        "level": level,
        "msg": message,
        **kwargs
    }
    stream = sys.stderr if level == "error" else sys.stdout
    print(json.dumps(record), file=stream, flush=True)

# Usage:
log_event("info", "fetched records", count=47, source="api")
log_event("error", "request failed", status_code=429, retry=2)
Enter fullscreen mode Exit fullscreen mode

Each line is valid JSON that can be piped to jq, shipped to any log service, or parsed by a monitoring script that sends an alert when error lines appear.

Step 6: Schedule With Cron and Enable Email Alerting

Add the job to crontab with MAILTO set at the top of the crontab file. Cron will send the job's stderr output to that address whenever the job exits with a non-zero exit code.

MAILTO=team@yourcompany.com
0 6 * * * /usr/bin/python3 /opt/sync/run.py >> /var/log/sync.log 2>&1
Enter fullscreen mode Exit fullscreen mode

The >> appends stdout to the log file. The 2>&1 sends stderr to the same file. The MAILTO line sends stderr to the email address when the exit code is non-zero.

Use crontab.guru to verify the expression runs at the time you intend -- crontab.guru provides an interactive expression checker that translates cron syntax to plain English.

Step 7: Add a Freshness Healthcheck

A healthcheck monitors the output for staleness. If the sync job should run every day and produce a file, a separate script can verify the file's modification time and alert if it has not been updated in the expected window:

import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta

output_file = Path("/data/sync/latest.json")
max_age_hours = 25  # allow some slack beyond 24h schedule

if not output_file.exists():
    print("ERROR: output file missing", file=sys.stderr)
    sys.exit(1)

age = datetime.now(tz=timezone.utc) - datetime.fromtimestamp(
    output_file.stat().st_mtime, tz=timezone.utc
)
if age > timedelta(hours=max_age_hours):
    print(f"ERROR: output file is {age.total_seconds()/3600:.1f}h old", file=sys.stderr)
    sys.exit(1)

print("OK")
Enter fullscreen mode Exit fullscreen mode

Schedule this healthcheck independently of the sync job, at a time after the sync should have completed.

Step 8: Document the Configuration Requirements

Before deploying the script, document the expected environment variables, the expected output location, and the minimum record count threshold that signals something is wrong. This documentation serves two purposes: it is the onboarding reference for anyone who needs to run the script in a new environment, and it is the first debugging step when the script fails in production.

At minimum, capture:

  • Every environment variable the script requires, with a description and an example value
  • The expected output path and format
  • The cron schedule and the MAILTO address
  • Any TTY compatibility notes (libraries that behave differently in cron's minimal environment)
  • The expected freshness window -- how old is too old for the output to be considered stale

A .env.example file committed alongside the script serves as the canonical list of required configuration. A new deployment starts by copying .env.example to .env and populating each value. If a required variable is missing, the script fails immediately at startup with a KeyError rather than mid-run when the missing value is first accessed.

This level of documentation does not require much time to write once the script is working correctly. It is the thing that makes the difference between a script one person understands and a script any team member can deploy and debug. A seven-step sync with documented configuration requirements is a system a second engineer can maintain without asking the original author every time something goes wrong.

Putting It Together

This pattern -- single entry point, environment variable configuration, incremental fetching, idempotent writes, structured logging, cron scheduling, a freshness healthcheck, and documented configuration -- produces a production-quality data sync without any pipeline framework.

Python provides the standard library components. PostgreSQL or SQLite handle persistence. Cron handles scheduling.

The data automation guide from 137Foundry covers the strategic decisions behind this architecture -- when to use it, when to upgrade to a framework like Apache Airflow, and how to make the transition cleanly. The 137Foundry services overview describes how 137Foundry implements these patterns for production data systems.

notebook annotated diagrams pen
Photo by cottonbro studio on Pexels

Top comments (0)