DEV Community

Cover image for 5 DIY Python Decorators for Building Cleaner Data Pipelines
Bala Priya C
Bala Priya C

Posted on

5 DIY Python Decorators for Building Cleaner Data Pipelines

Data pipelines often tend to accumulate the same boilerplate in a lot of places. You'll likely have retry logic copy-pasted across functions, timing code scattered through the codebase, validation checks buried inside business logic, and more. Decorators are a clean way to pull that stuff out.

They let you separate what a function does from how it behaves, and in data engineering, that's a useful separation to make.

This article walks through five practical decorators you can use to keep your data pipelines maintainable.

You can get the code on GitHub.


Prerequisites

Before we get started, you should be comfortable with:

  • Python functions as first-class objects — functions can be passed as arguments and returned from other functions
  • *args and **kwargs — used throughout to make decorators flexible enough to wrap any function signature
  • Basic exception handlingtry/except/raise blocks appear in several decorators
  • functools module — specifically functools.wraps, explained in the next section

You don't need to have written a decorator for data pipelines before. That's what this article is for.


How Python Decorators Work

A decorator is a function that takes another function as input and returns a new function as output. The @ symbol is syntactic sugar to make it read cleanly.

Here's the simplest possible example:

def shout(func):
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        return str(result).upper()
    return wrapper

@shout
def greet(name):
    return f"hello, {name}"

print(greet("world"))
# HELLO, WORLD
Enter fullscreen mode Exit fullscreen mode

When Python sees @shout above greet, it does this behind the scenes:

greet = shout(greet)
Enter fullscreen mode Exit fullscreen mode

So calling greet("world") calls wrapper("world"), which calls the original greet, gets "hello, world", and returns "HELLO, WORLD". The original function is unchanged; it's just been wrapped.

The wrapper uses *args and **kwargs so it can forward any arguments without knowing them in advance. This is what makes decorators reusable across functions with different signatures.

Note: without @functools.wraps(func), your decorated function loses its original name and docstring. greet.__name__ would show wrapper instead of greet, which makes debugging and log output harder. Every decorator in this article includes functools.wraps for this reason.


1. @retry — Handle Transient Failures

In many projects, database connections and API calls often fail, a file is briefly locked by another process, and more. The common fix is wrapping every external call in a for loop with a try/except. But copy-pasting that pattern across your codebase gets messy fast.

Here's a reusable @retry decorator:

import time
import functools

def retry(max_attempts=3, delay=2, exceptions=(Exception,)):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        raise
                    print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Here we simulate a flaky SQLite read that fails the first two times before succeeding on the third attempt:

import sqlite3

# Setup: create an in-memory DB with some orders
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (order_id, customer_id, amount, status)")
conn.executemany("INSERT INTO orders VALUES (?, ?, ?, ?)", [
    (1, 101, 59.99,  "completed"),
    (2, 102, 120.00, "completed"),
    (3, 101, 34.50,  "refunded"),
    (4, 103, 89.00,  "completed"),
])
conn.commit()

# Simulate a connection that fails twice before succeeding
attempt_count = 0

@retry(max_attempts=3, delay=1, exceptions=(sqlite3.OperationalError,))
def fetch_completed_orders(conn):
    global attempt_count
    attempt_count += 1
    if attempt_count < 3:
        raise sqlite3.OperationalError("database is locked")
    cursor = conn.execute("SELECT * FROM orders WHERE status = 'completed'")
    return cursor.fetchall()


rows = fetch_completed_orders(conn)

print(rows)
Enter fullscreen mode Exit fullscreen mode

Output:

Attempt 1 failed: database is locked. Retrying in 1s...
Attempt 2 failed: database is locked. Retrying in 1s...
[(1, 101, 59.99, 'completed'), (2, 102, 120.0, 'completed'), (4, 103, 89.0, 'completed')]
Enter fullscreen mode Exit fullscreen mode

We're specific about sqlite3.OperationalError that covers locks and connection issues. A sqlite3.ProgrammingError from a malformed query is not worth retrying; it'll fail the same way every time.

There are three nested functions here because retry takes configuration arguments. retry(...) returns decorator, which is the actual decorator. decorator receives the function and returns wrapper.


2. @timer — Measure Execution Time

When a pipeline run is slower than expected, the naive approach is sprinkling time.time() calls through your code. It works, but timing logic ends up tangled with business logic everywhere, and you have to clean it all up when you're done. Let's write a @timer decorator:

import time
import functools

def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"[TIMER] {func.__name__} completed in {elapsed:.4f}s")
        return result
    return wrapper
