DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

part_4_customer_churn_prediction_mlopszoomcamp

🎯 Training Operations Tutorial: From Models to Production

πŸ“‹ What You'll Learn

In this tutorial, you'll discover how to:

  • πŸ“Š Evaluate and validate your trained models (quality testing)
  • πŸ† Select the champion model automatically (picking the winner)
  • πŸ”„ Set up automated retraining (keeping recipes fresh)
  • πŸš€ Deploy models to production (opening for business)
  • πŸ“ˆ Monitor model performance over time (staying competitive)

πŸ€” Why Do We Need Training Operations?

Imagine your bakery has created several bread recipes, and now you need to:

  • Taste-test each recipe thoroughly (model evaluation)
  • Choose the best one for your customers (model selection)
  • Update recipes when ingredients change (retraining)
  • Launch the winning recipe in all stores (deployment)
  • Keep track of customer satisfaction (monitoring)

Training operations ensure your ML models work reliably in the real world!

πŸ† Model Evaluation Championship

Think of model evaluation like a cooking competition - we need fair judging to pick the winner.

🎯 Understanding Model Metrics (Judging Criteria)

Different metrics tell us different things about our model's performance:

πŸ“Š Model Performance Scorecard
β”œβ”€β”€ 🎯 Accuracy (Overall correctness)
β”œβ”€β”€ πŸ” Precision (When I say "churn", am I right?)
β”œβ”€β”€ πŸ“‘ Recall (Did I catch all the churners?)
β”œβ”€β”€ βš–οΈ F1-Score (Balance of precision & recall)
└── πŸ“ˆ AUC-ROC (Overall ranking ability)
Enter fullscreen mode Exit fullscreen mode

What each metric means in business terms:

  • Accuracy: "How often am I right overall?"
  • Precision: "When I predict churn, how often is it true?"
  • Recall: "Of all customers who actually churned, how many did I catch?"
  • F1-Score: "What's my balanced performance?"
  • AUC-ROC: "How good am I at ranking customers by risk?"

Step 1: Building a Model Evaluation System

What we're doing: Creating a comprehensive testing system for all our models.

# src/models/model_evaluator.py - Your professional judges panel
import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    roc_auc_score, confusion_matrix, classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns

