DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

part_3_customer_churn_prediction_mlopszoomcamp

πŸ€– Training Pipeline Tutorial: Building Your ML Model Factory

πŸ“‹ What You'll Learn

In this tutorial, you'll discover how to:

  • 🏭 Build an automated ML training pipeline (your model factory)
  • πŸ“Š Process raw data into ML-ready features (ingredient preparation)
  • πŸ§ͺ Experiment with different algorithms (recipe testing)
  • πŸ“ˆ Evaluate and select the best models (quality control)
  • πŸš€ Deploy models to production (shipping finished products)

πŸ€” Why Do We Need a Training Pipeline?

Imagine you're running a bakery that needs to:

  • Convert raw ingredients (flour, eggs) into bread (trained models)
  • Test different recipes (algorithms) to find the best one
  • Ensure consistent quality (reproducible results)
  • Scale production when demand increases
  • Keep track of what works and what doesn't

A training pipeline is your automated bakery that turns raw data into high-quality ML models!

πŸ›οΈ Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Training Service Architecture               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚   πŸ“Š Data       β”‚    β”‚   πŸ”§ Feature    β”‚    β”‚  πŸ€– Model    β”‚ β”‚
β”‚  β”‚   Processing    │───▢│   Engineering   │───▢│  Training    β”‚ β”‚
β”‚  β”‚                 β”‚    β”‚                 β”‚    β”‚              β”‚ β”‚
β”‚  β”‚ β€’ Validation    β”‚    β”‚ β€’ Encoding      β”‚    β”‚ β€’ Algorithms β”‚ β”‚
β”‚  β”‚ β€’ Cleaning      β”‚    β”‚ β€’ Scaling       β”‚    β”‚ β€’ Tuning     β”‚ β”‚
β”‚  β”‚ β€’ Splitting     β”‚    β”‚ β€’ Selection     β”‚    β”‚ β€’ Validation β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚           β”‚                       β”‚                      β”‚       β”‚
β”‚           β–Ό                       β–Ό                      β–Ό       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚   πŸ“ Logging    β”‚    β”‚   πŸ“Š Evaluation β”‚    β”‚  πŸš€ Model    β”‚ β”‚
β”‚  β”‚   & Tracking    │◄───│   & Metrics     │◄───│  Registry    β”‚ β”‚
β”‚  β”‚                 β”‚    β”‚                 β”‚    β”‚              β”‚ β”‚
β”‚  β”‚ β€’ MLflow        β”‚    β”‚ β€’ Performance   β”‚    β”‚ β€’ Staging    β”‚ β”‚
β”‚  β”‚ β€’ Experiments   β”‚    β”‚ β€’ Validation    β”‚    β”‚ β€’ Production β”‚ β”‚
β”‚  β”‚ β€’ Artifacts     β”‚    β”‚ β€’ Comparison    β”‚    β”‚ β€’ Versioning β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

πŸ“ Service Structure

services/training/
β”œβ”€β”€ πŸ“„ churn_mlops_pipeline.py      # Main training orchestrator
β”œβ”€β”€ πŸ“„ Dockerfile                  # Container configuration
β”œβ”€β”€ πŸ“„ requirements.txt             # Python dependencies
β”œβ”€β”€ πŸ“‚ data_processing/
β”‚   β”œβ”€β”€ πŸ“„ __init__.py
β”‚   β”œβ”€β”€ πŸ“„ data_loader.py          # Data ingestion utilities
β”‚   β”œβ”€β”€ πŸ“„ data_validator.py       # Data quality checks
β”‚   └── πŸ“„ preprocessor.py         # Data cleaning & transformation
β”œβ”€β”€ πŸ“‚ feature_engineering/
β”‚   β”œβ”€β”€ πŸ“„ __init__.py
β”‚   β”œβ”€β”€ πŸ“„ feature_builder.py      # Feature creation logic
β”‚   β”œβ”€β”€ πŸ“„ encoders.py             # Categorical encoding
β”‚   └── πŸ“„ scalers.py              # Numerical scaling
β”œβ”€β”€ πŸ“‚ model_training/
β”‚   β”œβ”€β”€ πŸ“„ __init__.py
β”‚   β”œβ”€β”€ πŸ“„ trainers.py             # Model training logic
β”‚   β”œβ”€β”€ πŸ“„ hyperparameter_tuning.py # HPO implementations
β”‚   └── πŸ“„ model_registry.py       # MLflow integration
β”œβ”€β”€ πŸ“‚ evaluation/
β”‚   β”œβ”€β”€ πŸ“„ __init__.py
β”‚   β”œβ”€β”€ πŸ“„ metrics.py              # Performance metrics
β”‚   β”œβ”€β”€ πŸ“„ validators.py           # Model validation
β”‚   └── πŸ“„ reports.py              # Evaluation reports
β”œβ”€β”€ πŸ“‚ config/
β”‚   β”œβ”€β”€ πŸ“„ training_config.py      # Training parameters
β”‚   β”œβ”€β”€ πŸ“„ model_config.py         # Model configurations
β”‚   └── πŸ“„ pipeline_config.py      # Pipeline settings
└── πŸ“‚ utils/
    β”œβ”€β”€ πŸ“„ __init__.py
    β”œβ”€β”€ πŸ“„ logging_utils.py        # Logging configuration
    β”œβ”€β”€ πŸ“„ storage_utils.py        # Data storage utilities
    └── πŸ“„ monitoring_utils.py     # Training monitoring
