DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

Prefect Pipeline Tutorial: Complete Breakdown

Customer Churn Prediction


πŸ“‹ Table of Contents

  1. Pipeline Overview
  2. Prefect Framework Basics
  3. Pipeline Architecture
  4. Task-by-Task Breakdown
  5. Flow Orchestration
  6. Monitoring Pipeline
  7. Execution Guide
  8. Troubleshooting

🎯 Pipeline Overview

What Your Prefect Pipeline Does

Your Prefect pipeline is a complete MLOps workflow orchestrator that automates the entire machine learning lifecycle for customer churn prediction. Think of it as an intelligent factory manager that:

  1. Coordinates all tasks in the correct order
  2. Handles failures gracefully with retries and error reporting
  3. Tracks progress and provides real-time monitoring
  4. Manages resources efficiently with concurrent execution
  5. Ensures reproducibility with proper logging and versioning

Business Value

Without Prefect: Manual, error-prone process requiring constant supervision
With Prefect: Automated, reliable, scalable ML pipeline that runs unattended

Pipeline Components

πŸ”„ Your Complete Prefect Pipeline:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CHURN PREDICTION PIPELINE                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚ πŸ“Š DATA PROCESSING TASKS                                        β”‚
β”‚ β”œβ”€β”€ setup_mlflow()              # Initialize experiment tracking β”‚
β”‚ β”œβ”€β”€ load_and_validate_data()    # Load and validate raw data    β”‚
β”‚ β”œβ”€β”€ prepare_data_splits()       # Create train/val/test splits  β”‚
β”‚ β”œβ”€β”€ feature_engineering()       # Transform data for ML         β”‚
β”‚ └── apply_smote()               # Balance the dataset           β”‚
β”‚                                                                 β”‚
β”‚ πŸ€– MODEL TRAINING TASKS                                         β”‚
β”‚ β”œβ”€β”€ train_baseline_models()     # Train simple baseline models  β”‚
β”‚ β”œβ”€β”€ hyperparameter_optimization() # Optimize advanced models    β”‚
β”‚ β”œβ”€β”€ train_optimized_models()    # Train with best parameters   β”‚
β”‚ └── cross_validate_models()     # Robust model evaluation      β”‚
β”‚                                                                 β”‚
β”‚ πŸš€ MODEL DEPLOYMENT TASKS                                       β”‚
β”‚ β”œβ”€β”€ select_best_model()         # Choose the best performer    β”‚
β”‚ └── register_best_model()       # Save to production registry  β”‚
β”‚                                                                 β”‚
β”‚ πŸ“Š MONITORING TASKS (Separate Flow)                             β”‚
β”‚ β”œβ”€β”€ load_reference_data()       # Get baseline data for drift  β”‚
β”‚ β”œβ”€β”€ generate_current_data()     # Simulate new incoming data   β”‚
β”‚ β”œβ”€β”€ calculate_drift_metrics()   # Measure data drift           β”‚
β”‚ └── save_monitoring_results()   # Store metrics in database    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

🧩 Prefect Framework Basics

Understanding Prefect Concepts

1. Tasks (@task)

@task(name="example_task", description="What this task does")
def my_task(input_data):
    """
    A task is a single unit of work in your pipeline.

    Key features:
    - Can be retried automatically on failure
    - Have built-in logging
    - Can run in parallel with other tasks
    - Return values are automatically cached
    """
    # Do some work
    result = process_data(input_data)
    return result
Enter fullscreen mode Exit fullscreen mode

2. Flows (@flow)

@flow(name="example_flow", description="Orchestrates multiple tasks")
def my_pipeline():
    """
    A flow orchestrates multiple tasks.

    Key features:
    - Defines the execution order of tasks
    - Manages dependencies between tasks
    - Provides error handling and retries
    - Can run tasks concurrently
    """
    # Tasks run in dependency order
    data = load_data()
    processed = process_data(data)
    model = train_model(processed)
    return model
Enter fullscreen mode Exit fullscreen mode

3. Task Runners

from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner())
def concurrent_pipeline():
    """
    Task runners control HOW tasks execute:

    - SequentialTaskRunner: One task at a time (default)
    - ConcurrentTaskRunner: Multiple tasks in parallel
    - DaskTaskRunner: Distributed execution across machines
    """
Enter fullscreen mode Exit fullscreen mode

Your Pipeline's Prefect Configuration

# Your main flow configuration
@flow(
    name="customer-churn-mlops-pipeline",           # Flow name in Prefect UI
    description="Complete MLOps pipeline for customer churn prediction",
    task_runner=ConcurrentTaskRunner(),             # Enable parallel execution
)
def churn_mlops_pipeline(
    data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
    hyperopt_max_evals: int = 5,                    # Number of optimization trials
):
Enter fullscreen mode Exit fullscreen mode