class ModelEvaluator:
    """
    This is your panel of expert judges who:
    - Test each model thoroughly
    - Score them on multiple criteria
    - Create detailed report cards
    - Pick the overall winner
    """

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.evaluation_results = {}

    def comprehensive_evaluation(self, model, X_test, y_test, model_name: str):
        """
        πŸ† Comprehensive model evaluation

        Like having each dish judged by multiple expert chefs
        on taste, presentation, creativity, and nutrition.
        """
        self.logger.info(f"πŸ” Evaluating {model_name} on all criteria...")

        # 🎯 Get model predictions
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]

        # πŸ“Š Calculate all performance metrics
        metrics = {
            # 🎯 Classification metrics
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred),
            'recall': recall_score(y_test, y_pred),
            'f1_score': f1_score(y_test, y_pred),
            'auc_roc': roc_auc_score(y_test, y_pred_proba),

            # πŸ’° Business metrics
            'false_positive_rate': self._calculate_fpr(y_test, y_pred),
            'false_negative_rate': self._calculate_fnr(y_test, y_pred),
            'business_value_score': self._calculate_business_value(y_test, y_pred)
        }

        # πŸ“‹ Create detailed analysis
        analysis = {
            'confusion_matrix': confusion_matrix(y_test, y_pred),
            'classification_report': classification_report(y_test, y_pred, output_dict=True),
            'prediction_distribution': self._analyze_prediction_distribution(y_pred_proba, y_test)
        }

        # 🎨 Create visual reports
        self._create_evaluation_visualizations(y_test, y_pred, y_pred_proba, model_name)

        # πŸ“ Log results in human-readable format
        self._log_evaluation_summary(model_name, metrics, analysis)

        # πŸ’Ύ Store results for comparison
        self.evaluation_results[model_name] = {
            'metrics': metrics,
            'analysis': analysis,
            'model': model
        }

        return metrics, analysis

    def _calculate_fpr(self, y_true, y_pred):
        """Calculate False Positive Rate (crying wolf rate)"""
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        return fp / (fp + tn) if (fp + tn) > 0 else 0

    def _calculate_fnr(self, y_true, y_pred):
        """Calculate False Negative Rate (missing actual churners)"""
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        return fn / (fn + tp) if (fn + tp) > 0 else 0

    def _calculate_business_value(self, y_true, y_pred):
        """
        πŸ’° Calculate business value of predictions

        This considers the cost of different types of errors:
        - Missing a churner (False Negative) costs more than a false alarm
        - Correctly identifying churners has high value
        """
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        # πŸ’° Business costs/benefits (example values)
        cost_false_negative = 100  # Lost customer lifetime value
        cost_false_positive = 10   # Cost of unnecessary retention effort
        benefit_true_positive = 80  # Retained customer value
        benefit_true_negative = 0   # No action needed

        total_value = (
            tp * benefit_true_positive +
            tn * benefit_true_negative -
            fp * cost_false_positive -
            fn * cost_false_negative
        )

        # Normalize by total predictions
        return total_value / len(y_true)

    def _analyze_prediction_distribution(self, y_pred_proba, y_true):
        """
        πŸ“Š Analyze how confident the model is in its predictions

        Like checking if a chef is confident in their dish or just guessing.
        """
        return {
            'avg_confidence_churners': np.mean(y_pred_proba[y_true == 1]),
            'avg_confidence_non_churners': np.mean(y_pred_proba[y_true == 0]),
            'confidence_separation': np.mean(y_pred_proba[y_true == 1]) - np.mean(y_pred_proba[y_true == 0]),
            'high_confidence_predictions': np.sum(np.abs(y_pred_proba - 0.5) > 0.3) / len(y_pred_proba)
        }

    def _create_evaluation_visualizations(self, y_true, y_pred, y_pred_proba, model_name):
        """
        🎨 Create comprehensive visual evaluation reports

        Like taking photos of each dish from different angles.
        """
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))

        # 🎯 Confusion Matrix
        cm = confusion_matrix(y_true, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', ax=axes[0,0], cmap='Blues',
                   xticklabels=['No Churn', 'Churn'], 
                   yticklabels=['No Churn', 'Churn'])
        axes[0,0].set_title(f'{model_name}: Confusion Matrix')
        axes[0,0].set_ylabel('True Label')
        axes[0,0].set_xlabel('Predicted Label')

        # πŸ“ˆ ROC Curve
        from sklearn.metrics import roc_curve, auc
        fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
        axes[0,1].plot(fpr, tpr, label=f'ROC Curve (AUC = {auc(fpr, tpr):.3f})')
        axes[0,1].plot([0, 1], [0, 1], 'k--', label='Random Guess')
        axes[0,1].set_xlabel('False Positive Rate')
        axes[0,1].set_ylabel('True Positive Rate')
        axes[0,1].set_title(f'{model_name}: ROC Curve')
        axes[0,1].legend()
        axes[0,1].grid(True)

        # πŸ“Š Prediction Distribution
        axes[0,2].hist(y_pred_proba[y_true==0], bins=30, alpha=0.5, 
                      label='No Churn', color='blue', density=True)
        axes[0,2].hist(y_pred_proba[y_true==1], bins=30, alpha=0.5, 
                      label='Churn', color='red', density=True)
        axes[0,2].set_xlabel('Predicted Probability')
        axes[0,2].set_ylabel('Density')
        axes[0,2].set_title(f'{model_name}: Prediction Distribution')
        axes[0,2].legend()
        axes[0,2].grid(True)

        # 🎯 Precision-Recall Curve
        from sklearn.metrics import precision_recall_curve
        precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
        axes[1,0].plot(recall, precision, label=f'PR Curve')
        axes[1,0].set_xlabel('Recall')
        axes[1,0].set_ylabel('Precision')
        axes[1,0].set_title(f'{model_name}: Precision-Recall Curve')
        axes[1,0].legend()
        axes[1,0].grid(True)

        # πŸ“Š Feature Importance (if available)
        if hasattr(self.evaluation_results[model_name]['model'], 'feature_importances_'):
            importance = self.evaluation_results[model_name]['model'].feature_importances_
            indices = np.argsort(importance)[::-1][:15]  # Top 15

            axes[1,1].bar(range(len(indices)), importance[indices])
            axes[1,1].set_title(f'{model_name}: Top 15 Feature Importances')
            axes[1,1].set_xticks(range(len(indices)))
            axes[1,1].tick_params(axis='x', rotation=45)

        # πŸ“ˆ Calibration Plot
        from sklearn.calibration import calibration_curve
        fraction_of_positives, mean_predicted_value = calibration_curve(
            y_true, y_pred_proba, n_bins=10
        )
        axes[1,2].plot(mean_predicted_value, fraction_of_positives, "s-", 
                      label=f"{model_name}")
        axes[1,2].plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
        axes[1,2].set_xlabel('Mean Predicted Probability')
        axes[1,2].set_ylabel('Fraction of Positives')
        axes[1,2].set_title(f'{model_name}: Calibration Plot')
        axes[1,2].legend()
        axes[1,2].grid(True)

        plt.tight_layout()
        plt.savefig(f'reports/figures/{model_name}_comprehensive_evaluation.png', 
                   dpi=300, bbox_inches='tight')
        plt.close()

        self.logger.info(f"πŸ“Š Comprehensive evaluation plots created for {model_name}")

    def _log_evaluation_summary(self, model_name, metrics, analysis):
        """πŸ“ Create human-readable evaluation summary"""

        self.logger.info(f"\nπŸ† {model_name} Performance Report Card:")
        self.logger.info(f"   πŸ“Š Overall Accuracy: {metrics['accuracy']:.3f} ({metrics['accuracy']*100:.1f}%)")
        self.logger.info(f"   🎯 Precision: {metrics['precision']:.3f} (When I say churn, I'm right {metrics['precision']*100:.1f}% of the time)")
        self.logger.info(f"   πŸ“‘ Recall: {metrics['recall']:.3f} (I catch {metrics['recall']*100:.1f}% of actual churners)")
        self.logger.info(f"   βš–οΈ F1-Score: {metrics['f1_score']:.3f} (Balanced performance)")
        self.logger.info(f"   πŸ“ˆ AUC-ROC: {metrics['auc_roc']:.3f} (Ranking ability)")
        self.logger.info(f"   πŸ’° Business Value: ${metrics['business_value_score']:.2f} per prediction")

        # 🚨 Highlight potential issues
        if metrics['false_positive_rate'] > 0.2:
            self.logger.warning(f"   ⚠️ High false alarm rate: {metrics['false_positive_rate']*100:.1f}%")
        if metrics['false_negative_rate'] > 0.3:
            self.logger.warning(f"   ⚠️ Missing many churners: {metrics['false_negative_rate']*100:.1f}%")

        # 🎯 Business interpretation
        cm = analysis['confusion_matrix']
        tn, fp, fn, tp = cm.ravel()

        self.logger.info(f"   πŸ“‹ Business Impact:")
        self.logger.info(f"      β€’ Correctly identified churners: {tp}")
        self.logger.info(f"      β€’ Correctly identified loyal customers: {tn}")
        self.logger.info(f"      β€’ False alarms (unnecessary retention efforts): {fp}")
        self.logger.info(f"      β€’ Missed churners (lost customers): {fn}")

    def select_champion_model(self):
        """
        πŸ† Select the best model based on multiple criteria

        Like choosing the winner of a cooking competition based on
        multiple judges' scores and different criteria.
        """
        self.logger.info("πŸ† Selecting champion model...")

        if not self.evaluation_results:
            raise ValueError("No models have been evaluated yet!")

        # πŸ“Š Calculate composite score for each model
        scores = {}

        for model_name, results in self.evaluation_results.items():
            metrics = results['metrics']

            # 🎯 Weighted scoring (adjust weights based on business priorities)
            composite_score = (
                metrics['auc_roc'] * 0.3 +           # Overall ranking ability
                metrics['f1_score'] * 0.25 +         # Balanced performance
                metrics['precision'] * 0.2 +         # Accuracy when predicting churn
                metrics['recall'] * 0.15 +           # Catching actual churners
                (1 - metrics['false_negative_rate']) * 0.1  # Not missing churners
            )

            scores[model_name] = composite_score

            self.logger.info(f"   {model_name}: Composite Score = {composite_score:.4f}")

        # πŸ† Select the winner
        champion_name = max(scores, key=scores.get)
        champion_score = scores[champion_name]

        self.logger.info(f"\nπŸŽ‰ Champion Model Selected: {champion_name}")
        self.logger.info(f"πŸ† Champion Score: {champion_score:.4f}")

        # πŸ“‹ Provide detailed justification
        champion_metrics = self.evaluation_results[champion_name]['metrics']
        self.logger.info(f"\nπŸ“‹ Why {champion_name} won:")
        self.logger.info(f"   β€’ Excellent overall ranking (AUC: {champion_metrics['auc_roc']:.3f})")
        self.logger.info(f"   β€’ Good precision-recall balance (F1: {champion_metrics['f1_score']:.3f})")
        self.logger.info(f"   β€’ Strong business value (${champion_metrics['business_value_score']:.2f}/prediction)")

        return champion_name, self.evaluation_results[champion_name]

    def create_model_comparison_report(self):
        """
        πŸ“Š Create comprehensive comparison report of all models

        Like creating a final judging summary showing how each dish performed.
        """
        if not self.evaluation_results:
            self.logger.warning("No evaluation results to compare")
            return

        # πŸ“Š Create comparison DataFrame
        comparison_data = []

        for model_name, results in self.evaluation_results.items():
            metrics = results['metrics']
            row = {
                'Model': model_name,
                'Accuracy': f"{metrics['accuracy']:.3f}",
                'Precision': f"{metrics['precision']:.3f}",
                'Recall': f"{metrics['recall']:.3f}",
                'F1-Score': f"{metrics['f1_score']:.3f}",
                'AUC-ROC': f"{metrics['auc_roc']:.3f}",
                'Business Value': f"${metrics['business_value_score']:.2f}",
                'FP Rate': f"{metrics['false_positive_rate']:.3f}",
                'FN Rate': f"{metrics['false_negative_rate']:.3f}"
            }
            comparison_data.append(row)

        comparison_df = pd.DataFrame(comparison_data)

        # πŸ’Ύ Save comparison table
        comparison_df.to_csv('reports/model_comparison.csv', index=False)

        # πŸ“Š Create comparison visualization
        fig, ax = plt.subplots(figsize=(12, 8))

        metrics_to_plot = ['Accuracy', 'Precision', 'Recall', 'F1-Score', 'AUC-ROC']
        x = np.arange(len(self.evaluation_results))
        width = 0.15

        for i, metric in enumerate(metrics_to_plot):
            values = [float(comparison_df[comparison_df['Model'] == model][metric].iloc[0]) 
                     for model in self.evaluation_results.keys()]
            ax.bar(x + i*width, values, width, label=metric)

        ax.set_xlabel('Models')
        ax.set_ylabel('Score')
        ax.set_title('Model Performance Comparison')
        ax.set_xticks(x + width * 2)
        ax.set_xticklabels(self.evaluation_results.keys(), rotation=45)
        ax.legend()
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.savefig('reports/figures/model_comparison.png', dpi=300, bbox_inches='tight')
        plt.close()

        self.logger.info("πŸ“Š Model comparison report created")
        self.logger.info(f"\n{comparison_df.to_string(index=False)}")

