Building a resilient, observable data ingestion pipeline in Python
Building a resilient, observable data ingestion pipeline in Python
Data pipelines are the backbone of modern applications, but they often fail quietly or become brittle over time. This tutorial shows you how to design, implement, and operate a small but robust data ingestion pipeline in Python. You’ll learn practical patterns for reliability, observability, and maintainability, with concrete code you can adapt to real-world projects.
Overview
- Goals: ingest data from a third-party API, transform it, and store it in a database with reliability and visibility.
- Constraints: modest latency, fault tolerance, recoverability, and clear failure signals.
- Stack: Python 3.11+, requests or httpx for HTTP, pydantic for data validation, SQLAlchemy with SQLite (as a simple durable store), and simple logging + metrics.
High-level architecture
- Producer: fetch data from an external API and emit validated records into a durable queue (in-process queue with a retry backbone, or an on-disk log).
- Transformer: apply schema validation, enrich records, and handle idempotency keys to prevent duplicates.
- Sink: persist transformed records to a database; expose a basic API to query recent data (optional).
- Orchestrator: coordinate backoff retries, dead-letter handling, and periodic runs.
Key design patterns
- Idempotent ingest: use a stable primary key (e.g., external_id plus a timestamp) to avoid duplicates on retries.
- Backoff and jitter: implement exponential backoff with jitter to avoid thundering retries.
- Durable storage for state: track last_seen_id or a checkpoint in a small file to resume cleanly after restarts.
- Observability: structured logs, metrics counters, and a lightweight health check endpoint or console summary.
Project layout
- ingester/
- init.py
- main.py
- fetcher.py
- transformer.py
- sink.py
- checkpoint.py
- models.py
- config.py
- utils.py
- tests/
- test_transformer.py
- test_sink.py
- requirements.txt
Code walkthrough
1) Configuration
- Keep environment-driven configuration for API URLs, credentials, DB path, batch size, and backoff settings.
config.py
- Store defaults and a simple validation function.
from future import annotations
import os
from typing import Optional
class Config:
API_BASE_URL: str
API_TOKEN: Optional[str]
DB_PATH: str
BATCH_SIZE: int
MAX_RETRIES: int
INITIAL_BACKOFF: float
MAX_BACKOFF: float
CHECKPOINT_FILE: str
def load_config() -> Config:
cfg = Config()
cfg.API_BASE_URL = os.environ.get("API_BASE_URL", "https://api.example.com/data")
cfg.API_TOKEN = os.environ.get("API_TOKEN")
cfg.DB_PATH = os.environ.get("DB_PATH", "data.db")
cfg.BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
cfg.MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "8"))
cfg.INITIAL_BACKOFF = float(os.environ.get("INITIAL_BACKOFF", "0.5"))
cfg.MAX_BACKOFF = float(os.environ.get("MAX_BACKOFF", "60.0"))
cfg.CHECKPOINT_FILE = os.environ.get("CHECKPOINT_FILE", "checkpoint.txt")
return cfg
2) Data models
- Use pydantic for input validation to catch schema drift early.
models.py
from future import annotations
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class RawRecord(BaseModel):
id: str
ts: str # ISO timestamp
value: float
meta: Optional[dict] = Field(default_factory=dict)
class TransformedRecord(BaseModel):
id: str
ts: datetime
value: float
value_scaled: float
fetched_at: datetime
source: str = "external_api"
3) Checkpointing
- Persist last processed id or timestamp to resume on restart.
checkpoint.py
from future import annotations
import os
from datetime import datetime
from typing import Optional
class Checkpoint:
def init(self, path: str):
self.path = path
self.last_ts: Optional[str] = None
def load(self) -> Optional[str]:
if not os.path.exists(self.path):
return None
with open(self.path, "r", encoding="utf-8") as f:
ts = f.read().strip()
self.last_ts = ts or None
return self.last_ts
def save(self, ts: str) -> None:
with open(self.path, "w", encoding="utf-8") as f:
f.write(ts)
4) Fetcher
- Make a paginated/ batched request to the API. Include retry logic for transient failures.
fetcher.py
from future import annotations
import time
import requests
from typing import List
from .models import RawRecord
def fetch_batch(api_base: str, token: str | None, batch_size: int, since_ts: str | None) -> List[RawRecord]:
headers = {"Authorization": f"Bearer {token}"} if token else {}
params = {"limit": batch_size}
if since_ts:
params["since"] = since_ts
resp = requests.get(f"{api_base}/data", headers=headers, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
records = [RawRecord(**r) for r in data.get("items", [])]
return records
5) Transformer
- Validate and enrich; compute idempotency key concept via id + ts.
transformer.py
from future import annotations
from datetime import datetime
from typing import List
from .models import RawRecord, TransformedRecord
from math import log10
def transform(records: List[RawRecord]) -> List[TransformedRecord]:
out: List[TransformedRecord] = []
for r in records:
# Basic validation (ensure ts is parseable)
ts_dt = datetime.fromisoformat(r.ts)
# Simple enrichment: scale the value
value_scaled = r.value * 100.0 # example transformation
transformed = TransformedRecord(
id=r.id,
ts=ts_dt,
value=r.value,
value_scaled=value_scaled,
fetched_at=datetime.utcnow(),
)
out.append(transformed)
return out
6) Sink
- Use SQLite via SQLAlchemy for simplicity and portability.
sink.py
from future import annotations
from datetime import datetime
from typing import List
from sqlalchemy import create_engine, Column, String, Float, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
from .models import TransformedRecord
Base = declarative_base()
class Record(Base):
tablename = "records"
id = Column(String, primary_key=True)
ts = Column(DateTime, nullable=False)
value = Column(Float, nullable=False)
value_scaled = Column(Float, nullable=False)
fetched_at = Column(DateTime, nullable=False)
def setup_db(db_path: str):
engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)
def upsert_records(records: List[TransformedRecord], Session) -> int:
session = Session()
count = 0
for r in records:
exists = session.query(Record).filter_by(id=r.id).one_or_none()
if exists:
# skip duplicates; or implement update if needed
continue
rec = Record(
id=r.id,
ts=r.ts,
value=r.value,
value_scaled=r.value_scaled,
fetched_at=r.fetched_at
)
session.add(rec)
count += 1
session.commit()
session.close()
return count
7) Utilities
- Simple backoff with jitter and a small utility to log progress.
utils.py
import random
import time
def backoff_with_jitter(attempt: int, base: float, cap: float) -> float:
backoff = min(cap, base * (2 ** attempt))
jitter = random.uniform(0, backoff * 0.2)
return backoff + jitter
8) Main orchestration
- Tie everything together with retries and checkpointing.
main.py
from future import annotations
import time
import logging
from datetime import datetime
from typing import List, Optional
from config import load_config
from fetcher import fetch_batch
from transformer import transform
from sink import setup_db, upsert_records
from checkpoint import Checkpoint
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
cfg = load_config()
Session = setup_db(cfg.DB_PATH)
ckpt = Checkpoint(cfg.CHECKPOINT_FILE)
last_ts = ckpt.load()
logging.info(f"Starting ingestion. Last checkpoint: {last_ts}")
attempt = 0
while True:
try:
records = fetch_batch(cfg.API_BASE_URL, cfg.API_TOKEN, cfg.BATCH_SIZE, last_ts)
if not records:
logging.info("No new records. Sleeping briefly.")
time.sleep(5)
continue
transformed = transform(records)
inserted = upsert_records(transformed, Session)
logging.info(f"Inserted {inserted} new records.")
# Update checkpoint with the most recent ts from the batch
latest = max(r.ts for r in transformed)
ckpt.save(latest.isoformat())
last_ts = latest.isoformat()
attempt = 0 # reset on success
except Exception as e:
logging.exception("Ingestion failed, retrying with backoff.")
if attempt >= cfg.MAX_RETRIES:
logging.error("Max retries reached. Exiting.")
break
backoff = min(cfg.MAX_BACKOFF, cfg.INITIAL_BACKOFF * (2 ** attempt))
jitter = backoff * 0.2 * (0.5 + random.random() * 0.5)
wait = backoff + jitter
time.sleep(wait)
attempt += 1
if name == "main":
main()
Notes on the example
- This is a compact, self-contained example focused on reliability and clarity. For a production-grade pipeline, you’d likely want:
- A proper message queue (e.g., Kafka, RabbitMQ) to decouple producers and consumers.
- More robust schema evolution tooling and schema registry integration.
- Observability with traces (OpenTelemetry), metrics (Prometheus), and a health endpoint.
- Idempotent saves with upserts rather than strict inserts when duplicates occur.
- A richer checkpointing strategy (e.g., store last_n_ids, store per-record checkpointing).
- Tests around transformer logic, sink behavior, and retry policies.
How to run
- Prerequisites: Python 3.11+, pip
- Create a virtual environment and install dependencies:
- pip install requests pydantic SQLAlchemy
- Set environment variables as needed:
- API_BASE_URL, API_TOKEN, DB_PATH, CHECKPOINT_FILE
- Run:
- python -m ingester.main
A quick validation you can try
- Create a mock API locally or adjust fetch_batch to return synthetic data to verify the pipeline end-to-end.
- Ensure that after running, data is stored in data.db and checkpoint.txt updates with the latest timestamp.
Illustrative example
- Suppose the API returns:
- { id: "rec-001", ts: "2026-05-31T12:00:00Z", value: 12.5 }
- The transformer computes value_scaled = 1250.0 and sets fetched_at to the current time.
- The sink stores a row with id rec-001, ts as a datetime, value 12.5, value_scaled 1250.0, and the fetch timestamp.
Would you like this adapted to integrate with a real streaming platform (e.g., Kafka) or to include a simple API endpoint to query the ingested data?
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)