DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

ML Pipeline Templates

ML Pipeline Templates

End-to-end ML pipeline templates covering ingestion, preprocessing, training, evaluation, and deployment. Stop building pipelines from scratch — customize these production-tested DAGs for Airflow, Prefect, or standalone Python.

Key Features

  • Complete pipeline stages — data ingestion, validation, preprocessing, training, evaluation, and deployment as modular steps
  • Orchestrator configs — ready-to-use DAGs for Airflow and Prefect with retry logic and failure handling
  • Standalone mode — run pipelines without an orchestrator using the included CLI runner
  • Validation gates — automated quality checks between stages that halt the pipeline on failure
  • Artifact management — model checkpoints, metrics, and data artifacts saved with lineage metadata
  • Parameterized configs — change datasets, models, and hyperparameters without touching pipeline code
  • CI/CD integration — GitHub Actions workflows for automated pipeline testing on pull requests

Quick Start

# 1. Copy the config
cp config.example.yaml config.yaml

# 2. Run the full pipeline locally
python -m pipelines.runner --config config.yaml --pipeline train

# 3. Run individual stages
python -m pipelines.runner --config config.yaml --stage preprocess
python -m pipelines.runner --config config.yaml --stage train
python -m pipelines.runner --config config.yaml --stage evaluate
Enter fullscreen mode Exit fullscreen mode
"""Define and run a training pipeline."""
from pipelines import Pipeline, Stage
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score
import joblib

def preprocess(context: dict) -> dict:
    df = pd.read_csv(context["data_path"])
    train, test = train_test_split(df, test_size=0.2, random_state=42)
    train.to_csv("artifacts/train.csv", index=False)
    test.to_csv("artifacts/test.csv", index=False)
    return {"train_path": "artifacts/train.csv", "test_path": "artifacts/test.csv"}

def train_model(context: dict) -> dict:
    train = pd.read_csv(context["train_path"])
    X, y = train.drop("target", axis=1), train["target"]
    model = GradientBoostingClassifier(
        n_estimators=context.get("n_estimators", 100),
        learning_rate=context.get("learning_rate", 0.1),
    )
    model.fit(X, y)
    joblib.dump(model, "artifacts/model.pkl")
    return {"model_path": "artifacts/model.pkl"}

def evaluate(context: dict) -> dict:
    model = joblib.load(context["model_path"])
    test = pd.read_csv(context["test_path"])
    X, y = test.drop("target", axis=1), test["target"]
    acc = accuracy_score(y, model.predict(X))
    assert acc >= 0.85, f"Accuracy {acc:.4f} below threshold 0.85"
    return {"metrics": {"accuracy": acc}}

# Assemble and run
pipeline = Pipeline(name="training-pipeline", stages=[
    Stage("preprocess", preprocess),
    Stage("train", train_model),
    Stage("evaluate", evaluate),
])

pipeline.run(context={"data_path": "data/dataset.csv", "n_estimators": 200})
Enter fullscreen mode Exit fullscreen mode

Architecture

ml-pipeline-templates/
├── config.example.yaml           # Pipeline parameters and paths
├── templates/
│   ├── pipelines/
│   │   ├── runner.py             # Standalone pipeline CLI runner
│   │   ├── stages/
│   │   │   ├── ingest.py         # Data loading and validation
│   │   │   ├── preprocess.py     # Feature engineering and splitting
│   │   │   ├── train.py          # Model training with config
│   │   │   ├── evaluate.py       # Metrics computation and gates
│   │   │   └── deploy.py         # Model packaging and deployment
│   │   └── utils.py              # Artifact saving, logging
│   ├── orchestrators/
│   │   ├── airflow_dag.py        # Airflow DAG definition
│   │   └── prefect_flow.py       # Prefect flow definition
│   └── ci/
│       └── pipeline_test.yaml    # GitHub Actions CI workflow
├── docs/
│   └── overview.md
└── examples/
    ├── sklearn_pipeline.py
    └── pytorch_pipeline.py
Enter fullscreen mode Exit fullscreen mode

Usage Examples

Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    "ml_training_pipeline",
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
    schedule_interval="@weekly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:
    ingest = PythonOperator(task_id="ingest", python_callable=ingest_data)
    preprocess = PythonOperator(task_id="preprocess", python_callable=preprocess_data)
    train = PythonOperator(task_id="train", python_callable=train_model)
    evaluate = PythonOperator(task_id="evaluate", python_callable=evaluate_model)
    deploy = PythonOperator(task_id="deploy", python_callable=deploy_model)
    ingest >> preprocess >> train >> evaluate >> deploy
Enter fullscreen mode Exit fullscreen mode

Configuration

# config.example.yaml
pipeline:
  name: "training-pipeline"
  artifact_dir: "./artifacts"
  log_level: "INFO"

data:
  source: "data/dataset.csv"       # Path or URL to raw data
  test_size: 0.2                   # Train/test split ratio
  random_seed: 42

training:
  model_type: "gradient_boosting"  # gradient_boosting | random_forest | mlp
  n_estimators: 200
  learning_rate: 0.1
  max_depth: 5

evaluation:
  accuracy_threshold: 0.85         # Minimum accuracy to pass gate
  f1_threshold: 0.80               # Minimum F1 to pass gate
  compare_to_baseline: true        # Compare against last deployed model

deployment:
  target: "local"                  # local | docker | kubernetes
  model_registry: "mlflow"        # mlflow | none
Enter fullscreen mode Exit fullscreen mode

Best Practices

  1. Make every stage idempotent — re-running a stage with the same inputs must produce the same outputs
  2. Pass data between stages via artifacts, not memory — write to disk/object store so stages can run independently
  3. Add validation gates between stages — catch bad data before it wastes GPU hours on training
  4. Parameterize everything — model type, hyperparams, paths, and thresholds all belong in config.yaml, not code
  5. Test pipelines on small data first — use a --sample-frac 0.01 flag to validate the DAG before full runs

Troubleshooting

Problem Cause Fix
Pipeline fails at evaluate gate Model accuracy below threshold Check data quality; lower threshold temporarily or retune hyperparameters
Airflow DAG not appearing Python syntax error or wrong dags_folder Run python airflow_dag.py directly to check for import errors
Artifacts directory fills up No cleanup policy Add max_artifacts: 5 in config and implement rotation in runner.py
Stage takes too long Large dataset with no sampling Use sample_frac in config for development runs

This is 1 of 10 resources in the ML Starter Kit toolkit. Get the complete [ML Pipeline Templates] with all files, templates, and documentation for $49.

Get the Full Kit →

Or grab the entire ML Starter Kit bundle (10 products) for $149 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)