# 🏭 Usage example
def evaluate_all_models(models, X_test, y_test):
    """Main evaluation function"""

    evaluator = ModelEvaluator()

    # πŸ” Evaluate each model
    for model_name, model in models.items():
        evaluator.comprehensive_evaluation(model, X_test, y_test, model_name)

    # πŸ“Š Create comparison report
    evaluator.create_model_comparison_report()

    # πŸ† Select champion
    champion_name, champion_info = evaluator.select_champion_model()

    return champion_name, champion_info, evaluator
Enter fullscreen mode Exit fullscreen mode

This evaluation system provides comprehensive testing and automatic selection of the best model, ensuring you deploy only the highest quality ML models to production.
y_pred = model.predict(self.X_test)
y_pred_proba = model.predict_proba(self.X_test)[:, 1]

    # Calculate comprehensive metrics
    metrics = self._calculate_comprehensive_metrics(y_pred, y_pred_proba)

    # Business impact analysis
    business_metrics = self._calculate_business_impact(y_pred, y_pred_proba)

    # Model interpretability analysis
    interpretability_metrics = self._analyze_model_interpretability(model, model_name)

    # Combine all metrics
    all_metrics = {**metrics, **business_metrics, **interpretability_metrics}

    evaluation_results[model_name] = {
        'metrics': all_metrics,
        'model_info': model_info,
        'evaluation_score': self._calculate_composite_score(all_metrics)
    }

    # Log detailed evaluation to MLflow
    self._log_evaluation_to_mlflow(model_name, model_info, all_metrics, y_pred, y_pred_proba)