πŸ—οΈ Pipeline Architecture

Execution Flow Diagram

πŸ”„ Task Execution Order & Dependencies:

β”Œβ”€ setup_mlflow() ──────────────────────────────────────────────┐
β”‚                                                               β”‚
β”œβ”€ load_and_validate_data() ────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ prepare_data_splits() ───────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ feature_engineering() ───────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ apply_smote() ───────────────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ train_baseline_models() ──┬─ hyperparameter_optimization() ──
β”‚           β”‚                β”‚           β”‚                      β”‚
β”‚           β”‚                β”‚           β–Ό                      β”‚
β”‚           β”‚                └─ train_optimized_models() ───────
β”‚           β”‚                            β”‚                      β”‚
β”‚           β–Ό                            β”‚                      β”‚
β”œβ”€ COMBINE RESULTS β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β”‚
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ cross_validate_models() ─────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
β”œβ”€ select_best_model() ─────────────────────────────────────────
β”‚           β”‚                                                   β”‚
β”‚           β–Ό                                                   β”‚
└─ register_best_model() β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

Concurrent Execution Benefits

Your pipeline uses ConcurrentTaskRunner() which enables:

  1. Baseline Models & Hyperparameter Optimization run in parallel
  2. Multiple hyperparameter optimization tasks run simultaneously
  3. Cross-validation runs in parallel for different models
  4. Faster overall execution without sacrificing reliability

πŸ“ Task-by-Task Breakdown

1. Setup & Configuration Tasks

setup_mlflow() Task

@task(name="setup_mlflow", description="Initialize MLflow tracking")
def setup_mlflow() -> str:
Enter fullscreen mode Exit fullscreen mode

What it does:

  • Connects to your MLflow server at http://34.238.247.213:5000
  • Creates or gets existing experiment named "churn_prediction_production"
  • Sets up experiment tracking for all subsequent model training

Why it's important:

  • Experiment Tracking: Every model run is logged with parameters and metrics
  • Reproducibility: You can recreate any model from its logged parameters
  • Comparison: Compare different models and experiments easily
  • Production Readiness: Models are properly versioned and stored

How it works:

# Your actual implementation breakdown:

1. Connect to MLflow server
   mlflow.set_tracking_uri("http://34.238.247.213:5000")

2. Test connection and create/get experiment
   try:
       experiment = mlflow.create_experiment("churn_prediction_production")
   except mlflow.exceptions.MlflowException:
       # Experiment exists, get its ID
       experiment = mlflow.get_experiment_by_name("churn_prediction_production")

3. Set active experiment
   mlflow.set_experiment("churn_prediction_production")

4. Return experiment ID for tracking
   return experiment_id
Enter fullscreen mode Exit fullscreen mode

load_and_validate_data() Task

@task(name="load_and_validate_data", description="Load data with comprehensive validation")
def load_and_validate_data(data_path: str) -> pd.DataFrame:
Enter fullscreen mode Exit fullscreen mode

What it does:

  • Loads the telco customer churn dataset
  • Performs 8 different data quality checks
  • Cleans and validates the data
  • Saves validation results for audit purposes

Detailed Validation Process:

# Your 8-step validation process:

βœ… Step 1: Schema Validation
   - Checks all required columns exist
   - Validates data types match expectations

βœ… Step 2: Data Type Fixes
   - Converts TotalCharges from string to numeric
   - Handles "string" values that should be numbers

βœ… Step 3: Missing Value Analysis
   - Counts missing values per column
   - Calculates percentages of missing data

βœ… Step 4: Data Quality Warnings
   - Warns if >5% of data is missing in any column
   - Logs high missing value columns

βœ… Step 5: Data Cleaning
   - Removes rows with missing values
   - Tracks how much data was lost

βœ… Step 6: Target Variable Analysis
   - Analyzes churn distribution (Yes/No ratio)
   - Ensures sufficient samples of each class

βœ… Step 7: Statistical Summary
   - Generates descriptive statistics for numerical features
   - Helps identify data distribution issues

βœ… Step 8: Outlier Detection
   - Uses IQR method to detect outliers
   - Counts outliers per numerical feature
Enter fullscreen mode Exit fullscreen mode

Validation Results Saved:

{
  "missing_values": {"tenure": 0, "MonthlyCharges": 0, ...},
  "missing_percentage": {"tenure": 0.0, "MonthlyCharges": 0.0, ...},
  "rows_removed": 11,
  "data_loss_percentage": 0.16,
  "target_distribution": {"No": 0.73, "Yes": 0.27},
  "numerical_stats": {...},
  "tenure_outliers": 45,
  "MonthlyCharges_outliers": 23
}
Enter fullscreen mode Exit fullscreen mode

