DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Building a Resilient, Type-Safe Data Processing Pipeline in Python from Scratch

Building a Resilient, Type-Safe Data Processing Pipeline in Python from Scratch

Building a Resilient, Type-Safe Data Processing Pipeline in Python from Scratch

In modern software systems, data is the lifeblood. Teams often ship features faster than they can ensure data correctness and end-to-end reliability. This tutorial walks you through building a practical, end-to-end data processing pipeline in Python that emphasizes type safety, observability, and fault tolerance. By the end, you’ll have a runnable example you can adapt for ETL tasks, data enrichment, or streaming-style processing.

What you’ll build

  • A small, local data pipeline that reads CSV input, validates and transforms records with strong typing, and writes enriched output.
  • Clear error handling with structured logging and metrics.
  • Simple checkpointing so the pipeline can resume after failures.
  • A testable design with easy extension for more complex data sources (APIs, databases) or sinks (data warehouses).

Prerequisites

  • Python 3.10+ (for typing features and match/case ergonomics)
  • Basic familiarity with CSVs, functions, and error handling
  • Optional: docker if you want to run in containerized form

Overview of design choices

  • Type safety: Use Pydantic models to validate input data and transformations. Pydantic provides runtime validation with lightweight type hints, helping catch data quality issues early.
  • Functional pipeline stages: Each stage is a pure function (input -> output) to improve testability and readability.
  • Fault tolerance: Exceptions are caught with contextual information; bad records are logged and skipped (or diverted to a retry queue in a more advanced setup). A simple checkpoint mechanism records the last successfully processed row.
  • Observability: Structured logging and a lightweight metrics counter for success/failure rates. Optional integration with Prometheus-style exporters.
  • Extensibility: Easily swap data sources/sinks without reworking core logic.

Project structure

  • pipeline/
    • init.py
    • models.py
    • transforms.py
    • stage.py
    • runner.py
    • checker.py
    • tests/
    • test_pipeline.py
    • data/
    • input.csv
  • requirements.txt

Step 1: Define data models and validation
Why: Centralized data definitions make transformations safer and easier to reason about.

models.py

  • Define input record shape
  • Define enriched output shape
  • Provide parsing helpers

Code (models.py):
from future import annotations

from typing import Optional
from pydantic import BaseModel, Field, ValidationError, validator
from datetime import datetime

class InputRecord(BaseModel):
id: int = Field(..., description="Unique record identifier")
name: str = Field(..., min_length=1, description="User full name")
email: str = Field(..., description="Email address")
signup_ts: str = Field(..., description="Signup timestamp in ISO format")
score: Optional[float] = Field(None, ge=0.0, le=100.0, description="Quality score")

@validator("signup_ts", pre=True)
def parse_signup_ts(cls, v):
    # Accept either ISO format or epoch seconds as string
    try:
        return datetime.fromisoformat(v)
    except Exception:
        try:
            return datetime.utcfromtimestamp(float(v))
        except Exception as e:
            raise ValueError(f"invalid signup_ts: {v}") from e

@validator("email")
def validate_email(cls, v):
    if "@" not in v:
        raise ValueError("invalid email")
    return v
Enter fullscreen mode Exit fullscreen mode

class OutputRecord(BaseModel):
id: int
name: str
email_domain: str
signup_date: str # ISO date string
score_bucket: str
is_active: bool

class Config:
    frozen = True
Enter fullscreen mode Exit fullscreen mode

def domain_from_email(email: str) -> str:
return email.split("@")[-1].lower()

def bucketize_score(score: Optional[float]) -> str:
if score is None:
return "unknown"
if score >= 90:
return "high"
if score >= 70:
return "medium"
if score >= 50:
return "low"
return "very_low"

def to_iso_date(dt: datetime) -> str:
return dt.date().isoformat()

Step 2: Implement transformation logic
Why: Keep stages small, testable, and deterministic.

transforms.py

  • Validate input via InputRecord, derive fields, and produce OutputRecord
  • Include an enrich_domain, format_date, and score bucketing

Code (transforms.py):
from future import annotations

from typing import Optional
from .models import InputRecord, OutputRecord, domain_from_email, bucketize_score, to_iso_date

def enrich_record(raw: dict) -> OutputRecord:
# Validate and normalize
record = InputRecord(**raw)

email_domain = domain_from_email(record.email)
signup_date = to_iso_date(record.signup_ts)

score_bucket = bucketize_score(record.score)

is_active = record.score is not None and record.score >= 60.0

return OutputRecord(
    id=record.id,
    name=record.name,
    email_domain=email_domain,
    signup_date=signup_date,
    score_bucket=score_bucket,
    is_active=is_active,
)
Enter fullscreen mode Exit fullscreen mode

Step 3: Define stages and runner
Why: Compose the pipeline with clear stage boundaries and a minimal checkpoint.

stage.py

  • Generic stage wrapper for composability
  • Simple run loop with error collection

Code (stage.py):
from future import annotations

from typing import Callable, Iterable, TypeVar, Generic, List, Any, Optional

T = TypeVar("T")
R = TypeVar("R")

class Stage(Generic[T, R]):
def init(self, func: Callable[[T], R], name: str):
self.func = func
self.name = name

def __call__(self, input: T) -> R:
    return self.func(input)
Enter fullscreen mode Exit fullscreen mode

def compose(*stages: Stage) -> Callable[[T], R]:
def pipeline(input: T) -> R:
current: Any = input
for s in stages:
current = s(current)
return current
return pipeline

runner.py

  • Read from CSV
  • Validate and transform row-by-row
  • Write to output CSV
  • Simple checkpoint: store last processed id to a file

Code (runner.py):
from future import annotations