# Select best model based on composite score
best_model_name = max(evaluation_results.keys(), 
                     key=lambda x: evaluation_results[x]['evaluation_score'])

self.best_model = evaluation_results[best_model_name]['model_info']['model']
self.best_score = evaluation_results[best_model_name]['evaluation_score']
best_run_id = evaluation_results[best_model_name]['model_info']['run_id']

logger.info(f"πŸ† Best model selected: {best_model_name}")
logger.info(f"πŸ“Š Composite score: {self.best_score:.4f}")

# Generate evaluation report
self._generate_evaluation_report(evaluation_results, best_model_name)

return best_run_id
Enter fullscreen mode Exit fullscreen mode

def _calculate_comprehensive_metrics(self, y_pred: np.ndarray, y_pred_proba: np.ndarray) -> Dict[str, float]:
"""πŸ“ Calculate comprehensive performance metrics"""

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, matthews_corrcoef,
    balanced_accuracy_score, cohen_kappa_score
)

metrics = {
    # 🎯 Classification Metrics
    'accuracy': accuracy_score(self.y_test, y_pred),
    'balanced_accuracy': balanced_accuracy_score(self.y_test, y_pred),
    'precision': precision_score(self.y_test, y_pred),
    'recall': recall_score(self.y_test, y_pred),
    'f1_score': f1_score(self.y_test, y_pred),
    'specificity': self._calculate_specificity(self.y_test, y_pred),

    # πŸ“Š Probabilistic Metrics
    'roc_auc': roc_auc_score(self.y_test, y_pred_proba),
    'average_precision': average_precision_score(self.y_test, y_pred_proba),
    'log_loss': -np.mean(self.y_test * np.log(y_pred_proba + 1e-15) + 
                        (1 - self.y_test) * np.log(1 - y_pred_proba + 1e-15)),

    # πŸ”„ Additional Metrics
    'matthews_corrcoef': matthews_corrcoef(self.y_test, y_pred),
    'cohen_kappa': cohen_kappa_score(self.y_test, y_pred),

    # πŸ“ˆ Threshold-based Metrics
    'precision_at_90_recall': self._precision_at_recall(self.y_test, y_pred_proba, 0.9),
    'recall_at_90_precision': self._recall_at_precision(self.y_test, y_pred_proba, 0.9),
}

return metrics
Enter fullscreen mode Exit fullscreen mode

def _calculate_business_impact(self, y_pred: np.ndarray, y_pred_proba: np.ndarray) -> Dict[str, float]:
"""πŸ’° Calculate business-relevant metrics"""

# Business assumptions (configurable in production)
customer_value = self.config.get('avg_customer_value', 1000)  # Average customer value
retention_cost = self.config.get('retention_cost', 100)       # Cost of retention campaign
false_positive_cost = self.config.get('fp_cost', 50)         # Cost of false positive

# Calculate confusion matrix components
tn, fp, fn, tp = confusion_matrix(self.y_test, y_pred).ravel()