Enter fullscreen mode Exit fullscreen mode

πŸ”„ Training Pipeline Implementation

Main Pipeline Orchestrator

# churn_mlops_pipeline.py
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
import xgboost as xgb
import lightgbm as lgb
import optuna
from datetime import datetime
import logging
import joblib
import os
from typing import Dict, Tuple, Any, List

# πŸ“Š Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/training.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class ChurnTrainingPipeline:
    """
    πŸ€– Complete MLOps training pipeline for customer churn prediction

    This class orchestrates the entire machine learning workflow from
    data ingestion to model registration, ensuring reproducible and
    scalable model development.
    """

    def __init__(self, config: Dict[str, Any]):
        """
        Initialize the training pipeline with configuration

        Args:
            config: Dictionary containing pipeline configuration
        """
        self.config = config
        self.mlflow_uri = config.get('mlflow_tracking_uri', 'http://localhost:5000')
        self.experiment_name = config.get('experiment_name', 'churn-prediction-experiment')
        self.model_name = config.get('model_name', 'churn-prediction')

        # πŸ”§ Setup MLflow
        mlflow.set_tracking_uri(self.mlflow_uri)
        mlflow.set_experiment(self.experiment_name)

        # πŸ“Š Initialize data containers
        self.raw_data = None
        self.processed_data = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

        # πŸ€– Model containers
        self.models = {}
        self.best_model = None
        self.best_score = 0.0

        logger.info("πŸš€ Training pipeline initialized")

    def run_complete_pipeline(self) -> str:
        """
        πŸ”„ Execute the complete training pipeline

        Returns:
            str: Run ID of the best model in MLflow
        """

        logger.info("🎯 Starting complete training pipeline")

        try:
            # 1️⃣ Data Processing Phase
            logger.info("πŸ“Š Phase 1: Data Processing")
            self.load_and_validate_data()
            self.preprocess_data()
            self.engineer_features()
            self.split_data()

            # 2️⃣ Model Training Phase
            logger.info("πŸ€– Phase 2: Model Training")
            self.train_baseline_models()
            self.hyperparameter_optimization()

            # 3️⃣ Evaluation Phase
            logger.info("πŸ“Š Phase 3: Model Evaluation")
            best_run_id = self.evaluate_and_select_best_model()

            # 4️⃣ Model Registration Phase
            logger.info("πŸš€ Phase 4: Model Registration")
            self.register_best_model(best_run_id)

            logger.info("βœ… Training pipeline completed successfully")
            return best_run_id

        except Exception as e:
            logger.error(f"❌ Pipeline failed: {str(e)}")
            raise

    def load_and_validate_data(self):
        """πŸ“₯ Load and validate the training dataset"""

        logger.info("πŸ“‚ Loading training data...")

        # Load data from configured source
        data_path = self.config.get('data_path', 'data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv')

        if not os.path.exists(data_path):
            raise FileNotFoundError(f"Training data not found at: {data_path}")

        self.raw_data = pd.read_csv(data_path)
        logger.info(f"πŸ“Š Loaded {len(self.raw_data)} records")

        # πŸ” Data validation
        self._validate_data_quality()

        # πŸ“ Log data info to MLflow
        with mlflow.start_run(run_name="data_validation") as run:
            mlflow.log_param("dataset_size", len(self.raw_data))
            mlflow.log_param("feature_count", len(self.raw_data.columns))
            mlflow.log_param("data_path", data_path)

            # Log data quality metrics
            missing_values = self.raw_data.isnull().sum().sum()
            mlflow.log_metric("missing_values_count", missing_values)
            mlflow.log_metric("missing_values_percentage", missing_values / self.raw_data.size * 100)

    def _validate_data_quality(self):
        """πŸ” Perform comprehensive data quality checks"""

        logger.info("πŸ” Validating data quality...")

        # Check for required columns
        required_columns = [
            'customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
            'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
            'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
            'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
            'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn'
        ]

        missing_columns = set(required_columns) - set(self.raw_data.columns)
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")

        # Check target variable distribution
        churn_distribution = self.raw_data['Churn'].value_counts(normalize=True)
        logger.info(f"πŸ“Š Churn distribution: {churn_distribution.to_dict()}")

        # Check for data quality issues
        total_charges_invalid = pd.to_numeric(self.raw_data['TotalCharges'], errors='coerce').isnull().sum()
        if total_charges_invalid > 0:
            logger.warning(f"⚠️ Found {total_charges_invalid} invalid TotalCharges values")

        logger.info("βœ… Data quality validation completed")

    def preprocess_data(self):
        """πŸ”§ Clean and preprocess the raw data"""

        logger.info("πŸ”§ Preprocessing data...")

        # Create a copy for processing
        self.processed_data = self.raw_data.copy()

        # 🧹 Handle TotalCharges conversion
        self.processed_data['TotalCharges'] = pd.to_numeric(
            self.processed_data['TotalCharges'], errors='coerce'
        )

        # Fill missing TotalCharges with 0 (new customers)
        self.processed_data['TotalCharges'].fillna(0, inplace=True)

        # πŸ”„ Convert target variable to binary
        self.processed_data['Churn'] = (self.processed_data['Churn'] == 'Yes').astype(int)

        # 🧹 Handle categorical variables with "No internet service" and "No phone service"
        internet_cols = ['OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 
                        'TechSupport', 'StreamingTV', 'StreamingMovies']

        for col in internet_cols:
            self.processed_data[col] = self.processed_data[col].replace('No internet service', 'No')

        self.processed_data['MultipleLines'] = self.processed_data['MultipleLines'].replace('No phone service', 'No')

        logger.info("βœ… Data preprocessing completed")

    def engineer_features(self):
        """βš™οΈ Create additional features for improved model performance"""

        logger.info("βš™οΈ Engineering features...")

        # πŸ’° Financial features
        self.processed_data['AvgMonthlyCharges'] = (
            self.processed_data['TotalCharges'] / (self.processed_data['tenure'] + 1)
        )

        self.processed_data['ChargesRatio'] = (
            self.processed_data['MonthlyCharges'] / (self.processed_data['TotalCharges'] + 1)
        )

        # πŸ“Š Service usage features
        service_cols = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity',
                       'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies']

        self.processed_data['TotalServices'] = (
            self.processed_data[service_cols] == 'Yes'
        ).sum(axis=1)

        # πŸ‘₯ Customer profile features
        self.processed_data['IsNewCustomer'] = (self.processed_data['tenure'] <= 6).astype(int)
        self.processed_data['IsLongTermCustomer'] = (self.processed_data['tenure'] >= 60).astype(int)

        # πŸ’³ Payment behavior features
        self.processed_data['HasAutoPay'] = self.processed_data['PaymentMethod'].isin([
            'Bank transfer (automatic)', 'Credit card (automatic)'
        ]).astype(int)

        logger.info(f"βœ… Feature engineering completed. Total features: {len(self.processed_data.columns)}")
