Customer Churn Prediction
π Table of Contents
- Pipeline Overview
- Prefect Framework Basics
- Pipeline Architecture
- Task-by-Task Breakdown
- Flow Orchestration
- Monitoring Pipeline
- Execution Guide
- Troubleshooting
π― Pipeline Overview
What Your Prefect Pipeline Does
Your Prefect pipeline is a complete MLOps workflow orchestrator that automates the entire machine learning lifecycle for customer churn prediction. Think of it as an intelligent factory manager that:
- Coordinates all tasks in the correct order
- Handles failures gracefully with retries and error reporting
- Tracks progress and provides real-time monitoring
- Manages resources efficiently with concurrent execution
- Ensures reproducibility with proper logging and versioning
Business Value
Without Prefect: Manual, error-prone process requiring constant supervision
With Prefect: Automated, reliable, scalable ML pipeline that runs unattended
Pipeline Components
π Your Complete Prefect Pipeline:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CHURN PREDICTION PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β π DATA PROCESSING TASKS β
β βββ setup_mlflow() # Initialize experiment tracking β
β βββ load_and_validate_data() # Load and validate raw data β
β βββ prepare_data_splits() # Create train/val/test splits β
β βββ feature_engineering() # Transform data for ML β
β βββ apply_smote() # Balance the dataset β
β β
β π€ MODEL TRAINING TASKS β
β βββ train_baseline_models() # Train simple baseline models β
β βββ hyperparameter_optimization() # Optimize advanced models β
β βββ train_optimized_models() # Train with best parameters β
β βββ cross_validate_models() # Robust model evaluation β
β β
β π MODEL DEPLOYMENT TASKS β
β βββ select_best_model() # Choose the best performer β
β βββ register_best_model() # Save to production registry β
β β
β π MONITORING TASKS (Separate Flow) β
β βββ load_reference_data() # Get baseline data for drift β
β βββ generate_current_data() # Simulate new incoming data β
β βββ calculate_drift_metrics() # Measure data drift β
β βββ save_monitoring_results() # Store metrics in database β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π§© Prefect Framework Basics
Understanding Prefect Concepts
1. Tasks (@task
)
@task(name="example_task", description="What this task does")
def my_task(input_data):
"""
A task is a single unit of work in your pipeline.
Key features:
- Can be retried automatically on failure
- Have built-in logging
- Can run in parallel with other tasks
- Return values are automatically cached
"""
# Do some work
result = process_data(input_data)
return result
2. Flows (@flow
)
@flow(name="example_flow", description="Orchestrates multiple tasks")
def my_pipeline():
"""
A flow orchestrates multiple tasks.
Key features:
- Defines the execution order of tasks
- Manages dependencies between tasks
- Provides error handling and retries
- Can run tasks concurrently
"""
# Tasks run in dependency order
data = load_data()
processed = process_data(data)
model = train_model(processed)
return model
3. Task Runners
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def concurrent_pipeline():
"""
Task runners control HOW tasks execute:
- SequentialTaskRunner: One task at a time (default)
- ConcurrentTaskRunner: Multiple tasks in parallel
- DaskTaskRunner: Distributed execution across machines
"""
Your Pipeline's Prefect Configuration
# Your main flow configuration
@flow(
name="customer-churn-mlops-pipeline", # Flow name in Prefect UI
description="Complete MLOps pipeline for customer churn prediction",
task_runner=ConcurrentTaskRunner(), # Enable parallel execution
)
def churn_mlops_pipeline(
data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
hyperopt_max_evals: int = 5, # Number of optimization trials
):
ποΈ Pipeline Architecture
Execution Flow Diagram
π Task Execution Order & Dependencies:
ββ setup_mlflow() βββββββββββββββββββββββββββββββββββββββββββββββ
β β
ββ load_and_validate_data() ββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ prepare_data_splits() βββββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ feature_engineering() βββββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ apply_smote() βββββββββββββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ train_baseline_models() βββ¬β hyperparameter_optimization() ββ€
β β β β β
β β β βΌ β
β β ββ train_optimized_models() βββββββ€
β β β β
β βΌ β β
ββ COMBINE RESULTS ββββββββββββββββββββββ β
β β β
β βΌ β
ββ cross_validate_models() βββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ select_best_model() βββββββββββββββββββββββββββββββββββββββββ€
β β β
β βΌ β
ββ register_best_model() βββββββββββββββββββββββββββββββββββββββ
Concurrent Execution Benefits
Your pipeline uses ConcurrentTaskRunner()
which enables:
- Baseline Models & Hyperparameter Optimization run in parallel
- Multiple hyperparameter optimization tasks run simultaneously
- Cross-validation runs in parallel for different models
- Faster overall execution without sacrificing reliability
π Task-by-Task Breakdown
1. Setup & Configuration Tasks
setup_mlflow()
Task
@task(name="setup_mlflow", description="Initialize MLflow tracking")
def setup_mlflow() -> str:
What it does:
- Connects to your MLflow server at
http://34.238.247.213:5000
- Creates or gets existing experiment named
"churn_prediction_production"
- Sets up experiment tracking for all subsequent model training
Why it's important:
- Experiment Tracking: Every model run is logged with parameters and metrics
- Reproducibility: You can recreate any model from its logged parameters
- Comparison: Compare different models and experiments easily
- Production Readiness: Models are properly versioned and stored
How it works:
# Your actual implementation breakdown:
1. Connect to MLflow server
mlflow.set_tracking_uri("http://34.238.247.213:5000")
2. Test connection and create/get experiment
try:
experiment = mlflow.create_experiment("churn_prediction_production")
except mlflow.exceptions.MlflowException:
# Experiment exists, get its ID
experiment = mlflow.get_experiment_by_name("churn_prediction_production")
3. Set active experiment
mlflow.set_experiment("churn_prediction_production")
4. Return experiment ID for tracking
return experiment_id
load_and_validate_data()
Task
@task(name="load_and_validate_data", description="Load data with comprehensive validation")
def load_and_validate_data(data_path: str) -> pd.DataFrame:
What it does:
- Loads the telco customer churn dataset
- Performs 8 different data quality checks
- Cleans and validates the data
- Saves validation results for audit purposes
Detailed Validation Process:
# Your 8-step validation process:
β
Step 1: Schema Validation
- Checks all required columns exist
- Validates data types match expectations
β
Step 2: Data Type Fixes
- Converts TotalCharges from string to numeric
- Handles "string" values that should be numbers
β
Step 3: Missing Value Analysis
- Counts missing values per column
- Calculates percentages of missing data
β
Step 4: Data Quality Warnings
- Warns if >5% of data is missing in any column
- Logs high missing value columns
β
Step 5: Data Cleaning
- Removes rows with missing values
- Tracks how much data was lost
β
Step 6: Target Variable Analysis
- Analyzes churn distribution (Yes/No ratio)
- Ensures sufficient samples of each class
β
Step 7: Statistical Summary
- Generates descriptive statistics for numerical features
- Helps identify data distribution issues
β
Step 8: Outlier Detection
- Uses IQR method to detect outliers
- Counts outliers per numerical feature
Validation Results Saved:
{
"missing_values": {"tenure": 0, "MonthlyCharges": 0, ...},
"missing_percentage": {"tenure": 0.0, "MonthlyCharges": 0.0, ...},
"rows_removed": 11,
"data_loss_percentage": 0.16,
"target_distribution": {"No": 0.73, "Yes": 0.27},
"numerical_stats": {...},
"tenure_outliers": 45,
"MonthlyCharges_outliers": 23
}
2. Data Processing Tasks
prepare_data_splits()
Task
@task(name="prepare_data_splits", description="Create train/validation/test splits")
def prepare_data_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
What it does:
Creates three datasets for proper model evaluation:
Split Strategy:
# Your 60/20/20 split approach:
π Total Dataset (100%)
βββ Training Set (60%) # For model learning
βββ Validation Set (20%) # For hyperparameter tuning
βββ Test Set (20%) # For final evaluation
# Implementation details:
1. First split: 60% train, 40% temp
train_df, temp_df = train_test_split(df, test_size=0.4, stratify=df["Churn"])
2. Second split: Split temp into 20% validation, 20% test
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["Churn"])
Why this approach:
- Training Set: Large enough for model learning
- Validation Set: Used for hyperparameter optimization (prevents overfitting)
- Test Set: Unbiased final evaluation (never seen during training/tuning)
- Stratification: Maintains the same churn ratio in all splits
feature_engineering()
Task
@task(name="feature_engineering", description="Comprehensive feature engineering")
def feature_engineering(...) -> Tuple[np.ndarray, ...]:
What it does:
Transforms raw customer data into machine learning-ready features.
Feature Processing Pipeline:
# Your feature transformation process:
π·οΈ Categorical Features Processing:
βββ Input: ["Yes", "No", "DSL", "Fiber optic", ...]
βββ One-Hot Encoding: Creates binary columns
βββ Drop First: Avoids multicollinearity
βββ Output: [0, 1, 0, 1, ...]
π Numerical Features Processing:
βββ Input: [tenure: 24, MonthlyCharges: 70.5, ...]
βββ Min-Max Scaling: Scales to 0-1 range
βββ Output: [0.42, 0.71, ...]
π― Target Variable Processing:
βββ Input: ["Yes", "No"]
βββ Label Encoding: Binary conversion
βββ Output: [1, 0]
Feature Engineering Steps:
# Detailed breakdown of your implementation:
1. Categorical Feature Encoding:
encoder = OneHotEncoder(drop="first", sparse_output=False)
encoded_features = encoder.fit_transform(df[categorical_features])
2. Feature Name Generation:
feature_names = encoder.get_feature_names_out(categorical_features)
# Creates names like: "Contract_One year", "PaymentMethod_Credit card"
3. Combine Features:
X_combined = pd.concat([X_encoded, df[numerical_features]], axis=1)
4. Scale All Features:
scaler = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X_combined)
5. Save Preprocessing Objects:
# Saved for production use
pickle.dump(encoder, open("models/encoder.pkl", "wb"))
pickle.dump(scaler, open("models/scaler.pkl", "wb"))
apply_smote()
Task
@task(name="apply_smote", description="Apply SMOTE for class balancing")
def apply_smote(X_train: np.ndarray, y_train: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
What it does:
Balances the dataset to improve model performance on churned customers.
The Imbalance Problem:
# Typical churn dataset imbalance:
Original Distribution:
βββ No Churn (0): 73% (5,174 customers)
βββ Churn (1): 27% (1,869 customers)
# Problem: Models become biased toward "No Churn"
# Solution: SMOTE creates synthetic churn examples
How SMOTE Works:
# Your SMOTE implementation:
π SMOTE Algorithm Steps:
1. Find churned customers in feature space
2. For each churned customer:
βββ Find k nearest churned neighbors
βββ Draw line between customer and neighbor
βββ Create new synthetic customer on that line
βββ Repeat until classes are balanced
π Result:
Balanced Distribution:
βββ No Churn (0): 50% (5,174 customers) [original]
βββ Churn (1): 50% (5,174 customers) [original + synthetic]
Benefits:
- Better Learning: Model sees equal examples of both classes
- Improved Recall: Better at catching actual churners
- Reduced Bias: More balanced predictions
- Synthetic Quality: SMOTE creates realistic examples, not just duplicates
3. Model Training Tasks
train_baseline_models()
Task
@task(name="train_baseline_models", description="Train baseline models")
def train_baseline_models(...) -> Dict[str, Dict[str, Any]]:
What it does:
Trains simple, interpretable models as performance baselines.
Your Baseline Models:
# Your three baseline models:
π€ Baseline Model Arsenal:
βββ SVC (Support Vector Classifier)
β βββ Kernel: Linear (simple decision boundary)
β βββ Strengths: Good for binary classification
β βββ Use Case: Fast, interpretable baseline
β
βββ AdaBoost (Adaptive Boosting)
β βββ Algorithm: Combines weak learners
β βββ Strengths: Reduces overfitting
β βββ Use Case: Ensemble method baseline
β
βββ Logistic Regression
βββ Algorithm: Linear probability model
βββ Strengths: Highly interpretable
βββ Use Case: Business-friendly baseline
Training Process:
# Your baseline training implementation:
for model_name, model in baseline_models.items():
with mlflow.start_run(run_name=f"{model_name}_baseline"):
# 1. Train model
model.fit(X_train.values, y_train.values)
# 2. Make predictions
y_pred = model.predict(X_val.values)
y_pred_proba = model.predict_proba(X_val)[:, 1]
# 3. Calculate metrics
metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred),
"recall": recall_score(y_val, y_pred),
"f1": f1_score(y_val, y_pred),
"roc_auc": roc_auc_score(y_val, y_pred_proba)
}
# 4. Log to MLflow
mlflow.log_params(model.get_params())
mlflow.log_metrics(metrics)
# 5. Store results
results[model_name] = {
"model": model,
"metrics": metrics,
"predictions": y_pred,
"probabilities": y_pred_proba
}
hyperparameter_optimization()
Task
@task(name="hyperparameter_optimization", description="Optimize hyperparameters for a model")
def hyperparameter_optimization(...) -> Dict[str, Any]:
What it does:
Uses Bayesian optimization to find the best hyperparameters for advanced models.
Your Optimization Strategy:
Models Being Optimized:
π― Advanced Models for Optimization:
βββ RandomForest
β βββ n_estimators: 100-500 trees
β βββ max_depth: 3-15 levels
β βββ min_samples_split: 2-20 samples
β βββ min_samples_leaf: 1-10 samples
β βββ max_features: ["sqrt", "log2", None]
β βββ bootstrap: [True, False]
β
βββ XGBoost
β βββ n_estimators: 50-200 boosters
β βββ max_depth: 3-10 levels
β βββ learning_rate: 0.01-0.3 (log scale)
β βββ min_child_weight: 1-10
β βββ subsample: 0.5-1.0
β βββ colsample_bytree: 0.5-1.0
β
βββ LightGBM
βββ n_estimators: 50-200 boosters
βββ max_depth: 3-10 levels
βββ learning_rate: 0.01-0.3 (log scale)
βββ num_leaves: 20-100 leaves
βββ min_child_weight: 1-10
βββ subsample: 0.5-1.0
βββ colsample_bytree: 0.5-1.0
Optimization Algorithm:
# Your Hyperopt implementation:
π§ Bayesian Optimization Process:
1. Define Search Space
βββ Continuous parameters: learning_rate, subsample
βββ Integer parameters: n_estimators, max_depth
βββ Categorical parameters: max_features, bootstrap
2. Objective Function
def objective(params):
βββ Create model with params
βββ Train on training data
βββ Evaluate on validation data
βββ Return negative recall (Hyperopt minimizes)
βββ Handle errors gracefully
3. Optimization Loop (max_evals=5 trials)
βββ Trial 1: Random parameters
βββ Trial 2: Based on Trial 1 results
βββ Trial 3: Intelligent parameter selection
βββ Trial 4: Refine best regions
βββ Trial 5: Final optimization
4. Return Best Parameters
βββ Parameters that achieved highest recall
Why Recall as Optimization Metric:
- Business Focus: Missing churners costs more than false alarms
- Retention Priority: Better to offer retention to loyal customers than lose churners
- Cost-Benefit: Retention cost < Customer acquisition cost
train_optimized_models()
Task
@task(name="train_optimized_models", description="Train models with optimized hyperparameters")
def train_optimized_models(...) -> Dict[str, Dict[str, Any]]:
What it does:
Trains the advanced models using the best hyperparameters found by optimization.
Training Process:
# Your optimized training implementation:
π Optimized Model Training:
for model_name, opt_result in optimization_results.items():
best_params = opt_result["best_params"]
# 1. Parameter Conversion
if model_name == "RandomForest":
# Convert choice indices to actual values
max_features_options = ["sqrt", "log2", None]
best_params["max_features"] = max_features_options[int(best_params["max_features"])]
# Convert float parameters to int
best_params["n_estimators"] = int(best_params["n_estimators"])
best_params["max_depth"] = int(best_params["max_depth"])
# 2. Model Creation
model = RandomForestClassifier(**best_params, random_state=42)
# 3. Training & Evaluation
with mlflow.start_run(run_name=f"{model_name}_optimized"):
model.fit(X_train.values, y_train.values)
y_pred = model.predict(X_val.values)
y_pred_proba = model.predict_proba(X_val)[:, 1]
# 4. Metrics & Logging
metrics = calculate_all_metrics(y_val, y_pred, y_pred_proba)
mlflow.log_params(best_params)
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, f"{model_name}_model")
cross_validate_models()
Task
@task(name="cross_validate_models", description="Perform cross-validation on all models")
def cross_validate_models(...) -> Dict[str, Dict[str, float]]:
What it does:
Provides robust performance estimates using 5-fold stratified cross-validation.
Cross-Validation Process:
# Your cross-validation implementation:
π 5-Fold Stratified Cross-Validation:
π Fold Creation:
βββ Fold 1: 20% of data (maintaining churn ratio)
βββ Fold 2: 20% of data (maintaining churn ratio)
βββ Fold 3: 20% of data (maintaining churn ratio)
βββ Fold 4: 20% of data (maintaining churn ratio)
βββ Fold 5: 20% of data (maintaining churn ratio)
π― Evaluation Process:
For each model:
βββ Train on 4 folds, test on 1 fold (5 times)
βββ Calculate metrics for each fold
βββ Compute mean and standard deviation
βββ Store comprehensive results
π Metrics Calculated:
βββ accuracy_mean Β± accuracy_std
βββ precision_mean Β± precision_std
βββ recall_mean Β± recall_std
βββ f1_mean Β± f1_std
βββ roc_auc_mean Β± roc_auc_std
Why Cross-Validation:
- Robust Estimates: Uses all data for both training and testing
- Variance Assessment: Standard deviation shows model stability
- Overfitting Detection: Consistent performance across folds indicates good generalization
- Fair Comparison: All models evaluated on same data splits
4. Model Selection & Deployment Tasks
select_best_model()
Task
@task(name="select_best_model", description="Select the best performing model")
def select_best_model(...) -> Dict[str, Any]:
What it does:
Chooses the best model based on cross-validation recall scores.
Selection Process:
# Your model selection logic:
π Best Model Selection:
selection_metric = "recall_mean" # Primary business metric
for model_name, cv_metrics in cv_results.items():
score = cv_metrics.get("recall_mean", 0)
if score > best_score:
best_score = score
best_model_name = model_name
# Result: Model with highest average recall across CV folds
Why Recall as Selection Metric:
- Business Priority: Catching churners is more important than precision
- Cost Consideration: Missing a churner costs more than unnecessary retention effort
- Customer Lifetime Value: Retaining customers has long-term value
register_best_model()
Task
@task(name="register_best_model", description="Register best model to MLflow Model Registry")
def register_best_model(...) -> str:
What it does:
Saves the best model to MLflow Model Registry for production deployment.
Registration Process:
# Your model registration implementation:
π Production Model Registration:
1. Final Test Evaluation
βββ Evaluate on held-out test set
βββ Calculate final performance metrics
βββ Ensure no data leakage
2. Artifact Preparation
βββ Save trained model (model.pkl)
βββ Save feature encoder (encoder.pkl)
βββ Save feature scaler (scaler.pkl)
βββ Create model metadata
3. Custom PyFunc Model Creation
class ChurnPredictionModel(PythonModel):
βββ Loads all preprocessing artifacts
βββ Handles end-to-end prediction pipeline
βββ Includes data validation
βββ Returns churn probabilities
4. MLflow Registration
βββ Log PyFunc model with artifacts
βββ Register to Model Registry
βββ Add detailed model description
βββ Return model URI for deployment
5. Model Documentation
βββ Performance metrics on test set
βββ Feature importance analysis
βββ Business impact estimates
βββ Deployment instructions
Custom PyFunc Model Benefits:
- End-to-End Pipeline: Includes preprocessing and prediction
- Production Ready: Handles real-world data formats
- Artifact Management: All dependencies bundled together
- Versioning: Proper model versioning for rollbacks
π Flow Orchestration
Main Pipeline Flow
@flow(
name="customer-churn-mlops-pipeline",
description="Complete MLOps pipeline for customer churn prediction",
task_runner=ConcurrentTaskRunner(),
)
def churn_mlops_pipeline(
data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
hyperopt_max_evals: int = 5,
):
Flow Execution Steps:
# Your complete pipeline orchestration:
π¬ Pipeline Execution Script:
Step 1: Initialize
βββ logger.info("Starting Customer Churn MLOps Pipeline...")
βββ experiment_id = setup_mlflow()
βββ Set up tracking and logging
Step 2: Data Processing
βββ df = load_and_validate_data(data_path)
βββ train_df, val_df, test_df = prepare_data_splits(df)
βββ X_train, X_val, X_test, y_train, y_val, y_test = feature_engineering(...)
βββ X_train_balanced, y_train_balanced = apply_smote(X_train, y_train)
Step 3: Model Training (Parallel Execution)
βββ baseline_results = train_baseline_models(...) # Task A
βββ optimization_results = {} # Task B starts
βββ for model_name in ["RandomForest", "XGBoost", "LightGBM"]:
βββββ optimization_results[model_name] = hyperparameter_optimization(...)
Step 4: Advanced Training
βββ optimized_results = train_optimized_models(...)
βββ all_models = {**baseline_results, **optimized_results}
Step 5: Evaluation & Selection
βββ cv_results = cross_validate_models(all_models, ...)
βββ best_model_info = select_best_model(all_models, cv_results, "recall_mean")
βββ model_uri = register_best_model(best_model_info, X_test, y_test)
Step 6: Summary & Completion
βββ Generate pipeline summary
βββ Log final results
βββ Return success status with model URI
Concurrent Task Execution
Tasks That Run in Parallel:
# Your concurrent execution strategy:
π Parallel Execution Groups:
Group 1: Data Processing (Sequential - Dependencies)
βββ setup_mlflow()
βββ load_and_validate_data()
βββ prepare_data_splits()
βββ feature_engineering()
βββ apply_smote()
Group 2: Model Training (Parallel)
βββ train_baseline_models() # Starts immediately
βββ hyperparameter_optimization() # 3 models run concurrently
βββ RandomForest optimization
βββ XGBoost optimization
βββ LightGBM optimization
Group 3: Final Training (Sequential - Needs optimization results)
βββ train_optimized_models()
βββ cross_validate_models()
βββ select_best_model()
βββ register_best_model()
Performance Benefits:
- 30-50% faster execution with parallel hyperparameter optimization
- Resource efficiency - CPU cores used optimally
- Fault tolerance - If one optimization fails, others continue
- Scalability - Easy to add more models without increasing total time
π Monitoring Pipeline
Separate Monitoring Flow
# Your monitoring pipeline (monitor_churn_model.py):
@flow(name="churn_model_monitoring")
def churn_monitoring_pipeline():
Monitoring Tasks Breakdown:
load_reference_data()
Task
def load_reference_data():
"""Load and prepare reference data from training dataset"""
What it does:
- Loads the original training dataset as baseline for drift detection
- Applies same preprocessing as production pipeline
- Creates reference distribution for comparison
generate_current_data()
Task
@task
def generate_current_data():
"""Generate or load current production data"""
What it does:
- Simulates new incoming customer data
- In production: Would load from real-time data sources
- Applies same preprocessing as training pipeline
calculate_drift_metrics()
Task
@task
def calculate_drift_metrics(reference_data, current_data):
"""Calculate data drift and model performance metrics"""
What it does:
Uses Evidently AI to calculate:
- Data Drift: Statistical tests (KS-test, PSI) for feature distribution changes
- Prediction Drift: Changes in model output distribution
- Missing Values: Data quality monitoring
- Model Performance: If ground truth is available
Drift Detection Process:
# Your Evidently monitoring implementation:
π Evidently Report Generation:
βββ DatasetDriftMetric() # Overall dataset drift
βββ ColumnDriftMetric() # Individual feature drift
βββ DatasetMissingValuesMetric() # Data quality checks
βββ Model performance metrics # If labels available
π― Drift Calculation:
βββ Statistical Tests: KS-test, Chi-square, Z-test
βββ Thresholds: Configurable drift sensitivity
βββ Feature-level: Individual feature drift scores
βββ Dataset-level: Overall drift assessment
save_monitoring_results()
Task
@task
def save_monitoring_results(metrics):
"""Save monitoring results to database"""
What it does:
Saves monitoring metrics to PostgreSQL database for Grafana visualization:
-- Your monitoring table schema:
CREATE TABLE CHURN_ML_METRICS(
timestamp TIMESTAMP,
prediction_drift FLOAT, -- Prediction distribution drift
num_drifted_columns INTEGER, -- Number of features with drift
share_missing_values FLOAT, -- Data quality metric
avg_churn_probability FLOAT, -- Average prediction score
model_version VARCHAR(10) -- Model version tracking
);
Monitoring Pipeline Execution
# Your complete monitoring workflow:
π Monitoring Pipeline Flow:
1. Load reference (training) data
2. Generate/load current (production) data
3. Calculate drift metrics using Evidently
4. Save results to PostgreSQL database
5. Trigger alerts if drift exceeds thresholds
6. Update Grafana dashboards automatically
π Execution Guide
Running Your Pipeline
Local Development
# 1. Install dependencies
pip install -r requirements.txt
# 2. Start MLflow server
mlflow server --host 0.0.0.0 --port 5000
# 3. Run the complete pipeline
python services/training/churn_mlops_pipeline.py
# 4. Monitor progress in terminal and MLflow UI
# MLflow UI: http://localhost:5000
Production Deployment
# 1. Start Prefect server
prefect server start
# 2. Create deployment
prefect deployment build services/training/churn_mlops_pipeline.py:churn_mlops_pipeline -n "churn-pipeline"
# 3. Apply deployment
prefect deployment apply churn_mlops_pipeline-deployment.yaml
# 4. Start worker
prefect agent start --work-queue "default"
# 5. Run deployment
prefect deployment run "churn_mlops_pipeline/churn-pipeline"
Scheduled Execution
# Your pipeline with scheduling:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=churn_mlops_pipeline,
name="churn-pipeline-daily",
schedule=CronSchedule(cron="0 2 * * *"), # Daily at 2 AM
work_queue_name="ml-training"
)
Monitoring Execution
Prefect UI Dashboard
π₯οΈ Prefect UI (http://localhost:4200):
βββ Flow Runs: Real-time pipeline execution status
βββ Task Status: Individual task success/failure
βββ Logs: Detailed execution logs
βββ Performance: Task execution times
βββ Schedules: Upcoming pipeline runs
MLflow Integration
π MLflow Tracking (http://localhost:5000):
βββ Experiments: All pipeline runs grouped
βββ Models: Trained model artifacts and metrics
βββ Comparisons: Side-by-side model performance
βββ Registry: Production-ready model versions
π οΈ Troubleshooting
Common Issues & Solutions
1. MLflow Connection Issues
Problem:
MLflow server not available: Connection refused
Solution:
# Your error handling in setup_mlflow():
try:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()
client.search_experiments()
except Exception as e:
logger.warning(f"MLflow server not available: {e}")
logger.info("Continuing without MLflow tracking...")
return "local_experiment"
Prevention:
- Ensure MLflow server is running
- Check network connectivity
- Verify MLFLOW_TRACKING_URI is correct
2. Memory Issues During Training
Problem:
MemoryError: Unable to allocate array
Solution:
# Reduce dataset size or use sampling:
if len(df) > 50000:
df = df.sample(n=50000, random_state=42)
logger.info("Dataset sampled to 50,000 rows for memory efficiency")
3. Hyperparameter Optimization Failures
Problem:
Optimization failed for XGBoost: Invalid parameter
Solution:
# Your error handling in optimization:
def objective(params):
try:
model = get_model_instance(model_name, params.copy())
model.fit(X_train.values, y_train.values)
y_pred = model.predict(X_val.values)
recall = recall_score(y_val, y_pred)
return {"loss": -recall, "status": STATUS_OK}
except Exception as e:
logger.warning(f"Error in optimization trial: {e}")
return {"loss": 0, "status": STATUS_OK} # Return bad score, continue
4. Task Failures and Retries
# Configure task retries:
@task(
name="robust_task",
retries=3,
retry_delay_seconds=60,
retry_jitter_factor=0.1
)
def robust_training_task():
# Task implementation
pass
5. Data Validation Failures
Problem:
ValueError: Missing required columns: ['TotalCharges']
Solution:
# Your validation in load_and_validate_data():
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"Missing required columns: {missing_columns}")
logger.info("Available columns: {list(df.columns)}")
raise ValueError(f"Missing required columns: {missing_columns}")
Pipeline Health Monitoring
Key Metrics to Monitor
# Your pipeline health indicators:
π Pipeline Health Metrics:
βββ Execution Time: Should be consistent (Β±20%)
βββ Model Performance: Recall should stay >0.80
βββ Data Quality: <5% missing values
βββ Memory Usage: <80% of available RAM
βββ Error Rate: <5% task failures
π¨ Alert Conditions:
βββ Pipeline fails to complete
βββ Best model recall drops >10%
βββ Data validation failures
βββ MLflow connectivity issues
βββ Unusual execution times (+50%)
Monitoring Commands
# Check pipeline status
prefect flow-run ls --limit 10
# View specific run logs
prefect flow-run logs <flow-run-id>
# Monitor system resources
htop
nvidia-smi # If using GPU
# Check MLflow experiments
mlflow experiments list
mlflow runs list --experiment-id 1
Top comments (0)