Building Production-Ready Data Pipelines with Python
Data pipelines that work in development often fail in production. This guide teaches you to build resilient, observable data pipelines that handle failures gracefully and scale with your data volume.
What You'll Build
A production-grade data pipeline system with Apache Airflow orchestration, built-in error handling, retry logic, monitoring dashboards, and automated alerting. You'll learn patterns used by data engineering teams at scale.
Why Production Pipelines Matter
Fragile pipelines cause data delays, broken dashboards, and lost revenue. Production-ready pipelines provide:
- Automatic retries — transient failures don't stop your pipeline
- Observability — know exactly where and why something failed
- Scalability — handle 100 records or 100 million
- Data quality — catch bad data before it reaches your warehouse
Full Tutorial
Step 1: Project Structure
data-pipeline/
├── dags/
│ ├── __init__.py
│ ├── etl_dag.py
│ └── data_quality_dag.py
├── plugins/
│ ├── __init__.py
│ ├── custom_operators.py
│ └── hooks.py
├── config/
│ └── pipelines.yaml
├── tests/
│ └── test_dags.py
└── docker-compose.yml
Step 2: Airflow DAG Definition
# dags/etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
def extract_data(**context):
"""Extract data from source API."""
import requests
url = "https://api.source.com/data"
response = requests.get(url, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data["records"])
# Store in XCom for next task
context["ti"].xcom_push(key="raw_data", value=df.to_json())
logger.info(f"Extracted {len(df)} records")
return len(df)
def transform_data(**context):
"""Apply transformations and data quality checks."""
ti = context["ti"]
raw_json = ti.xcom_pull(task_ids="extract", key="raw_data")
df = pd.read_json(raw_json)
# Remove duplicates
df = df.drop_duplicates(subset=["id"])
# Handle missing values
df["amount"] = df["amount"].fillna(0)
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Validate data quality
assert len(df) > 0, "Empty dataset after transformation"
assert df["amount"].min() >= 0, "Negative amounts found"
ti.xcom_push(key="transformed_data", value=df.to_json())
logger.info(f"Transformed to {len(df)} clean records")
return len(df)
def load_data(**context):
"""Load data into target database."""
ti = context["ti"]
transformed_json = ti.xcom_pull(task_ids="transform", key="transformed_data")
df = pd.read_json(transformed_json)
pg_hook = PostgresHook(postgres_conn_id="data_warehouse")
engine = pg_hook.get_sqlalchemy_engine()
df.to_sql(
"processed_data",
engine,
if_exists="append",
index=False,
chunksize=1000
)
logger.info(f"Loaded {len(df)} records to warehouse")
with DAG(
"etl_pipeline",
default_args=default_args,
description="Production ETL pipeline",
schedule_interval="0 */6 * * *", # Every 6 hours
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["production", "etl"],
) as dag:
with TaskGroup("extract_transform_load") as etl_group:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
load = PythonOperator(
task_id="load",
python_callable=load_data,
)
extract >> transform >> load
notify = BashOperator(
task_id="notify_success",
bash_command='echo "Pipeline completed successfully at $(date)"',
)
etl_group >> notify
Step 3: Custom Operator for Complex Tasks
# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
import json
import time
class DataQualityOperator(BaseOperator):
@apply_defaults
def __init__(self, sql_checks: list[str], redshift_conn_id: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sql_checks = sql_checks
self.redshift_conn_id = redshift_conn_id
def execute(self, context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
failed_checks = []
for check in self.sql_checks:
result = hook.get_first(check)
if not result or result[0] == 0:
failed_checks.append(check)
self.log.error(f"Data quality check failed: {check}")
if failed_checks:
raise AirflowException(
f"Data quality checks failed: {len(failed_checks)} checks failed"
)
self.log.info("All data quality checks passed")
class FileProcessorOperator(BaseOperator):
@apply_defaults
def __init__(self, input_pattern: str, output_path: str, **kwargs):
super().__init__(**kwargs)
self.input_pattern = input_pattern
self.output_path = output_path
def execute(self, context):
import glob
import pandas as pd
files = glob.glob(self.input_pattern)
self.log.info(f"Processing {len(files)} files")
dfs = []
for file in files:
try:
df = pd.read_csv(file)
dfs.append(df)
except Exception as e:
self.log.warning(f"Failed to process {file}: {e}")
if dfs:
combined = pd.concat(dfs, ignore_index=True)
combined.to_csv(self.output_path, index=False)
self.log.info(f"Combined {len(dfs)} files into {self.output_path}")
else:
raise AirflowException("No files processed successfully")
Step 4: Error Handling Patterns
# plugins/error_handlers.py
from airflow.models import Variable
import requests
import logging
logger = logging.getLogger(__name__)
class AlertHandler:
def __init__(self):
self.slack_webhook = Variable.get("slack_webhook_url", default_var="")
self.pagerduty_key = Variable.get("pagerduty_key", default_var="")
def send_slack_alert(self, message: str, severity: str = "warning"):
if not self.slack_webhook:
return
color = {"info": "#36a64f", "warning": "#ff9900", "critical": "#ff0000"}
payload = {
"attachments": [{
"color": color.get(severity, "#ff9900"),
"title": f"Pipeline Alert - {severity.upper()}",
"text": message,
"ts": time.time()
}]
}
try:
requests.post(self.slack_webhook, json=payload, timeout=10)
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
def on_retry_callback(self, context):
self.send_slack_alert(
f"Task {context['task_instance'].task_id} retrying "
f"(attempt {context['ti'].try_number})",
severity="info"
)
def on_failure_callback(self, context):
self.send_slack_alert(
f"Task {context['task_instance'].task_id} failed "
f"after {context['ti'].try_number} attempts",
severity="critical"
)
Step 5: Monitoring Configuration
# plugins/monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time
TASK_DURATION = Histogram(
"etl_task_duration_seconds",
"Time spent on ETL tasks",
["dag_id", "task_id"],
buckets=[10, 30, 60, 120, 300, 600]
)
TASK_STATUS = Counter(
"etl_task_status_total",
"ETL task completion status",
["dag_id", "task_id", "status"]
)
RECORDS_PROCESSED = Counter(
"etl_records_processed_total",
"Total records processed",
["dag_id", "source"]
)
class PipelineMonitor:
@staticmethod
def record_task_start(dag_id: str, task_id: str):
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="started").inc()
@staticmethod
def record_task_complete(dag_id: str, task_id: str, duration: float):
TASK_DURATION.labels(dag_id=dag_id, task_id=task_id).observe(duration)
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="completed").inc()
@staticmethod
def record_task_failure(dag_id: str, task_id: str):
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="failed").inc()
@staticmethod
def record_records(dag_id: str, source: str, count: int):
RECORDS_PROCESSED.labels(dag_id=dag_id, source=source).inc(count)
Step 6: Docker Deployment
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
image: apache/airflow:2.8.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
ports: ["8080:8080"]
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
airflow-scheduler:
image: apache/airflow:2.8.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
command: scheduler
Pipeline Monitoring Checklist
- Set up Prometheus metrics for all pipeline stages
- Configure Slack/PagerDuty alerts for failures
- Monitor data freshness and volume anomalies
- Track pipeline duration trends over time
- Set up dashboards in Grafana for visibility
Get the Code
Ready to use these tools? Browse our collection of tested, production-ready Python scripts:
🔗 Browse Products: Anna's Digital Products
All products include:
- ✅ Tested and verified code
- ✅ Instant delivery via crypto or card
- ✅ Free updates forever
- ✅ Telegram bot support (@AnnaLilithBot)
Top comments (0)