Customer Churn Prediction
📋 Table of Contents
- Pipeline Overview
- Prefect Framework Basics
- Pipeline Architecture
- Task-by-Task Breakdown
- Flow Orchestration
- Monitoring Pipeline
- Execution Guide
- Troubleshooting
🎯 Pipeline Overview
What Your Prefect Pipeline Does
Your Prefect pipeline is a complete MLOps workflow orchestrator that automates the entire machine learning lifecycle for customer churn prediction. Think of it as an intelligent factory manager that:
- Coordinates all tasks in the correct order
- Handles failures gracefully with retries and error reporting
- Tracks progress and provides real-time monitoring
- Manages resources efficiently with concurrent execution
- Ensures reproducibility with proper logging and versioning
Business Value
Without Prefect: Manual, error-prone process requiring constant supervision
With Prefect: Automated, reliable, scalable ML pipeline that runs unattended
Pipeline Components
🔄 Your Complete Prefect Pipeline:
┌─────────────────────────────────────────────────────────────────┐
│ CHURN PREDICTION PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 📊 DATA PROCESSING TASKS │
│ ├── setup_mlflow() # Initialize experiment tracking │
│ ├── load_and_validate_data() # Load and validate raw data │
│ ├── prepare_data_splits() # Create train/val/test splits │
│ ├── feature_engineering() # Transform data for ML │
│ └── apply_smote() # Balance the dataset │
│ │
│ 🤖 MODEL TRAINING TASKS │
│ ├── train_baseline_models() # Train simple baseline models │
│ ├── hyperparameter_optimization() # Optimize advanced models │
│ ├── train_optimized_models() # Train with best parameters │
│ └── cross_validate_models() # Robust model evaluation │
│ │
│ 🚀 MODEL DEPLOYMENT TASKS │
│ ├── select_best_model() # Choose the best performer │
│ └── register_best_model() # Save to production registry │
│ │
│ 📊 MONITORING TASKS (Separate Flow) │
│ ├── load_reference_data() # Get baseline data for drift │
│ ├── generate_current_data() # Simulate new incoming data │
│ ├── calculate_drift_metrics() # Measure data drift │
│ └── save_monitoring_results() # Store metrics in database │
└─────────────────────────────────────────────────────────────────┘
🧩 Prefect Framework Basics
Understanding Prefect Concepts
1. Tasks (@task)
@task(name="example_task", description="What this task does")
def my_task(input_data):
"""
A task is a single unit of work in your pipeline.
Key features:
- Can be retried automatically on failure
- Have built-in logging
- Can run in parallel with other tasks
- Return values are automatically cached
"""
# Do some work
result = process_data(input_data)
return result
2. Flows (@flow)
@flow(name="example_flow", description="Orchestrates multiple tasks")
def my_pipeline():
"""
A flow orchestrates multiple tasks.
Key features:
- Defines the execution order of tasks
- Manages dependencies between tasks
- Provides error handling and retries
- Can run tasks concurrently
"""
# Tasks run in dependency order
data = load_data()
processed = process_data(data)
model = train_model(processed)
return model
3. Task Runners
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def concurrent_pipeline():
"""
Task runners control HOW tasks execute:
- SequentialTaskRunner: One task at a time (default)
- ConcurrentTaskRunner: Multiple tasks in parallel
- DaskTaskRunner: Distributed execution across machines
"""
Your Pipeline's Prefect Configuration
# Your main flow configuration
@flow(
name="customer-churn-mlops-pipeline", # Flow name in Prefect UI
description="Complete MLOps pipeline for customer churn prediction",
task_runner=ConcurrentTaskRunner(), # Enable parallel execution
)
def churn_mlops_pipeline(
data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
hyperopt_max_evals: int = 5, # Number of optimization trials
):
🏗️ Pipeline Architecture
Execution Flow Diagram
🔄 Task Execution Order & Dependencies:
┌─ setup_mlflow() ──────────────────────────────────────────────┐
│ │
├─ load_and_validate_data() ───────────────────────────────────┤
│ │ │
│ ▼ │
├─ prepare_data_splits() ──────────────────────────────────────┤
│ │ │
│ ▼ │
├─ feature_engineering() ──────────────────────────────────────┤
│ │ │
│ ▼ │
├─ apply_smote() ──────────────────────────────────────────────┤
│ │ │
│ ▼ │
├─ train_baseline_models() ──┬─ hyperparameter_optimization() ─┤
│ │ │ │ │
│ │ │ ▼ │
│ │ └─ train_optimized_models() ──────┤
│ │ │ │
│ ▼ │ │
├─ COMBINE RESULTS ◄────────────────────┘ │
│ │ │
│ ▼ │
├─ cross_validate_models() ────────────────────────────────────┤
│ │ │
│ ▼ │
├─ select_best_model() ────────────────────────────────────────┤
│ │ │
│ ▼ │
└─ register_best_model() ──────────────────────────────────────┘
Concurrent Execution Benefits
Your pipeline uses ConcurrentTaskRunner() which enables:
- Baseline Models & Hyperparameter Optimization run in parallel
- Multiple hyperparameter optimization tasks run simultaneously
- Cross-validation runs in parallel for different models
- Faster overall execution without sacrificing reliability
📝 Task-by-Task Breakdown
1. Setup & Configuration Tasks
setup_mlflow() Task
@task(name="setup_mlflow", description="Initialize MLflow tracking")
def setup_mlflow() -> str:
What it does:
- Connects to your MLflow server at
http://34.238.247.213:5000 - Creates or gets existing experiment named
"churn_prediction_production" - Sets up experiment tracking for all subsequent model training
Why it's important:
- Experiment Tracking: Every model run is logged with parameters and metrics
- Reproducibility: You can recreate any model from its logged parameters
- Comparison: Compare different models and experiments easily
- Production Readiness: Models are properly versioned and stored
How it works:
# Your actual implementation breakdown:
1. Connect to MLflow server
mlflow.set_tracking_uri("http://34.238.247.213:5000")
2. Test connection and create/get experiment
try:
experiment = mlflow.create_experiment("churn_prediction_production")
except mlflow.exceptions.MlflowException:
# Experiment exists, get its ID
experiment = mlflow.get_experiment_by_name("churn_prediction_production")
3. Set active experiment
mlflow.set_experiment("churn_prediction_production")
4. Return experiment ID for tracking
return experiment_id
load_and_validate_data() Task
@task(name="load_and_validate_data", description="Load data with comprehensive validation")
def load_and_validate_data(data_path: str) -> pd.DataFrame:
What it does:
- Loads the telco customer churn dataset
- Performs 8 different data quality checks
- Cleans and validates the data
- Saves validation results for audit purposes
Detailed Validation Process:
# Your 8-step validation process:
✅ Step 1: Schema Validation
- Checks all required columns exist
- Validates data types match expectations
✅ Step 2: Data Type Fixes
- Converts TotalCharges from string to numeric
- Handles "string" values that should be numbers
✅ Step 3: Missing Value Analysis
- Counts missing values per column
- Calculates percentages of missing data
✅ Step 4: Data Quality Warnings
- Warns if >5% of data is missing in any column
- Logs high missing value columns
✅ Step 5: Data Cleaning
- Removes rows with missing values
- Tracks how much data was lost
✅ Step 6: Target Variable Analysis
- Analyzes churn distribution (Yes/No ratio)
- Ensures sufficient samples of each class
✅ Step 7: Statistical Summary
- Generates descriptive statistics for numerical features
- Helps identify data distribution issues
✅ Step 8: Outlier Detection
- Uses IQR method to detect outliers
- Counts outliers per numerical feature
Validation Results Saved:
{
"missing_values": {"tenure": 0, "MonthlyCharges": 0, ...},
"missing_percentage": {"tenure": 0.0, "MonthlyCharges": 0.0, ...},
"rows_removed": 11,
"data_loss_percentage": 0.16,
"target_distribution": {"No": 0.73, "Yes": 0.27},
"numerical_stats": {...},
"tenure_outliers": 45,
"MonthlyCharges_outliers": 23
}
2. Data Processing Tasks
prepare_data_splits() Task
@task(name="prepare_data_splits", description="Create train/validation/test splits")
def prepare_data_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
What it does:
Creates three datasets for proper model evaluation:
Split Strategy:
# Your 60/20/20 split approach:
📊 Total Dataset (100%)
├── Training Set (60%) # For model learning
├── Validation Set (20%) # For hyperparameter tuning
└── Test Set (20%) # For final evaluation
# Implementation details:
1. First split: 60% train, 40% temp
train_df, temp_df = train_test_split(df, test_size=0.4, stratify=df["Churn"])
2. Second split: Split temp into 20% validation, 20% test
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["Churn"])
Why this approach:
- Training Set: Large enough for model learning
- Validation Set: Used for hyperparameter optimization (prevents overfitting)
- Test Set: Unbiased final evaluation (never seen during training/tuning)
- Stratification: Maintains the same churn ratio in all splits
feature_engineering() Task
@task(name="feature_engineering", description="Comprehensive feature engineering")
def feature_engineering(...) -> Tuple[np.ndarray, ...]:
What it does:
Transforms raw customer data into machine learning-ready features.
Feature Processing Pipeline:
# Your feature transformation process:
🏷️ Categorical Features Processing:
├── Input: ["Yes", "No", "DSL", "Fiber optic", ...]
├── One-Hot Encoding: Creates binary columns
├── Drop First: Avoids multicollinearity
└── Output: [0, 1, 0, 1, ...]
📊 Numerical Features Processing:
├── Input: [tenure: 24, MonthlyCharges: 70.5, ...]
├── Min-Max Scaling: Scales to 0-1 range
└── Output: [0.42, 0.71, ...]
🎯 Target Variable Processing:
├── Input: ["Yes", "No"]
├── Label Encoding: Binary conversion
└── Output: [1, 0]
Feature Engineering Steps:
# Detailed breakdown of your implementation:
1. Categorical Feature Encoding:
encoder = OneHotEncoder(drop="first", sparse_output=False)
encoded_features = encoder.fit_transform(df[categorical_features])
2. Feature Name Generation:
feature_names = encoder.get_feature_names_out(categorical_features)
# Creates names like: "Contract_One year", "PaymentMethod_Credit card"
3. Combine Features:
X_combined = pd.concat([X_encoded, df[numerical_features]], axis=1)
4. Scale All Features:
scaler = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X_combined)
5. Save Preprocessing Objects:
# Saved for production use
pickle.dump(encoder, open("models/encoder.pkl", "wb"))
pickle.dump(scaler, open("models/scaler.pkl", "wb"))
apply_smote() Task
@task(name="apply_smote", description="Apply SMOTE for class balancing")
def apply_smote(X_train: np.ndarray, y_train: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
What it does:
Balances the dataset to improve model performance on churned customers.
The Imbalance Problem:
# Typical churn dataset imbalance:
Original Distribution:
├── No Churn (0): 73% (5,174 customers)
└── Churn (1): 27% (1,869 customers)
# Problem: Models become biased toward "No Churn"
# Solution: SMOTE creates synthetic churn examples
How SMOTE Works:
# Your SMOTE implementation:
🔍 SMOTE Algorithm Steps:
1. Find churned customers in feature space
2. For each churned customer:
├── Find k nearest churned neighbors
├── Draw line between customer and neighbor
├── Create new synthetic customer on that line
└── Repeat until classes are balanced
📊 Result:
Balanced Distribution:
├── No Churn (0): 50% (5,174 customers) [original]
└── Churn (1): 50% (5,174 customers) [original + synthetic]
Benefits:
- Better Learning: Model sees equal examples of both classes
- Improved Recall: Better at catching actual churners
- Reduced Bias: More balanced predictions
- Synthetic Quality: SMOTE creates realistic examples, not just duplicates
3. Model Training Tasks
train_baseline_models() Task
@task(name="train_baseline_models", description="Train baseline models")
def train_baseline_models(...) -> Dict[str, Dict[str, Any]]:
What it does:
Trains simple, interpretable models as performance baselines.
Your Baseline Models:
# Your three baseline models:
🤖 Baseline Model Arsenal:
├── SVC (Support Vector Classifier)
│ ├── Kernel: Linear (simple decision boundary)
│ ├── Strengths: Good for binary classification
│ └── Use Case: Fast, interpretable baseline
│
├── AdaBoost (Adaptive Boosting)
│ ├── Algorithm: Combines weak learners
│ ├── Strengths: Reduces overfitting
│ └── Use Case: Ensemble method baseline
│
└── Logistic Regression
├── Algorithm: Linear probability model
├── Strengths: Highly interpretable
└── Use Case: Business-friendly baseline
Training Process:
# Your baseline training implementation:
for model_name, model in baseline_models.items():
with mlflow.start_run(run_name=f"{model_name}_baseline"):
# 1. Train model
model.fit(X_train.values, y_train.values)
# 2. Make predictions
y_pred = model.predict(X_val.values)
y_pred_proba = model.predict_proba(X_val)[:, 1]
# 3. Calculate metrics
metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred),
"recall": recall_score(y_val, y_pred),
"f1": f1_score(y_val, y_pred),
"roc_auc": roc_auc_score(y_val, y_pred_proba)
}
# 4. Log to MLflow
mlflow.log_params(model.get_params())
mlflow.log_metrics(metrics)
# 5. Store results
results[model_name] = {
"model": model,
"metrics": metrics,
"predictions": y_pred,
"probabilities": y_pred_proba
}
hyperparameter_optimization() Task
@task(name="hyperparameter_optimization", description="Optimize hyperparameters for a model")
def hyperparameter_optimization(...) -> Dict[str, Any]:
What it does:
Uses Bayesian optimization to find the best hyperparameters for advanced models.
Your Optimization Strategy:
Models Being Optimized:
🎯 Advanced Models for Optimization:
├── RandomForest
│ ├── n_estimators: 100-500 trees
│ ├── max_depth: 3-15 levels
│ ├── min_samples_split: 2-20 samples
│ ├── min_samples_leaf: 1-10 samples
│ ├── max_features: ["sqrt", "log2", None]
│ └── bootstrap: [True, False]
│
├── XGBoost
│ ├── n_estimators: 50-200 boosters
│ ├── max_depth: 3-10 levels
│ ├── learning_rate: 0.01-0.3 (log scale)
│ ├── min_child_weight: 1-10
│ ├── subsample: 0.5-1.0
│ └── colsample_bytree: 0.5-1.0
│
└── LightGBM
├── n_estimators: 50-200 boosters
├── max_depth: 3-10 levels
├── learning_rate: 0.01-0.3 (log scale)
├── num_leaves: 20-100 leaves
├── min_child_weight: 1-10
├── subsample: 0.5-1.0
└── colsample_bytree: 0.5-1.0
Optimization Algorithm:
# Your Hyperopt implementation:
🧠 Bayesian Optimization Process:
1. Define Search Space
├── Continuous parameters: learning_rate, subsample
├── Integer parameters: n_estimators, max_depth
└── Categorical parameters: max_features, bootstrap
2. Objective Function
def objective(params):
├── Create model with params
├── Train on training data
├── Evaluate on validation data
├── Return negative recall (Hyperopt minimizes)
└── Handle errors gracefully
3. Optimization Loop (max_evals=5 trials)
├── Trial 1: Random parameters
├── Trial 2: Based on Trial 1 results
├── Trial 3: Intelligent parameter selection
├── Trial 4: Refine best regions
└── Trial 5: Final optimization
4. Return Best Parameters
└── Parameters that achieved highest recall
Why Recall as Optimization Metric:
- Business Focus: Missing churners costs more than false alarms
- Retention Priority: Better to offer retention to loyal customers than lose churners
- Cost-Benefit: Retention cost < Customer acquisition cost
train_optimized_models() Task
@task(name="train_optimized_models", description="Train models with optimized hyperparameters")
def train_optimized_models(...) -> Dict[str, Dict[str, Any]]:
What it does:
Trains the advanced models using the best hyperparameters found by optimization.
Training Process:
# Your optimized training implementation:
🚀 Optimized Model Training:
for model_name, opt_result in optimization_results.items():
best_params = opt_result["best_params"]
# 1. Parameter Conversion
if model_name == "RandomForest":
# Convert choice indices to actual values
max_features_options = ["sqrt", "log2", None]
best_params["max_features"] = max_features_options[int(best_params["max_features"])]
# Convert float parameters to int
best_params["n_estimators"] = int(best_params["n_estimators"])
best_params["max_depth"] = int(best_params["max_depth"])
# 2. Model Creation
model = RandomForestClassifier(**best_params, random_state=42)
# 3. Training & Evaluation
with mlflow.start_run(run_name=f"{model_name}_optimized"):
model.fit(X_train.values, y_train.values)
y_pred = model.predict(X_val.values)
y_pred_proba = model.predict_proba(X_val)[:, 1]
# 4. Metrics & Logging
metrics = calculate_all_metrics(y_val, y_pred, y_pred_proba)
mlflow.log_params(best_params)
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, f"{model_name}_model")
cross_validate_models() Task
@task(name="cross_validate_models", description="Perform cross-validation on all models")
def cross_validate_models(...) -> Dict[str, Dict[str, float]]:
What it does:
Provides robust performance estimates using 5-fold stratified cross-validation.
Cross-Validation Process:
# Your cross-validation implementation:
🔄 5-Fold Stratified Cross-Validation:
📊 Fold Creation:
├── Fold 1: 20% of data (maintaining churn ratio)
├── Fold 2: 20% of data (maintaining churn ratio)
├── Fold 3: 20% of data (maintaining churn ratio)
├── Fold 4: 20% of data (maintaining churn ratio)
└── Fold 5: 20% of data (maintaining churn ratio)
🎯 Evaluation Process:
For each model:
├── Train on 4 folds, test on 1 fold (5 times)
├── Calculate metrics for each fold
├── Compute mean and standard deviation
└── Store comprehensive results
📈 Metrics Calculated:
├── accuracy_mean ± accuracy_std
├── precision_mean ± precision_std
├── recall_mean ± recall_std
├── f1_mean ± f1_std
└── roc_auc_mean ± roc_auc_std
Why Cross-Validation:
- Robust Estimates: Uses all data for both training and testing
- Variance Assessment: Standard deviation shows model stability
- Overfitting Detection: Consistent performance across folds indicates good generalization
- Fair Comparison: All models evaluated on same data splits
4. Model Selection & Deployment Tasks
select_best_model() Task
@task(name="select_best_model", description="Select the best performing model")
def select_best_model(...) -> Dict[str, Any]:
What it does:
Chooses the best model based on cross-validation recall scores.
Selection Process:
# Your model selection logic:
🏆 Best Model Selection:
selection_metric = "recall_mean" # Primary business metric
for model_name, cv_metrics in cv_results.items():
score = cv_metrics.get("recall_mean", 0)
if score > best_score:
best_score = score
best_model_name = model_name
# Result: Model with highest average recall across CV folds
Why Recall as Selection Metric:
- Business Priority: Catching churners is more important than precision
- Cost Consideration: Missing a churner costs more than unnecessary retention effort
- Customer Lifetime Value: Retaining customers has long-term value
register_best_model() Task
@task(name="register_best_model", description="Register best model to MLflow Model Registry")
def register_best_model(...) -> str:
What it does:
Saves the best model to MLflow Model Registry for production deployment.
Registration Process:
# Your model registration implementation:
🚀 Production Model Registration:
1. Final Test Evaluation
├── Evaluate on held-out test set
├── Calculate final performance metrics
└── Ensure no data leakage
2. Artifact Preparation
├── Save trained model (model.pkl)
├── Save feature encoder (encoder.pkl)
├── Save feature scaler (scaler.pkl)
└── Create model metadata
3. Custom PyFunc Model Creation
class ChurnPredictionModel(PythonModel):
├── Loads all preprocessing artifacts
├── Handles end-to-end prediction pipeline
├── Includes data validation
└── Returns churn probabilities
4. MLflow Registration
├── Log PyFunc model with artifacts
├── Register to Model Registry
├── Add detailed model description
└── Return model URI for deployment
5. Model Documentation
├── Performance metrics on test set
├── Feature importance analysis
├── Business impact estimates
└── Deployment instructions
Custom PyFunc Model Benefits:
- End-to-End Pipeline: Includes preprocessing and prediction
- Production Ready: Handles real-world data formats
- Artifact Management: All dependencies bundled together
- Versioning: Proper model versioning for rollbacks
🔄 Flow Orchestration
Main Pipeline Flow
@flow(
name="customer-churn-mlops-pipeline",
description="Complete MLOps pipeline for customer churn prediction",
task_runner=ConcurrentTaskRunner(),
)
def churn_mlops_pipeline(
data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
hyperopt_max_evals: int = 5,
):
Flow Execution Steps:
# Your complete pipeline orchestration:
🎬 Pipeline Execution Script:
Step 1: Initialize
├── logger.info("Starting Customer Churn MLOps Pipeline...")
├── experiment_id = setup_mlflow()
└── Set up tracking and logging
Step 2: Data Processing
├── df = load_and_validate_data(data_path)
├── train_df, val_df, test_df = prepare_data_splits(df)
├── X_train, X_val, X_test, y_train, y_val, y_test = feature_engineering(...)
└── X_train_balanced, y_train_balanced = apply_smote(X_train, y_train)
Step 3: Model Training (Parallel Execution)
├── baseline_results = train_baseline_models(...) # Task A
└── optimization_results = {} # Task B starts
├── for model_name in ["RandomForest", "XGBoost", "LightGBM"]:
└──── optimization_results[model_name] = hyperparameter_optimization(...)
Step 4: Advanced Training
├── optimized_results = train_optimized_models(...)
└── all_models = {**baseline_results, **optimized_results}
Step 5: Evaluation & Selection
├── cv_results = cross_validate_models(all_models, ...)
├── best_model_info = select_best_model(all_models, cv_results, "recall_mean")
└── model_uri = register_best_model(best_model_info, X_test, y_test)
Step 6: Summary & Completion
├── Generate pipeline summary
├── Log final results
└── Return success status with model URI
Concurrent Task Execution
Tasks That Run in Parallel:
# Your concurrent execution strategy:
🔄 Parallel Execution Groups:
Group 1: Data Processing (Sequential - Dependencies)
├── setup_mlflow()
├── load_and_validate_data()
├── prepare_data_splits()
├── feature_engineering()
└── apply_smote()
Group 2: Model Training (Parallel)
├── train_baseline_models() # Starts immediately
└── hyperparameter_optimization() # 3 models run concurrently
├── RandomForest optimization
├── XGBoost optimization
└── LightGBM optimization
Group 3: Final Training (Sequential - Needs optimization results)
├── train_optimized_models()
├── cross_validate_models()
├── select_best_model()
└── register_best_model()
Performance Benefits:
- 30-50% faster execution with parallel hyperparameter optimization
- Resource efficiency - CPU cores used optimally
- Fault tolerance - If one optimization fails, others continue
- Scalability - Easy to add more models without increasing total time
📊 Monitoring Pipeline
Separate Monitoring Flow
# Your monitoring pipeline (monitor_churn_model.py):
@flow(name="churn_model_monitoring")
def churn_monitoring_pipeline():
Monitoring Tasks Breakdown:
load_reference_data() Task
def load_reference_data():
"""Load and prepare reference data from training dataset"""
What it does:
- Loads the original training dataset as baseline for drift detection
- Applies same preprocessing as production pipeline
- Creates reference distribution for comparison
generate_current_data() Task
@task
def generate_current_data():
"""Generate or load current production data"""
What it does:
- Simulates new incoming customer data
- In production: Would load from real-time data sources
- Applies same preprocessing as training pipeline
calculate_drift_metrics() Task
@task
def calculate_drift_metrics(reference_data, current_data):
"""Calculate data drift and model performance metrics"""
What it does:
Uses Evidently AI to calculate:
- Data Drift: Statistical tests (KS-test, PSI) for feature distribution changes
- Prediction Drift: Changes in model output distribution
- Missing Values: Data quality monitoring
- Model Performance: If ground truth is available
Drift Detection Process:
# Your Evidently monitoring implementation:
📊 Evidently Report Generation:
├── DatasetDriftMetric() # Overall dataset drift
├── ColumnDriftMetric() # Individual feature drift
├── DatasetMissingValuesMetric() # Data quality checks
└── Model performance metrics # If labels available
🎯 Drift Calculation:
├── Statistical Tests: KS-test, Chi-square, Z-test
├── Thresholds: Configurable drift sensitivity
├── Feature-level: Individual feature drift scores
└── Dataset-level: Overall drift assessment
save_monitoring_results() Task
@task
def save_monitoring_results(metrics):
"""Save monitoring results to database"""
What it does:
Saves monitoring metrics to PostgreSQL database for Grafana visualization:
-- Your monitoring table schema:
CREATE TABLE CHURN_ML_METRICS(
timestamp TIMESTAMP,
prediction_drift FLOAT, -- Prediction distribution drift
num_drifted_columns INTEGER, -- Number of features with drift
share_missing_values FLOAT, -- Data quality metric
avg_churn_probability FLOAT, -- Average prediction score
model_version VARCHAR(10) -- Model version tracking
);
Monitoring Pipeline Execution
# Your complete monitoring workflow:
🔍 Monitoring Pipeline Flow:
1. Load reference (training) data
2. Generate/load current (production) data
3. Calculate drift metrics using Evidently
4. Save results to PostgreSQL database
5. Trigger alerts if drift exceeds thresholds
6. Update Grafana dashboards automatically
🚀 Execution Guide
Running Your Pipeline
Local Development
# 1. Install dependencies
pip install -r requirements.txt
# 2. Start MLflow server
mlflow server --host 0.0.0.0 --port 5000
# 3. Run the complete pipeline
python services/training/churn_mlops_pipeline.py
# 4. Monitor progress in terminal and MLflow UI
# MLflow UI: http://localhost:5000
Production Deployment
# 1. Start Prefect server
prefect server start
# 2. Create deployment
prefect deployment build services/training/churn_mlops_pipeline.py:churn_mlops_pipeline -n "churn-pipeline"
# 3. Apply deployment
prefect deployment apply churn_mlops_pipeline-deployment.yaml
# 4. Start worker
prefect agent start --work-queue "default"
# 5. Run deployment
prefect deployment run "churn_mlops_pipeline/churn-pipeline"
Scheduled Execution
# Your pipeline with scheduling:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=churn_mlops_pipeline,
name="churn-pipeline-daily",
schedule=CronSchedule(cron="0 2 * * *"), # Daily at 2 AM
work_queue_name="ml-training"
)
Monitoring Execution
Prefect UI Dashboard
🖥️ Prefect UI (http://localhost:4200):
├── Flow Runs: Real-time pipeline execution status
├── Task Status: Individual task success/failure
├── Logs: Detailed execution logs
├── Performance: Task execution times
└── Schedules: Upcoming pipeline runs
MLflow Integration
📊 MLflow Tracking (http://localhost:5000):
├── Experiments: All pipeline runs grouped
├── Models: Trained model artifacts and metrics
├── Comparisons: Side-by-side model performance
└── Registry: Production-ready model versions
🛠️ Troubleshooting
Common Issues & Solutions
1. MLflow Connection Issues
Problem:
MLflow server not available: Connection refused
Solution:
# Your error handling in setup_mlflow():
try:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()
client.search_experiments()
except Exception as e:
logger.warning(f"MLflow server not available: {e}")
logger.info("Continuing without MLflow tracking...")
return "local_experiment"
Prevention:
- Ensure MLflow server is running
- Check network connectivity
- Verify MLFLOW_TRACKING_URI is correct
2. Memory Issues During Training
Problem:
MemoryError: Unable to allocate array
Solution:
# Reduce dataset size or use sampling:
if len(df) > 50000:
df = df.sample(n=50000, random_state=42)
logger.info("Dataset sampled to 50,000 rows for memory efficiency")
3. Hyperparameter Optimization Failures
Problem:
Optimization failed for XGBoost: Invalid parameter
Solution:
# Your error handling in optimization:
def objective(params):
try:
model = get_model_instance(model_name, params.copy())
model.fit(X_train.values, y_train.values)
y_pred = model.predict(X_val.values)
recall = recall_score(y_val, y_pred)
return {"loss": -recall, "status": STATUS_OK}
except Exception as e:
logger.warning(f"Error in optimization trial: {e}")
return {"loss": 0, "status": STATUS_OK} # Return bad score, continue
4. Task Failures and Retries
# Configure task retries:
@task(
name="robust_task",
retries=3,
retry_delay_seconds=60,
retry_jitter_factor=0.1
)
def robust_training_task():
# Task implementation
pass
5. Data Validation Failures
Problem:
ValueError: Missing required columns: ['TotalCharges']
Solution:
# Your validation in load_and_validate_data():
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"Missing required columns: {missing_columns}")
logger.info("Available columns: {list(df.columns)}")
raise ValueError(f"Missing required columns: {missing_columns}")
Pipeline Health Monitoring
Key Metrics to Monitor
# Your pipeline health indicators:
📊 Pipeline Health Metrics:
├── Execution Time: Should be consistent (±20%)
├── Model Performance: Recall should stay >0.80
├── Data Quality: <5% missing values
├── Memory Usage: <80% of available RAM
└── Error Rate: <5% task failures
🚨 Alert Conditions:
├── Pipeline fails to complete
├── Best model recall drops >10%
├── Data validation failures
├── MLflow connectivity issues
└── Unusual execution times (+50%)
Monitoring Commands
# Check pipeline status
prefect flow-run ls --limit 10
# View specific run logs
prefect flow-run logs <flow-run-id>
# Monitor system resources
htop
nvidia-smi # If using GPU
# Check MLflow experiments
mlflow experiments list
mlflow runs list --experiment-id 1
Top comments (0)