2. Data Processing Tasks

prepare_data_splits() Task

@task(name="prepare_data_splits", description="Create train/validation/test splits")
def prepare_data_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Creates three datasets for proper model evaluation:

Split Strategy:

# Your 60/20/20 split approach:

πŸ“Š Total Dataset (100%)
β”œβ”€β”€ Training Set (60%)     # For model learning
β”œβ”€β”€ Validation Set (20%)   # For hyperparameter tuning
└── Test Set (20%)         # For final evaluation

# Implementation details:
1. First split: 60% train, 40% temp
   train_df, temp_df = train_test_split(df, test_size=0.4, stratify=df["Churn"])

2. Second split: Split temp into 20% validation, 20% test
   val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["Churn"])
Enter fullscreen mode Exit fullscreen mode

Why this approach:

  • Training Set: Large enough for model learning
  • Validation Set: Used for hyperparameter optimization (prevents overfitting)
  • Test Set: Unbiased final evaluation (never seen during training/tuning)
  • Stratification: Maintains the same churn ratio in all splits

feature_engineering() Task

@task(name="feature_engineering", description="Comprehensive feature engineering")
def feature_engineering(...) -> Tuple[np.ndarray, ...]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Transforms raw customer data into machine learning-ready features.

Feature Processing Pipeline:

# Your feature transformation process:

🏷️ Categorical Features Processing:
β”œβ”€β”€ Input: ["Yes", "No", "DSL", "Fiber optic", ...]
β”œβ”€β”€ One-Hot Encoding: Creates binary columns
β”œβ”€β”€ Drop First: Avoids multicollinearity
└── Output: [0, 1, 0, 1, ...]

πŸ“Š Numerical Features Processing:
β”œβ”€β”€ Input: [tenure: 24, MonthlyCharges: 70.5, ...]
β”œβ”€β”€ Min-Max Scaling: Scales to 0-1 range
└── Output: [0.42, 0.71, ...]

🎯 Target Variable Processing:
β”œβ”€β”€ Input: ["Yes", "No"]
β”œβ”€β”€ Label Encoding: Binary conversion
└── Output: [1, 0]
Enter fullscreen mode Exit fullscreen mode

Feature Engineering Steps:

# Detailed breakdown of your implementation:

1. Categorical Feature Encoding:
   encoder = OneHotEncoder(drop="first", sparse_output=False)
   encoded_features = encoder.fit_transform(df[categorical_features])

2. Feature Name Generation:
   feature_names = encoder.get_feature_names_out(categorical_features)
   # Creates names like: "Contract_One year", "PaymentMethod_Credit card"

3. Combine Features:
   X_combined = pd.concat([X_encoded, df[numerical_features]], axis=1)

4. Scale All Features:
   scaler = MinMaxScaler(feature_range=(0, 1))
   X_scaled = scaler.fit_transform(X_combined)

5. Save Preprocessing Objects:
   # Saved for production use
   pickle.dump(encoder, open("models/encoder.pkl", "wb"))
   pickle.dump(scaler, open("models/scaler.pkl", "wb"))
Enter fullscreen mode Exit fullscreen mode

apply_smote() Task

