DEV Community

Cover image for Python for Data Engineers: The Patterns I Use in Every Pipeline
De' Clerke
De' Clerke

Posted on

Python for Data Engineers: The Patterns I Use in Every Pipeline

Most Python tutorials teach you to write Python. This one teaches you to write pipelines.

The code is different. A pipeline has to be correct at 7am when no one is watching, recover gracefully from a flaky API, not destroy existing data on a re-run, and leave enough logs that you can diagnose a failure three days later. The language is the same — the patterns are not.

I've built pipelines processing anywhere from a few thousand rows to 1.5 million. The stack is always some combination of requests, pandas, SQLAlchemy, dbt, pytest, and Airflow. Here are the patterns that show up in every one.


Project Structure

Every pipeline project I start looks like this:

my_pipeline/
├── dags/               # Airflow DAG files
├── scripts/            # Extract, transform, load logic
├── models/             # SQLAlchemy table models / Pydantic schemas
├── tests/              # pytest tests
├── .env                # Secrets — NEVER commit this
├── .gitignore
├── requirements.txt
└── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

And the .gitignore always includes:

.env
.venv/
__pycache__/
*.pyc
*.log
airflow/logs/
projectsummary.md
Enter fullscreen mode Exit fullscreen mode

The .env file holds all credentials. Nothing sensitive is hardcoded or committed.


Environment Setup

Every project gets its own isolated virtual environment. Never install pipeline dependencies globally — version conflicts between projects will cause hours of confusion.

# Create and activate
python3.11 -m venv .venv
source .venv/bin/activate        # Linux/WSL
.venv\Scripts\Activate.ps1       # Windows PowerShell

# Install
pip install --upgrade pip
pip install -r requirements.txt

# Freeze current state before deleting venv
pip freeze > requirements.txt
Enter fullscreen mode Exit fullscreen mode

Or with uv (10–100x faster installs):

uv venv .venv
source .venv/bin/activate
uv pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Environment Variables and Secrets

Load configuration from .env with python-dotenv. Every variable comes through os.getenv(), never hardcoded.

from dotenv import load_dotenv
import os

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")
API_KEY      = os.getenv("API_KEY")
DEBUG        = os.getenv("DEBUG", "false").lower() == "true"
PORT         = int(os.getenv("PORT", "8000"))
Enter fullscreen mode Exit fullscreen mode

For required variables, fail fast at startup rather than at runtime mid-pipeline:

def require_env(key: str) -> str:
    val = os.getenv(key)
    if not val:
        raise EnvironmentError(f"Required env var '{key}' is not set")
    return val

DATABASE_URL = require_env("DATABASE_URL")
API_KEY      = require_env("API_KEY")
Enter fullscreen mode Exit fullscreen mode

This surfaces misconfiguration the moment the pipeline starts, not 10 minutes in when it tries to connect to the database.


Logging

Pipelines run unattended. When something fails, logs are all you have. Set up a proper logger in every script — not print() statements.

import logging
from logging.handlers import RotatingFileHandler

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.StreamHandler(),                             # Console output
        RotatingFileHandler("pipeline.log",
                            maxBytes=5_000_000,
                            backupCount=3)                   # Rolling log files
    ]
)
logger = logging.getLogger(__name__)
Enter fullscreen mode Exit fullscreen mode

RotatingFileHandler keeps your last three 5MB log files and discards older ones. Without it, long-running pipelines produce log files that eventually fill the disk.

Use the right log level:

logger.debug("Fetching page 3 of symbol list")       # Verbose trace
logger.info(f"Loaded {len(df)} rows for SCOM")       # Normal progress
logger.warning(f"No data returned for {symbol}")     # Something's off but not fatal
logger.error(f"HTTP error: {e}", exc_info=True)      # Failure with traceback
logger.critical("Database unreachable — aborting")   # Fatal, pipeline cannot continue
Enter fullscreen mode Exit fullscreen mode

exc_info=True on error logs adds the full traceback. Always use it when logging exceptions.

In Airflow, use the Airflow logger so output appears in the task logs in the UI:

from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log.info("Starting extraction")
Enter fullscreen mode Exit fullscreen mode

Type Hints and Dataclasses

Type hints in pipeline code make function signatures self-documenting and catch errors before runtime. Use them consistently.

from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from datetime import datetime

@dataclass
class PriceRecord:
    symbol: str
    close_price: float
    volume: int
    timestamp: datetime
    source: str = "api"
    metadata: Dict[str, Any] = field(default_factory=dict)

    def is_valid(self) -> bool:
        return self.close_price > 0 and self.volume >= 0
Enter fullscreen mode Exit fullscreen mode

For generator functions that yield chunks of data (common in large file processing):

from typing import Generator
import pandas as pd

def read_in_chunks(path: str, size: int = 50_000) -> Generator[pd.DataFrame, None, None]:
    for chunk in pd.read_csv(path, chunksize=size):
        yield chunk
Enter fullscreen mode Exit fullscreen mode

Fetching Data from APIs

The Basic Pattern

import requests
from datetime import datetime

def fetch_forex_rates(base: str = "USD", target: str = "KES") -> dict:
    url = f"https://api.exchangerate-api.com/v4/latest/{base}"
    response = requests.get(url, timeout=10)
    response.raise_for_status()   # Raises on 4xx/5xx — don't silently ignore HTTP errors
    data = response.json()
    return {
        "pair":       f"{base}/{target}",
        "rate":       data["rates"][target],
        "fetched_at": datetime.utcnow(),
    }
Enter fullscreen mode Exit fullscreen mode

response.raise_for_status() is the most important line. Without it, a 429 or 503 silently returns None or partial data that looks valid and corrupts your pipeline.

Authentication Headers

def fetch_with_auth(endpoint: str, api_key: str) -> dict:
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type":  "application/json",
        "User-Agent":    "Mozilla/5.0",   # Some APIs block Python's default agent
    }
    response = requests.get(endpoint, headers=headers, timeout=15)
    response.raise_for_status()
    return response.json()
Enter fullscreen mode Exit fullscreen mode

Pagination

Most real APIs return data in pages. Fetch all of them:

def fetch_all_pages(base_url: str, params: dict = None) -> list:
    all_records = []
    page = 1
    params = params or {}

    while True:
        params["page"] = page
        response = requests.get(base_url, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()

        records = data.get("results", [])
        if not records:
            break

        all_records.extend(records)
        page += 1
        time.sleep(0.5)   # Respect rate limits

    logger.info(f"Fetched {len(all_records)} total records across {page - 1} pages")
    return all_records
Enter fullscreen mode Exit fullscreen mode

Automatic Retries

Network errors and temporary 5xx responses happen in production. Build in retries at the session level so every request benefits:

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def make_session() -> requests.Session:
    session = requests.Session()
    retry = Retry(
        total=3,
        backoff_factor=1,                           # Waits: 1s, 2s, 4s
        status_forcelist=[429, 500, 502, 503, 504]  # Retry on these HTTP codes
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

session = make_session()
response = session.get("https://api.example.com/prices", timeout=10)
Enter fullscreen mode Exit fullscreen mode

Robust Task Function

Put it all together with proper logging and error handling:

def safe_fetch_and_load(symbol: str, date: str) -> int:
    logger.info(f"Starting fetch for {symbol} on {date}")
    try:
        df = fetch_prices(symbol, date)
        if df.empty:
            logger.warning(f"No data for {symbol} on {date}")
            return 0
        df = clean_prices(df)
        load_to_db(df)
        logger.info(f"Loaded {len(df)} rows for {symbol}")
        return len(df)
    except requests.exceptions.Timeout:
        logger.error(f"Timeout fetching {symbol} — retrying on next run")
        raise
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP {e.response.status_code} for {symbol}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error for {symbol}: {e}", exc_info=True)
        raise
Enter fullscreen mode Exit fullscreen mode

raise after logging re-raises the exception so Airflow or the calling code knows the task failed. Catching and swallowing exceptions silently is worse than crashing.


Pandas: The Transformation Layer

Loading Data

import pandas as pd

df = pd.read_csv("data.csv")
df = pd.read_csv("data.csv", parse_dates=["timestamp"])
df = pd.read_parquet("data.parquet", columns=["symbol", "close_price"])
df = pd.read_sql("SELECT * FROM stock_prices WHERE symbol = %s", engine, params=("SCOM",))

# Large CSV — process in chunks without loading everything into memory
chunks = []
for chunk in pd.read_csv("large.csv", chunksize=50_000):
    chunks.append(process_chunk(chunk))
df = pd.concat(chunks, ignore_index=True)
Enter fullscreen mode Exit fullscreen mode

First-Look Inspection

Before transforming anything, inspect:

df.shape                                              # (rows, columns)
df.dtypes                                             # Column types
df.isnull().sum()                                     # Missing values per column
df.duplicated().sum()                                 # Duplicate rows
df.memory_usage(deep=True).sum() / 1024**2            # Memory in MB
df.describe()                                         # Min, max, mean, std
Enter fullscreen mode Exit fullscreen mode

Cleaning

df.dropna(subset=["symbol", "close_price"], inplace=True)
df["volume"].fillna(0, inplace=True)
df.drop_duplicates(subset=["symbol", "timestamp"], keep="last", inplace=True)
df.rename(columns={"Ticker": "symbol", "Close Price": "close_price"}, inplace=True)

# Type casting — use errors="coerce" to turn bad values into NaN instead of crashing
df["close_price"] = pd.to_numeric(df["close_price"], errors="coerce")
df["volume"]      = pd.to_numeric(df["volume"],      errors="coerce").fillna(0).astype(int)
df["timestamp"]   = pd.to_datetime(df["timestamp"],  utc=True)

# Normalise strings
df["symbol"] = df["symbol"].str.strip().str.upper()
Enter fullscreen mode Exit fullscreen mode

Aggregation

# Single column
df.groupby("symbol")["close_price"].mean()

# Multiple aggregations in one pass
summary = df.groupby("symbol").agg(
    avg_price    = ("close_price", "mean"),
    total_volume = ("volume",      "sum"),
    high         = ("close_price", "max"),
    low          = ("close_price", "min"),
    count        = ("close_price", "count"),
).reset_index()
Enter fullscreen mode Exit fullscreen mode

Datetime Operations

df["date"]        = df["timestamp"].dt.date
df["hour"]        = df["timestamp"].dt.hour
df["day_of_week"] = df["timestamp"].dt.day_name()
df["ts_nairobi"]  = df["timestamp"].dt.tz_convert("Africa/Nairobi")
Enter fullscreen mode Exit fullscreen mode

Time-Series Resampling

# Build 1-hour OHLCV bars from tick data
df.set_index("timestamp", inplace=True)
ohlcv = (
    df.groupby("symbol")
    .resample("1h")
    .agg(
        open   = ("close_price", "first"),
        high   = ("close_price", "max"),
        low    = ("close_price", "min"),
        close  = ("close_price", "last"),
        volume = ("volume",      "sum"),
    )
    .dropna()
    .reset_index()
)
Enter fullscreen mode Exit fullscreen mode

Adding Computed Columns

df["price_change"] = df["close_price"] - df["open_price"]
df["pct_change"]   = df["price_change"] / df["open_price"] * 100

# Rolling average per symbol (using transform to keep row count)
df["ma_7"] = df.groupby("symbol")["close_price"].transform(
    lambda x: x.rolling(7).mean()
)
Enter fullscreen mode Exit fullscreen mode

Performance

Two rules that matter:

Vectorise instead of applying row-by-row:

df["gain"] = df["close"] - df["open"]                            # Fast — vectorised
df["gain"] = df.apply(lambda r: r["close"] - r["open"], axis=1) # Slow — row loop
Enter fullscreen mode Exit fullscreen mode

Downcast types to reduce memory:

df["close_price"] = pd.to_numeric(df["close_price"], downcast="float")
df["volume"]      = pd.to_numeric(df["volume"],      downcast="integer")
Enter fullscreen mode Exit fullscreen mode

On a DataFrame with 1M rows, downcasting float64 to float32 halves memory usage. On a machine with limited RAM, the difference between a pipeline succeeding and crashing is often type casting.


Writing Data to PostgreSQL

Setup

from sqlalchemy import create_engine
import os

engine = create_engine(
    os.getenv("DATABASE_URL"),
    pool_pre_ping=True    # Checks connection before using it — avoids stale pool errors
)
Enter fullscreen mode Exit fullscreen mode

Append (small DataFrames)

df.to_sql("stock_prices", engine, if_exists="append", index=False)
df.to_sql("stock_prices", engine, if_exists="append", index=False, chunksize=500)
Enter fullscreen mode Exit fullscreen mode

to_sql with default settings is slow for large DataFrames — it inserts row by row. Use chunksize and consider the bulk patterns below.

Upsert (idempotent — safe to re-run)

For pipelines that re-process the same time range on retry, plain INSERT will create duplicates. Upsert prevents that:

from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import Table, MetaData

meta = MetaData()
meta.reflect(bind=engine)
table = meta.tables["stock_prices"]

def upsert_dataframe(df: pd.DataFrame, table, engine, conflict_cols: list):
    rows = df.to_dict(orient="records")
    with engine.connect() as conn:
        stmt = insert(table).values(rows)
        update_cols = {
            c.name: stmt.excluded[c.name]
            for c in table.c
            if c.name not in conflict_cols
        }
        stmt = stmt.on_conflict_do_update(
            index_elements=conflict_cols,
            set_=update_cols
        )
        conn.execute(stmt)
        conn.commit()

upsert_dataframe(df, table, engine, conflict_cols=["symbol", "timestamp"])
Enter fullscreen mode Exit fullscreen mode

Every pipeline I've built that writes to PostgreSQL uses this pattern. The conflict columns match the UNIQUE constraint on the table.

Bulk COPY (fastest — 10–100x faster than to_sql)

For loading large DataFrames, PostgreSQL's COPY command is dramatically faster than individual inserts:

import psycopg2
from io import StringIO

def bulk_copy(df: pd.DataFrame, table_name: str, conn_string: str):
    conn = psycopg2.connect(conn_string)
    cur  = conn.cursor()
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)
    cur.copy_from(buffer, table_name, sep=",", null="")
    conn.commit()
    cur.close()
    conn.close()
    logger.info(f"Bulk-copied {len(df)} rows into {table_name}")
Enter fullscreen mode Exit fullscreen mode

On the LedgerSync pipeline processing 1.5M rows, switching from to_sql to COPY reduced the load step from 4 minutes to 18 seconds.


Testing Pipelines with pytest

Untested pipelines break silently. The minimal test suite for a pipeline covers: does the transform produce the right columns, does it reject bad data, and does it load correctly to the database.

Project structure

tests/
├── conftest.py          # Shared fixtures
├── test_extract.py
├── test_transform.py
└── test_load.py
Enter fullscreen mode Exit fullscreen mode

Fixtures in conftest.py

# tests/conftest.py
import pytest
import pandas as pd
from sqlalchemy import create_engine

@pytest.fixture
def sample_prices():
    return pd.DataFrame({
        "symbol":      ["SCOM", "EABL", "KCB"],
        "close_price": [14.5,   182.0,  43.0],
        "volume":      [1000,   500,    2000],
        "timestamp":   pd.to_datetime(["2025-01-01"] * 3, utc=True),
    })

@pytest.fixture
def db_engine():
    engine = create_engine("sqlite:///:memory:")
    yield engine
    engine.dispose()
Enter fullscreen mode Exit fullscreen mode

Transform tests

def test_clean_strips_symbols(sample_prices):
    sample_prices["symbol"] = [" scom ", "eabl", " KCB"]
    result = clean_prices(sample_prices)
    assert list(result["symbol"]) == ["SCOM", "EABL", "KCB"]

def test_clean_drops_negative_prices(sample_prices):
    sample_prices.loc[0, "close_price"] = -1.0
    result = clean_prices(sample_prices)
    assert len(result) == 2
    assert result["close_price"].min() > 0

def test_transform_adds_pct_change(sample_prices):
    result = compute_pct_change(sample_prices)
    assert "pct_change" in result.columns
    assert result["pct_change"].notna().all()
Enter fullscreen mode Exit fullscreen mode

Parametrized tests

@pytest.mark.parametrize("symbol,expected_tier", [
    ("SCOM", "low"),
    ("EABL", "high"),
])
def test_tier_assignment(symbol, expected_tier):
    price = {"SCOM": 14.5, "EABL": 182.0}[symbol]
    assert assign_tier(price) == expected_tier
Enter fullscreen mode Exit fullscreen mode

Mocking external API calls

Tests should not call real APIs — they're slow, rate-limited, and return different data every run:

from unittest.mock import patch, MagicMock

@patch("scripts.extract.requests.get")
def test_fetch_returns_correct_pair(mock_get):
    mock_get.return_value = MagicMock(
        status_code=200,
        json=lambda: {"rates": {"KES": 129.5}}
    )
    mock_get.return_value.raise_for_status = lambda: None

    result = fetch_forex_rates("USD", "KES")

    assert result["pair"] == "USD/KES"
    assert result["rate"] == 129.5
    mock_get.assert_called_once()
Enter fullscreen mode Exit fullscreen mode

Database load test

def test_data_loads_to_db(sample_prices, db_engine):
    sample_prices.to_sql("prices", db_engine, if_exists="replace", index=False)
    result = pd.read_sql("SELECT COUNT(*) AS n FROM prices", db_engine)
    assert result["n"].iloc[0] == 3
Enter fullscreen mode Exit fullscreen mode

Run tests:

pytest -v --tb=short tests/          # Verbose, short tracebacks
pytest -x tests/                     # Stop on first failure
pytest -k "test_clean" tests/        # Run only tests matching name
Enter fullscreen mode Exit fullscreen mode

Airflow: Orchestrating the Pipeline

Once you have working extract, transform, and load functions, wrapping them in an Airflow DAG is mostly adding decorators.

TaskFlow API (Airflow 2.x / 3.x)

from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandas as pd
import os

@dag(
    schedule="0 7 * * 1-5",         # Weekdays at 7am
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "email_on_failure": False,
    },
    tags=["nse", "stocks", "daily"]
)
def nse_daily_pipeline():

    @task
    def extract() -> list:
        df = fetch_nse_data()
        return df.to_dict(orient="records")   # Must be JSON-serializable for XCom

    @task
    def transform(raw: list) -> list:
        df = pd.DataFrame(raw)
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        df["symbol"]    = df["symbol"].str.strip().str.upper()
        df.dropna(subset=["symbol", "close_price"], inplace=True)
        return df.to_dict(orient="records")

    @task
    def load(clean: list) -> int:
        df = pd.DataFrame(clean)
        engine = create_engine(os.getenv("DATABASE_URL"))
        upsert_dataframe(df, ...)   # Use the upsert pattern from above
        return len(df)

    @task
    def notify(row_count: int):
        logger.info(f"Pipeline complete. Loaded {row_count} rows.")

    raw   = extract()
    clean = transform(raw)
    count = load(clean)
    notify(count)