# Business metrics
business_metrics = {
    # πŸ’° Revenue Impact
    'revenue_saved': tp * customer_value,  # True positives saved
    'revenue_lost': fn * customer_value,   # False negatives lost
    'retention_costs': (tp + fp) * retention_cost,  # Campaign costs
    'false_positive_costs': fp * false_positive_cost,

    # πŸ“Š Efficiency Metrics
    'cost_per_saved_customer': (tp + fp) * retention_cost / max(tp, 1),
    'roi': (tp * customer_value - (tp + fp) * retention_cost) / max((tp + fp) * retention_cost, 1),

    # 🎯 Coverage Metrics
    'churn_coverage': tp / max(tp + fn, 1),  # Percentage of churners identified
    'campaign_efficiency': tp / max(tp + fp, 1),  # Success rate of campaigns

    # πŸ“ˆ Risk Metrics
    'high_risk_identified': np.sum((y_pred_proba > 0.8) & (self.y_test == 1)) / max(np.sum(self.y_test == 1), 1),
    'low_risk_accuracy': np.sum((y_pred_proba < 0.3) & (self.y_test == 0)) / max(np.sum(self.y_test == 0), 1),
}

# Net business value
net_value = business_metrics['revenue_saved'] - business_metrics['revenue_lost'] - business_metrics['retention_costs']
business_metrics['net_business_value'] = net_value

return business_metrics
Enter fullscreen mode Exit fullscreen mode

def _analyze_model_interpretability(self, model, model_name: str) -> Dict[str, float]:
"""πŸ” Analyze model interpretability and feature importance"""

interpretability_metrics = {}

try:
    # Feature importance (for tree-based models)
    if hasattr(model, 'feature_importances_'):
        feature_importance = model.feature_importances_

        # Top feature importance metrics
        interpretability_metrics.update({
            'top_feature_importance': np.max(feature_importance),
            'feature_importance_concentration': np.sum(np.sort(feature_importance)[-5:]) / np.sum(feature_importance),
            'feature_importance_entropy': -np.sum(feature_importance * np.log(feature_importance + 1e-15)),
            'effective_features': np.sum(feature_importance > 0.01)  # Features with >1% importance
        })

        # Log feature importance plot
        self._create_feature_importance_plot(feature_importance, model_name)

    # Model complexity metrics
    if hasattr(model, 'n_estimators'):  # Tree-based ensemble
        interpretability_metrics['model_complexity'] = model.n_estimators
    elif hasattr(model, 'coef_'):  # Linear model
        interpretability_metrics['model_complexity'] = np.sum(np.abs(model.coef_[0]) > 0.01)
    else:
        interpretability_metrics['model_complexity'] = 1.0  # Default for unknown models

    # Training time proxy (smaller models are generally faster)
    interpretability_metrics['training_efficiency'] = 1.0 / (interpretability_metrics.get('model_complexity', 1.0) + 1)

except Exception as e:
    logger.warning(f"⚠️ Could not analyze interpretability for {model_name}: {str(e)}")
    interpretability_metrics = {'interpretability_score': 0.5}  # Default neutral score

return interpretability_metrics
Enter fullscreen mode Exit fullscreen mode

def _calculate_composite_score(self, metrics: Dict[str, float]) -> float:
"""🎯 Calculate composite evaluation score"""

# Define weights for different metric categories
weights = {
    # Performance metrics (40% weight)
    'roc_auc': 0.15,
    'f1_score': 0.10,
    'balanced_accuracy': 0.10,
    'average_precision': 0.05,

    # Business metrics (40% weight)
    'roi': 0.15,
    'churn_coverage': 0.10,
    'campaign_efficiency': 0.10,
    'high_risk_identified': 0.05,

    # Interpretability metrics (20% weight)
    'training_efficiency': 0.10,
    'effective_features': 0.05,
    'feature_importance_concentration': 0.05
}

# Normalize business metrics to 0-1 scale
normalized_metrics = metrics.copy()

# ROI normalization (assume good ROI is > 3.0)
if 'roi' in normalized_metrics:
    normalized_metrics['roi'] = min(1.0, max(0.0, normalized_metrics['roi'] / 3.0))

# Effective features normalization (assume good range is 5-15 features)
if 'effective_features' in normalized_metrics:
    eff_feat = normalized_metrics['effective_features']
    normalized_metrics['effective_features'] = 1.0 - abs(eff_feat - 10) / 10
    normalized_metrics['effective_features'] = max(0.0, min(1.0, normalized_metrics['effective_features']))

# Calculate weighted score
composite_score = 0.0
total_weight = 0.0

for metric, weight in weights.items():
    if metric in normalized_metrics:
        composite_score += normalized_metrics[metric] * weight
        total_weight += weight

# Normalize by actual weights used
if total_weight > 0:
    composite_score /= total_weight

return composite_score
Enter fullscreen mode Exit fullscreen mode

def _log_evaluation_to_mlflow(self, model_name: str, model_info: dict, metrics: dict,
y_pred: np.ndarray, y_pred_proba: np.ndarray):
"""πŸ“ Log comprehensive evaluation results to MLflow"""