Enter fullscreen mode Exit fullscreen mode

Data Processing & Feature Engineering

def split_data(self, test_size: float = 0.2, random_state: int = 42):
    """
    πŸ“Š Split data into training and testing sets with proper preprocessing
    """

    logger.info("πŸ“Š Splitting data into train/test sets...")

    # Remove identifier and target columns
    feature_columns = [col for col in self.processed_data.columns 
                      if col not in ['customerID', 'Churn']]

    X = self.processed_data[feature_columns].copy()
    y = self.processed_data['Churn'].copy()

    # πŸ”€ Stratified split to maintain target distribution
    self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y
    )

    # 🏷️ Encode categorical variables
    self._encode_categorical_features()

    # πŸ“ Scale numerical features
    self._scale_numerical_features()

    logger.info(f"πŸ“Š Data split completed:")
    logger.info(f"   Training set: {len(self.X_train)} samples")
    logger.info(f"   Test set: {len(self.X_test)} samples")
    logger.info(f"   Features: {self.X_train.shape[1]}")

def _encode_categorical_features(self):
    """🏷️ Encode categorical features using appropriate encoders"""

    from sklearn.preprocessing import LabelEncoder, OneHotEncoder
    import joblib

    # Identify categorical columns
    categorical_columns = self.X_train.select_dtypes(include=['object']).columns.tolist()

    if not categorical_columns:
        logger.info("🏷️ No categorical features to encode")
        return

    logger.info(f"🏷️ Encoding categorical features: {categorical_columns}")

    # Use Label Encoding for binary and ordinal features
    label_encoder = LabelEncoder()

    # Fit encoder on training data and transform both sets
    for col in categorical_columns:
        # Fit on training data
        self.X_train[col] = label_encoder.fit_transform(self.X_train[col].astype(str))

        # Transform test data, handling unseen categories
        test_values = self.X_test[col].astype(str)

        # Handle unseen categories by replacing with most frequent training value
        unseen_mask = ~test_values.isin(label_encoder.classes_)
        if unseen_mask.any():
            most_frequent = self.X_train[col].mode().iloc[0]
            test_values.loc[unseen_mask] = label_encoder.inverse_transform([most_frequent])[0]

        self.X_test[col] = label_encoder.transform(test_values)

    # Save encoder for later use
    os.makedirs('artifacts', exist_ok=True)
    joblib.dump(label_encoder, 'artifacts/encoder.pkl')

    logger.info("βœ… Categorical encoding completed")

