ML Pipeline Templates
End-to-end ML pipeline templates covering ingestion, preprocessing, training, evaluation, and deployment. Stop building pipelines from scratch — customize these production-tested DAGs for Airflow, Prefect, or standalone Python.
Key Features
- Complete pipeline stages — data ingestion, validation, preprocessing, training, evaluation, and deployment as modular steps
- Orchestrator configs — ready-to-use DAGs for Airflow and Prefect with retry logic and failure handling
- Standalone mode — run pipelines without an orchestrator using the included CLI runner
- Validation gates — automated quality checks between stages that halt the pipeline on failure
- Artifact management — model checkpoints, metrics, and data artifacts saved with lineage metadata
- Parameterized configs — change datasets, models, and hyperparameters without touching pipeline code
- CI/CD integration — GitHub Actions workflows for automated pipeline testing on pull requests
Quick Start
# 1. Copy the config
cp config.example.yaml config.yaml
# 2. Run the full pipeline locally
python -m pipelines.runner --config config.yaml --pipeline train
# 3. Run individual stages
python -m pipelines.runner --config config.yaml --stage preprocess
python -m pipelines.runner --config config.yaml --stage train
python -m pipelines.runner --config config.yaml --stage evaluate
"""Define and run a training pipeline."""
from pipelines import Pipeline, Stage
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score
import joblib
def preprocess(context: dict) -> dict:
df = pd.read_csv(context["data_path"])
train, test = train_test_split(df, test_size=0.2, random_state=42)
train.to_csv("artifacts/train.csv", index=False)
test.to_csv("artifacts/test.csv", index=False)
return {"train_path": "artifacts/train.csv", "test_path": "artifacts/test.csv"}
def train_model(context: dict) -> dict:
train = pd.read_csv(context["train_path"])
X, y = train.drop("target", axis=1), train["target"]
model = GradientBoostingClassifier(
n_estimators=context.get("n_estimators", 100),
learning_rate=context.get("learning_rate", 0.1),
)
model.fit(X, y)
joblib.dump(model, "artifacts/model.pkl")
return {"model_path": "artifacts/model.pkl"}
def evaluate(context: dict) -> dict:
model = joblib.load(context["model_path"])
test = pd.read_csv(context["test_path"])
X, y = test.drop("target", axis=1), test["target"]
acc = accuracy_score(y, model.predict(X))
assert acc >= 0.85, f"Accuracy {acc:.4f} below threshold 0.85"
return {"metrics": {"accuracy": acc}}
# Assemble and run
pipeline = Pipeline(name="training-pipeline", stages=[
Stage("preprocess", preprocess),
Stage("train", train_model),
Stage("evaluate", evaluate),
])
pipeline.run(context={"data_path": "data/dataset.csv", "n_estimators": 200})
Architecture
ml-pipeline-templates/
├── config.example.yaml # Pipeline parameters and paths
├── templates/
│ ├── pipelines/
│ │ ├── runner.py # Standalone pipeline CLI runner
│ │ ├── stages/
│ │ │ ├── ingest.py # Data loading and validation
│ │ │ ├── preprocess.py # Feature engineering and splitting
│ │ │ ├── train.py # Model training with config
│ │ │ ├── evaluate.py # Metrics computation and gates
│ │ │ └── deploy.py # Model packaging and deployment
│ │ └── utils.py # Artifact saving, logging
│ ├── orchestrators/
│ │ ├── airflow_dag.py # Airflow DAG definition
│ │ └── prefect_flow.py # Prefect flow definition
│ └── ci/
│ └── pipeline_test.yaml # GitHub Actions CI workflow
├── docs/
│ └── overview.md
└── examples/
├── sklearn_pipeline.py
└── pytorch_pipeline.py
Usage Examples
Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
"ml_training_pipeline",
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
schedule_interval="@weekly",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
ingest = PythonOperator(task_id="ingest", python_callable=ingest_data)
preprocess = PythonOperator(task_id="preprocess", python_callable=preprocess_data)
train = PythonOperator(task_id="train", python_callable=train_model)
evaluate = PythonOperator(task_id="evaluate", python_callable=evaluate_model)
deploy = PythonOperator(task_id="deploy", python_callable=deploy_model)
ingest >> preprocess >> train >> evaluate >> deploy
Configuration
# config.example.yaml
pipeline:
name: "training-pipeline"
artifact_dir: "./artifacts"
log_level: "INFO"
data:
source: "data/dataset.csv" # Path or URL to raw data
test_size: 0.2 # Train/test split ratio
random_seed: 42
training:
model_type: "gradient_boosting" # gradient_boosting | random_forest | mlp
n_estimators: 200
learning_rate: 0.1
max_depth: 5
evaluation:
accuracy_threshold: 0.85 # Minimum accuracy to pass gate
f1_threshold: 0.80 # Minimum F1 to pass gate
compare_to_baseline: true # Compare against last deployed model
deployment:
target: "local" # local | docker | kubernetes
model_registry: "mlflow" # mlflow | none
Best Practices
- Make every stage idempotent — re-running a stage with the same inputs must produce the same outputs
- Pass data between stages via artifacts, not memory — write to disk/object store so stages can run independently
- Add validation gates between stages — catch bad data before it wastes GPU hours on training
-
Parameterize everything — model type, hyperparams, paths, and thresholds all belong in
config.yaml, not code -
Test pipelines on small data first — use a
--sample-frac 0.01flag to validate the DAG before full runs
Troubleshooting
| Problem | Cause | Fix |
|---|---|---|
Pipeline fails at evaluate gate |
Model accuracy below threshold | Check data quality; lower threshold temporarily or retune hyperparameters |
| Airflow DAG not appearing | Python syntax error or wrong dags_folder
|
Run python airflow_dag.py directly to check for import errors |
| Artifacts directory fills up | No cleanup policy | Add max_artifacts: 5 in config and implement rotation in runner.py
|
| Stage takes too long | Large dataset with no sampling | Use sample_frac in config for development runs |
This is 1 of 10 resources in the ML Starter Kit toolkit. Get the complete [ML Pipeline Templates] with all files, templates, and documentation for $49.
Or grab the entire ML Starter Kit bundle (10 products) for $149 — save 30%.
Top comments (0)