@task(name="apply_smote", description="Apply SMOTE for class balancing")
def apply_smote(X_train: np.ndarray, y_train: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Balances the dataset to improve model performance on churned customers.

The Imbalance Problem:

# Typical churn dataset imbalance:
Original Distribution:
β”œβ”€β”€ No Churn (0): 73% (5,174 customers)
└── Churn (1):    27% (1,869 customers)

# Problem: Models become biased toward "No Churn"
# Solution: SMOTE creates synthetic churn examples
Enter fullscreen mode Exit fullscreen mode

How SMOTE Works:

# Your SMOTE implementation:

πŸ” SMOTE Algorithm Steps:
1. Find churned customers in feature space
2. For each churned customer:
   β”œβ”€β”€ Find k nearest churned neighbors
   β”œβ”€β”€ Draw line between customer and neighbor
   β”œβ”€β”€ Create new synthetic customer on that line
   └── Repeat until classes are balanced

πŸ“Š Result:
Balanced Distribution:
β”œβ”€β”€ No Churn (0): 50% (5,174 customers) [original]
└── Churn (1):    50% (5,174 customers) [original + synthetic]
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Better Learning: Model sees equal examples of both classes
  • Improved Recall: Better at catching actual churners
  • Reduced Bias: More balanced predictions
  • Synthetic Quality: SMOTE creates realistic examples, not just duplicates

3. Model Training Tasks

train_baseline_models() Task

@task(name="train_baseline_models", description="Train baseline models")
def train_baseline_models(...) -> Dict[str, Dict[str, Any]]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Trains simple, interpretable models as performance baselines.

Your Baseline Models:

# Your three baseline models:

πŸ€– Baseline Model Arsenal:
β”œβ”€β”€ SVC (Support Vector Classifier)
β”‚   β”œβ”€β”€ Kernel: Linear (simple decision boundary)
β”‚   β”œβ”€β”€ Strengths: Good for binary classification
β”‚   └── Use Case: Fast, interpretable baseline
β”‚
β”œβ”€β”€ AdaBoost (Adaptive Boosting)
β”‚   β”œβ”€β”€ Algorithm: Combines weak learners
β”‚   β”œβ”€β”€ Strengths: Reduces overfitting
β”‚   └── Use Case: Ensemble method baseline
β”‚
└── Logistic Regression
    β”œβ”€β”€ Algorithm: Linear probability model
    β”œβ”€β”€ Strengths: Highly interpretable
    └── Use Case: Business-friendly baseline
Enter fullscreen mode Exit fullscreen mode

Training Process:

# Your baseline training implementation:

for model_name, model in baseline_models.items():
    with mlflow.start_run(run_name=f"{model_name}_baseline"):
        # 1. Train model
        model.fit(X_train.values, y_train.values)

        # 2. Make predictions
        y_pred = model.predict(X_val.values)
        y_pred_proba = model.predict_proba(X_val)[:, 1]

        # 3. Calculate metrics
        metrics = {
            "accuracy": accuracy_score(y_val, y_pred),
            "precision": precision_score(y_val, y_pred),
            "recall": recall_score(y_val, y_pred),
            "f1": f1_score(y_val, y_pred),
            "roc_auc": roc_auc_score(y_val, y_pred_proba)
        }

        # 4. Log to MLflow
        mlflow.log_params(model.get_params())
        mlflow.log_metrics(metrics)

        # 5. Store results
        results[model_name] = {
            "model": model,
            "metrics": metrics,
            "predictions": y_pred,
            "probabilities": y_pred_proba
        }
Enter fullscreen mode Exit fullscreen mode

hyperparameter_optimization() Task

@task(name="hyperparameter_optimization", description="Optimize hyperparameters for a model")
def hyperparameter_optimization(...) -> Dict[str, Any]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Uses Bayesian optimization to find the best hyperparameters for advanced models.

Your Optimization Strategy:

Models Being Optimized:

🎯 Advanced Models for Optimization:
β”œβ”€β”€ RandomForest
β”‚   β”œβ”€β”€ n_estimators: 100-500 trees
β”‚   β”œβ”€β”€ max_depth: 3-15 levels
β”‚   β”œβ”€β”€ min_samples_split: 2-20 samples
β”‚   β”œβ”€β”€ min_samples_leaf: 1-10 samples
β”‚   β”œβ”€β”€ max_features: ["sqrt", "log2", None]
β”‚   └── bootstrap: [True, False]
β”‚
β”œβ”€β”€ XGBoost
β”‚   β”œβ”€β”€ n_estimators: 50-200 boosters
β”‚   β”œβ”€β”€ max_depth: 3-10 levels
β”‚   β”œβ”€β”€ learning_rate: 0.01-0.3 (log scale)
β”‚   β”œβ”€β”€ min_child_weight: 1-10
β”‚   β”œβ”€β”€ subsample: 0.5-1.0
β”‚   └── colsample_bytree: 0.5-1.0
β”‚
└── LightGBM
    β”œβ”€β”€ n_estimators: 50-200 boosters
    β”œβ”€β”€ max_depth: 3-10 levels
    β”œβ”€β”€ learning_rate: 0.01-0.3 (log scale)
    β”œβ”€β”€ num_leaves: 20-100 leaves
    β”œβ”€β”€ min_child_weight: 1-10
    β”œβ”€β”€ subsample: 0.5-1.0
    └── colsample_bytree: 0.5-1.0
Enter fullscreen mode Exit fullscreen mode

Optimization Algorithm:

# Your Hyperopt implementation:

🧠 Bayesian Optimization Process:
1. Define Search Space
   β”œβ”€β”€ Continuous parameters: learning_rate, subsample
   β”œβ”€β”€ Integer parameters: n_estimators, max_depth
   └── Categorical parameters: max_features, bootstrap

2. Objective Function
   def objective(params):
       β”œβ”€β”€ Create model with params
       β”œβ”€β”€ Train on training data
       β”œβ”€β”€ Evaluate on validation data
       β”œβ”€β”€ Return negative recall (Hyperopt minimizes)
       └── Handle errors gracefully

3. Optimization Loop (max_evals=5 trials)
   β”œβ”€β”€ Trial 1: Random parameters
   β”œβ”€β”€ Trial 2: Based on Trial 1 results
   β”œβ”€β”€ Trial 3: Intelligent parameter selection
   β”œβ”€β”€ Trial 4: Refine best regions
   └── Trial 5: Final optimization

4. Return Best Parameters
   └── Parameters that achieved highest recall
Enter fullscreen mode Exit fullscreen mode

Why Recall as Optimization Metric:

  • Business Focus: Missing churners costs more than false alarms
  • Retention Priority: Better to offer retention to loyal customers than lose churners
  • Cost-Benefit: Retention cost < Customer acquisition cost

train_optimized_models() Task

@task(name="train_optimized_models", description="Train models with optimized hyperparameters")
def train_optimized_models(...) -> Dict[str, Dict[str, Any]]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Trains the advanced models using the best hyperparameters found by optimization.

Training Process:

# Your optimized training implementation:

πŸš€ Optimized Model Training:
for model_name, opt_result in optimization_results.items():
    best_params = opt_result["best_params"]

    # 1. Parameter Conversion
    if model_name == "RandomForest":
        # Convert choice indices to actual values
        max_features_options = ["sqrt", "log2", None]
        best_params["max_features"] = max_features_options[int(best_params["max_features"])]

        # Convert float parameters to int
        best_params["n_estimators"] = int(best_params["n_estimators"])
        best_params["max_depth"] = int(best_params["max_depth"])

    # 2. Model Creation
    model = RandomForestClassifier(**best_params, random_state=42)

    # 3. Training & Evaluation
    with mlflow.start_run(run_name=f"{model_name}_optimized"):
        model.fit(X_train.values, y_train.values)
        y_pred = model.predict(X_val.values)
        y_pred_proba = model.predict_proba(X_val)[:, 1]

        # 4. Metrics & Logging
        metrics = calculate_all_metrics(y_val, y_pred, y_pred_proba)
        mlflow.log_params(best_params)
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, f"{model_name}_model")
Enter fullscreen mode Exit fullscreen mode