nse_daily_pipeline()
Enter fullscreen mode Exit fullscreen mode

XCom (the mechanism Airflow uses to pass data between tasks) stores data in the metadata database. For DataFrames larger than about 1MB, don't pass them through XCom — write to PostgreSQL or Parquet in the extract task and read from there in the transform task.

Waiting for a Condition with a Sensor

from airflow.sensors.python import PythonSensor

wait_for_api = PythonSensor(
    task_id="wait_for_api_ready",
    python_callable=lambda: requests.get(
        "https://api.nse.co.ke/status", timeout=5
    ).status_code == 200,
    timeout=3600,
    poke_interval=60,
    mode="reschedule"    # Releases the worker slot while waiting — critical for production
)
Enter fullscreen mode Exit fullscreen mode

mode="reschedule" is important. Without it, a sensor holds a worker slot indefinitely while it waits. On a small Airflow setup with 4 workers, 4 waiting sensors can deadlock the entire cluster.

Triggering with a Config (for backfills)

airflow dags trigger nse_daily_pipeline --conf '{"date": "2025-01-15"}'
Enter fullscreen mode Exit fullscreen mode
@task
def extract(**context) -> list:
    conf = context.get("dag_run").conf or {}
    target_date = conf.get("date", str(datetime.today().date()))
    df = fetch_nse_data(date=target_date)
    return df.to_dict(orient="records")
