DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

MLOps Pipeline Templates

MLOps Pipeline Templates

The gap between a Jupyter notebook and a production ML system isn't code quality — it's operational infrastructure. These pipeline templates give you end-to-end ML workflows with feature engineering, model training, evaluation, and deployment stages that are orchestrated, versioned, tested, and monitored. Built for real-world use with Airflow and Prefect DAG definitions, containerized training steps, automated model validation gates, and deployment patterns that support rollback. Go from "it works on my machine" to "it runs in production" in days, not months.

Key Features

  • Complete Pipeline Templates — Six production-ready templates: batch prediction, real-time inference, retraining, A/B testing, feature backfill, and data quality monitoring.
  • Orchestrator Configs — Pre-built DAGs for Airflow and Prefect with retry logic and alerting.
  • Containerized Steps — Each stage runs in Docker with pinned dependencies for reproducibility.
  • Model Validation Gates — Automated checks: performance thresholds, drift detection, and regression tests before promotion.
  • Multi-Environment Support — Development, staging, and production configs with environment-specific parameters, credentials, and scaling.
  • Artifact Lineage — Track which data, code version, and config produced each model artifact with full provenance chains.
  • Notification System — Slack, email, and PagerDuty alerts for failures, degradation, and successful deployments.

Quick Start

unzip mlops-pipeline-templates.zip && cd mlops-pipeline-templates
pip install -r requirements.txt

# Generate a new pipeline from template
python src/mlops_pipeline/core.py create \
  --template batch_prediction \
  --config configs/development.yaml \
  --output ./my_pipeline/
Enter fullscreen mode Exit fullscreen mode
# configs/development.yaml
pipeline:
  name: customer_churn_prediction
  schedule: "0 6 * * *"  # Daily at 6 AM
  template: batch_prediction

stages:
  data_ingestion:
    source: postgresql
    query: "SELECT * FROM features WHERE dt = '{{ ds }}'"
    output: s3://ml-data/raw/{{ ds }}/

  feature_engineering:
    transformers: [scaling, encoding, imputation]
    output: s3://ml-data/features/{{ ds }}/

  training:
    model: xgboost
    hyperparameters:
      n_estimators: 500
      max_depth: 6
      learning_rate: 0.1
    validation_split: 0.2
    experiment_tracking: mlflow

  evaluation:
    metrics: [auc_roc, precision, recall, f1]
    min_auc: 0.82
    comparison: baseline  # compare against current production model
    gate: hard  # hard = block deployment, soft = warn only

  deployment:
    strategy: canary  # canary | blue_green | rolling
    canary_percentage: 10
    health_check_endpoint: /health
    rollback_on_failure: true

notifications:
  on_failure: [slack, email]
  on_success: [slack]
Enter fullscreen mode Exit fullscreen mode

Architecture

┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│  Data    │──>│ Feature  │──>│ Training │──>│ Eval     │──>│ Deploy   │
│ Ingest   │   │ Engineer │   │          │   │ Gate     │   │          │
└──────────┘   └──────────┘   └──────────┘   └────┬─────┘   └──────────┘
                                                   │
                                              PASS │ FAIL
                                                   │   │
                                              ┌────▼┐ ┌▼────────┐
                                              │Deploy│ │Rollback │
                                              │Model │ │+ Alert  │
                                              └─────┘ └─────────┘
Enter fullscreen mode Exit fullscreen mode

Each stage is containerized with defined inputs/outputs. The orchestrator handles retry, timeout, and dependency management.

Usage Examples

Create and Run a Batch Prediction Pipeline

from mlops_pipeline.core import PipelineBuilder

builder = PipelineBuilder.from_config("configs/development.yaml")
pipeline = builder.build()
pipeline.dry_run()  # validates all stages without executing

result = pipeline.run(execution_date="2026-03-23", parameters={"model_version": "v3"})
print(f"Status: {result.status} | AUC: {result.metrics['auc_roc']:.4f} | Gate: {'PASS' if result.gate_passed else 'FAIL'}")
Enter fullscreen mode Exit fullscreen mode

Define a Retraining Pipeline with Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    "churn_model_retraining",
    default_args={"owner": "ml-team", "retries": 2, "retry_delay": timedelta(minutes=5)},
    schedule_interval="0 6 * * 1",  # Weekly on Monday
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:
    ingest = PythonOperator(task_id="data_ingestion", python_callable=ingest_data,
                            op_kwargs={"source": "postgresql", "days_back": 90})
    engineer = PythonOperator(task_id="feature_engineering", python_callable=run_feature_pipeline)
    train = PythonOperator(task_id="model_training", python_callable=train_model,
                           op_kwargs={"experiment": "churn_weekly"})
    evaluate = PythonOperator(task_id="evaluation_gate", python_callable=run_evaluation_gate,
                              op_kwargs={"min_auc": 0.82})
    deploy = PythonOperator(task_id="canary_deploy", python_callable=deploy_canary,
                            op_kwargs={"canary_pct": 10})
    ingest >> engineer >> train >> evaluate >> deploy
Enter fullscreen mode Exit fullscreen mode

Model Validation Gate

from mlops_pipeline.core import EvaluationGate

gate = EvaluationGate(
    metrics_thresholds={"auc_roc": 0.82, "precision": 0.75},
    regression_tolerance=0.02, baseline_model="models/production_v2.pt",
)
result = gate.evaluate(candidate_model=new_model, test_data=holdout_set)
if result.passed:
    deploy_model(new_model, strategy="canary")
else:
    alert_team(result.failures)
Enter fullscreen mode Exit fullscreen mode

Configuration Reference

Parameter Type Default Description
pipeline.schedule str 0 6 * * * Cron schedule expression
stages.evaluation.min_auc float 0.82 Minimum AUC to pass gate
stages.evaluation.gate str hard Gate type: hard (block) or soft (warn)
stages.deployment.strategy str canary Deploy strategy: canary, blue_green, rolling
stages.deployment.canary_percentage int 10 Initial traffic to new model

Best Practices

  1. Gate every deployment — Never auto-deploy without evaluation. Even soft gates (warn-only) catch 80% of bad deployments.
  2. Use canary deployments — Route 10% of traffic to the new model for 1 hour before full rollout. Monitor latency, error rate, and business metrics.
  3. Version everything together — Pin data version + code commit + config hash for each model. If you can't reproduce a model, you can't debug it.
  4. Test the pipeline, not just the model — Pipeline bugs (wrong date filter, stale cache, schema change) cause more production incidents than model quality issues.
  5. Keep stages idempotent — Every stage should produce the same output if run twice with the same inputs. This enables safe retries.

Troubleshooting

Issue Cause Fix
Pipeline succeeds but model quality drops Training data has stale features Verify feature freshness — check feature_timestamp in data ingestion
Canary deployment auto-rolls back Health check timeout too aggressive Increase canary_duration_minutes and adjust health check thresholds
Airflow DAG not appearing Syntax error or import failure Check Airflow webserver logs, verify all imports are available in the worker environment
Evaluation gate always fails Baseline model path outdated Update baseline_model_path to current production model after each successful deployment

This is 1 of 11 resources in the ML Engineer Toolkit toolkit. Get the complete [MLOps Pipeline Templates] with all files, templates, and documentation for $49.

Get the Full Kit →

Or grab the entire ML Engineer Toolkit bundle (11 products) for $149 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)