cross_validate_models() Task

@task(name="cross_validate_models", description="Perform cross-validation on all models")
def cross_validate_models(...) -> Dict[str, Dict[str, float]]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Provides robust performance estimates using 5-fold stratified cross-validation.

Cross-Validation Process:

# Your cross-validation implementation:

πŸ”„ 5-Fold Stratified Cross-Validation:

πŸ“Š Fold Creation:
β”œβ”€β”€ Fold 1: 20% of data (maintaining churn ratio)
β”œβ”€β”€ Fold 2: 20% of data (maintaining churn ratio)
β”œβ”€β”€ Fold 3: 20% of data (maintaining churn ratio)
β”œβ”€β”€ Fold 4: 20% of data (maintaining churn ratio)
└── Fold 5: 20% of data (maintaining churn ratio)

🎯 Evaluation Process:
For each model:
β”œβ”€β”€ Train on 4 folds, test on 1 fold (5 times)
β”œβ”€β”€ Calculate metrics for each fold
β”œβ”€β”€ Compute mean and standard deviation
└── Store comprehensive results

πŸ“ˆ Metrics Calculated:
β”œβ”€β”€ accuracy_mean Β± accuracy_std
β”œβ”€β”€ precision_mean Β± precision_std
β”œβ”€β”€ recall_mean Β± recall_std
β”œβ”€β”€ f1_mean Β± f1_std
└── roc_auc_mean Β± roc_auc_std
Enter fullscreen mode Exit fullscreen mode

Why Cross-Validation:

  • Robust Estimates: Uses all data for both training and testing
  • Variance Assessment: Standard deviation shows model stability
  • Overfitting Detection: Consistent performance across folds indicates good generalization
  • Fair Comparison: All models evaluated on same data splits

4. Model Selection & Deployment Tasks

select_best_model() Task

@task(name="select_best_model", description="Select the best performing model")
def select_best_model(...) -> Dict[str, Any]:
Enter fullscreen mode Exit fullscreen mode

What it does:
Chooses the best model based on cross-validation recall scores.

Selection Process:

# Your model selection logic:

πŸ† Best Model Selection:
selection_metric = "recall_mean"  # Primary business metric

for model_name, cv_metrics in cv_results.items():
    score = cv_metrics.get("recall_mean", 0)
    if score > best_score:
        best_score = score
        best_model_name = model_name

# Result: Model with highest average recall across CV folds
Enter fullscreen mode Exit fullscreen mode

Why Recall as Selection Metric:

  • Business Priority: Catching churners is more important than precision
  • Cost Consideration: Missing a churner costs more than unnecessary retention effort
  • Customer Lifetime Value: Retaining customers has long-term value

register_best_model() Task

@task(name="register_best_model", description="Register best model to MLflow Model Registry")
def register_best_model(...) -> str:
Enter fullscreen mode Exit fullscreen mode

