Most Python tutorials teach you to write Python. This one teaches you to write pipelines.
The code is different. A pipeline has to be correct at 7am when no one is watching, recover gracefully from a flaky API, not destroy existing data on a re-run, and leave enough logs that you can diagnose a failure three days later. The language is the same — the patterns are not.
I've built pipelines processing anywhere from a few thousand rows to 1.5 million. The stack is always some combination of requests, pandas, SQLAlchemy, dbt, pytest, and Airflow. Here are the patterns that show up in every one.
Project Structure
Every pipeline project I start looks like this:
my_pipeline/
├── dags/ # Airflow DAG files
├── scripts/ # Extract, transform, load logic
├── models/ # SQLAlchemy table models / Pydantic schemas
├── tests/ # pytest tests
├── .env # Secrets — NEVER commit this
├── .gitignore
├── requirements.txt
└── docker-compose.yml
And the .gitignore always includes:
.env
.venv/
__pycache__/
*.pyc
*.log
airflow/logs/
projectsummary.md
The .env file holds all credentials. Nothing sensitive is hardcoded or committed.
Environment Setup
Every project gets its own isolated virtual environment. Never install pipeline dependencies globally — version conflicts between projects will cause hours of confusion.
# Create and activate
python3.11 -m venv .venv
source .venv/bin/activate # Linux/WSL
.venv\Scripts\Activate.ps1 # Windows PowerShell
# Install
pip install --upgrade pip
pip install -r requirements.txt
# Freeze current state before deleting venv
pip freeze > requirements.txt
Or with uv (10–100x faster installs):
uv venv .venv
source .venv/bin/activate
uv pip install -r requirements.txt
Environment Variables and Secrets
Load configuration from .env with python-dotenv. Every variable comes through os.getenv(), never hardcoded.
from dotenv import load_dotenv
import os
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
API_KEY = os.getenv("API_KEY")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
PORT = int(os.getenv("PORT", "8000"))
For required variables, fail fast at startup rather than at runtime mid-pipeline:
def require_env(key: str) -> str:
val = os.getenv(key)
if not val:
raise EnvironmentError(f"Required env var '{key}' is not set")
return val
DATABASE_URL = require_env("DATABASE_URL")
API_KEY = require_env("API_KEY")
This surfaces misconfiguration the moment the pipeline starts, not 10 minutes in when it tries to connect to the database.
Logging
Pipelines run unattended. When something fails, logs are all you have. Set up a proper logger in every script — not print() statements.
import logging
from logging.handlers import RotatingFileHandler
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(), # Console output
RotatingFileHandler("pipeline.log",
maxBytes=5_000_000,
backupCount=3) # Rolling log files
]
)
logger = logging.getLogger(__name__)
RotatingFileHandler keeps your last three 5MB log files and discards older ones. Without it, long-running pipelines produce log files that eventually fill the disk.
Use the right log level:
logger.debug("Fetching page 3 of symbol list") # Verbose trace
logger.info(f"Loaded {len(df)} rows for SCOM") # Normal progress
logger.warning(f"No data returned for {symbol}") # Something's off but not fatal
logger.error(f"HTTP error: {e}", exc_info=True) # Failure with traceback
logger.critical("Database unreachable — aborting") # Fatal, pipeline cannot continue
exc_info=True on error logs adds the full traceback. Always use it when logging exceptions.
In Airflow, use the Airflow logger so output appears in the task logs in the UI:
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log.info("Starting extraction")
Type Hints and Dataclasses
Type hints in pipeline code make function signatures self-documenting and catch errors before runtime. Use them consistently.
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from datetime import datetime
@dataclass
class PriceRecord:
symbol: str
close_price: float
volume: int
timestamp: datetime
source: str = "api"
metadata: Dict[str, Any] = field(default_factory=dict)
def is_valid(self) -> bool:
return self.close_price > 0 and self.volume >= 0
For generator functions that yield chunks of data (common in large file processing):
from typing import Generator
import pandas as pd
def read_in_chunks(path: str, size: int = 50_000) -> Generator[pd.DataFrame, None, None]:
for chunk in pd.read_csv(path, chunksize=size):
yield chunk
Fetching Data from APIs
The Basic Pattern
import requests
from datetime import datetime
def fetch_forex_rates(base: str = "USD", target: str = "KES") -> dict:
url = f"https://api.exchangerate-api.com/v4/latest/{base}"
response = requests.get(url, timeout=10)
response.raise_for_status() # Raises on 4xx/5xx — don't silently ignore HTTP errors
data = response.json()
return {
"pair": f"{base}/{target}",
"rate": data["rates"][target],
"fetched_at": datetime.utcnow(),
}
response.raise_for_status() is the most important line. Without it, a 429 or 503 silently returns None or partial data that looks valid and corrupts your pipeline.
Authentication Headers
def fetch_with_auth(endpoint: str, api_key: str) -> dict:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0", # Some APIs block Python's default agent
}
response = requests.get(endpoint, headers=headers, timeout=15)
response.raise_for_status()
return response.json()
Pagination
Most real APIs return data in pages. Fetch all of them:
def fetch_all_pages(base_url: str, params: dict = None) -> list:
all_records = []
page = 1
params = params or {}
while True:
params["page"] = page
response = requests.get(base_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
records = data.get("results", [])
if not records:
break
all_records.extend(records)
page += 1
time.sleep(0.5) # Respect rate limits
logger.info(f"Fetched {len(all_records)} total records across {page - 1} pages")
return all_records
Automatic Retries
Network errors and temporary 5xx responses happen in production. Build in retries at the session level so every request benefits:
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def make_session() -> requests.Session:
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1, # Waits: 1s, 2s, 4s
status_forcelist=[429, 500, 502, 503, 504] # Retry on these HTTP codes
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
session = make_session()
response = session.get("https://api.example.com/prices", timeout=10)
Robust Task Function
Put it all together with proper logging and error handling:
def safe_fetch_and_load(symbol: str, date: str) -> int:
logger.info(f"Starting fetch for {symbol} on {date}")
try:
df = fetch_prices(symbol, date)
if df.empty:
logger.warning(f"No data for {symbol} on {date}")
return 0
df = clean_prices(df)
load_to_db(df)
logger.info(f"Loaded {len(df)} rows for {symbol}")
return len(df)
except requests.exceptions.Timeout:
logger.error(f"Timeout fetching {symbol} — retrying on next run")
raise
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP {e.response.status_code} for {symbol}")
raise
except Exception as e:
logger.error(f"Unexpected error for {symbol}: {e}", exc_info=True)
raise
raise after logging re-raises the exception so Airflow or the calling code knows the task failed. Catching and swallowing exceptions silently is worse than crashing.
Pandas: The Transformation Layer
Loading Data
import pandas as pd
df = pd.read_csv("data.csv")
df = pd.read_csv("data.csv", parse_dates=["timestamp"])
df = pd.read_parquet("data.parquet", columns=["symbol", "close_price"])
df = pd.read_sql("SELECT * FROM stock_prices WHERE symbol = %s", engine, params=("SCOM",))
# Large CSV — process in chunks without loading everything into memory
chunks = []
for chunk in pd.read_csv("large.csv", chunksize=50_000):
chunks.append(process_chunk(chunk))
df = pd.concat(chunks, ignore_index=True)
First-Look Inspection
Before transforming anything, inspect:
df.shape # (rows, columns)
df.dtypes # Column types
df.isnull().sum() # Missing values per column
df.duplicated().sum() # Duplicate rows
df.memory_usage(deep=True).sum() / 1024**2 # Memory in MB
df.describe() # Min, max, mean, std
Cleaning
df.dropna(subset=["symbol", "close_price"], inplace=True)
df["volume"].fillna(0, inplace=True)
df.drop_duplicates(subset=["symbol", "timestamp"], keep="last", inplace=True)
df.rename(columns={"Ticker": "symbol", "Close Price": "close_price"}, inplace=True)
# Type casting — use errors="coerce" to turn bad values into NaN instead of crashing
df["close_price"] = pd.to_numeric(df["close_price"], errors="coerce")
df["volume"] = pd.to_numeric(df["volume"], errors="coerce").fillna(0).astype(int)
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
# Normalise strings
df["symbol"] = df["symbol"].str.strip().str.upper()
Aggregation
# Single column
df.groupby("symbol")["close_price"].mean()
# Multiple aggregations in one pass
summary = df.groupby("symbol").agg(
avg_price = ("close_price", "mean"),
total_volume = ("volume", "sum"),
high = ("close_price", "max"),
low = ("close_price", "min"),
count = ("close_price", "count"),
).reset_index()
Datetime Operations
df["date"] = df["timestamp"].dt.date
df["hour"] = df["timestamp"].dt.hour
df["day_of_week"] = df["timestamp"].dt.day_name()
df["ts_nairobi"] = df["timestamp"].dt.tz_convert("Africa/Nairobi")
Time-Series Resampling
# Build 1-hour OHLCV bars from tick data
df.set_index("timestamp", inplace=True)
ohlcv = (
df.groupby("symbol")
.resample("1h")
.agg(
open = ("close_price", "first"),
high = ("close_price", "max"),
low = ("close_price", "min"),
close = ("close_price", "last"),
volume = ("volume", "sum"),
)
.dropna()
.reset_index()
)
Adding Computed Columns
df["price_change"] = df["close_price"] - df["open_price"]
df["pct_change"] = df["price_change"] / df["open_price"] * 100
# Rolling average per symbol (using transform to keep row count)
df["ma_7"] = df.groupby("symbol")["close_price"].transform(
lambda x: x.rolling(7).mean()
)
Performance
Two rules that matter:
Vectorise instead of applying row-by-row:
df["gain"] = df["close"] - df["open"] # Fast — vectorised
df["gain"] = df.apply(lambda r: r["close"] - r["open"], axis=1) # Slow — row loop
Downcast types to reduce memory:
df["close_price"] = pd.to_numeric(df["close_price"], downcast="float")
df["volume"] = pd.to_numeric(df["volume"], downcast="integer")
On a DataFrame with 1M rows, downcasting float64 to float32 halves memory usage. On a machine with limited RAM, the difference between a pipeline succeeding and crashing is often type casting.
Writing Data to PostgreSQL
Setup
from sqlalchemy import create_engine
import os
engine = create_engine(
os.getenv("DATABASE_URL"),
pool_pre_ping=True # Checks connection before using it — avoids stale pool errors
)
Append (small DataFrames)
df.to_sql("stock_prices", engine, if_exists="append", index=False)
df.to_sql("stock_prices", engine, if_exists="append", index=False, chunksize=500)
to_sql with default settings is slow for large DataFrames — it inserts row by row. Use chunksize and consider the bulk patterns below.
Upsert (idempotent — safe to re-run)
For pipelines that re-process the same time range on retry, plain INSERT will create duplicates. Upsert prevents that:
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import Table, MetaData
meta = MetaData()
meta.reflect(bind=engine)
table = meta.tables["stock_prices"]
def upsert_dataframe(df: pd.DataFrame, table, engine, conflict_cols: list):
rows = df.to_dict(orient="records")
with engine.connect() as conn:
stmt = insert(table).values(rows)
update_cols = {
c.name: stmt.excluded[c.name]
for c in table.c
if c.name not in conflict_cols
}
stmt = stmt.on_conflict_do_update(
index_elements=conflict_cols,
set_=update_cols
)
conn.execute(stmt)
conn.commit()
upsert_dataframe(df, table, engine, conflict_cols=["symbol", "timestamp"])
Every pipeline I've built that writes to PostgreSQL uses this pattern. The conflict columns match the UNIQUE constraint on the table.
Bulk COPY (fastest — 10–100x faster than to_sql)
For loading large DataFrames, PostgreSQL's COPY command is dramatically faster than individual inserts:
import psycopg2
from io import StringIO
def bulk_copy(df: pd.DataFrame, table_name: str, conn_string: str):
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
buffer = StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cur.copy_from(buffer, table_name, sep=",", null="")
conn.commit()
cur.close()
conn.close()
logger.info(f"Bulk-copied {len(df)} rows into {table_name}")
On the LedgerSync pipeline processing 1.5M rows, switching from to_sql to COPY reduced the load step from 4 minutes to 18 seconds.
Testing Pipelines with pytest
Untested pipelines break silently. The minimal test suite for a pipeline covers: does the transform produce the right columns, does it reject bad data, and does it load correctly to the database.
Project structure
tests/
├── conftest.py # Shared fixtures
├── test_extract.py
├── test_transform.py
└── test_load.py
Fixtures in conftest.py
# tests/conftest.py
import pytest
import pandas as pd
from sqlalchemy import create_engine
@pytest.fixture
def sample_prices():
return pd.DataFrame({
"symbol": ["SCOM", "EABL", "KCB"],
"close_price": [14.5, 182.0, 43.0],
"volume": [1000, 500, 2000],
"timestamp": pd.to_datetime(["2025-01-01"] * 3, utc=True),
})
@pytest.fixture
def db_engine():
engine = create_engine("sqlite:///:memory:")
yield engine
engine.dispose()
Transform tests
def test_clean_strips_symbols(sample_prices):
sample_prices["symbol"] = [" scom ", "eabl", " KCB"]
result = clean_prices(sample_prices)
assert list(result["symbol"]) == ["SCOM", "EABL", "KCB"]
def test_clean_drops_negative_prices(sample_prices):
sample_prices.loc[0, "close_price"] = -1.0
result = clean_prices(sample_prices)
assert len(result) == 2
assert result["close_price"].min() > 0
def test_transform_adds_pct_change(sample_prices):
result = compute_pct_change(sample_prices)
assert "pct_change" in result.columns
assert result["pct_change"].notna().all()
Parametrized tests
@pytest.mark.parametrize("symbol,expected_tier", [
("SCOM", "low"),
("EABL", "high"),
])
def test_tier_assignment(symbol, expected_tier):
price = {"SCOM": 14.5, "EABL": 182.0}[symbol]
assert assign_tier(price) == expected_tier
Mocking external API calls
Tests should not call real APIs — they're slow, rate-limited, and return different data every run:
from unittest.mock import patch, MagicMock
@patch("scripts.extract.requests.get")
def test_fetch_returns_correct_pair(mock_get):
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: {"rates": {"KES": 129.5}}
)
mock_get.return_value.raise_for_status = lambda: None
result = fetch_forex_rates("USD", "KES")
assert result["pair"] == "USD/KES"
assert result["rate"] == 129.5
mock_get.assert_called_once()
Database load test
def test_data_loads_to_db(sample_prices, db_engine):
sample_prices.to_sql("prices", db_engine, if_exists="replace", index=False)
result = pd.read_sql("SELECT COUNT(*) AS n FROM prices", db_engine)
assert result["n"].iloc[0] == 3
Run tests:
pytest -v --tb=short tests/ # Verbose, short tracebacks
pytest -x tests/ # Stop on first failure
pytest -k "test_clean" tests/ # Run only tests matching name
Airflow: Orchestrating the Pipeline
Once you have working extract, transform, and load functions, wrapping them in an Airflow DAG is mostly adding decorators.
TaskFlow API (Airflow 2.x / 3.x)
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandas as pd
import os
@dag(
schedule="0 7 * * 1-5", # Weekdays at 7am
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": False,
},
tags=["nse", "stocks", "daily"]
)
def nse_daily_pipeline():
@task
def extract() -> list:
df = fetch_nse_data()
return df.to_dict(orient="records") # Must be JSON-serializable for XCom
@task
def transform(raw: list) -> list:
df = pd.DataFrame(raw)
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
df["symbol"] = df["symbol"].str.strip().str.upper()
df.dropna(subset=["symbol", "close_price"], inplace=True)
return df.to_dict(orient="records")
@task
def load(clean: list) -> int:
df = pd.DataFrame(clean)
engine = create_engine(os.getenv("DATABASE_URL"))
upsert_dataframe(df, ...) # Use the upsert pattern from above
return len(df)
@task
def notify(row_count: int):
logger.info(f"Pipeline complete. Loaded {row_count} rows.")
raw = extract()
clean = transform(raw)
count = load(clean)
notify(count)
nse_daily_pipeline()
XCom (the mechanism Airflow uses to pass data between tasks) stores data in the metadata database. For DataFrames larger than about 1MB, don't pass them through XCom — write to PostgreSQL or Parquet in the extract task and read from there in the transform task.
Waiting for a Condition with a Sensor
from airflow.sensors.python import PythonSensor
wait_for_api = PythonSensor(
task_id="wait_for_api_ready",
python_callable=lambda: requests.get(
"https://api.nse.co.ke/status", timeout=5
).status_code == 200,
timeout=3600,
poke_interval=60,
mode="reschedule" # Releases the worker slot while waiting — critical for production
)
mode="reschedule" is important. Without it, a sensor holds a worker slot indefinitely while it waits. On a small Airflow setup with 4 workers, 4 waiting sensors can deadlock the entire cluster.
Triggering with a Config (for backfills)
airflow dags trigger nse_daily_pipeline --conf '{"date": "2025-01-15"}'
@task
def extract(**context) -> list:
conf = context.get("dag_run").conf or {}
target_date = conf.get("date", str(datetime.today().date()))
df = fetch_nse_data(date=target_date)
return df.to_dict(orient="records")
Pipeline State Tracking
Incremental pipelines need to know where they left off. Track state in a PostgreSQL table rather than relying on Airflow's execution_date:
from sqlalchemy import text
CREATE_STATE_TABLE = """
CREATE TABLE IF NOT EXISTS pipeline_state (
pipeline_name VARCHAR(100) PRIMARY KEY,
last_successful_run TIMESTAMPTZ,
last_row_count INT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
"""
def get_last_run(pipeline_name: str, engine) -> Optional[datetime]:
with engine.connect() as conn:
row = conn.execute(text(
"SELECT last_successful_run FROM pipeline_state WHERE pipeline_name = :name"
), {"name": pipeline_name}).fetchone()
return row[0] if row else None
def update_pipeline_state(pipeline_name: str, row_count: int, engine):
with engine.connect() as conn:
conn.execute(text("""
INSERT INTO pipeline_state (pipeline_name, last_successful_run, last_row_count)
VALUES (:name, NOW(), :count)
ON CONFLICT (pipeline_name) DO UPDATE SET
last_successful_run = NOW(),
last_row_count = :count,
updated_at = NOW()
"""), {"name": pipeline_name, "count": row_count})
conn.commit()
The extract task queries get_last_run() to find its watermark. The load task calls update_pipeline_state() only after a successful load. If the pipeline fails mid-run, the watermark doesn't advance and the next run reprocesses from the last safe checkpoint.
When to Reach Beyond pandas
Polars for faster transforms on large in-memory DataFrames. The lazy API chains operations and executes them in a single pass:
import polars as pl
result = (
pl.scan_parquet("data/*.parquet")
.filter(pl.col("close_price") > 0)
.with_columns(pl.col("symbol").str.to_uppercase())
.group_by("symbol").agg(pl.col("close_price").mean().alias("avg_price"))
.sort("avg_price", descending=True)
.collect()
)
Polars can be 5–20x faster than pandas for the same transform on large datasets. It also uses all CPU cores by default.
Parquet over CSV once you're dealing with more than a few hundred thousand rows. It's compressed, columnar, and reads 10–50x faster. df.to_parquet("data.parquet", index=False) and pd.read_parquet("data.parquet", columns=["symbol", "price"]) are enough to get started.
Delta Lake when you need time travel, schema enforcement, and the ability to upsert into file-based storage:
from deltalake import DeltaTable, write_deltalake
write_deltalake("data/silver/prices", df, mode="append")
# Upsert into Delta table
dt = DeltaTable("data/silver/prices")
(
dt.merge(new_df, "s.symbol = t.symbol AND s.date = t.date", "s", "t")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
The Pattern That Ties Everything Together
Every pipeline I build follows this shape:
extract() → returns raw records (list of dicts)
transform() → returns cleaned records (list of dicts or DataFrame)
load() → upserts into PostgreSQL, returns row count
notify() → logs completion
Each function has one responsibility. Each can be tested independently. Each logs what it does and raises on failure. Wrapped in an Airflow DAG, it runs on schedule, retries on failure, and leaves a log trail you can read days later.
Python doesn't make this easy by default — but these patterns do.
Follow me on dev.to for more data engineering content, or browse the project code at github.com/declerke.



Top comments (0)