DEV Community

Anna lilith
Anna lilith

Posted on

Building Production-Ready Data Pipelines with Python

Building Production-Ready Data Pipelines with Python

Data pipelines that work in development often fail in production. This guide teaches you to build resilient, observable data pipelines that handle failures gracefully and scale with your data volume.

What You'll Build

A production-grade data pipeline system with Apache Airflow orchestration, built-in error handling, retry logic, monitoring dashboards, and automated alerting. You'll learn patterns used by data engineering teams at scale.

Why Production Pipelines Matter

Fragile pipelines cause data delays, broken dashboards, and lost revenue. Production-ready pipelines provide:

  • Automatic retries — transient failures don't stop your pipeline
  • Observability — know exactly where and why something failed
  • Scalability — handle 100 records or 100 million
  • Data quality — catch bad data before it reaches your warehouse

Full Tutorial

Step 1: Project Structure

data-pipeline/
├── dags/
│   ├── __init__.py
│   ├── etl_dag.py
│   └── data_quality_dag.py
├── plugins/
│   ├── __init__.py
│   ├── custom_operators.py
│   └── hooks.py
├── config/
│   └── pipelines.yaml
├── tests/
│   └── test_dags.py
└── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

Step 2: Airflow DAG Definition

# dags/etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging

logger = logging.getLogger(__name__)

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
}

def extract_data(**context):
    """Extract data from source API."""
    import requests

    url = "https://api.source.com/data"
    response = requests.get(url, timeout=30)
    response.raise_for_status()

    data = response.json()
    df = pd.DataFrame(data["records"])

    # Store in XCom for next task
    context["ti"].xcom_push(key="raw_data", value=df.to_json())

    logger.info(f"Extracted {len(df)} records")
    return len(df)

def transform_data(**context):
    """Apply transformations and data quality checks."""
    ti = context["ti"]
    raw_json = ti.xcom_pull(task_ids="extract", key="raw_data")
    df = pd.read_json(raw_json)

    # Remove duplicates
    df = df.drop_duplicates(subset=["id"])

    # Handle missing values
    df["amount"] = df["amount"].fillna(0)
    df["timestamp"] = pd.to_datetime(df["timestamp"])

    # Validate data quality
    assert len(df) > 0, "Empty dataset after transformation"
    assert df["amount"].min() >= 0, "Negative amounts found"

    ti.xcom_push(key="transformed_data", value=df.to_json())
    logger.info(f"Transformed to {len(df)} clean records")
    return len(df)

def load_data(**context):
    """Load data into target database."""
    ti = context["ti"]
    transformed_json = ti.xcom_pull(task_ids="transform", key="transformed_data")
    df = pd.read_json(transformed_json)

    pg_hook = PostgresHook(postgres_conn_id="data_warehouse")
    engine = pg_hook.get_sqlalchemy_engine()

    df.to_sql(
        "processed_data",
        engine,
        if_exists="append",
        index=False,
        chunksize=1000
    )

    logger.info(f"Loaded {len(df)} records to warehouse")

with DAG(
    "etl_pipeline",
    default_args=default_args,
    description="Production ETL pipeline",
    schedule_interval="0 */6 * * *",  # Every 6 hours
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["production", "etl"],
) as dag:

    with TaskGroup("extract_transform_load") as etl_group:
        extract = PythonOperator(
            task_id="extract",
            python_callable=extract_data,
        )

        transform = PythonOperator(
            task_id="transform",
            python_callable=transform_data,
        )

        load = PythonOperator(
            task_id="load",
            python_callable=load_data,
        )

        extract >> transform >> load

    notify = BashOperator(
        task_id="notify_success",
        bash_command='echo "Pipeline completed successfully at $(date)"',
    )

    etl_group >> notify
Enter fullscreen mode Exit fullscreen mode

Step 3: Custom Operator for Complex Tasks

# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
import json
import time