What it does:
Saves the best model to MLflow Model Registry for production deployment.

Registration Process:

# Your model registration implementation:

πŸš€ Production Model Registration:

1. Final Test Evaluation
   β”œβ”€β”€ Evaluate on held-out test set
   β”œβ”€β”€ Calculate final performance metrics
   └── Ensure no data leakage

2. Artifact Preparation
   β”œβ”€β”€ Save trained model (model.pkl)
   β”œβ”€β”€ Save feature encoder (encoder.pkl)
   β”œβ”€β”€ Save feature scaler (scaler.pkl)
   └── Create model metadata

3. Custom PyFunc Model Creation
   class ChurnPredictionModel(PythonModel):
       β”œβ”€β”€ Loads all preprocessing artifacts
       β”œβ”€β”€ Handles end-to-end prediction pipeline
       β”œβ”€β”€ Includes data validation
       └── Returns churn probabilities

4. MLflow Registration
   β”œβ”€β”€ Log PyFunc model with artifacts
   β”œβ”€β”€ Register to Model Registry
   β”œβ”€β”€ Add detailed model description
   └── Return model URI for deployment

5. Model Documentation
   β”œβ”€β”€ Performance metrics on test set
   β”œβ”€β”€ Feature importance analysis
   β”œβ”€β”€ Business impact estimates
   └── Deployment instructions
Enter fullscreen mode Exit fullscreen mode

Custom PyFunc Model Benefits:

  • End-to-End Pipeline: Includes preprocessing and prediction
  • Production Ready: Handles real-world data formats
  • Artifact Management: All dependencies bundled together
  • Versioning: Proper model versioning for rollbacks

πŸ”„ Flow Orchestration

Main Pipeline Flow

@flow(
    name="customer-churn-mlops-pipeline",
    description="Complete MLOps pipeline for customer churn prediction",
    task_runner=ConcurrentTaskRunner(),
)
def churn_mlops_pipeline(
    data_path: str = "data/raw/WA_Fn-UseC_-Telco-Customer-Churn.csv",
    hyperopt_max_evals: int = 5,
):
Enter fullscreen mode Exit fullscreen mode

Flow Execution Steps:

# Your complete pipeline orchestration:

🎬 Pipeline Execution Script:

Step 1: Initialize
β”œβ”€β”€ logger.info("Starting Customer Churn MLOps Pipeline...")
β”œβ”€β”€ experiment_id = setup_mlflow()
└── Set up tracking and logging

Step 2: Data Processing
β”œβ”€β”€ df = load_and_validate_data(data_path)
β”œβ”€β”€ train_df, val_df, test_df = prepare_data_splits(df)
β”œβ”€β”€ X_train, X_val, X_test, y_train, y_val, y_test = feature_engineering(...)
└── X_train_balanced, y_train_balanced = apply_smote(X_train, y_train)

Step 3: Model Training (Parallel Execution)
β”œβ”€β”€ baseline_results = train_baseline_models(...)  # Task A
└── optimization_results = {}                      # Task B starts
    β”œβ”€β”€ for model_name in ["RandomForest", "XGBoost", "LightGBM"]:
    └──── optimization_results[model_name] = hyperparameter_optimization(...)

Step 4: Advanced Training
β”œβ”€β”€ optimized_results = train_optimized_models(...)
└── all_models = {**baseline_results, **optimized_results}

Step 5: Evaluation & Selection
β”œβ”€β”€ cv_results = cross_validate_models(all_models, ...)
β”œβ”€β”€ best_model_info = select_best_model(all_models, cv_results, "recall_mean")
└── model_uri = register_best_model(best_model_info, X_test, y_test)

Step 6: Summary & Completion
β”œβ”€β”€ Generate pipeline summary
β”œβ”€β”€ Log final results
└── Return success status with model URI
Enter fullscreen mode Exit fullscreen mode

Concurrent Task Execution

Tasks That Run in Parallel:

# Your concurrent execution strategy:

πŸ”„ Parallel Execution Groups:

Group 1: Data Processing (Sequential - Dependencies)
β”œβ”€β”€ setup_mlflow()
β”œβ”€β”€ load_and_validate_data()
β”œβ”€β”€ prepare_data_splits()
β”œβ”€β”€ feature_engineering()
└── apply_smote()

Group 2: Model Training (Parallel)
β”œβ”€β”€ train_baseline_models()              # Starts immediately
└── hyperparameter_optimization()        # 3 models run concurrently
    β”œβ”€β”€ RandomForest optimization
    β”œβ”€β”€ XGBoost optimization
    └── LightGBM optimization