with mlflow.start_run(run_name=f"evaluation_{model_name}") as run:
    # Log all metrics
    for metric_name, value in metrics.items():
        mlflow.log_metric(f"eval_{metric_name}", value)

    # Log confusion matrix
    cm = confusion_matrix(self.y_test, y_pred)
    self._log_confusion_matrix(cm, model_name)

    # Log ROC curve
    self._log_roc_curve(self.y_test, y_pred_proba, model_name)

    # Log precision-recall curve
    self._log_precision_recall_curve(self.y_test, y_pred_proba, model_name)

    # Log feature importance (if available)
    if hasattr(model_info['model'], 'feature_importances_'):
        self._log_feature_importance(model_info['model'].feature_importances_, model_name)

    # Log calibration plot
    self._log_calibration_plot(self.y_test, y_pred_proba, model_name)

    # Tag as evaluation run
    mlflow.set_tag("evaluation_run", "true")
    mlflow.set_tag("model_type", model_name)
Enter fullscreen mode Exit fullscreen mode

## πŸš€ Model Registry & Deployment

Enter fullscreen mode Exit fullscreen mode


python
def register_best_model(self, best_run_id: str):
"""πŸš€ Register the best model to MLflow Model Registry"""

logger.info("πŸš€ Registering best model to Model Registry...")

try:
    # Get model URI from the best run
    model_uri = f"runs:/{best_run_id}/model"

    # Register model
    model_version = mlflow.register_model(
        model_uri=model_uri,
        name=self.model_name
    )

    logger.info(f"πŸ“ Model registered: {self.model_name} version {model_version.version}")

    # Transition to Staging for validation
    client = mlflow.tracking.MlflowClient()
    client.transition_model_version_stage(
        name=self.model_name,
        version=model_version.version,
        stage="Staging"
    )

    logger.info(f"πŸ”„ Model transitioned to Staging stage")

    # Add model description and tags
    client.update_model_version(
        name=self.model_name,
        version=model_version.version,
        description=f"""
        Best performing churn prediction model from training run {best_run_id}.

        Performance Summary:
        - ROC-AUC: {self.models[self._get_best_model_name()]['roc_auc']:.4f}
        - Composite Score: {self.best_score:.4f}
        - Training Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

        Model Configuration:
        - Features: {self.X_train.shape[1]}
        - Training Samples: {len(self.X_train)}
        - Test Samples: {len(self.X_test)}
        """
    )

    # Set tags
    client.set_model_version_tag(
        name=self.model_name,
        version=model_version.version,
        key="training_date",
        value=datetime.now().strftime('%Y-%m-%d')
    )

    client.set_model_version_tag(
        name=self.model_name,
        version=model_version.version,
        key="validation_status",
        value="pending"
    )

    # Save model artifacts locally for backup
    self._save_model_artifacts(model_version.version)

    # Validate model in staging
    if self._validate_staging_model(model_version.version):
        # Transition to Production if validation passes
        client.transition_model_version_stage(
            name=self.model_name,
            version=model_version.version,
            stage="Production"
        )
        logger.info("βœ… Model promoted to Production after validation")
    else:
        logger.warning("⚠️ Model validation failed, keeping in Staging")

    return model_version.version

except Exception as e:
    logger.error(f"❌ Model registration failed: {str(e)}")
    raise
Enter fullscreen mode Exit fullscreen mode

def _validate_staging_model(self, model_version: str) -> bool:
"""βœ… Validate model in staging environment"""

logger.info(f"βœ… Validating model version {model_version} in staging...")

try:
    # Load model from staging
    client = mlflow.tracking.MlflowClient()
    model_uri = f"models:/{self.model_name}/{model_version}"
    staging_model = mlflow.sklearn.load_model(model_uri)

    # Run validation tests
    validation_passed = True

    # 1. Prediction consistency test
    sample_predictions = staging_model.predict_proba(self.X_test[:100])
    if not self._validate_prediction_format(sample_predictions):
        logger.error("❌ Prediction format validation failed")
        validation_passed = False

    # 2. Performance threshold test
    y_pred_proba = staging_model.predict_proba(self.X_test)[:, 1]
    staging_auc = roc_auc_score(self.y_test, y_pred_proba)

    min_auc_threshold = self.config.get('min_auc_threshold', 0.75)
    if staging_auc < min_auc_threshold:
        logger.error(f"❌ Model AUC {staging_auc:.4f} below threshold {min_auc_threshold}")
        validation_passed = False

    # 3. Data drift test (simplified)
    if not self._validate_feature_stability():
        logger.error("❌ Feature stability validation failed")
        validation_passed = False

    # 4. Model size and performance test
    model_size = self._get_model_size(staging_model)
    max_size_mb = self.config.get('max_model_size_mb', 100)
    if model_size > max_size_mb:
        logger.warning(f"⚠️ Model size {model_size:.1f}MB exceeds recommended {max_size_mb}MB")

    # Update validation status
    client.set_model_version_tag(
        name=self.model_name,
        version=model_version,
        key="validation_status",
        value="passed" if validation_passed else "failed"
    )

    client.set_model_version_tag(
        name=self.model_name,
        version=model_version,
        key="validation_auc",
        value=str(staging_auc)
    )

    return validation_passed