import csv
import json
from pathlib import Path
from typing import Dict, Iterable, List
from .transforms import enrich_record
from .models import InputRecord, OutputRecord
import logging

CHECKPOINT_PATH = Path(".pipeline_checkpoint")
OUTPUT_PATH = Path("data/output.csv")
INPUT_PATH = Path("data/input.csv")

logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)

def load_checkpoint() -> int:
if CHECKPOINT_PATH.exists():
with CHECKPOINT_PATH.open("r") as f:
try:
return int(f.read().strip())
except ValueError:
return 0
return 0

def save_checkpoint(last_id: int) -> None:
CHECKPOINT_PATH.write_text(str(last_id))

def read_input_rows(start_id: int) -> Iterable[Dict]:
with INPUT_PATH.open("r", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if int(row["id"]) > start_id:
yield row

def write_output(rows: List[OutputRecord]) -> None:
fieldnames = ["id", "name", "email_domain", "signup_date", "score_bucket", "is_active"]
OUTPUT_PATH.parent.mkdir(exist_ok=True, parents=True)
with OUTPUT_PATH.open("a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if OUTPUT_PATH.stat().st_size == 0:
writer.writeheader()
for r in rows:
writer.writerow(r.dict())

def run_pipeline():
checkpoint = load_checkpoint()
logger.info(f"Starting from checkpoint: {checkpoint}")
batch: List[OutputRecord] = []
last_processed_id = checkpoint

for row in read_input_rows(checkpoint):
    try:
        out = enrich_record(row)
        batch.append(out)
        last_processed_id = out.id
    except Exception as e:
        logger.exception(f"Failed to process row {row.get('id')}: {e}")

    if len(batch) >= 100:
        write_output(batch)
        batch.clear()
        save_checkpoint(last_processed_id)

if batch:
    write_output(batch)
    save_checkpoint(last_processed_id)

logger.info("Pipeline run complete.")
Enter fullscreen mode Exit fullscreen mode

Step 4: Optional checks and tests
checker.py

  • Basic validation to ensure output matches expected domain, date, etc.

Code (checker.py):
from future import annotations

from .models import OutputRecord
from typing import List

def basic_checks(outputs: List[OutputRecord]) -> List[str]:
issues: List[str] = []
for o in outputs:
if not o.name:
issues.append(f"Record {o.id} has empty name.")
if not o.email_domain:
issues.append(f"Record {o.id} missing email domain.")
if not o.signup_date:
issues.append(f"Record {o.id} missing signup_date.")
return issues

Step 5: Tests
tests/test_pipeline.py

  • Validate transforms with a sample input row
  • Validate bucket logic via direct OutputRecord creation is already tested via transforms

Code (tests/test_pipeline.py):
import unittest
from pipeline.transforms import enrich_record
from pipeline.models import InputRecord

class TestPipeline(unittest.TestCase):
def test_enrich_record_basic(self):
raw = {
"id": 1,
"name": "Alice Smith",
"email": "alice@example.com",
"signup_ts": "2024-01-02T12:34:56",
"score": 88.5
}
out = enrich_record(raw)
self.assertEqual(out.id, 1)
self.assertEqual(out.name, "Alice Smith")
self.assertEqual(out.email_domain, "example.com")
self.assertEqual(out.signup_date, "2024-01-02")
self.assertEqual(out.score_bucket, "high")
self.assertTrue(out.is_active)

if name == "main":
unittest.main()

Step 6: Data sample
data/input.csv
id,name,email,signup_ts,score
1,Alice Smith,alice@example.com,2024-01-02T12:34:56,88.5
2,Bob Jones,bob@sample.org,2024-02-03T09:21:00,72
3,Eve Adams,eve@invalid,2024-03-04T08:00:00,45

Step 7: Running the pipeline

  • Install dependencies
  • Run tests
  • Create input.csv as above
  • Execute python -m pipeline.runner

Example commands (assuming this project layout as described):

  • Prepare environment
    python -m venv venv
    source venv/bin/activate
    pip install -r requirements.txt

  • Run tests
    pytest -q

  • Run pipeline
    python -m pipeline.runner

Notes on extensibility

  • Swap input source: Replace read_input_rows to pull from a database or API, while keeping enrichment logic unchanged.
  • Swap output sink: Instead of writing to CSV, push to a message queue or a data warehouse via a sink adapter.
  • Improve checkpointing: Use durable storage like a database row or a file in a distributed store; support resume from a specific timestamp or id.

Common pitfalls and how to avoid

  • Data drift: If input schemas change, add regression tests and a schema contract in InputRecord. Use semantic versioning for your pipeline contract.
  • Large files: Process in streaming fashion with buffered writes instead of loading entire datasets into memory.
  • Error storms: Implement backoff for transient errors and enrichment retries, rather than failing hard on first error.

Illustrative example: a small run

  • You feed input.csv with three records (as above).
  • The pipeline processes records 1 and 2, skipping 3 due to email validation failure in the raw data (eve@invalid). If you want to collect bad records, add a bad-record sink.

Why this approach helps in practice

  • Clear, testable boundaries help a team maintain and extend the pipeline without accidentally breaking downstream systems.
  • Type validation reduces late-stage bugs when data enters analytics or downstream services.
  • Simple checkpointing avoids reprocessing large portions of data after a crash, saving compute time and ensuring progress visibility.
  • Observability through logging and lightweight metrics makes it easier to diagnose data quality issues and monitor pipeline health.

Would you like me to tailor this to a specific data source (e.g., a PostgreSQL table or an API) or adapt the sink to write into a data warehouse (like Snowflake or BigQuery)? I can also provide containerized docker setup or add a small Prometheus-compatible metrics endpoint if you want real-time monitoring.

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)