π€ Training Pipeline Tutorial: Building Your ML Model Factory
π What You'll Learn
In this tutorial, you'll discover how to:
- π Build an automated ML training pipeline (your model factory)
- π Process raw data into ML-ready features (ingredient preparation)
- π§ͺ Experiment with different algorithms (recipe testing)
- π Evaluate and select the best models (quality control)
- π Deploy models to production (shipping finished products)
π€ Why Do We Need a Training Pipeline?
Imagine you're running a bakery that needs to:
- Convert raw ingredients (flour, eggs) into bread (trained models)
- Test different recipes (algorithms) to find the best one
- Ensure consistent quality (reproducible results)
- Scale production when demand increases
- Keep track of what works and what doesn't
A training pipeline is your automated bakery that turns raw data into high-quality ML models!
ποΈ Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Training Service Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β π Data β β π§ Feature β β π€ Model β β
β β Processing βββββΆβ Engineering βββββΆβ Training β β
β β β β β β β β
β β β’ Validation β β β’ Encoding β β β’ Algorithms β β
β β β’ Cleaning β β β’ Scaling β β β’ Tuning β β
β β β’ Splitting β β β’ Selection β β β’ Validation β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β π Logging β β π Evaluation β β π Model β β
β β & Tracking ββββββ & Metrics ββββββ Registry β β
β β β β β β β β
β β β’ MLflow β β β’ Performance β β β’ Staging β β
β β β’ Experiments β β β’ Validation β β β’ Production β β
β β β’ Artifacts β β β’ Comparison β β β’ Versioning β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Service Structure
services/training/
βββ π churn_mlops_pipeline.py # Main training orchestrator
βββ π Dockerfile # Container configuration
βββ π requirements.txt # Python dependencies
βββ π data_processing/
β βββ π __init__.py
β βββ π data_loader.py # Data ingestion utilities
β βββ π data_validator.py # Data quality checks
β βββ π preprocessor.py # Data cleaning & transformation
βββ π feature_engineering/
β βββ π __init__.py
β βββ π feature_builder.py # Feature creation logic
β βββ π encoders.py # Categorical encoding
β βββ π scalers.py # Numerical scaling
βββ π model_training/
β βββ π __init__.py
β βββ π trainers.py # Model training logic
β βββ π hyperparameter_tuning.py # HPO implementations
β βββ π model_registry.py # MLflow integration
βββ π evaluation/
β βββ π __init__.py
β βββ π metrics.py # Performance metrics
β βββ π validators.py # Model validation
β βββ π reports.py # Evaluation reports
βββ π config/
β βββ π training_config.py # Training parameters
β βββ π model_config.py # Model configurations
β βββ π pipeline_config.py # Pipeline settings
βββ π utils/
βββ π __init__.py
βββ π logging_utils.py # Logging configuration
βββ π storage_utils.py # Data storage utilities
βββ π monitoring_utils.py # Training monitoring
π Training Pipeline Implementation
Main Pipeline Orchestrator
# churn_mlops_pipeline.py
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
import xgboost as xgb
import lightgbm as lgb
import optuna
from datetime import datetime
import logging
import joblib
import os
from typing import Dict, Tuple, Any, List
# π Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/training.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ChurnTrainingPipeline:
"""
π€ Complete MLOps training pipeline for customer churn prediction
This class orchestrates the entire machine learning workflow from
data ingestion to model registration, ensuring reproducible and
scalable model development.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the training pipeline with configuration
Args:
config: Dictionary containing pipeline configuration
"""
self.config = config
self.mlflow_uri = config.get('mlflow_tracking_uri', 'http://localhost:5000')
self.experiment_name = config.get('experiment_name', 'churn-prediction-experiment')
self.model_name = config.get('model_name', 'churn-prediction')
# π§ Setup MLflow
mlflow.set_tracking_uri(self.mlflow_uri)
mlflow.set_experiment(self.experiment_name)
# π Initialize data containers
self.raw_data = None
self.processed_data = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
# π€ Model containers
self.models = {}
self.best_model = None
self.best_score = 0.0
logger.info("π Training pipeline initialized")
def run_complete_pipeline(self) -> str:
"""
π Execute the complete training pipeline
Returns:
str: Run ID of the best model in MLflow
"""
logger.info("π― Starting complete training pipeline")
try:
# 1οΈβ£ Data Processing Phase
logger.info("π Phase 1: Data Processing")
self.load_and_validate_data()
self.preprocess_data()
self.engineer_features()
self.split_data()
# 2οΈβ£ Model Training Phase
logger.info("π€ Phase 2: Model Training")
self.train_baseline_models()
self.hyperparameter_optimization()
# 3οΈβ£ Evaluation Phase
logger.info("π Phase 3: Model Evaluation")
best_run_id = self.evaluate_and_select_best_model()
# 4οΈβ£ Model Registration Phase
logger.info("π Phase 4: Model Registration")
self.register_best_model(best_run_id)
logger.info("β
Training pipeline completed successfully")
return best_run_id
except Exception as e:
logger.error(f"β Pipeline failed: {str(e)}")
raise
def load_and_validate_data(self):
"""π₯ Load and validate the training dataset"""
logger.info("π Loading training data...")
# Load data from configured source
data_path = self.config.get('data_path', 'data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv')
if not os.path.exists(data_path):
raise FileNotFoundError(f"Training data not found at: {data_path}")
self.raw_data = pd.read_csv(data_path)
logger.info(f"π Loaded {len(self.raw_data)} records")
# π Data validation
self._validate_data_quality()
# π Log data info to MLflow
with mlflow.start_run(run_name="data_validation") as run:
mlflow.log_param("dataset_size", len(self.raw_data))
mlflow.log_param("feature_count", len(self.raw_data.columns))
mlflow.log_param("data_path", data_path)
# Log data quality metrics
missing_values = self.raw_data.isnull().sum().sum()
mlflow.log_metric("missing_values_count", missing_values)
mlflow.log_metric("missing_values_percentage", missing_values / self.raw_data.size * 100)
def _validate_data_quality(self):
"""π Perform comprehensive data quality checks"""
logger.info("π Validating data quality...")
# Check for required columns
required_columns = [
'customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn'
]
missing_columns = set(required_columns) - set(self.raw_data.columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Check target variable distribution
churn_distribution = self.raw_data['Churn'].value_counts(normalize=True)
logger.info(f"π Churn distribution: {churn_distribution.to_dict()}")
# Check for data quality issues
total_charges_invalid = pd.to_numeric(self.raw_data['TotalCharges'], errors='coerce').isnull().sum()
if total_charges_invalid > 0:
logger.warning(f"β οΈ Found {total_charges_invalid} invalid TotalCharges values")
logger.info("β
Data quality validation completed")
def preprocess_data(self):
"""π§ Clean and preprocess the raw data"""
logger.info("π§ Preprocessing data...")
# Create a copy for processing
self.processed_data = self.raw_data.copy()
# π§Ή Handle TotalCharges conversion
self.processed_data['TotalCharges'] = pd.to_numeric(
self.processed_data['TotalCharges'], errors='coerce'
)
# Fill missing TotalCharges with 0 (new customers)
self.processed_data['TotalCharges'].fillna(0, inplace=True)
# π Convert target variable to binary
self.processed_data['Churn'] = (self.processed_data['Churn'] == 'Yes').astype(int)
# π§Ή Handle categorical variables with "No internet service" and "No phone service"
internet_cols = ['OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
'TechSupport', 'StreamingTV', 'StreamingMovies']
for col in internet_cols:
self.processed_data[col] = self.processed_data[col].replace('No internet service', 'No')
self.processed_data['MultipleLines'] = self.processed_data['MultipleLines'].replace('No phone service', 'No')
logger.info("β
Data preprocessing completed")
def engineer_features(self):
"""βοΈ Create additional features for improved model performance"""
logger.info("βοΈ Engineering features...")
# π° Financial features
self.processed_data['AvgMonthlyCharges'] = (
self.processed_data['TotalCharges'] / (self.processed_data['tenure'] + 1)
)
self.processed_data['ChargesRatio'] = (
self.processed_data['MonthlyCharges'] / (self.processed_data['TotalCharges'] + 1)
)
# π Service usage features
service_cols = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity',
'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies']
self.processed_data['TotalServices'] = (
self.processed_data[service_cols] == 'Yes'
).sum(axis=1)
# π₯ Customer profile features
self.processed_data['IsNewCustomer'] = (self.processed_data['tenure'] <= 6).astype(int)
self.processed_data['IsLongTermCustomer'] = (self.processed_data['tenure'] >= 60).astype(int)
# π³ Payment behavior features
self.processed_data['HasAutoPay'] = self.processed_data['PaymentMethod'].isin([
'Bank transfer (automatic)', 'Credit card (automatic)'
]).astype(int)
logger.info(f"β
Feature engineering completed. Total features: {len(self.processed_data.columns)}")
Data Processing & Feature Engineering
def split_data(self, test_size: float = 0.2, random_state: int = 42):
"""
π Split data into training and testing sets with proper preprocessing
"""
logger.info("π Splitting data into train/test sets...")
# Remove identifier and target columns
feature_columns = [col for col in self.processed_data.columns
if col not in ['customerID', 'Churn']]
X = self.processed_data[feature_columns].copy()
y = self.processed_data['Churn'].copy()
# π Stratified split to maintain target distribution
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=y
)
# π·οΈ Encode categorical variables
self._encode_categorical_features()
# π Scale numerical features
self._scale_numerical_features()
logger.info(f"π Data split completed:")
logger.info(f" Training set: {len(self.X_train)} samples")
logger.info(f" Test set: {len(self.X_test)} samples")
logger.info(f" Features: {self.X_train.shape[1]}")
def _encode_categorical_features(self):
"""π·οΈ Encode categorical features using appropriate encoders"""
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import joblib
# Identify categorical columns
categorical_columns = self.X_train.select_dtypes(include=['object']).columns.tolist()
if not categorical_columns:
logger.info("π·οΈ No categorical features to encode")
return
logger.info(f"π·οΈ Encoding categorical features: {categorical_columns}")
# Use Label Encoding for binary and ordinal features
label_encoder = LabelEncoder()
# Fit encoder on training data and transform both sets
for col in categorical_columns:
# Fit on training data
self.X_train[col] = label_encoder.fit_transform(self.X_train[col].astype(str))
# Transform test data, handling unseen categories
test_values = self.X_test[col].astype(str)
# Handle unseen categories by replacing with most frequent training value
unseen_mask = ~test_values.isin(label_encoder.classes_)
if unseen_mask.any():
most_frequent = self.X_train[col].mode().iloc[0]
test_values.loc[unseen_mask] = label_encoder.inverse_transform([most_frequent])[0]
self.X_test[col] = label_encoder.transform(test_values)
# Save encoder for later use
os.makedirs('artifacts', exist_ok=True)
joblib.dump(label_encoder, 'artifacts/encoder.pkl')
logger.info("β
Categorical encoding completed")
def _scale_numerical_features(self):
"""π Scale numerical features using StandardScaler"""
from sklearn.preprocessing import StandardScaler
import joblib
# Identify numerical columns
numerical_columns = self.X_train.select_dtypes(include=[np.number]).columns.tolist()
if not numerical_columns:
logger.info("π No numerical features to scale")
return
logger.info(f"π Scaling numerical features: {numerical_columns}")
# Fit scaler on training data
scaler = StandardScaler()
self.X_train[numerical_columns] = scaler.fit_transform(self.X_train[numerical_columns])
self.X_test[numerical_columns] = scaler.transform(self.X_test[numerical_columns])
# Save scaler for later use
os.makedirs('artifacts', exist_ok=True)
joblib.dump(scaler, 'artifacts/scaler.pkl')
logger.info("β
Feature scaling completed")
Model Training & Hyperparameter Optimization
def train_baseline_models(self):
"""π€ Train multiple baseline models for comparison"""
logger.info("π€ Training baseline models...")
# Define baseline models
baseline_models = {
'logistic_regression': LogisticRegression(random_state=42, max_iter=1000),
'random_forest': RandomForestClassifier(random_state=42, n_estimators=100),
'gradient_boosting': GradientBoostingClassifier(random_state=42),
'xgboost': xgb.XGBClassifier(random_state=42, eval_metric='logloss'),
'lightgbm': lgb.LGBMClassifier(random_state=42, verbose=-1)
}
# Train and evaluate each model
for name, model in baseline_models.items():
logger.info(f"π Training {name}...")
with mlflow.start_run(run_name=f"baseline_{name}") as run:
# Train model
model.fit(self.X_train, self.y_train)
# Make predictions
y_pred = model.predict(self.X_test)
y_pred_proba = model.predict_proba(self.X_test)[:, 1]
# Calculate metrics
roc_auc = roc_auc_score(self.y_test, y_pred_proba)
# Cross-validation score
cv_scores = cross_val_score(model, self.X_train, self.y_train,
cv=5, scoring='roc_auc')
cv_mean = cv_scores.mean()
cv_std = cv_scores.std()
# Log parameters and metrics
mlflow.log_param("model_type", name)
mlflow.log_param("model_class", model.__class__.__name__)
mlflow.log_metric("roc_auc", roc_auc)
mlflow.log_metric("cv_roc_auc_mean", cv_mean)
mlflow.log_metric("cv_roc_auc_std", cv_std)
# Log model
mlflow.sklearn.log_model(model, f"model_{name}")
# Store model for comparison
self.models[name] = {
'model': model,
'roc_auc': roc_auc,
'cv_score': cv_mean,
'run_id': run.info.run_id
}
logger.info(f"β
{name}: ROC-AUC = {roc_auc:.4f}, CV = {cv_mean:.4f} Β± {cv_std:.4f}")
def hyperparameter_optimization(self):
"""π Optimize hyperparameters for the best performing models"""
logger.info("π Starting hyperparameter optimization...")
# Select top 2 models for optimization
sorted_models = sorted(self.models.items(),
key=lambda x: x[1]['roc_auc'], reverse=True)
top_models = sorted_models[:2]
logger.info(f"π― Optimizing top models: {[name for name, _ in top_models]}")
for model_name, model_info in top_models:
logger.info(f"π§ Optimizing {model_name}...")
# Define optimization objective
def objective(trial):
params = self._get_optimization_params(model_name, trial)
if model_name == 'random_forest':
model = RandomForestClassifier(**params, random_state=42)
elif model_name == 'xgboost':
model = xgb.XGBClassifier(**params, random_state=42, eval_metric='logloss')
elif model_name == 'lightgbm':
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
elif model_name == 'gradient_boosting':
model = GradientBoostingClassifier(**params, random_state=42)
else:
# For logistic regression or other models
model = LogisticRegression(**params, random_state=42, max_iter=1000)
# Cross-validation
cv_scores = cross_val_score(model, self.X_train, self.y_train,
cv=5, scoring='roc_auc')
return cv_scores.mean()
# Run optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, timeout=300) # 5 minutes max per model
# Train best model
best_params = study.best_params
best_model = self._create_model_with_params(model_name, best_params)
best_model.fit(self.X_train, self.y_train)
# Evaluate optimized model
y_pred_proba = best_model.predict_proba(self.X_test)[:, 1]
roc_auc = roc_auc_score(self.y_test, y_pred_proba)
# Log optimized model
with mlflow.start_run(run_name=f"optimized_{model_name}") as run:
mlflow.log_param("model_type", f"{model_name}_optimized")
mlflow.log_param("optimization_trials", len(study.trials))
mlflow.log_param("best_trial", study.best_trial.number)
# Log best parameters
for param, value in best_params.items():
mlflow.log_param(f"best_{param}", value)
mlflow.log_metric("roc_auc", roc_auc)
mlflow.log_metric("improvement_over_baseline",
roc_auc - model_info['roc_auc'])
# Log model
mlflow.sklearn.log_model(best_model, f"model_optimized_{model_name}")
# Update models dictionary
self.models[f"{model_name}_optimized"] = {
'model': best_model,
'roc_auc': roc_auc,
'cv_score': study.best_value,
'run_id': run.info.run_id,
'params': best_params
}
logger.info(f"β
{model_name} optimization completed: ROC-AUC = {roc_auc:.4f}")
def _get_optimization_params(self, model_name: str, trial):
"""ποΈ Define hyperparameter search spaces for different models"""
if model_name == 'random_forest':
return {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
elif model_name == 'xgboost':
return {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 1)
}
elif model_name == 'lightgbm':
return {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 15),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'num_leaves': trial.suggest_int('num_leaves', 10, 300),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 1)
}
elif model_name == 'gradient_boosting':
return {
'n_estimators': trial.suggest_int('n_estimators', 50, 200),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
else: # logistic_regression
return {
'C': trial.suggest_float('C', 0.001, 100, log=True),
'penalty': trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet']),
'solver': trial.suggest_categorical('solver', ['liblinear', 'saga']),
'l1_ratio': trial.suggest_float('l1_ratio', 0, 1) if trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet']) == 'elasticnet' else None
}
def _create_model_with_params(self, model_name: str, params: dict):
"""ποΈ Create model instance with optimized parameters"""
if model_name == 'random_forest':
return RandomForestClassifier(**params, random_state=42)
elif model_name == 'xgboost':
return xgb.XGBClassifier(**params, random_state=42, eval_metric='logloss')
elif model_name == 'lightgbm':
return lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
elif model_name == 'gradient_boosting':
return GradientBoostingClassifier(**params, random_state=42)
else:
return LogisticRegression(**params, random_state=42, max_iter=1000)
mlopszoomcamp
This first part covers the core training pipeline architecture, data processing, and model training components. The second part will focus on evaluation, model registry, deployment automation, and monitoring integration.
Top comments (0)