def _scale_numerical_features(self):
    """πŸ“ Scale numerical features using StandardScaler"""

    from sklearn.preprocessing import StandardScaler
    import joblib

    # Identify numerical columns
    numerical_columns = self.X_train.select_dtypes(include=[np.number]).columns.tolist()

    if not numerical_columns:
        logger.info("πŸ“ No numerical features to scale")
        return

    logger.info(f"πŸ“ Scaling numerical features: {numerical_columns}")

    # Fit scaler on training data
    scaler = StandardScaler()
    self.X_train[numerical_columns] = scaler.fit_transform(self.X_train[numerical_columns])
    self.X_test[numerical_columns] = scaler.transform(self.X_test[numerical_columns])

    # Save scaler for later use
    os.makedirs('artifacts', exist_ok=True)
    joblib.dump(scaler, 'artifacts/scaler.pkl')

    logger.info("βœ… Feature scaling completed")
Enter fullscreen mode Exit fullscreen mode

Model Training & Hyperparameter Optimization

def train_baseline_models(self):
    """πŸ€– Train multiple baseline models for comparison"""

    logger.info("πŸ€– Training baseline models...")

    # Define baseline models
    baseline_models = {
        'logistic_regression': LogisticRegression(random_state=42, max_iter=1000),
        'random_forest': RandomForestClassifier(random_state=42, n_estimators=100),
        'gradient_boosting': GradientBoostingClassifier(random_state=42),
        'xgboost': xgb.XGBClassifier(random_state=42, eval_metric='logloss'),
        'lightgbm': lgb.LGBMClassifier(random_state=42, verbose=-1)
    }

    # Train and evaluate each model
    for name, model in baseline_models.items():
        logger.info(f"πŸ”„ Training {name}...")

        with mlflow.start_run(run_name=f"baseline_{name}") as run:
            # Train model
            model.fit(self.X_train, self.y_train)

            # Make predictions
            y_pred = model.predict(self.X_test)
            y_pred_proba = model.predict_proba(self.X_test)[:, 1]

            # Calculate metrics
            roc_auc = roc_auc_score(self.y_test, y_pred_proba)

            # Cross-validation score
            cv_scores = cross_val_score(model, self.X_train, self.y_train, 
                                      cv=5, scoring='roc_auc')
            cv_mean = cv_scores.mean()
            cv_std = cv_scores.std()

            # Log parameters and metrics
            mlflow.log_param("model_type", name)
            mlflow.log_param("model_class", model.__class__.__name__)

            mlflow.log_metric("roc_auc", roc_auc)
            mlflow.log_metric("cv_roc_auc_mean", cv_mean)
            mlflow.log_metric("cv_roc_auc_std", cv_std)

            # Log model
            mlflow.sklearn.log_model(model, f"model_{name}")

            # Store model for comparison
            self.models[name] = {
                'model': model,
                'roc_auc': roc_auc,
                'cv_score': cv_mean,
                'run_id': run.info.run_id
            }

            logger.info(f"βœ… {name}: ROC-AUC = {roc_auc:.4f}, CV = {cv_mean:.4f} Β± {cv_std:.4f}")