except Exception as e:
    logger.error(f"❌ Model validation error: {str(e)}")
    return False
Enter fullscreen mode Exit fullscreen mode

def _save_model_artifacts(self, model_version: str):
"""πŸ’Ύ Save model artifacts locally for backup and deployment"""

logger.info("πŸ’Ύ Saving model artifacts...")

# Create artifacts directory
artifacts_dir = f"artifacts/model_v{model_version}"
os.makedirs(artifacts_dir, exist_ok=True)

# Save the trained model
model_path = os.path.join(artifacts_dir, "model.pkl")
joblib.dump(self.best_model, model_path)

# Save preprocessors
scaler_path = os.path.join(artifacts_dir, "scaler.pkl")
encoder_path = os.path.join(artifacts_dir, "encoder.pkl")

if os.path.exists("artifacts/scaler.pkl"):
    joblib.dump(joblib.load("artifacts/scaler.pkl"), scaler_path)

if os.path.exists("artifacts/encoder.pkl"):
    joblib.dump(joblib.load("artifacts/encoder.pkl"), encoder_path)

# Save model metadata
metadata = {
    "model_version": model_version,
    "training_date": datetime.now().isoformat(),
    "model_type": type(self.best_model).__name__,
    "feature_count": self.X_train.shape[1],
    "training_samples": len(self.X_train),
    "test_auc": self.models[self._get_best_model_name()]['roc_auc'],
    "composite_score": self.best_score
}

metadata_path = os.path.join(artifacts_dir, "metadata.json")
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2)

logger.info(f"βœ… Model artifacts saved to {artifacts_dir}")
Enter fullscreen mode Exit fullscreen mode

## 🐳 Containerization & Deployment

### Docker Configuration

Enter fullscreen mode Exit fullscreen mode


dockerfile

services/training/Dockerfile

FROM python:3.9-slim

🏷️ Metadata

LABEL maintainer="mlops-team@company.com"
LABEL version="2.0.0"
LABEL description="Customer Churn Prediction Training Service"

πŸ”§ System dependencies

RUN apt-get update && apt-get install -y \
curl \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*

πŸ“ Working directory

WORKDIR /app

πŸ“‹ Copy requirements first for better caching

COPY requirements.txt .

πŸ“¦ Install Python dependencies

RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt

πŸ“ Copy training code

COPY . .

πŸ‘€ Create non-root user for security

RUN useradd --create-home --shell /bin/bash trainer && \
chown -R trainer:trainer /app && \
mkdir -p /app/logs /app/artifacts /app/models && \
chown -R trainer:trainer /app/logs /app/artifacts /app/models

USER trainer

πŸ”Œ Expose port for monitoring

EXPOSE 8080

πŸ₯ Health check

HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')" || exit 1

πŸš€ Default command

CMD ["python", "churn_mlops_pipeline.py"]


### Training Automation & Scheduling

Enter fullscreen mode Exit fullscreen mode


python

utils/training_scheduler.py

import schedule
import time
import threading
import logging
from datetime import datetime, timedelta
import os
import subprocess

class TrainingScheduler:
"""πŸ•’ Automated training scheduler with flexible scheduling options"""

def __init__(self, config: dict):
    self.config = config
    self.is_running = False
    self.scheduler_thread = None

    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    self.logger = logging.getLogger(__name__)

def schedule_training(self):
    """πŸ—“οΈ Setup training schedules"""

    # Weekly full retraining (Sundays at 2 AM)
    schedule.every().sunday.at("02:00").do(self._run_full_training)

    # Daily incremental training (every day at 6 AM)
    schedule.every().day.at("06:00").do(self._run_incremental_training)

    # Data drift check (every 6 hours)
    schedule.every(6).hours.do(self._check_data_drift)

    # Model performance monitoring (every hour)
    schedule.every().hour.do(self._monitor_model_performance)

    self.logger.info("πŸ“… Training schedules configured")

def start_scheduler(self):
    """πŸš€ Start the training scheduler"""

    if self.is_running:
        self.logger.warning("⚠️ Scheduler is already running")
        return

    self.is_running = True
    self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
    self.scheduler_thread.start()

    self.logger.info("πŸš€ Training scheduler started")

def stop_scheduler(self):
    """πŸ›‘ Stop the training scheduler"""

    self.is_running = False
    if self.scheduler_thread:
        self.scheduler_thread.join()

    self.logger.info("πŸ›‘ Training scheduler stopped")

def _run_scheduler(self):
    """πŸ”„ Main scheduler loop"""

    while self.is_running:
        schedule.run_pending()
        time.sleep(60)  # Check every minute

def _run_full_training(self):
    """πŸ”„ Execute full model retraining"""

    self.logger.info("πŸ”„ Starting scheduled full training...")

    try:
        # Run training pipeline
        cmd = ["python", "churn_mlops_pipeline.py", "--mode", "full"]
        result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")

        if result.returncode == 0:
            self.logger.info("βœ… Full training completed successfully")
            self._notify_training_completion("full", "success")
        else:
            self.logger.error(f"❌ Full training failed: {result.stderr}")
            self._notify_training_completion("full", "failed", result.stderr)

    except Exception as e:
        self.logger.error(f"❌ Full training error: {str(e)}")
        self._notify_training_completion("full", "error", str(e))

