Most machine learning models never make it to production. The gap between a Jupyter notebook experiment and a reliable, monitored production service is enormous. That gap is what MLOps exists to close.
This guide walks through building a complete MLOps pipeline — from experiment tracking to model serving and monitoring — with practical code you can adapt to your own stack.
The MLOps Maturity Model
Before building, understand where you are:
| Level | Description | Characteristics |
|---|---|---|
| 0 | Manual | Notebooks, manual deployment, no versioning |
| 1 | ML Pipeline | Automated training, experiment tracking |
| 2 | CI/CD for ML | Automated testing, deployment pipelines |
| 3 | Full MLOps | Automated retraining, monitoring, feedback loops |
Most teams are at Level 0 or 1. This guide gets you to Level 2 and points toward Level 3.
Architecture Overview
┌───────────────────────────────────────────────────────────┐
│ MLOps Pipeline │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Feature │ │ Training │ │ Model │ │ Model │ │
│ │ Store │→ │ Pipeline │→ │ Registry │→ │ Serving │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ↑ ↑ ↑ │ │
│ │ │ │ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Data │ │Experiment│ │Validation│ │Monitoring│ │
│ │ Source │ │ Tracking │ │ Gates │ │& Alerts │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└───────────────────────────────────────────────────────────┘
Step 1: Experiment Tracking with MLflow
Experiment tracking is the foundation. If you can't reproduce an experiment, you can't debug or improve it.
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)
# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")
def train_model(
data_path: str,
hyperparams: dict,
experiment_name: str = "customer-churn-prediction",
) -> str:
"""Train a model with full experiment tracking."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"gbm-{hyperparams.get('n_estimators', 100)}") as run:
# Log parameters
mlflow.log_params(hyperparams)
mlflow.log_param("data_path", data_path)
# Load and prepare data
df = pd.read_parquet(data_path)
X = df.drop(columns=["churn", "customer_id"])
y = df["churn"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Log dataset info
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
mlflow.log_param("feature_count", X_train.shape[1])
mlflow.log_param("positive_rate", float(y.mean()))
# Train
model = GradientBoostingClassifier(**hyperparams)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
"auc_roc": roc_auc_score(y_test, y_proba),
}
mlflow.log_metrics(metrics)
# Log feature importance
importance_df = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_,
}).sort_values("importance", ascending=False)
importance_df.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Log model with signature
from mlflow.models import infer_signature
signature = infer_signature(X_test, y_pred)
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
registered_model_name="churn-classifier",
)
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
return run.info.run_id
Step 2: Feature Store Pattern
A feature store ensures consistent features between training and serving. Here's a lightweight implementation:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import pandas as pd
import hashlib
import json
@dataclass
class FeatureDefinition:
name: str
description: str
dtype: str
transform_fn: str # Reference to transformation function
source_table: str
version: str = "1.0"
class SimpleFeatureStore:
"""Lightweight feature store for consistent feature engineering."""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.registry: dict[str, FeatureDefinition] = {}
def register_feature_group(
self, group_name: str, features: list[FeatureDefinition]
):
"""Register a group of related features."""
for feature in features:
key = f"{group_name}.{feature.name}"
self.registry[key] = feature
def compute_features(
self,
group_name: str,
entity_df: pd.DataFrame,
transforms: dict,
) -> pd.DataFrame:
"""Compute features for given entities."""
result = entity_df.copy()
for key, feature_def in self.registry.items():
if not key.startswith(group_name):
continue
transform = transforms.get(feature_def.transform_fn)
if transform:
result[feature_def.name] = transform(result)
# Cache computed features with version hash
cache_key = self._compute_hash(group_name, entity_df)
cache_path = f"{self.storage_path}/{group_name}/{cache_key}.parquet"
result.to_parquet(cache_path)
return result
def get_training_features(
self, group_name: str, entity_df: pd.DataFrame
) -> pd.DataFrame:
"""Get features for training (point-in-time correct)."""
# In production, this would do point-in-time joins
# to prevent data leakage
cache_key = self._compute_hash(group_name, entity_df)
cache_path = f"{self.storage_path}/{group_name}/{cache_key}.parquet"
try:
return pd.read_parquet(cache_path)
except FileNotFoundError:
raise ValueError(
f"Features not computed for {group_name}. "
"Run compute_features first."
)
def get_serving_features(
self, group_name: str, entity_ids: list
) -> pd.DataFrame:
"""Get latest features for real-time serving."""
# In production, this reads from a low-latency store (Redis)
latest_path = f"{self.storage_path}/{group_name}/latest.parquet"
df = pd.read_parquet(latest_path)
return df[df["entity_id"].isin(entity_ids)]
def _compute_hash(
self, group_name: str, entity_df: pd.DataFrame
) -> str:
content = f"{group_name}_{len(entity_df)}_{entity_df.columns.tolist()}"
return hashlib.md5(content.encode()).hexdigest()[:12]
# Usage
store = SimpleFeatureStore("/data/feature_store")
# Define features
customer_features = [
FeatureDefinition(
name="total_purchases_30d",
description="Total purchase amount in last 30 days",
dtype="float64",
transform_fn="compute_purchases_30d",
source_table="transactions",
),
FeatureDefinition(
name="login_frequency_7d",
description="Number of logins in last 7 days",
dtype="int64",
transform_fn="compute_login_freq_7d",
source_table="user_events",
),
FeatureDefinition(
name="support_tickets_90d",
description="Support tickets opened in last 90 days",
dtype="int64",
transform_fn="compute_support_tickets",
source_table="support_tickets",
),
]
store.register_feature_group("customer", customer_features)
Step 3: Model Validation Gates
Never deploy a model without automated validation. Here's a validation framework:
from dataclasses import dataclass
from enum import Enum
class ValidationResult(Enum):
PASS = "pass"
WARN = "warn"
FAIL = "fail"
@dataclass
class ValidationCheck:
name: str
result: ValidationResult
actual_value: float
threshold: float
message: str
class ModelValidator:
"""Automated model validation before deployment."""
def __init__(
self,
min_accuracy: float = 0.80,
min_auc: float = 0.75,
max_latency_ms: float = 100,
min_coverage: float = 0.95,
):
self.min_accuracy = min_accuracy
self.min_auc = min_auc
self.max_latency_ms = max_latency_ms
self.min_coverage = min_coverage
self.checks: list[ValidationCheck] = []
def validate_performance(self, metrics: dict) -> bool:
"""Check model performance against thresholds."""
checks = [
("accuracy", metrics.get("accuracy", 0), self.min_accuracy),
("auc_roc", metrics.get("auc_roc", 0), self.min_auc),
]
all_pass = True
for name, actual, threshold in checks:
if actual >= threshold:
result = ValidationResult.PASS
elif actual >= threshold * 0.95: # Within 5%
result = ValidationResult.WARN
else:
result = ValidationResult.FAIL
all_pass = False
self.checks.append(ValidationCheck(
name=f"performance_{name}",
result=result,
actual_value=actual,
threshold=threshold,
message=f"{name}: {actual:.4f} (threshold: {threshold})"
))
return all_pass
def validate_regression(
self, current_metrics: dict, baseline_metrics: dict,
max_degradation: float = 0.02,
) -> bool:
"""Check that new model doesn't regress vs. baseline."""
all_pass = True
for metric_name in ["accuracy", "f1", "auc_roc"]:
current = current_metrics.get(metric_name, 0)
baseline = baseline_metrics.get(metric_name, 0)
degradation = baseline - current
if degradation <= 0:
result = ValidationResult.PASS
elif degradation <= max_degradation:
result = ValidationResult.WARN
else:
result = ValidationResult.FAIL
all_pass = False
self.checks.append(ValidationCheck(
name=f"regression_{metric_name}",
result=result,
actual_value=current,
threshold=baseline - max_degradation,
message=(
f"{metric_name}: current={current:.4f}, "
f"baseline={baseline:.4f}, "
f"degradation={degradation:.4f}"
),
))
return all_pass
def validate_latency(self, latency_p50: float, latency_p99: float) -> bool:
"""Check inference latency requirements."""
p99_pass = latency_p99 <= self.max_latency_ms
self.checks.append(ValidationCheck(
name="latency_p99",
result=ValidationResult.PASS if p99_pass else ValidationResult.FAIL,
actual_value=latency_p99,
threshold=self.max_latency_ms,
message=f"p99 latency: {latency_p99:.1f}ms (max: {self.max_latency_ms}ms)",
))
return p99_pass
def validate_data_drift(
self,
training_stats: dict,
serving_stats: dict,
max_psi: float = 0.2,
) -> bool:
"""Check for Population Stability Index (PSI) drift."""
# Simplified PSI calculation
all_pass = True
for feature in training_stats:
if feature in serving_stats:
train_mean = training_stats[feature]["mean"]
serve_mean = serving_stats[feature]["mean"]
drift = abs(train_mean - serve_mean) / (abs(train_mean) + 1e-8)
if drift > max_psi:
all_pass = False
self.checks.append(ValidationCheck(
name=f"drift_{feature}",
result=(
ValidationResult.PASS if drift <= max_psi * 0.5
else ValidationResult.WARN if drift <= max_psi
else ValidationResult.FAIL
),
actual_value=drift,
threshold=max_psi,
message=f"{feature} drift: {drift:.4f} (max PSI: {max_psi})",
))
return all_pass
def generate_report(self) -> str:
"""Generate validation report."""
lines = ["=" * 60, "MODEL VALIDATION REPORT", "=" * 60]
for check in self.checks:
icon = {
ValidationResult.PASS: "PASS",
ValidationResult.WARN: "WARN",
ValidationResult.FAIL: "FAIL",
}[check.result]
lines.append(f" [{icon}] {check.message}")
passed = sum(1 for c in self.checks if c.result == ValidationResult.PASS)
warned = sum(1 for c in self.checks if c.result == ValidationResult.WARN)
failed = sum(1 for c in self.checks if c.result == ValidationResult.FAIL)
lines.append("=" * 60)
lines.append(
f"Results: {passed} passed, {warned} warnings, {failed} failed"
)
lines.append(
f"Overall: {'APPROVED' if failed == 0 else 'REJECTED'}"
)
lines.append("=" * 60)
return "\n".join(lines)
Step 4: Model Serving with FastAPI
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
class PredictionRequest(BaseModel):
customer_id: str
features: dict[str, float]
class PredictionResponse(BaseModel):
customer_id: str
prediction: int
probability: float
model_version: str
latency_ms: float
# Global model storage
model_store = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load model at startup
model_uri = "models:/churn-classifier/Production"
model_store["model"] = mlflow.sklearn.load_model(model_uri)
model_store["version"] = "production"
yield
model_store.clear()
app = FastAPI(title="Churn Prediction API", lifespan=lifespan)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
start_time = time.time()
model = model_store.get("model")
if not model:
raise HTTPException(status_code=503, detail="Model not loaded")
# Create feature DataFrame
features_df = pd.DataFrame([request.features])
# Predict
prediction = int(model.predict(features_df)[0])
probability = float(model.predict_proba(features_df)[0][1])
latency_ms = (time.time() - start_time) * 1000
return PredictionResponse(
customer_id=request.customer_id,
prediction=prediction,
probability=probability,
model_version=model_store["version"],
latency_ms=round(latency_ms, 2),
)
@app.get("/health")
async def health():
return {
"status": "healthy",
"model_loaded": "model" in model_store,
"model_version": model_store.get("version", "none"),
}
Step 5: Model Monitoring
The model is deployed. Now you need to know when it degrades.
from collections import defaultdict
from datetime import datetime, timedelta
import numpy as np
class ModelMonitor:
"""Monitor model performance and data drift in production."""
def __init__(self, model_name: str, alert_callback=None):
self.model_name = model_name
self.predictions: list[dict] = []
self.alert_callback = alert_callback
def log_prediction(
self,
features: dict,
prediction: int,
probability: float,
actual: int | None = None,
latency_ms: float = 0,
):
"""Log a prediction for monitoring."""
self.predictions.append({
"timestamp": datetime.utcnow(),
"features": features,
"prediction": prediction,
"probability": probability,
"actual": actual,
"latency_ms": latency_ms,
})
def compute_metrics(
self, window_hours: int = 24
) -> dict:
"""Compute monitoring metrics over a time window."""
cutoff = datetime.utcnow() - timedelta(hours=window_hours)
recent = [
p for p in self.predictions
if p["timestamp"] > cutoff
]
if not recent:
return {"error": "No predictions in window"}
# Prediction distribution
predictions = [p["prediction"] for p in recent]
probabilities = [p["probability"] for p in recent]
metrics = {
"prediction_count": len(recent),
"positive_rate": np.mean(predictions),
"avg_probability": np.mean(probabilities),
"probability_std": np.std(probabilities),
"avg_latency_ms": np.mean([p["latency_ms"] for p in recent]),
"p99_latency_ms": np.percentile(
[p["latency_ms"] for p in recent], 99
),
}
# If we have ground truth, compute accuracy
labeled = [p for p in recent if p["actual"] is not None]
if labeled:
correct = sum(
1 for p in labeled if p["prediction"] == p["actual"]
)
metrics["accuracy"] = correct / len(labeled)
metrics["labeled_count"] = len(labeled)
return metrics
def check_alerts(self, metrics: dict):
"""Check if any metrics breach alert thresholds."""
alerts = []
# Prediction rate anomaly
if metrics.get("positive_rate", 0) > 0.5:
alerts.append(
f"High positive prediction rate: "
f"{metrics['positive_rate']:.2%}"
)
# Latency spike
if metrics.get("p99_latency_ms", 0) > 200:
alerts.append(
f"High p99 latency: "
f"{metrics['p99_latency_ms']:.0f}ms"
)
# Accuracy drop (if labeled data available)
if metrics.get("accuracy", 1.0) < 0.75:
alerts.append(
f"Low accuracy: {metrics['accuracy']:.2%}"
)
if alerts and self.alert_callback:
for alert in alerts:
self.alert_callback(self.model_name, alert)
return alerts
CI/CD Pipeline for ML
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
paths:
- 'ml/**'
- 'features/**'
schedule:
- cron: '0 6 * * 1' # Weekly retraining
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install -r requirements.txt
- run: pytest ml/tests/ -v
train:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install -r requirements.txt
- name: Train Model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: python ml/train.py --config ml/configs/production.yaml
validate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Validation
run: python ml/validate.py --run-id ${{ needs.train.outputs.run_id }}
- name: Check Results
run: |
if [ "$(cat validation_result.txt)" != "APPROVED" ]; then
echo "Model validation failed"
exit 1
fi
deploy:
needs: validate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: Promote Model
run: |
python ml/promote.py \
--run-id ${{ needs.train.outputs.run_id }} \
--stage Production
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/churn-model \
model=ghcr.io/myorg/churn-model:${{ github.sha }}
Summary
A production MLOps pipeline has these components:
| Component | Purpose | Tools |
|---|---|---|
| Experiment Tracking | Reproducibility | MLflow, W&B |
| Feature Store | Consistent features | Feast, custom |
| Training Pipeline | Automated training | Airflow, GitHub Actions |
| Model Registry | Version management | MLflow, SageMaker |
| Validation Gates | Quality assurance | Custom checks |
| Model Serving | Inference API | FastAPI, Triton |
| Monitoring | Drift detection | Custom, Evidently |
Start with experiment tracking and model serving. Add the rest iteratively as your models mature.
Get Production MLOps Templates
Building MLOps infrastructure from scratch takes months. The MLOps Pipeline Templates from ML Engineer Toolkit give you end-to-end pipeline templates with feature engineering, training, evaluation, and deployment stages — ready to customize.
The full ML Engineer Toolkit collection includes 11 production-tested tools: model serving, feature stores, experiment tracking, monitoring dashboards, NLP kits, and GPU optimization guides.
Use code LAUNCH40 for 40% off, or STUDENT for 50% off.
Top comments (0)