Enter fullscreen mode Exit fullscreen mode

Pipeline State Tracking

Incremental pipelines need to know where they left off. Track state in a PostgreSQL table rather than relying on Airflow's execution_date:

from sqlalchemy import text

CREATE_STATE_TABLE = """
CREATE TABLE IF NOT EXISTS pipeline_state (
    pipeline_name       VARCHAR(100) PRIMARY KEY,
    last_successful_run TIMESTAMPTZ,
    last_row_count      INT,
    updated_at          TIMESTAMPTZ DEFAULT NOW()
);
"""

def get_last_run(pipeline_name: str, engine) -> Optional[datetime]:
    with engine.connect() as conn:
        row = conn.execute(text(
            "SELECT last_successful_run FROM pipeline_state WHERE pipeline_name = :name"
        ), {"name": pipeline_name}).fetchone()
        return row[0] if row else None

def update_pipeline_state(pipeline_name: str, row_count: int, engine):
    with engine.connect() as conn:
        conn.execute(text("""
            INSERT INTO pipeline_state (pipeline_name, last_successful_run, last_row_count)
            VALUES (:name, NOW(), :count)
            ON CONFLICT (pipeline_name) DO UPDATE SET
                last_successful_run = NOW(),
                last_row_count      = :count,
                updated_at          = NOW()
        """), {"name": pipeline_name, "count": row_count})
        conn.commit()