class DataQualityOperator(BaseOperator):
    @apply_defaults
    def __init__(self, sql_checks: list[str], redshift_conn_id: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sql_checks = sql_checks
        self.redshift_conn_id = redshift_conn_id

    def execute(self, context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        failed_checks = []

        for check in self.sql_checks:
            result = hook.get_first(check)
            if not result or result[0] == 0:
                failed_checks.append(check)
                self.log.error(f"Data quality check failed: {check}")

        if failed_checks:
            raise AirflowException(
                f"Data quality checks failed: {len(failed_checks)} checks failed"
            )

        self.log.info("All data quality checks passed")

class FileProcessorOperator(BaseOperator):
    @apply_defaults
    def __init__(self, input_pattern: str, output_path: str, **kwargs):
        super().__init__(**kwargs)
        self.input_pattern = input_pattern
        self.output_path = output_path

    def execute(self, context):
        import glob
        import pandas as pd

        files = glob.glob(self.input_pattern)
        self.log.info(f"Processing {len(files)} files")

        dfs = []
        for file in files:
            try:
                df = pd.read_csv(file)
                dfs.append(df)
            except Exception as e:
                self.log.warning(f"Failed to process {file}: {e}")

        if dfs:
            combined = pd.concat(dfs, ignore_index=True)
            combined.to_csv(self.output_path, index=False)
            self.log.info(f"Combined {len(dfs)} files into {self.output_path}")
        else:
            raise AirflowException("No files processed successfully")
Enter fullscreen mode Exit fullscreen mode

Step 4: Error Handling Patterns

# plugins/error_handlers.py
from airflow.models import Variable
import requests
import logging

logger = logging.getLogger(__name__)

class AlertHandler:
    def __init__(self):
        self.slack_webhook = Variable.get("slack_webhook_url", default_var="")
        self.pagerduty_key = Variable.get("pagerduty_key", default_var="")

    def send_slack_alert(self, message: str, severity: str = "warning"):
        if not self.slack_webhook:
            return

        color = {"info": "#36a64f", "warning": "#ff9900", "critical": "#ff0000"}
        payload = {
            "attachments": [{
                "color": color.get(severity, "#ff9900"),
                "title": f"Pipeline Alert - {severity.upper()}",
                "text": message,
                "ts": time.time()
            }]
        }
        try:
            requests.post(self.slack_webhook, json=payload, timeout=10)
        except Exception as e:
            logger.error(f"Failed to send Slack alert: {e}")

    def on_retry_callback(self, context):
        self.send_slack_alert(
            f"Task {context['task_instance'].task_id} retrying "
            f"(attempt {context['ti'].try_number})",
            severity="info"
        )

    def on_failure_callback(self, context):
        self.send_slack_alert(
            f"Task {context['task_instance'].task_id} failed "
            f"after {context['ti'].try_number} attempts",
            severity="critical"
        )
Enter fullscreen mode Exit fullscreen mode

Step 5: Monitoring Configuration

# plugins/monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time

TASK_DURATION = Histogram(
    "etl_task_duration_seconds",
    "Time spent on ETL tasks",
    ["dag_id", "task_id"],
    buckets=[10, 30, 60, 120, 300, 600]
)

TASK_STATUS = Counter(
    "etl_task_status_total",
    "ETL task completion status",
    ["dag_id", "task_id", "status"]
)

RECORDS_PROCESSED = Counter(
    "etl_records_processed_total",
    "Total records processed",
    ["dag_id", "source"]
)

class PipelineMonitor:
    @staticmethod
    def record_task_start(dag_id: str, task_id: str):
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="started").inc()

    @staticmethod
    def record_task_complete(dag_id: str, task_id: str, duration: float):
        TASK_DURATION.labels(dag_id=dag_id, task_id=task_id).observe(duration)
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="completed").inc()

    @staticmethod
    def record_task_failure(dag_id: str, task_id: str):
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="failed").inc()

    @staticmethod
    def record_records(dag_id: str, source: str, count: int):
        RECORDS_PROCESSED.labels(dag_id=dag_id, source=source).inc(count)
Enter fullscreen mode Exit fullscreen mode

Step 6: Docker Deployment

# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  airflow-webserver:
    image: apache/airflow:2.8.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
    ports: ["8080:8080"]
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins

  airflow-scheduler:
    image: apache/airflow:2.8.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins
    command: scheduler
Enter fullscreen mode Exit fullscreen mode

Pipeline Monitoring Checklist

  • Set up Prometheus metrics for all pipeline stages
  • Configure Slack/PagerDuty alerts for failures
  • Monitor data freshness and volume anomalies
  • Track pipeline duration trends over time
  • Set up dashboards in Grafana for visibility

Get the Code

Ready to use these tools? Browse our collection of tested, production-ready Python scripts:

🔗 Browse Products: Anna's Digital Products

All products include:

  • ✅ Tested and verified code
  • ✅ Instant delivery via crypto or card
  • ✅ Free updates forever
  • ✅ Telegram bot support (@AnnaLilithBot)

Top comments (0)