π― Training Operations Tutorial: From Models to Production
π What You'll Learn
In this tutorial, you'll discover how to:
- π Evaluate and validate your trained models (quality testing)
- π Select the champion model automatically (picking the winner)
- π Set up automated retraining (keeping recipes fresh)
- π Deploy models to production (opening for business)
- π Monitor model performance over time (staying competitive)
π€ Why Do We Need Training Operations?
Imagine your bakery has created several bread recipes, and now you need to:
- Taste-test each recipe thoroughly (model evaluation)
- Choose the best one for your customers (model selection)
- Update recipes when ingredients change (retraining)
- Launch the winning recipe in all stores (deployment)
- Keep track of customer satisfaction (monitoring)
Training operations ensure your ML models work reliably in the real world!
π Model Evaluation Championship
Think of model evaluation like a cooking competition - we need fair judging to pick the winner.
π― Understanding Model Metrics (Judging Criteria)
Different metrics tell us different things about our model's performance:
π Model Performance Scorecard
βββ π― Accuracy (Overall correctness)
βββ π Precision (When I say "churn", am I right?)
βββ π‘ Recall (Did I catch all the churners?)
βββ βοΈ F1-Score (Balance of precision & recall)
βββ π AUC-ROC (Overall ranking ability)
What each metric means in business terms:
- Accuracy: "How often am I right overall?"
- Precision: "When I predict churn, how often is it true?"
- Recall: "Of all customers who actually churned, how many did I catch?"
- F1-Score: "What's my balanced performance?"
- AUC-ROC: "How good am I at ranking customers by risk?"
Step 1: Building a Model Evaluation System
What we're doing: Creating a comprehensive testing system for all our models.
# src/models/model_evaluator.py - Your professional judges panel
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns
class ModelEvaluator:
"""
This is your panel of expert judges who:
- Test each model thoroughly
- Score them on multiple criteria
- Create detailed report cards
- Pick the overall winner
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.evaluation_results = {}
def comprehensive_evaluation(self, model, X_test, y_test, model_name: str):
"""
π Comprehensive model evaluation
Like having each dish judged by multiple expert chefs
on taste, presentation, creativity, and nutrition.
"""
self.logger.info(f"π Evaluating {model_name} on all criteria...")
# π― Get model predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# π Calculate all performance metrics
metrics = {
# π― Classification metrics
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1_score': f1_score(y_test, y_pred),
'auc_roc': roc_auc_score(y_test, y_pred_proba),
# π° Business metrics
'false_positive_rate': self._calculate_fpr(y_test, y_pred),
'false_negative_rate': self._calculate_fnr(y_test, y_pred),
'business_value_score': self._calculate_business_value(y_test, y_pred)
}
# π Create detailed analysis
analysis = {
'confusion_matrix': confusion_matrix(y_test, y_pred),
'classification_report': classification_report(y_test, y_pred, output_dict=True),
'prediction_distribution': self._analyze_prediction_distribution(y_pred_proba, y_test)
}
# π¨ Create visual reports
self._create_evaluation_visualizations(y_test, y_pred, y_pred_proba, model_name)
# π Log results in human-readable format
self._log_evaluation_summary(model_name, metrics, analysis)
# πΎ Store results for comparison
self.evaluation_results[model_name] = {
'metrics': metrics,
'analysis': analysis,
'model': model
}
return metrics, analysis
def _calculate_fpr(self, y_true, y_pred):
"""Calculate False Positive Rate (crying wolf rate)"""
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
return fp / (fp + tn) if (fp + tn) > 0 else 0
def _calculate_fnr(self, y_true, y_pred):
"""Calculate False Negative Rate (missing actual churners)"""
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
return fn / (fn + tp) if (fn + tp) > 0 else 0
def _calculate_business_value(self, y_true, y_pred):
"""
π° Calculate business value of predictions
This considers the cost of different types of errors:
- Missing a churner (False Negative) costs more than a false alarm
- Correctly identifying churners has high value
"""
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
# π° Business costs/benefits (example values)
cost_false_negative = 100 # Lost customer lifetime value
cost_false_positive = 10 # Cost of unnecessary retention effort
benefit_true_positive = 80 # Retained customer value
benefit_true_negative = 0 # No action needed
total_value = (
tp * benefit_true_positive +
tn * benefit_true_negative -
fp * cost_false_positive -
fn * cost_false_negative
)
# Normalize by total predictions
return total_value / len(y_true)
def _analyze_prediction_distribution(self, y_pred_proba, y_true):
"""
π Analyze how confident the model is in its predictions
Like checking if a chef is confident in their dish or just guessing.
"""
return {
'avg_confidence_churners': np.mean(y_pred_proba[y_true == 1]),
'avg_confidence_non_churners': np.mean(y_pred_proba[y_true == 0]),
'confidence_separation': np.mean(y_pred_proba[y_true == 1]) - np.mean(y_pred_proba[y_true == 0]),
'high_confidence_predictions': np.sum(np.abs(y_pred_proba - 0.5) > 0.3) / len(y_pred_proba)
}
def _create_evaluation_visualizations(self, y_true, y_pred, y_pred_proba, model_name):
"""
π¨ Create comprehensive visual evaluation reports
Like taking photos of each dish from different angles.
"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# π― Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', ax=axes[0,0], cmap='Blues',
xticklabels=['No Churn', 'Churn'],
yticklabels=['No Churn', 'Churn'])
axes[0,0].set_title(f'{model_name}: Confusion Matrix')
axes[0,0].set_ylabel('True Label')
axes[0,0].set_xlabel('Predicted Label')
# π ROC Curve
from sklearn.metrics import roc_curve, auc
fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
axes[0,1].plot(fpr, tpr, label=f'ROC Curve (AUC = {auc(fpr, tpr):.3f})')
axes[0,1].plot([0, 1], [0, 1], 'k--', label='Random Guess')
axes[0,1].set_xlabel('False Positive Rate')
axes[0,1].set_ylabel('True Positive Rate')
axes[0,1].set_title(f'{model_name}: ROC Curve')
axes[0,1].legend()
axes[0,1].grid(True)
# π Prediction Distribution
axes[0,2].hist(y_pred_proba[y_true==0], bins=30, alpha=0.5,
label='No Churn', color='blue', density=True)
axes[0,2].hist(y_pred_proba[y_true==1], bins=30, alpha=0.5,
label='Churn', color='red', density=True)
axes[0,2].set_xlabel('Predicted Probability')
axes[0,2].set_ylabel('Density')
axes[0,2].set_title(f'{model_name}: Prediction Distribution')
axes[0,2].legend()
axes[0,2].grid(True)
# π― Precision-Recall Curve
from sklearn.metrics import precision_recall_curve
precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
axes[1,0].plot(recall, precision, label=f'PR Curve')
axes[1,0].set_xlabel('Recall')
axes[1,0].set_ylabel('Precision')
axes[1,0].set_title(f'{model_name}: Precision-Recall Curve')
axes[1,0].legend()
axes[1,0].grid(True)
# π Feature Importance (if available)
if hasattr(self.evaluation_results[model_name]['model'], 'feature_importances_'):
importance = self.evaluation_results[model_name]['model'].feature_importances_
indices = np.argsort(importance)[::-1][:15] # Top 15
axes[1,1].bar(range(len(indices)), importance[indices])
axes[1,1].set_title(f'{model_name}: Top 15 Feature Importances')
axes[1,1].set_xticks(range(len(indices)))
axes[1,1].tick_params(axis='x', rotation=45)
# π Calibration Plot
from sklearn.calibration import calibration_curve
fraction_of_positives, mean_predicted_value = calibration_curve(
y_true, y_pred_proba, n_bins=10
)
axes[1,2].plot(mean_predicted_value, fraction_of_positives, "s-",
label=f"{model_name}")
axes[1,2].plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
axes[1,2].set_xlabel('Mean Predicted Probability')
axes[1,2].set_ylabel('Fraction of Positives')
axes[1,2].set_title(f'{model_name}: Calibration Plot')
axes[1,2].legend()
axes[1,2].grid(True)
plt.tight_layout()
plt.savefig(f'reports/figures/{model_name}_comprehensive_evaluation.png',
dpi=300, bbox_inches='tight')
plt.close()
self.logger.info(f"π Comprehensive evaluation plots created for {model_name}")
def _log_evaluation_summary(self, model_name, metrics, analysis):
"""π Create human-readable evaluation summary"""
self.logger.info(f"\nπ {model_name} Performance Report Card:")
self.logger.info(f" π Overall Accuracy: {metrics['accuracy']:.3f} ({metrics['accuracy']*100:.1f}%)")
self.logger.info(f" π― Precision: {metrics['precision']:.3f} (When I say churn, I'm right {metrics['precision']*100:.1f}% of the time)")
self.logger.info(f" π‘ Recall: {metrics['recall']:.3f} (I catch {metrics['recall']*100:.1f}% of actual churners)")
self.logger.info(f" βοΈ F1-Score: {metrics['f1_score']:.3f} (Balanced performance)")
self.logger.info(f" π AUC-ROC: {metrics['auc_roc']:.3f} (Ranking ability)")
self.logger.info(f" π° Business Value: ${metrics['business_value_score']:.2f} per prediction")
# π¨ Highlight potential issues
if metrics['false_positive_rate'] > 0.2:
self.logger.warning(f" β οΈ High false alarm rate: {metrics['false_positive_rate']*100:.1f}%")
if metrics['false_negative_rate'] > 0.3:
self.logger.warning(f" β οΈ Missing many churners: {metrics['false_negative_rate']*100:.1f}%")
# π― Business interpretation
cm = analysis['confusion_matrix']
tn, fp, fn, tp = cm.ravel()
self.logger.info(f" π Business Impact:")
self.logger.info(f" β’ Correctly identified churners: {tp}")
self.logger.info(f" β’ Correctly identified loyal customers: {tn}")
self.logger.info(f" β’ False alarms (unnecessary retention efforts): {fp}")
self.logger.info(f" β’ Missed churners (lost customers): {fn}")
def select_champion_model(self):
"""
π Select the best model based on multiple criteria
Like choosing the winner of a cooking competition based on
multiple judges' scores and different criteria.
"""
self.logger.info("π Selecting champion model...")
if not self.evaluation_results:
raise ValueError("No models have been evaluated yet!")
# π Calculate composite score for each model
scores = {}
for model_name, results in self.evaluation_results.items():
metrics = results['metrics']
# π― Weighted scoring (adjust weights based on business priorities)
composite_score = (
metrics['auc_roc'] * 0.3 + # Overall ranking ability
metrics['f1_score'] * 0.25 + # Balanced performance
metrics['precision'] * 0.2 + # Accuracy when predicting churn
metrics['recall'] * 0.15 + # Catching actual churners
(1 - metrics['false_negative_rate']) * 0.1 # Not missing churners
)
scores[model_name] = composite_score
self.logger.info(f" {model_name}: Composite Score = {composite_score:.4f}")
# π Select the winner
champion_name = max(scores, key=scores.get)
champion_score = scores[champion_name]
self.logger.info(f"\nπ Champion Model Selected: {champion_name}")
self.logger.info(f"π Champion Score: {champion_score:.4f}")
# π Provide detailed justification
champion_metrics = self.evaluation_results[champion_name]['metrics']
self.logger.info(f"\nπ Why {champion_name} won:")
self.logger.info(f" β’ Excellent overall ranking (AUC: {champion_metrics['auc_roc']:.3f})")
self.logger.info(f" β’ Good precision-recall balance (F1: {champion_metrics['f1_score']:.3f})")
self.logger.info(f" β’ Strong business value (${champion_metrics['business_value_score']:.2f}/prediction)")
return champion_name, self.evaluation_results[champion_name]
def create_model_comparison_report(self):
"""
π Create comprehensive comparison report of all models
Like creating a final judging summary showing how each dish performed.
"""
if not self.evaluation_results:
self.logger.warning("No evaluation results to compare")
return
# π Create comparison DataFrame
comparison_data = []
for model_name, results in self.evaluation_results.items():
metrics = results['metrics']
row = {
'Model': model_name,
'Accuracy': f"{metrics['accuracy']:.3f}",
'Precision': f"{metrics['precision']:.3f}",
'Recall': f"{metrics['recall']:.3f}",
'F1-Score': f"{metrics['f1_score']:.3f}",
'AUC-ROC': f"{metrics['auc_roc']:.3f}",
'Business Value': f"${metrics['business_value_score']:.2f}",
'FP Rate': f"{metrics['false_positive_rate']:.3f}",
'FN Rate': f"{metrics['false_negative_rate']:.3f}"
}
comparison_data.append(row)
comparison_df = pd.DataFrame(comparison_data)
# πΎ Save comparison table
comparison_df.to_csv('reports/model_comparison.csv', index=False)
# π Create comparison visualization
fig, ax = plt.subplots(figsize=(12, 8))
metrics_to_plot = ['Accuracy', 'Precision', 'Recall', 'F1-Score', 'AUC-ROC']
x = np.arange(len(self.evaluation_results))
width = 0.15
for i, metric in enumerate(metrics_to_plot):
values = [float(comparison_df[comparison_df['Model'] == model][metric].iloc[0])
for model in self.evaluation_results.keys()]
ax.bar(x + i*width, values, width, label=metric)
ax.set_xlabel('Models')
ax.set_ylabel('Score')
ax.set_title('Model Performance Comparison')
ax.set_xticks(x + width * 2)
ax.set_xticklabels(self.evaluation_results.keys(), rotation=45)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('reports/figures/model_comparison.png', dpi=300, bbox_inches='tight')
plt.close()
self.logger.info("π Model comparison report created")
self.logger.info(f"\n{comparison_df.to_string(index=False)}")
# π Usage example
def evaluate_all_models(models, X_test, y_test):
"""Main evaluation function"""
evaluator = ModelEvaluator()
# π Evaluate each model
for model_name, model in models.items():
evaluator.comprehensive_evaluation(model, X_test, y_test, model_name)
# π Create comparison report
evaluator.create_model_comparison_report()
# π Select champion
champion_name, champion_info = evaluator.select_champion_model()
return champion_name, champion_info, evaluator
This evaluation system provides comprehensive testing and automatic selection of the best model, ensuring you deploy only the highest quality ML models to production.
y_pred = model.predict(self.X_test)
y_pred_proba = model.predict_proba(self.X_test)[:, 1]
# Calculate comprehensive metrics
metrics = self._calculate_comprehensive_metrics(y_pred, y_pred_proba)
# Business impact analysis
business_metrics = self._calculate_business_impact(y_pred, y_pred_proba)
# Model interpretability analysis
interpretability_metrics = self._analyze_model_interpretability(model, model_name)
# Combine all metrics
all_metrics = {**metrics, **business_metrics, **interpretability_metrics}
evaluation_results[model_name] = {
'metrics': all_metrics,
'model_info': model_info,
'evaluation_score': self._calculate_composite_score(all_metrics)
}
# Log detailed evaluation to MLflow
self._log_evaluation_to_mlflow(model_name, model_info, all_metrics, y_pred, y_pred_proba)
# Select best model based on composite score
best_model_name = max(evaluation_results.keys(),
key=lambda x: evaluation_results[x]['evaluation_score'])
self.best_model = evaluation_results[best_model_name]['model_info']['model']
self.best_score = evaluation_results[best_model_name]['evaluation_score']
best_run_id = evaluation_results[best_model_name]['model_info']['run_id']
logger.info(f"π Best model selected: {best_model_name}")
logger.info(f"π Composite score: {self.best_score:.4f}")
# Generate evaluation report
self._generate_evaluation_report(evaluation_results, best_model_name)
return best_run_id
def _calculate_comprehensive_metrics(self, y_pred: np.ndarray, y_pred_proba: np.ndarray) -> Dict[str, float]:
"""π Calculate comprehensive performance metrics"""
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score, matthews_corrcoef,
balanced_accuracy_score, cohen_kappa_score
)
metrics = {
# π― Classification Metrics
'accuracy': accuracy_score(self.y_test, y_pred),
'balanced_accuracy': balanced_accuracy_score(self.y_test, y_pred),
'precision': precision_score(self.y_test, y_pred),
'recall': recall_score(self.y_test, y_pred),
'f1_score': f1_score(self.y_test, y_pred),
'specificity': self._calculate_specificity(self.y_test, y_pred),
# π Probabilistic Metrics
'roc_auc': roc_auc_score(self.y_test, y_pred_proba),
'average_precision': average_precision_score(self.y_test, y_pred_proba),
'log_loss': -np.mean(self.y_test * np.log(y_pred_proba + 1e-15) +
(1 - self.y_test) * np.log(1 - y_pred_proba + 1e-15)),
# π Additional Metrics
'matthews_corrcoef': matthews_corrcoef(self.y_test, y_pred),
'cohen_kappa': cohen_kappa_score(self.y_test, y_pred),
# π Threshold-based Metrics
'precision_at_90_recall': self._precision_at_recall(self.y_test, y_pred_proba, 0.9),
'recall_at_90_precision': self._recall_at_precision(self.y_test, y_pred_proba, 0.9),
}
return metrics
def _calculate_business_impact(self, y_pred: np.ndarray, y_pred_proba: np.ndarray) -> Dict[str, float]:
"""π° Calculate business-relevant metrics"""
# Business assumptions (configurable in production)
customer_value = self.config.get('avg_customer_value', 1000) # Average customer value
retention_cost = self.config.get('retention_cost', 100) # Cost of retention campaign
false_positive_cost = self.config.get('fp_cost', 50) # Cost of false positive
# Calculate confusion matrix components
tn, fp, fn, tp = confusion_matrix(self.y_test, y_pred).ravel()
# Business metrics
business_metrics = {
# π° Revenue Impact
'revenue_saved': tp * customer_value, # True positives saved
'revenue_lost': fn * customer_value, # False negatives lost
'retention_costs': (tp + fp) * retention_cost, # Campaign costs
'false_positive_costs': fp * false_positive_cost,
# π Efficiency Metrics
'cost_per_saved_customer': (tp + fp) * retention_cost / max(tp, 1),
'roi': (tp * customer_value - (tp + fp) * retention_cost) / max((tp + fp) * retention_cost, 1),
# π― Coverage Metrics
'churn_coverage': tp / max(tp + fn, 1), # Percentage of churners identified
'campaign_efficiency': tp / max(tp + fp, 1), # Success rate of campaigns
# π Risk Metrics
'high_risk_identified': np.sum((y_pred_proba > 0.8) & (self.y_test == 1)) / max(np.sum(self.y_test == 1), 1),
'low_risk_accuracy': np.sum((y_pred_proba < 0.3) & (self.y_test == 0)) / max(np.sum(self.y_test == 0), 1),
}
# Net business value
net_value = business_metrics['revenue_saved'] - business_metrics['revenue_lost'] - business_metrics['retention_costs']
business_metrics['net_business_value'] = net_value
return business_metrics
def _analyze_model_interpretability(self, model, model_name: str) -> Dict[str, float]:
"""π Analyze model interpretability and feature importance"""
interpretability_metrics = {}
try:
# Feature importance (for tree-based models)
if hasattr(model, 'feature_importances_'):
feature_importance = model.feature_importances_
# Top feature importance metrics
interpretability_metrics.update({
'top_feature_importance': np.max(feature_importance),
'feature_importance_concentration': np.sum(np.sort(feature_importance)[-5:]) / np.sum(feature_importance),
'feature_importance_entropy': -np.sum(feature_importance * np.log(feature_importance + 1e-15)),
'effective_features': np.sum(feature_importance > 0.01) # Features with >1% importance
})
# Log feature importance plot
self._create_feature_importance_plot(feature_importance, model_name)
# Model complexity metrics
if hasattr(model, 'n_estimators'): # Tree-based ensemble
interpretability_metrics['model_complexity'] = model.n_estimators
elif hasattr(model, 'coef_'): # Linear model
interpretability_metrics['model_complexity'] = np.sum(np.abs(model.coef_[0]) > 0.01)
else:
interpretability_metrics['model_complexity'] = 1.0 # Default for unknown models
# Training time proxy (smaller models are generally faster)
interpretability_metrics['training_efficiency'] = 1.0 / (interpretability_metrics.get('model_complexity', 1.0) + 1)
except Exception as e:
logger.warning(f"β οΈ Could not analyze interpretability for {model_name}: {str(e)}")
interpretability_metrics = {'interpretability_score': 0.5} # Default neutral score
return interpretability_metrics
def _calculate_composite_score(self, metrics: Dict[str, float]) -> float:
"""π― Calculate composite evaluation score"""
# Define weights for different metric categories
weights = {
# Performance metrics (40% weight)
'roc_auc': 0.15,
'f1_score': 0.10,
'balanced_accuracy': 0.10,
'average_precision': 0.05,
# Business metrics (40% weight)
'roi': 0.15,
'churn_coverage': 0.10,
'campaign_efficiency': 0.10,
'high_risk_identified': 0.05,
# Interpretability metrics (20% weight)
'training_efficiency': 0.10,
'effective_features': 0.05,
'feature_importance_concentration': 0.05
}
# Normalize business metrics to 0-1 scale
normalized_metrics = metrics.copy()
# ROI normalization (assume good ROI is > 3.0)
if 'roi' in normalized_metrics:
normalized_metrics['roi'] = min(1.0, max(0.0, normalized_metrics['roi'] / 3.0))
# Effective features normalization (assume good range is 5-15 features)
if 'effective_features' in normalized_metrics:
eff_feat = normalized_metrics['effective_features']
normalized_metrics['effective_features'] = 1.0 - abs(eff_feat - 10) / 10
normalized_metrics['effective_features'] = max(0.0, min(1.0, normalized_metrics['effective_features']))
# Calculate weighted score
composite_score = 0.0
total_weight = 0.0
for metric, weight in weights.items():
if metric in normalized_metrics:
composite_score += normalized_metrics[metric] * weight
total_weight += weight
# Normalize by actual weights used
if total_weight > 0:
composite_score /= total_weight
return composite_score
def _log_evaluation_to_mlflow(self, model_name: str, model_info: dict, metrics: dict,
y_pred: np.ndarray, y_pred_proba: np.ndarray):
"""π Log comprehensive evaluation results to MLflow"""
with mlflow.start_run(run_name=f"evaluation_{model_name}") as run:
# Log all metrics
for metric_name, value in metrics.items():
mlflow.log_metric(f"eval_{metric_name}", value)
# Log confusion matrix
cm = confusion_matrix(self.y_test, y_pred)
self._log_confusion_matrix(cm, model_name)
# Log ROC curve
self._log_roc_curve(self.y_test, y_pred_proba, model_name)
# Log precision-recall curve
self._log_precision_recall_curve(self.y_test, y_pred_proba, model_name)
# Log feature importance (if available)
if hasattr(model_info['model'], 'feature_importances_'):
self._log_feature_importance(model_info['model'].feature_importances_, model_name)
# Log calibration plot
self._log_calibration_plot(self.y_test, y_pred_proba, model_name)
# Tag as evaluation run
mlflow.set_tag("evaluation_run", "true")
mlflow.set_tag("model_type", model_name)
## π Model Registry & Deployment
python
def register_best_model(self, best_run_id: str):
"""π Register the best model to MLflow Model Registry"""
logger.info("π Registering best model to Model Registry...")
try:
# Get model URI from the best run
model_uri = f"runs:/{best_run_id}/model"
# Register model
model_version = mlflow.register_model(
model_uri=model_uri,
name=self.model_name
)
logger.info(f"π Model registered: {self.model_name} version {model_version.version}")
# Transition to Staging for validation
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=self.model_name,
version=model_version.version,
stage="Staging"
)
logger.info(f"π Model transitioned to Staging stage")
# Add model description and tags
client.update_model_version(
name=self.model_name,
version=model_version.version,
description=f"""
Best performing churn prediction model from training run {best_run_id}.
Performance Summary:
- ROC-AUC: {self.models[self._get_best_model_name()]['roc_auc']:.4f}
- Composite Score: {self.best_score:.4f}
- Training Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Model Configuration:
- Features: {self.X_train.shape[1]}
- Training Samples: {len(self.X_train)}
- Test Samples: {len(self.X_test)}
"""
)
# Set tags
client.set_model_version_tag(
name=self.model_name,
version=model_version.version,
key="training_date",
value=datetime.now().strftime('%Y-%m-%d')
)
client.set_model_version_tag(
name=self.model_name,
version=model_version.version,
key="validation_status",
value="pending"
)
# Save model artifacts locally for backup
self._save_model_artifacts(model_version.version)
# Validate model in staging
if self._validate_staging_model(model_version.version):
# Transition to Production if validation passes
client.transition_model_version_stage(
name=self.model_name,
version=model_version.version,
stage="Production"
)
logger.info("β
Model promoted to Production after validation")
else:
logger.warning("β οΈ Model validation failed, keeping in Staging")
return model_version.version
except Exception as e:
logger.error(f"β Model registration failed: {str(e)}")
raise
def _validate_staging_model(self, model_version: str) -> bool:
"""β
Validate model in staging environment"""
logger.info(f"β
Validating model version {model_version} in staging...")
try:
# Load model from staging
client = mlflow.tracking.MlflowClient()
model_uri = f"models:/{self.model_name}/{model_version}"
staging_model = mlflow.sklearn.load_model(model_uri)
# Run validation tests
validation_passed = True
# 1. Prediction consistency test
sample_predictions = staging_model.predict_proba(self.X_test[:100])
if not self._validate_prediction_format(sample_predictions):
logger.error("β Prediction format validation failed")
validation_passed = False
# 2. Performance threshold test
y_pred_proba = staging_model.predict_proba(self.X_test)[:, 1]
staging_auc = roc_auc_score(self.y_test, y_pred_proba)
min_auc_threshold = self.config.get('min_auc_threshold', 0.75)
if staging_auc < min_auc_threshold:
logger.error(f"β Model AUC {staging_auc:.4f} below threshold {min_auc_threshold}")
validation_passed = False
# 3. Data drift test (simplified)
if not self._validate_feature_stability():
logger.error("β Feature stability validation failed")
validation_passed = False
# 4. Model size and performance test
model_size = self._get_model_size(staging_model)
max_size_mb = self.config.get('max_model_size_mb', 100)
if model_size > max_size_mb:
logger.warning(f"β οΈ Model size {model_size:.1f}MB exceeds recommended {max_size_mb}MB")
# Update validation status
client.set_model_version_tag(
name=self.model_name,
version=model_version,
key="validation_status",
value="passed" if validation_passed else "failed"
)
client.set_model_version_tag(
name=self.model_name,
version=model_version,
key="validation_auc",
value=str(staging_auc)
)
return validation_passed
except Exception as e:
logger.error(f"β Model validation error: {str(e)}")
return False
def _save_model_artifacts(self, model_version: str):
"""πΎ Save model artifacts locally for backup and deployment"""
logger.info("πΎ Saving model artifacts...")
# Create artifacts directory
artifacts_dir = f"artifacts/model_v{model_version}"
os.makedirs(artifacts_dir, exist_ok=True)
# Save the trained model
model_path = os.path.join(artifacts_dir, "model.pkl")
joblib.dump(self.best_model, model_path)
# Save preprocessors
scaler_path = os.path.join(artifacts_dir, "scaler.pkl")
encoder_path = os.path.join(artifacts_dir, "encoder.pkl")
if os.path.exists("artifacts/scaler.pkl"):
joblib.dump(joblib.load("artifacts/scaler.pkl"), scaler_path)
if os.path.exists("artifacts/encoder.pkl"):
joblib.dump(joblib.load("artifacts/encoder.pkl"), encoder_path)
# Save model metadata
metadata = {
"model_version": model_version,
"training_date": datetime.now().isoformat(),
"model_type": type(self.best_model).__name__,
"feature_count": self.X_train.shape[1],
"training_samples": len(self.X_train),
"test_auc": self.models[self._get_best_model_name()]['roc_auc'],
"composite_score": self.best_score
}
metadata_path = os.path.join(artifacts_dir, "metadata.json")
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"β
Model artifacts saved to {artifacts_dir}")
## π³ Containerization & Deployment
### Docker Configuration
dockerfile
services/training/Dockerfile
FROM python:3.9-slim
π·οΈ Metadata
LABEL maintainer="mlops-team@company.com"
LABEL version="2.0.0"
LABEL description="Customer Churn Prediction Training Service"
π§ System dependencies
RUN apt-get update && apt-get install -y \
curl \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*
π Working directory
WORKDIR /app
π Copy requirements first for better caching
COPY requirements.txt .
π¦ Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
π Copy training code
COPY . .
π€ Create non-root user for security
RUN useradd --create-home --shell /bin/bash trainer && \
chown -R trainer:trainer /app && \
mkdir -p /app/logs /app/artifacts /app/models && \
chown -R trainer:trainer /app/logs /app/artifacts /app/models
USER trainer
π Expose port for monitoring
EXPOSE 8080
π₯ Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')" || exit 1
π Default command
CMD ["python", "churn_mlops_pipeline.py"]
### Training Automation & Scheduling
python
utils/training_scheduler.py
import schedule
import time
import threading
import logging
from datetime import datetime, timedelta
import os
import subprocess
class TrainingScheduler:
"""π Automated training scheduler with flexible scheduling options"""
def __init__(self, config: dict):
self.config = config
self.is_running = False
self.scheduler_thread = None
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def schedule_training(self):
"""ποΈ Setup training schedules"""
# Weekly full retraining (Sundays at 2 AM)
schedule.every().sunday.at("02:00").do(self._run_full_training)
# Daily incremental training (every day at 6 AM)
schedule.every().day.at("06:00").do(self._run_incremental_training)
# Data drift check (every 6 hours)
schedule.every(6).hours.do(self._check_data_drift)
# Model performance monitoring (every hour)
schedule.every().hour.do(self._monitor_model_performance)
self.logger.info("π
Training schedules configured")
def start_scheduler(self):
"""π Start the training scheduler"""
if self.is_running:
self.logger.warning("β οΈ Scheduler is already running")
return
self.is_running = True
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
self.logger.info("π Training scheduler started")
def stop_scheduler(self):
"""π Stop the training scheduler"""
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join()
self.logger.info("π Training scheduler stopped")
def _run_scheduler(self):
"""π Main scheduler loop"""
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
def _run_full_training(self):
"""π Execute full model retraining"""
self.logger.info("π Starting scheduled full training...")
try:
# Run training pipeline
cmd = ["python", "churn_mlops_pipeline.py", "--mode", "full"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")
if result.returncode == 0:
self.logger.info("β
Full training completed successfully")
self._notify_training_completion("full", "success")
else:
self.logger.error(f"β Full training failed: {result.stderr}")
self._notify_training_completion("full", "failed", result.stderr)
except Exception as e:
self.logger.error(f"β Full training error: {str(e)}")
self._notify_training_completion("full", "error", str(e))
def _run_incremental_training(self):
"""β‘ Execute incremental model training"""
self.logger.info("β‘ Starting scheduled incremental training...")
try:
# Check if new data is available
if not self._check_new_data_available():
self.logger.info("βΉοΈ No new data available for incremental training")
return
# Run incremental training
cmd = ["python", "churn_mlops_pipeline.py", "--mode", "incremental"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")
if result.returncode == 0:
self.logger.info("β
Incremental training completed successfully")
self._notify_training_completion("incremental", "success")
else:
self.logger.error(f"β Incremental training failed: {result.stderr}")
self._notify_training_completion("incremental", "failed", result.stderr)
except Exception as e:
self.logger.error(f"β Incremental training error: {str(e)}")
self._notify_training_completion("incremental", "error", str(e))
def _check_data_drift(self):
"""π Check for data drift and trigger retraining if needed"""
self.logger.info("π Checking for data drift...")
try:
# Run data drift detection
cmd = ["python", "-c", "from utils.drift_detector import detect_drift; detect_drift()"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")
if "DRIFT_DETECTED" in result.stdout:
self.logger.warning("β οΈ Data drift detected, triggering retraining...")
self._run_full_training()
else:
self.logger.info("β
No significant data drift detected")
except Exception as e:
self.logger.error(f"β Data drift check error: {str(e)}")
def _monitor_model_performance(self):
"""π Monitor current model performance"""
try:
# Check model performance metrics
cmd = ["python", "-c", "from utils.performance_monitor import check_performance; check_performance()"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd="/app")
if "PERFORMANCE_DEGRADED" in result.stdout:
self.logger.warning("π Model performance degraded, scheduling retraining...")
# Schedule urgent retraining
schedule.every(30).minutes.do(self._run_full_training).tag('urgent')
except Exception as e:
self.logger.error(f"β Performance monitoring error: {str(e)}")
π Main execution
if name == "main":
config = {
"mlflow_tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"),
"experiment_name": "churn-prediction-scheduled",
"notification_webhook": os.getenv("NOTIFICATION_WEBHOOK")
}
scheduler = TrainingScheduler(config)
scheduler.schedule_training()
scheduler.start_scheduler()
try:
# Keep the scheduler running
while True:
time.sleep(60)
except KeyboardInterrupt:
scheduler.stop_scheduler()
print("π Training scheduler stopped")
## π Monitoring & Performance Tracking
python
utils/training_monitor.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import mlflow
import logging
π Training metrics
training_runs_total = Counter('training_runs_total', 'Total training runs', ['status', 'mode'])
training_duration = Histogram('training_duration_seconds', 'Training duration')
model_performance_gauge = Gauge('model_performance_auc', 'Current model AUC score')
data_processed_total = Counter('data_processed_total', 'Total data points processed')
class TrainingMonitor:
"""π Comprehensive training monitoring and alerting"""
def __init__(self, webhook_url: str = None):
self.webhook_url = webhook_url
self.logger = logging.getLogger(__name__)
# Start metrics server
start_http_server(8080)
self.logger.info("π Training monitoring server started on port 8080")
def track_training_run(self, func):
"""π― Decorator to track training runs"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Record successful training
duration = time.time() - start_time
training_duration.observe(duration)
training_runs_total.labels(status='success', mode='full').inc()
# Update performance gauge if available
if hasattr(result, 'best_score'):
model_performance_gauge.set(result.best_score)
self.logger.info(f"β
Training completed: {duration:.2f}s")
return result
except Exception as e:
training_runs_total.labels(status='failed', mode='full').inc()
self.logger.error(f"β Training failed: {str(e)}")
self._send_alert("Training Failed", str(e))
raise
return wrapper
def _send_alert(self, title: str, message: str):
"""π¨ Send training alerts"""
if not self.webhook_url:
return
try:
import requests
payload = {
"text": f"π¨ MLOps Alert: {title}",
"attachments": [{
"color": "danger",
"fields": [{
"title": "Details",
"value": message,
"short": False
}]
}]
}
requests.post(self.webhook_url, json=payload)
except Exception as e:
self.logger.error(f"Failed to send alert: {str(e)}")
#mlopszoomcamp
This comprehensive documentation covers the complete Training service implementation, from the core ML pipeline through deployment automation and monitoring. The service ensures reliable, automated model development with proper evaluation, validation, and deployment processes.
Top comments (0)