def hyperparameter_optimization(self):
    """πŸ” Optimize hyperparameters for the best performing models"""

    logger.info("πŸ” Starting hyperparameter optimization...")

    # Select top 2 models for optimization
    sorted_models = sorted(self.models.items(), 
                          key=lambda x: x[1]['roc_auc'], reverse=True)

    top_models = sorted_models[:2]
    logger.info(f"🎯 Optimizing top models: {[name for name, _ in top_models]}")

    for model_name, model_info in top_models:
        logger.info(f"πŸ”§ Optimizing {model_name}...")

        # Define optimization objective
        def objective(trial):
            params = self._get_optimization_params(model_name, trial)

            if model_name == 'random_forest':
                model = RandomForestClassifier(**params, random_state=42)
            elif model_name == 'xgboost':
                model = xgb.XGBClassifier(**params, random_state=42, eval_metric='logloss')
            elif model_name == 'lightgbm':
                model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
            elif model_name == 'gradient_boosting':
                model = GradientBoostingClassifier(**params, random_state=42)
            else:
                # For logistic regression or other models
                model = LogisticRegression(**params, random_state=42, max_iter=1000)

            # Cross-validation
            cv_scores = cross_val_score(model, self.X_train, self.y_train, 
                                      cv=5, scoring='roc_auc')
            return cv_scores.mean()

        # Run optimization
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=50, timeout=300)  # 5 minutes max per model

        # Train best model
        best_params = study.best_params
        best_model = self._create_model_with_params(model_name, best_params)
        best_model.fit(self.X_train, self.y_train)

        # Evaluate optimized model
        y_pred_proba = best_model.predict_proba(self.X_test)[:, 1]
        roc_auc = roc_auc_score(self.y_test, y_pred_proba)

        # Log optimized model
        with mlflow.start_run(run_name=f"optimized_{model_name}") as run:
            mlflow.log_param("model_type", f"{model_name}_optimized")
            mlflow.log_param("optimization_trials", len(study.trials))
            mlflow.log_param("best_trial", study.best_trial.number)

            # Log best parameters
            for param, value in best_params.items():
                mlflow.log_param(f"best_{param}", value)

            mlflow.log_metric("roc_auc", roc_auc)
            mlflow.log_metric("improvement_over_baseline", 
                            roc_auc - model_info['roc_auc'])

            # Log model
            mlflow.sklearn.log_model(best_model, f"model_optimized_{model_name}")

            # Update models dictionary
            self.models[f"{model_name}_optimized"] = {
                'model': best_model,
                'roc_auc': roc_auc,
                'cv_score': study.best_value,
                'run_id': run.info.run_id,
                'params': best_params
            }

        logger.info(f"βœ… {model_name} optimization completed: ROC-AUC = {roc_auc:.4f}")

def _get_optimization_params(self, model_name: str, trial):
    """πŸŽ›οΈ Define hyperparameter search spaces for different models"""

    if model_name == 'random_forest':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 20),
            'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
            'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
        }

    elif model_name == 'xgboost':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
            'reg_lambda': trial.suggest_float('reg_lambda', 0, 1)
        }

    elif model_name == 'lightgbm':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 15),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
            'num_leaves': trial.suggest_int('num_leaves', 10, 300),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
            'reg_lambda': trial.suggest_float('reg_lambda', 0, 1)
        }

    elif model_name == 'gradient_boosting':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 200),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
        }

    else:  # logistic_regression
        return {
            'C': trial.suggest_float('C', 0.001, 100, log=True),
            'penalty': trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet']),
            'solver': trial.suggest_categorical('solver', ['liblinear', 'saga']),
            'l1_ratio': trial.suggest_float('l1_ratio', 0, 1) if trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet']) == 'elasticnet' else None
        }

def _create_model_with_params(self, model_name: str, params: dict):
    """πŸ—οΈ Create model instance with optimized parameters"""

    if model_name == 'random_forest':
        return RandomForestClassifier(**params, random_state=42)
    elif model_name == 'xgboost':
        return xgb.XGBClassifier(**params, random_state=42, eval_metric='logloss')
    elif model_name == 'lightgbm':
        return lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
    elif model_name == 'gradient_boosting':
        return GradientBoostingClassifier(**params, random_state=42)
    else:
        return LogisticRegression(**params, random_state=42, max_iter=1000)
Enter fullscreen mode Exit fullscreen mode

mlopszoomcamp

This first part covers the core training pipeline architecture, data processing, and model training components. The second part will focus on evaluation, model registry, deployment automation, and monitoring integration.

Top comments (0)