Building a Resilient, Type-Safe Data Processing Pipeline in Python from Scratch
Building a Resilient, Type-Safe Data Processing Pipeline in Python from Scratch
In modern software systems, data is the lifeblood. Teams often ship features faster than they can ensure data correctness and end-to-end reliability. This tutorial walks you through building a practical, end-to-end data processing pipeline in Python that emphasizes type safety, observability, and fault tolerance. By the end, you’ll have a runnable example you can adapt for ETL tasks, data enrichment, or streaming-style processing.
What you’ll build
- A small, local data pipeline that reads CSV input, validates and transforms records with strong typing, and writes enriched output.
- Clear error handling with structured logging and metrics.
- Simple checkpointing so the pipeline can resume after failures.
- A testable design with easy extension for more complex data sources (APIs, databases) or sinks (data warehouses).
Prerequisites
- Python 3.10+ (for typing features and match/case ergonomics)
- Basic familiarity with CSVs, functions, and error handling
- Optional: docker if you want to run in containerized form
Overview of design choices
- Type safety: Use Pydantic models to validate input data and transformations. Pydantic provides runtime validation with lightweight type hints, helping catch data quality issues early.
- Functional pipeline stages: Each stage is a pure function (input -> output) to improve testability and readability.
- Fault tolerance: Exceptions are caught with contextual information; bad records are logged and skipped (or diverted to a retry queue in a more advanced setup). A simple checkpoint mechanism records the last successfully processed row.
- Observability: Structured logging and a lightweight metrics counter for success/failure rates. Optional integration with Prometheus-style exporters.
- Extensibility: Easily swap data sources/sinks without reworking core logic.
Project structure
- pipeline/
- init.py
- models.py
- transforms.py
- stage.py
- runner.py
- checker.py
- tests/
- test_pipeline.py
- data/
- input.csv
- requirements.txt
Step 1: Define data models and validation
Why: Centralized data definitions make transformations safer and easier to reason about.
models.py
- Define input record shape
- Define enriched output shape
- Provide parsing helpers
Code (models.py):
from future import annotations
from typing import Optional
from pydantic import BaseModel, Field, ValidationError, validator
from datetime import datetime
class InputRecord(BaseModel):
id: int = Field(..., description="Unique record identifier")
name: str = Field(..., min_length=1, description="User full name")
email: str = Field(..., description="Email address")
signup_ts: str = Field(..., description="Signup timestamp in ISO format")
score: Optional[float] = Field(None, ge=0.0, le=100.0, description="Quality score")
@validator("signup_ts", pre=True)
def parse_signup_ts(cls, v):
# Accept either ISO format or epoch seconds as string
try:
return datetime.fromisoformat(v)
except Exception:
try:
return datetime.utcfromtimestamp(float(v))
except Exception as e:
raise ValueError(f"invalid signup_ts: {v}") from e
@validator("email")
def validate_email(cls, v):
if "@" not in v:
raise ValueError("invalid email")
return v
class OutputRecord(BaseModel):
id: int
name: str
email_domain: str
signup_date: str # ISO date string
score_bucket: str
is_active: bool
class Config:
frozen = True
def domain_from_email(email: str) -> str:
return email.split("@")[-1].lower()
def bucketize_score(score: Optional[float]) -> str:
if score is None:
return "unknown"
if score >= 90:
return "high"
if score >= 70:
return "medium"
if score >= 50:
return "low"
return "very_low"
def to_iso_date(dt: datetime) -> str:
return dt.date().isoformat()
Step 2: Implement transformation logic
Why: Keep stages small, testable, and deterministic.
transforms.py
- Validate input via InputRecord, derive fields, and produce OutputRecord
- Include an enrich_domain, format_date, and score bucketing
Code (transforms.py):
from future import annotations
from typing import Optional
from .models import InputRecord, OutputRecord, domain_from_email, bucketize_score, to_iso_date
def enrich_record(raw: dict) -> OutputRecord:
# Validate and normalize
record = InputRecord(**raw)
email_domain = domain_from_email(record.email)
signup_date = to_iso_date(record.signup_ts)
score_bucket = bucketize_score(record.score)
is_active = record.score is not None and record.score >= 60.0
return OutputRecord(
id=record.id,
name=record.name,
email_domain=email_domain,
signup_date=signup_date,
score_bucket=score_bucket,
is_active=is_active,
)
Step 3: Define stages and runner
Why: Compose the pipeline with clear stage boundaries and a minimal checkpoint.
stage.py
- Generic stage wrapper for composability
- Simple run loop with error collection
Code (stage.py):
from future import annotations
from typing import Callable, Iterable, TypeVar, Generic, List, Any, Optional
T = TypeVar("T")
R = TypeVar("R")
class Stage(Generic[T, R]):
def init(self, func: Callable[[T], R], name: str):
self.func = func
self.name = name
def __call__(self, input: T) -> R:
return self.func(input)
def compose(*stages: Stage) -> Callable[[T], R]:
def pipeline(input: T) -> R:
current: Any = input
for s in stages:
current = s(current)
return current
return pipeline
runner.py
- Read from CSV
- Validate and transform row-by-row
- Write to output CSV
- Simple checkpoint: store last processed id to a file
Code (runner.py):
from future import annotations
import csv
import json
from pathlib import Path
from typing import Dict, Iterable, List
from .transforms import enrich_record
from .models import InputRecord, OutputRecord
import logging
CHECKPOINT_PATH = Path(".pipeline_checkpoint")
OUTPUT_PATH = Path("data/output.csv")
INPUT_PATH = Path("data/input.csv")
logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def load_checkpoint() -> int:
if CHECKPOINT_PATH.exists():
with CHECKPOINT_PATH.open("r") as f:
try:
return int(f.read().strip())
except ValueError:
return 0
return 0
def save_checkpoint(last_id: int) -> None:
CHECKPOINT_PATH.write_text(str(last_id))
def read_input_rows(start_id: int) -> Iterable[Dict]:
with INPUT_PATH.open("r", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if int(row["id"]) > start_id:
yield row
def write_output(rows: List[OutputRecord]) -> None:
fieldnames = ["id", "name", "email_domain", "signup_date", "score_bucket", "is_active"]
OUTPUT_PATH.parent.mkdir(exist_ok=True, parents=True)
with OUTPUT_PATH.open("a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if OUTPUT_PATH.stat().st_size == 0:
writer.writeheader()
for r in rows:
writer.writerow(r.dict())
def run_pipeline():
checkpoint = load_checkpoint()
logger.info(f"Starting from checkpoint: {checkpoint}")
batch: List[OutputRecord] = []
last_processed_id = checkpoint
for row in read_input_rows(checkpoint):
try:
out = enrich_record(row)
batch.append(out)
last_processed_id = out.id
except Exception as e:
logger.exception(f"Failed to process row {row.get('id')}: {e}")
if len(batch) >= 100:
write_output(batch)
batch.clear()
save_checkpoint(last_processed_id)
if batch:
write_output(batch)
save_checkpoint(last_processed_id)
logger.info("Pipeline run complete.")
Step 4: Optional checks and tests
checker.py
- Basic validation to ensure output matches expected domain, date, etc.
Code (checker.py):
from future import annotations
from .models import OutputRecord
from typing import List
def basic_checks(outputs: List[OutputRecord]) -> List[str]:
issues: List[str] = []
for o in outputs:
if not o.name:
issues.append(f"Record {o.id} has empty name.")
if not o.email_domain:
issues.append(f"Record {o.id} missing email domain.")
if not o.signup_date:
issues.append(f"Record {o.id} missing signup_date.")
return issues
Step 5: Tests
tests/test_pipeline.py
- Validate transforms with a sample input row
- Validate bucket logic via direct OutputRecord creation is already tested via transforms
Code (tests/test_pipeline.py):
import unittest
from pipeline.transforms import enrich_record
from pipeline.models import InputRecord
class TestPipeline(unittest.TestCase):
def test_enrich_record_basic(self):
raw = {
"id": 1,
"name": "Alice Smith",
"email": "alice@example.com",
"signup_ts": "2024-01-02T12:34:56",
"score": 88.5
}
out = enrich_record(raw)
self.assertEqual(out.id, 1)
self.assertEqual(out.name, "Alice Smith")
self.assertEqual(out.email_domain, "example.com")
self.assertEqual(out.signup_date, "2024-01-02")
self.assertEqual(out.score_bucket, "high")
self.assertTrue(out.is_active)
if name == "main":
unittest.main()
Step 6: Data sample
data/input.csv
id,name,email,signup_ts,score
1,Alice Smith,alice@example.com,2024-01-02T12:34:56,88.5
2,Bob Jones,bob@sample.org,2024-02-03T09:21:00,72
3,Eve Adams,eve@invalid,2024-03-04T08:00:00,45
Step 7: Running the pipeline
- Install dependencies
- Run tests
- Create input.csv as above
- Execute python -m pipeline.runner
Example commands (assuming this project layout as described):
Prepare environment
python -m venv venv
source venv/bin/activate
pip install -r requirements.txtRun tests
pytest -qRun pipeline
python -m pipeline.runner
Notes on extensibility
- Swap input source: Replace read_input_rows to pull from a database or API, while keeping enrichment logic unchanged.
- Swap output sink: Instead of writing to CSV, push to a message queue or a data warehouse via a sink adapter.
- Improve checkpointing: Use durable storage like a database row or a file in a distributed store; support resume from a specific timestamp or id.
Common pitfalls and how to avoid
- Data drift: If input schemas change, add regression tests and a schema contract in InputRecord. Use semantic versioning for your pipeline contract.
- Large files: Process in streaming fashion with buffered writes instead of loading entire datasets into memory.
- Error storms: Implement backoff for transient errors and enrichment retries, rather than failing hard on first error.
Illustrative example: a small run
- You feed input.csv with three records (as above).
- The pipeline processes records 1 and 2, skipping 3 due to email validation failure in the raw data (eve@invalid). If you want to collect bad records, add a bad-record sink.
Why this approach helps in practice
- Clear, testable boundaries help a team maintain and extend the pipeline without accidentally breaking downstream systems.
- Type validation reduces late-stage bugs when data enters analytics or downstream services.
- Simple checkpointing avoids reprocessing large portions of data after a crash, saving compute time and ensuring progress visibility.
- Observability through logging and lightweight metrics makes it easier to diagnose data quality issues and monitor pipeline health.
Would you like me to tailor this to a specific data source (e.g., a PostgreSQL table or an API) or adapt the sink to write into a data warehouse (like Snowflake or BigQuery)? I can also provide containerized docker setup or add a small Prometheus-compatible metrics endpoint if you want real-time monitoring.
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)