MLOps Pipeline Templates
The gap between a Jupyter notebook and a production ML system isn't code quality — it's operational infrastructure. These pipeline templates give you end-to-end ML workflows with feature engineering, model training, evaluation, and deployment stages that are orchestrated, versioned, tested, and monitored. Built for real-world use with Airflow and Prefect DAG definitions, containerized training steps, automated model validation gates, and deployment patterns that support rollback. Go from "it works on my machine" to "it runs in production" in days, not months.
Key Features
- Complete Pipeline Templates — Six production-ready templates: batch prediction, real-time inference, retraining, A/B testing, feature backfill, and data quality monitoring.
- Orchestrator Configs — Pre-built DAGs for Airflow and Prefect with retry logic and alerting.
- Containerized Steps — Each stage runs in Docker with pinned dependencies for reproducibility.
- Model Validation Gates — Automated checks: performance thresholds, drift detection, and regression tests before promotion.
- Multi-Environment Support — Development, staging, and production configs with environment-specific parameters, credentials, and scaling.
- Artifact Lineage — Track which data, code version, and config produced each model artifact with full provenance chains.
- Notification System — Slack, email, and PagerDuty alerts for failures, degradation, and successful deployments.
Quick Start
unzip mlops-pipeline-templates.zip && cd mlops-pipeline-templates
pip install -r requirements.txt
# Generate a new pipeline from template
python src/mlops_pipeline/core.py create \
--template batch_prediction \
--config configs/development.yaml \
--output ./my_pipeline/
# configs/development.yaml
pipeline:
name: customer_churn_prediction
schedule: "0 6 * * *" # Daily at 6 AM
template: batch_prediction
stages:
data_ingestion:
source: postgresql
query: "SELECT * FROM features WHERE dt = '{{ ds }}'"
output: s3://ml-data/raw/{{ ds }}/
feature_engineering:
transformers: [scaling, encoding, imputation]
output: s3://ml-data/features/{{ ds }}/
training:
model: xgboost
hyperparameters:
n_estimators: 500
max_depth: 6
learning_rate: 0.1
validation_split: 0.2
experiment_tracking: mlflow
evaluation:
metrics: [auc_roc, precision, recall, f1]
min_auc: 0.82
comparison: baseline # compare against current production model
gate: hard # hard = block deployment, soft = warn only
deployment:
strategy: canary # canary | blue_green | rolling
canary_percentage: 10
health_check_endpoint: /health
rollback_on_failure: true
notifications:
on_failure: [slack, email]
on_success: [slack]
Architecture
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Data │──>│ Feature │──>│ Training │──>│ Eval │──>│ Deploy │
│ Ingest │ │ Engineer │ │ │ │ Gate │ │ │
└──────────┘ └──────────┘ └──────────┘ └────┬─────┘ └──────────┘
│
PASS │ FAIL
│ │
┌────▼┐ ┌▼────────┐
│Deploy│ │Rollback │
│Model │ │+ Alert │
└─────┘ └─────────┘
Each stage is containerized with defined inputs/outputs. The orchestrator handles retry, timeout, and dependency management.
Usage Examples
Create and Run a Batch Prediction Pipeline
from mlops_pipeline.core import PipelineBuilder
builder = PipelineBuilder.from_config("configs/development.yaml")
pipeline = builder.build()
pipeline.dry_run() # validates all stages without executing
result = pipeline.run(execution_date="2026-03-23", parameters={"model_version": "v3"})
print(f"Status: {result.status} | AUC: {result.metrics['auc_roc']:.4f} | Gate: {'PASS' if result.gate_passed else 'FAIL'}")
Define a Retraining Pipeline with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
"churn_model_retraining",
default_args={"owner": "ml-team", "retries": 2, "retry_delay": timedelta(minutes=5)},
schedule_interval="0 6 * * 1", # Weekly on Monday
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
ingest = PythonOperator(task_id="data_ingestion", python_callable=ingest_data,
op_kwargs={"source": "postgresql", "days_back": 90})
engineer = PythonOperator(task_id="feature_engineering", python_callable=run_feature_pipeline)
train = PythonOperator(task_id="model_training", python_callable=train_model,
op_kwargs={"experiment": "churn_weekly"})
evaluate = PythonOperator(task_id="evaluation_gate", python_callable=run_evaluation_gate,
op_kwargs={"min_auc": 0.82})
deploy = PythonOperator(task_id="canary_deploy", python_callable=deploy_canary,
op_kwargs={"canary_pct": 10})
ingest >> engineer >> train >> evaluate >> deploy
Model Validation Gate
from mlops_pipeline.core import EvaluationGate
gate = EvaluationGate(
metrics_thresholds={"auc_roc": 0.82, "precision": 0.75},
regression_tolerance=0.02, baseline_model="models/production_v2.pt",
)
result = gate.evaluate(candidate_model=new_model, test_data=holdout_set)
if result.passed:
deploy_model(new_model, strategy="canary")
else:
alert_team(result.failures)
Configuration Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
pipeline.schedule |
str | 0 6 * * * |
Cron schedule expression |
stages.evaluation.min_auc |
float | 0.82 |
Minimum AUC to pass gate |
stages.evaluation.gate |
str | hard |
Gate type: hard (block) or soft (warn) |
stages.deployment.strategy |
str | canary |
Deploy strategy: canary, blue_green, rolling |
stages.deployment.canary_percentage |
int | 10 |
Initial traffic to new model |
Best Practices
- Gate every deployment — Never auto-deploy without evaluation. Even soft gates (warn-only) catch 80% of bad deployments.
- Use canary deployments — Route 10% of traffic to the new model for 1 hour before full rollout. Monitor latency, error rate, and business metrics.
- Version everything together — Pin data version + code commit + config hash for each model. If you can't reproduce a model, you can't debug it.
- Test the pipeline, not just the model — Pipeline bugs (wrong date filter, stale cache, schema change) cause more production incidents than model quality issues.
- Keep stages idempotent — Every stage should produce the same output if run twice with the same inputs. This enables safe retries.
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Pipeline succeeds but model quality drops | Training data has stale features | Verify feature freshness — check feature_timestamp in data ingestion |
| Canary deployment auto-rolls back | Health check timeout too aggressive | Increase canary_duration_minutes and adjust health check thresholds |
| Airflow DAG not appearing | Syntax error or import failure | Check Airflow webserver logs, verify all imports are available in the worker environment |
| Evaluation gate always fails | Baseline model path outdated | Update baseline_model_path to current production model after each successful deployment |
This is 1 of 11 resources in the ML Engineer Toolkit toolkit. Get the complete [MLOps Pipeline Templates] with all files, templates, and documentation for $49.
Or grab the entire ML Engineer Toolkit bundle (11 products) for $149 — save 30%.
Top comments (0)