Enter fullscreen mode Exit fullscreen mode

Here we apply it to a transformation step that computes customer lifetime value and assigns spend tiers across a list of orders:

orders = [
    {"order_id": 1, "customer_id": 101, "amount": 59.99,  "status": "completed"},
    {"order_id": 2, "customer_id": 102, "amount": 120.00, "status": "completed"},
    {"order_id": 3, "customer_id": 101, "amount": 34.50,  "status": "refunded"},
    {"order_id": 4, "customer_id": 103, "amount": 89.00,  "status": "completed"},
    {"order_id": 5, "customer_id": 102, "amount": 45.00,  "status": "completed"},
]

@timer
def transform_orders(orders: list[dict]) -> list[dict]:
    # Compute lifetime value per customer
    ltv = {}
    for o in orders:
        if o["status"] == "completed":
            ltv[o["customer_id"]] = ltv.get(o["customer_id"], 0) + o["amount"]

    # Enrich each order with LTV and spend tier
    def spend_tier(value):
        if value >= 200: return "high"
        if value >= 100: return "mid"
        return "low"

    return [
        {**o, "customer_ltv": ltv.get(o["customer_id"], 0),
              "spend_tier": spend_tier(ltv.get(o["customer_id"], 0))}
        for o in orders
    ]


result = transform_orders(orders)

for row in result:
    print(row)
Enter fullscreen mode Exit fullscreen mode

Output:

[TIMER] transform_orders completed in 0.0001s
{'order_id': 1, 'customer_id': 101, 'amount': 59.99, 'status': 'completed', 'customer_ltv': 59.99, 'spend_tier': 'low'}
{'order_id': 2, 'customer_id': 102, 'amount': 120.0, 'status': 'completed', 'customer_ltv': 165.0, 'spend_tier': 'mid'}
{'order_id': 3, 'customer_id': 101, 'amount': 34.5, 'status': 'refunded', 'customer_ltv': 59.99, 'spend_tier': 'low'}
{'order_id': 4, 'customer_id': 103, 'amount': 89.0, 'status': 'completed', 'customer_ltv': 89.0, 'spend_tier': 'low'}
{'order_id': 5, 'customer_id': 102, 'amount': 45.0, 'status': 'completed', 'customer_ltv': 165.0, 'spend_tier': 'mid'}
Enter fullscreen mode Exit fullscreen mode

Inside wrapper, we store the result before printing the time and return it after, so the decorator is invisible to whatever called the function. We use time.perf_counter() instead of time.time() because it has higher resolution and isn't affected by system clock adjustments.


3. @validate_schema — Catch Bad Data Early

Data from upstream sources isn't always consistently shaped. A bug in a producer might start omitting a field. A schema change might rename a key. If a required field is missing, you want to know at the point the record enters your pipeline — not several transformations later when something raises a KeyError with no indication of which record caused it.

import functools

def validate_schema(required_keys):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(data, *args, **kwargs):
            if not isinstance(data, dict):
                raise TypeError(f"Expected dict, got {type(data).__name__}")
            missing = required_keys - data.keys()
            if missing:
                raise ValueError(f"Missing required keys: {missing}")
            return func(data, *args, **kwargs)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

We apply it to normalize_order, which standardizes each raw order record before it's passed to the transformation step:

@validate_schema(required_keys={"order_id", "customer_id", "amount", "status"})
def normalize_order(order: dict) -> dict:
    return {
        "order_id": order["order_id"],
        "customer_id": order["customer_id"],
        "amount": round(float(order["amount"]), 2),
        "status": order["status"].lower().strip(),
        "note": order.get("note", ""),   # optional — fine to be absent
    }


# Valid record — passes through
normalize_order({"order_id": 1, "customer_id": 101, "amount": "59.99", "status": "Completed"})
# {"order_id": 1, "customer_id": 101, "amount": 59.99, "status": "completed", "note": ""}

# Missing amount — caught immediately
normalize_order({"order_id": 2, "customer_id": 102, "status": "Completed"})
# ValueError: Missing required keys: {'amount'}

# Wrong type entirely — also caught
normalize_order([1, 101, 59.99, "completed"])
# TypeError: Expected dict, got list
Enter fullscreen mode Exit fullscreen mode

Two checks run before the function body executes. First, we confirm the input is a dict. Second, we use set subtraction to find any missing required keys and raise a ValueError that names them exactly.

Note that note is not in required_keys. It's optional and we use .get() for it. The decorator only enforces what's truly required.


4. @cache_result — Skip Redundant Computation