Group 3: Final Training (Sequential - Needs optimization results)
β”œβ”€β”€ train_optimized_models()
β”œβ”€β”€ cross_validate_models()
β”œβ”€β”€ select_best_model()
└── register_best_model()
Enter fullscreen mode Exit fullscreen mode

Performance Benefits:

  • 30-50% faster execution with parallel hyperparameter optimization
  • Resource efficiency - CPU cores used optimally
  • Fault tolerance - If one optimization fails, others continue
  • Scalability - Easy to add more models without increasing total time

πŸ“Š Monitoring Pipeline

Separate Monitoring Flow

# Your monitoring pipeline (monitor_churn_model.py):

@flow(name="churn_model_monitoring")
def churn_monitoring_pipeline():
Enter fullscreen mode Exit fullscreen mode

Monitoring Tasks Breakdown:

load_reference_data() Task

def load_reference_data():
    """Load and prepare reference data from training dataset"""
Enter fullscreen mode Exit fullscreen mode

What it does:

  • Loads the original training dataset as baseline for drift detection
  • Applies same preprocessing as production pipeline
  • Creates reference distribution for comparison

generate_current_data() Task

@task
def generate_current_data():
    """Generate or load current production data"""
Enter fullscreen mode Exit fullscreen mode

What it does:

  • Simulates new incoming customer data
  • In production: Would load from real-time data sources
  • Applies same preprocessing as training pipeline

calculate_drift_metrics() Task

@task
def calculate_drift_metrics(reference_data, current_data):
    """Calculate data drift and model performance metrics"""
Enter fullscreen mode Exit fullscreen mode

What it does:
Uses Evidently AI to calculate:

  • Data Drift: Statistical tests (KS-test, PSI) for feature distribution changes
  • Prediction Drift: Changes in model output distribution
  • Missing Values: Data quality monitoring
  • Model Performance: If ground truth is available

Drift Detection Process:

# Your Evidently monitoring implementation:

πŸ“Š Evidently Report Generation:
β”œβ”€β”€ DatasetDriftMetric()          # Overall dataset drift
β”œβ”€β”€ ColumnDriftMetric()           # Individual feature drift
β”œβ”€β”€ DatasetMissingValuesMetric()  # Data quality checks
└── Model performance metrics     # If labels available

🎯 Drift Calculation:
β”œβ”€β”€ Statistical Tests: KS-test, Chi-square, Z-test
β”œβ”€β”€ Thresholds: Configurable drift sensitivity
β”œβ”€β”€ Feature-level: Individual feature drift scores
└── Dataset-level: Overall drift assessment
Enter fullscreen mode Exit fullscreen mode

save_monitoring_results() Task

@task
def save_monitoring_results(metrics):
    """Save monitoring results to database"""
Enter fullscreen mode Exit fullscreen mode

What it does:
Saves monitoring metrics to PostgreSQL database for Grafana visualization:

-- Your monitoring table schema:
CREATE TABLE CHURN_ML_METRICS(
    timestamp TIMESTAMP,
    prediction_drift FLOAT,           -- Prediction distribution drift
    num_drifted_columns INTEGER,      -- Number of features with drift
    share_missing_values FLOAT,       -- Data quality metric
    avg_churn_probability FLOAT,      -- Average prediction score
    model_version VARCHAR(10)         -- Model version tracking
);
Enter fullscreen mode Exit fullscreen mode

Monitoring Pipeline Execution

# Your complete monitoring workflow:

πŸ” Monitoring Pipeline Flow:
1. Load reference (training) data
2. Generate/load current (production) data
3. Calculate drift metrics using Evidently
4. Save results to PostgreSQL database
5. Trigger alerts if drift exceeds thresholds
6. Update Grafana dashboards automatically
Enter fullscreen mode Exit fullscreen mode

πŸš€ Execution Guide

Running Your Pipeline

Local Development

# 1. Install dependencies
pip install -r requirements.txt

# 2. Start MLflow server
mlflow server --host 0.0.0.0 --port 5000

# 3. Run the complete pipeline
python services/training/churn_mlops_pipeline.py

# 4. Monitor progress in terminal and MLflow UI
# MLflow UI: http://localhost:5000
Enter fullscreen mode Exit fullscreen mode

Production Deployment

# 1. Start Prefect server
prefect server start

# 2. Create deployment
prefect deployment build services/training/churn_mlops_pipeline.py:churn_mlops_pipeline -n "churn-pipeline"

# 3. Apply deployment
prefect deployment apply churn_mlops_pipeline-deployment.yaml

# 4. Start worker
prefect agent start --work-queue "default"

# 5. Run deployment
prefect deployment run "churn_mlops_pipeline/churn-pipeline"
Enter fullscreen mode Exit fullscreen mode

Scheduled Execution