Enter fullscreen mode Exit fullscreen mode

The extract task queries get_last_run() to find its watermark. The load task calls update_pipeline_state() only after a successful load. If the pipeline fails mid-run, the watermark doesn't advance and the next run reprocesses from the last safe checkpoint.


When to Reach Beyond pandas

Polars for faster transforms on large in-memory DataFrames. The lazy API chains operations and executes them in a single pass:

import polars as pl

result = (
    pl.scan_parquet("data/*.parquet")
    .filter(pl.col("close_price") > 0)
    .with_columns(pl.col("symbol").str.to_uppercase())
    .group_by("symbol").agg(pl.col("close_price").mean().alias("avg_price"))
    .sort("avg_price", descending=True)
    .collect()
)
Enter fullscreen mode Exit fullscreen mode

Polars can be 5–20x faster than pandas for the same transform on large datasets. It also uses all CPU cores by default.

Parquet over CSV once you're dealing with more than a few hundred thousand rows. It's compressed, columnar, and reads 10–50x faster. df.to_parquet("data.parquet", index=False) and pd.read_parquet("data.parquet", columns=["symbol", "price"]) are enough to get started.

Delta Lake when you need time travel, schema enforcement, and the ability to upsert into file-based storage:

from deltalake import DeltaTable, write_deltalake

write_deltalake("data/silver/prices", df, mode="append")

# Upsert into Delta table
dt = DeltaTable("data/silver/prices")
(
    dt.merge(new_df, "s.symbol = t.symbol AND s.date = t.date", "s", "t")
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)
Enter fullscreen mode Exit fullscreen mode

The Pattern That Ties Everything Together

Every pipeline I build follows this shape:

extract()   → returns raw records (list of dicts)
transform() → returns cleaned records (list of dicts or DataFrame)
load()      → upserts into PostgreSQL, returns row count
notify()    → logs completion
Enter fullscreen mode Exit fullscreen mode

Each function has one responsibility. Each can be tested independently. Each logs what it does and raises on failure. Wrapped in an Airflow DAG, it runs on schedule, retries on failure, and leaves a log trail you can read days later.

Python doesn't make this easy by default — but these patterns do.


Follow me on dev.to for more data engineering content, or browse the project code at github.com/declerke.

Top comments (0)