Some pipeline steps are expensive and return the same result for the same inputs. A lookup table loaded from a database, a CSV read from disk, a slow aggregation — if you're calling these multiple times with the same arguments, there's no reason to recompute it.

Python has functools.lru_cache built in, but it only works with hashable arguments. Dicts and lists — common in data pipelines — won't work with it. This decorator handles those cases by hashing the arguments:

import functools
import hashlib
import json

def cache_result(func):
    cache = {}

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = hashlib.md5(
            json.dumps((args, kwargs), sort_keys=True, default=str).encode()
        ).hexdigest()

        if key not in cache:
            cache[key] = func(*args, **kwargs)
            print(f"[CACHE] Computed result for {func.__name__}")
        else:
            print(f"[CACHE] Cache hit for {func.__name__}")

        return cache[key]

    wrapper.cache_clear = lambda: cache.clear()
    return wrapper
Enter fullscreen mode Exit fullscreen mode

We apply it to load_discount_codes, which reads a reference table from our SQLite database. This table is queried repeatedly during enrichment but rarely changes within a run:

import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE discount_codes (code TEXT, discount_pct REAL, active INTEGER)")
conn.executemany("INSERT INTO discount_codes VALUES (?, ?, ?)", [
    ("SAVE10", 10.0, 1),
    ("SAVE20", 20.0, 1),
    ("OLD5",    5.0, 0),
])
conn.commit()

@cache_result
def load_discount_codes(active_only: bool = True) -> dict:
    query = "SELECT code, discount_pct FROM discount_codes"
    if active_only:
        query += " WHERE active = 1"
    rows = conn.execute(query).fetchall()
    return {code: pct for code, pct in rows}


# First call hits the database
codes = load_discount_codes(active_only=True)

# Second call is instant
codes = load_discount_codes(active_only=True)

print(codes)

# Different argument = separate cache entry
all_codes = load_discount_codes(active_only=False)
Enter fullscreen mode Exit fullscreen mode

Output:

[CACHE] Computed result for load_discount_codes
[CACHE] Cache hit for load_discount_codes
{'SAVE10': 10.0, 'SAVE20': 20.0}
[CACHE] Computed result for load_discount_codes
Enter fullscreen mode Exit fullscreen mode

The cache key is generated by serializing args and kwargs to JSON, then MD5-hashing the result. sort_keys=True ensures consistent output regardless of keyword argument order; default=str handles types that JSON can't serialize natively. The cache dict lives in the closure and persists for the program's lifetime. cache_clear is attached to the wrapper as an escape hatch if you need to force a fresh load.


5. @log_step — Build a Trail

Without structured logging, debugging a failed pipeline run means guesswork. You have no record of what ran, what succeeded, or where things went sideways.

import functools
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s — %(message)s")
logger = logging.getLogger(__name__)

def log_step(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        logger.info(f"START  {func.__name__}")
        try:
            result = func(*args, **kwargs)
            logger.info(f"END    {func.__name__} — OK")
            return result
        except Exception as e:
            logger.error(f"FAIL   {func.__name__}{type(e).__name__}: {e}")
            raise
    return wrapper
Enter fullscreen mode Exit fullscreen mode

We apply it across the main stages of our pipeline:

@log_step
def fetch_completed_orders(conn) -> list[tuple]:
    cursor = conn.execute("SELECT * FROM orders WHERE status = 'completed'")
    return cursor.fetchall()

@log_step
def transform_orders(orders: list[dict]) -> list[dict]:
    ...

@log_step
def write_results(conn, rows: list[dict]) -> int:
    conn.execute("CREATE TABLE IF NOT EXISTS orders_transformed "
                 "(order_id, customer_id, amount, status, customer_ltv, spend_tier)")
    conn.executemany(
        "INSERT INTO orders_transformed VALUES (:order_id, :customer_id, :amount, "
        ":status, :customer_ltv, :spend_tier)",
        rows,
    )
    conn.commit()
    return len(rows)
Enter fullscreen mode Exit fullscreen mode

After logging the error, we call bare raise. This re-raises the original exception so the pipeline still fails. Without it, the decorator would silently swallow every error and execution would continue. Always re-raise in a logging decorator.


Wrapping Up

Decorators let you write that logic once and apply it anywhere. The five patterns here — retry, timer, schema validation, caching, and logging — cover the most common cross-cutting concerns in data pipelines.

A good starting point is @log_step and @retry. They address the most immediate pain points in data pipelines with minimal setup. Add the others as your pipeline grows and the needs become clearer.

Happy coding!

Top comments (1)

Collapse
 
sreno77 profile image
Scott Reno

Great article! Thanks for sharing