# Your pipeline with scheduling:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=churn_mlops_pipeline,
    name="churn-pipeline-daily",
    schedule=CronSchedule(cron="0 2 * * *"),  # Daily at 2 AM
    work_queue_name="ml-training"
)
Enter fullscreen mode Exit fullscreen mode

Monitoring Execution

Prefect UI Dashboard

πŸ–₯️ Prefect UI (http://localhost:4200):
β”œβ”€β”€ Flow Runs: Real-time pipeline execution status
β”œβ”€β”€ Task Status: Individual task success/failure
β”œβ”€β”€ Logs: Detailed execution logs
β”œβ”€β”€ Performance: Task execution times
└── Schedules: Upcoming pipeline runs
Enter fullscreen mode Exit fullscreen mode

MLflow Integration

πŸ“Š MLflow Tracking (http://localhost:5000):
β”œβ”€β”€ Experiments: All pipeline runs grouped
β”œβ”€β”€ Models: Trained model artifacts and metrics
β”œβ”€β”€ Comparisons: Side-by-side model performance
└── Registry: Production-ready model versions
Enter fullscreen mode Exit fullscreen mode

πŸ› οΈ Troubleshooting

Common Issues & Solutions

1. MLflow Connection Issues

Problem:

MLflow server not available: Connection refused
Enter fullscreen mode Exit fullscreen mode

Solution:

# Your error handling in setup_mlflow():
try:
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    client = MlflowClient()
    client.search_experiments()
except Exception as e:
    logger.warning(f"MLflow server not available: {e}")
    logger.info("Continuing without MLflow tracking...")
    return "local_experiment"
Enter fullscreen mode Exit fullscreen mode

Prevention:

  • Ensure MLflow server is running
  • Check network connectivity
  • Verify MLFLOW_TRACKING_URI is correct

2. Memory Issues During Training

Problem:

MemoryError: Unable to allocate array
Enter fullscreen mode Exit fullscreen mode

Solution:

# Reduce dataset size or use sampling:
if len(df) > 50000:
    df = df.sample(n=50000, random_state=42)
    logger.info("Dataset sampled to 50,000 rows for memory efficiency")
Enter fullscreen mode Exit fullscreen mode

3. Hyperparameter Optimization Failures

Problem:

Optimization failed for XGBoost: Invalid parameter
Enter fullscreen mode Exit fullscreen mode

Solution:

# Your error handling in optimization:
def objective(params):
    try:
        model = get_model_instance(model_name, params.copy())
        model.fit(X_train.values, y_train.values)
        y_pred = model.predict(X_val.values)
        recall = recall_score(y_val, y_pred)
        return {"loss": -recall, "status": STATUS_OK}
    except Exception as e:
        logger.warning(f"Error in optimization trial: {e}")
        return {"loss": 0, "status": STATUS_OK}  # Return bad score, continue
Enter fullscreen mode Exit fullscreen mode

4. Task Failures and Retries

# Configure task retries:
@task(
    name="robust_task",
    retries=3,
    retry_delay_seconds=60,
    retry_jitter_factor=0.1
)
def robust_training_task():
    # Task implementation
    pass
Enter fullscreen mode Exit fullscreen mode

5. Data Validation Failures

Problem:

ValueError: Missing required columns: ['TotalCharges']
Enter fullscreen mode Exit fullscreen mode

Solution:

# Your validation in load_and_validate_data():
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
    logger.error(f"Missing required columns: {missing_columns}")
    logger.info("Available columns: {list(df.columns)}")
    raise ValueError(f"Missing required columns: {missing_columns}")
Enter fullscreen mode Exit fullscreen mode

Pipeline Health Monitoring

Key Metrics to Monitor

# Your pipeline health indicators:

πŸ“Š Pipeline Health Metrics:
β”œβ”€β”€ Execution Time: Should be consistent (Β±20%)
β”œβ”€β”€ Model Performance: Recall should stay >0.80
β”œβ”€β”€ Data Quality: <5% missing values
β”œβ”€β”€ Memory Usage: <80% of available RAM
└── Error Rate: <5% task failures

🚨 Alert Conditions:
β”œβ”€β”€ Pipeline fails to complete
β”œβ”€β”€ Best model recall drops >10%
β”œβ”€β”€ Data validation failures
β”œβ”€β”€ MLflow connectivity issues
└── Unusual execution times (+50%)
Enter fullscreen mode Exit fullscreen mode

Monitoring Commands

# Check pipeline status
prefect flow-run ls --limit 10

# View specific run logs
prefect flow-run logs <flow-run-id>

# Monitor system resources
htop
nvidia-smi  # If using GPU

# Check MLflow experiments
mlflow experiments list
mlflow runs list --experiment-id 1
Enter fullscreen mode Exit fullscreen mode

mlopszoomcamp

Top comments (0)