def _run_incremental_training(self):
    """⚑ Execute incremental model training"""

    self.logger.info("⚑ Starting scheduled incremental training...")

    try:
        # Check if new data is available
        if not self._check_new_data_available():
            self.logger.info("ℹ️ No new data available for incremental training")
            return

        # Run incremental training
        cmd = ["python", "churn_mlops_pipeline.py", "--mode", "incremental"]
        result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")

        if result.returncode == 0:
            self.logger.info("βœ… Incremental training completed successfully")
            self._notify_training_completion("incremental", "success")
        else:
            self.logger.error(f"❌ Incremental training failed: {result.stderr}")
            self._notify_training_completion("incremental", "failed", result.stderr)

    except Exception as e:
        self.logger.error(f"❌ Incremental training error: {str(e)}")
        self._notify_training_completion("incremental", "error", str(e))

def _check_data_drift(self):
    """πŸ“Š Check for data drift and trigger retraining if needed"""

    self.logger.info("πŸ“Š Checking for data drift...")

    try:
        # Run data drift detection
        cmd = ["python", "-c", "from utils.drift_detector import detect_drift; detect_drift()"]
        result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")

        if "DRIFT_DETECTED" in result.stdout:
            self.logger.warning("⚠️ Data drift detected, triggering retraining...")
            self._run_full_training()
        else:
            self.logger.info("βœ… No significant data drift detected")

    except Exception as e:
        self.logger.error(f"❌ Data drift check error: {str(e)}")

def _monitor_model_performance(self):
    """πŸ“ˆ Monitor current model performance"""

    try:
        # Check model performance metrics
        cmd = ["python", "-c", "from utils.performance_monitor import check_performance; check_performance()"]
        result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")

        if "PERFORMANCE_DEGRADED" in result.stdout:
            self.logger.warning("πŸ“‰ Model performance degraded, scheduling retraining...")
            # Schedule urgent retraining
            schedule.every(30).minutes.do(self._run_full_training).tag('urgent')

    except Exception as e:
        self.logger.error(f"❌ Performance monitoring error: {str(e)}")
Enter fullscreen mode Exit fullscreen mode

πŸš€ Main execution

if name == "main":
config = {
"mlflow_tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"),
"experiment_name": "churn-prediction-scheduled",
"notification_webhook": os.getenv("NOTIFICATION_WEBHOOK")
}

scheduler = TrainingScheduler(config)
scheduler.schedule_training()
scheduler.start_scheduler()

try:
    # Keep the scheduler running
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    scheduler.stop_scheduler()
    print("πŸ›‘ Training scheduler stopped")
Enter fullscreen mode Exit fullscreen mode

## πŸ“Š Monitoring & Performance Tracking

Enter fullscreen mode Exit fullscreen mode


python

utils/training_monitor.py

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import mlflow
import logging

πŸ“Š Training metrics

training_runs_total = Counter('training_runs_total', 'Total training runs', ['status', 'mode'])
training_duration = Histogram('training_duration_seconds', 'Training duration')
model_performance_gauge = Gauge('model_performance_auc', 'Current model AUC score')
data_processed_total = Counter('data_processed_total', 'Total data points processed')

class TrainingMonitor:
"""πŸ“Š Comprehensive training monitoring and alerting"""

def __init__(self, webhook_url: str = None):
    self.webhook_url = webhook_url
    self.logger = logging.getLogger(__name__)

    # Start metrics server
    start_http_server(8080)
    self.logger.info("πŸ“Š Training monitoring server started on port 8080")

def track_training_run(self, func):
    """🎯 Decorator to track training runs"""

    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)

            # Record successful training
            duration = time.time() - start_time
            training_duration.observe(duration)
            training_runs_total.labels(status='success', mode='full').inc()

            # Update performance gauge if available
            if hasattr(result, 'best_score'):
                model_performance_gauge.set(result.best_score)

            self.logger.info(f"βœ… Training completed: {duration:.2f}s")
            return result

        except Exception as e:
            training_runs_total.labels(status='failed', mode='full').inc()
            self.logger.error(f"❌ Training failed: {str(e)}")
            self._send_alert("Training Failed", str(e))
            raise

    return wrapper

def _send_alert(self, title: str, message: str):
    """🚨 Send training alerts"""

    if not self.webhook_url:
        return

    try:
        import requests

        payload = {
            "text": f"🚨 MLOps Alert: {title}",
            "attachments": [{
                "color": "danger",
                "fields": [{
                    "title": "Details",
                    "value": message,
                    "short": False
                }]
            }]
        }

        requests.post(self.webhook_url, json=payload)

    except Exception as e:
        self.logger.error(f"Failed to send alert: {str(e)}")
Enter fullscreen mode Exit fullscreen mode


#mlopszoomcamp
This comprehensive documentation covers the complete Training service implementation, from the core ML pipeline through deployment automation and monitoring. The service ensures reliable, automated model development with proper evaluation, validation, and deployment processes.
Enter fullscreen mode Exit fullscreen mode

Top comments (0)