DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

part_5_customer_churn_prediction_mlopszoomcamp

πŸ“Š Monitoring & Observability Tutorial: Your ML Watchdog System

πŸ“‹ What You'll Learn

In this tutorial, you'll discover how to:

  • πŸ‘€ Monitor your ML models in real-time (24/7 watchdog)
  • πŸ“ˆ Track model performance and data drift (quality control)
  • 🚨 Set up automatic alerts for problems (early warning system)
  • πŸ“Š Create dashboards for business insights (mission control)
  • πŸ”§ Detect and fix issues before customers notice (preventive maintenance)

πŸ€” Why Do We Need Monitoring?

Imagine you run a restaurant and need to:

  • Watch the kitchen temperature (system health)
  • Check if ingredients are fresh (data quality)
  • Monitor customer satisfaction (model performance)
  • Get alerts if something goes wrong (immediate notification)
  • Track daily sales and trends (business metrics)

ML monitoring is your digital restaurant manager that watches everything 24/7!

πŸ—οΈ Understanding the Monitoring Ecosystem

Think of monitoring like a security system for your house with different types of sensors:

πŸ‘€ The Watchers (What We Monitor)

🏠 ML System House
β”œβ”€β”€ πŸšͺ Front Door (API Gateway)
β”‚   └── πŸ“Š Request patterns, response times
β”œβ”€β”€ 🍳 Kitchen (Model Serving)
β”‚   └── πŸ“ˆ Prediction accuracy, latency
β”œβ”€β”€ 🧊 Refrigerator (Data Storage)
β”‚   └── πŸ“Š Data quality, freshness
└── πŸ’‘ Electrical System (Infrastructure)
    └── πŸ”‹ CPU, memory, disk usage
Enter fullscreen mode Exit fullscreen mode

🚨 The Alert System (What Triggers Alarms)

🚨 Alert Triggers
β”œβ”€β”€ πŸ”΄ Critical: Model accuracy drops below 80%
β”œβ”€β”€ 🟑 Warning: Response time > 2 seconds
β”œβ”€β”€ πŸ”΅ Info: Daily prediction volume unusual
└── 🟒 Success: All systems healthy
Enter fullscreen mode Exit fullscreen mode

πŸŽ“ Step-by-Step Tutorial: Building Your Monitoring System

Step 1: Setting Up the Data Collection (Installing Sensors)

What we're doing: Creating sensors to collect information about our ML system.

# src/monitoring/data_collector.py - Your data gathering sensors
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import json
from typing import Dict, List, Any

class MLMonitoringCollector:
    """
    This is like installing sensors throughout your restaurant to collect data:
    - Temperature sensors (system metrics)
    - Customer feedback forms (prediction feedback)
    - Sales counters (usage metrics)
    - Quality inspectors (model performance)
    """

    def __init__(self, storage_path: str = "monitoring/data/"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        self.logger = logging.getLogger(__name__)

    def collect_prediction_data(self, 
                              customer_features: Dict, 
                              prediction: float, 
                              actual_outcome: float = None,
                              model_version: str = "1.0"):
        """
        πŸ“Š Collect data about each prediction made

        Like recording each order in your restaurant:
        - What did the customer order? (features)
        - What did we serve? (prediction)
        - Were they satisfied? (actual outcome)
        """

        prediction_record = {
            'timestamp': datetime.now().isoformat(),
            'model_version': model_version,
            'prediction': prediction,
            'actual_outcome': actual_outcome,
            'customer_features': customer_features,
            'response_time_ms': self._calculate_response_time(),
            'request_id': self._generate_request_id()
        }

        # πŸ’Ύ Store the prediction record
        self._store_prediction_record(prediction_record)

        # πŸ“Š Update real-time metrics
        self._update_realtime_metrics(prediction_record)

        self.logger.info(f"πŸ“Š Prediction recorded: {prediction:.3f} (ID: {prediction_record['request_id']})")

        return prediction_record['request_id']

    def collect_system_metrics(self):
        """
        πŸ–₯️ Collect system performance metrics

        Like checking your restaurant's equipment:
        - Are the ovens running at the right temperature?
        - Is the refrigerator cold enough?
        - Are the lights working?
        """
        import psutil

        system_metrics = {
            'timestamp': datetime.now().isoformat(),
            'cpu_usage_percent': psutil.cpu_percent(interval=1),
            'memory_usage_percent': psutil.virtual_memory().percent,
            'disk_usage_percent': psutil.disk_usage('/').percent,
            'available_memory_gb': psutil.virtual_memory().available / (1024**3),
            'network_connections': len(psutil.net_connections()),
            'process_count': len(psutil.pids())
        }

        # πŸ’Ύ Store system metrics
        self._store_system_metrics(system_metrics)

        # 🚨 Check for alerts
        self._check_system_alerts(system_metrics)

        return system_metrics

    def collect_model_performance_metrics(self, y_true: List, y_pred: List, time_window: str = "1h"):
        """
        πŸ“ˆ Calculate and collect model performance metrics

        Like reviewing customer satisfaction surveys to see how you're doing.
        """
        from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score

        try:
            # πŸ“Š Calculate performance metrics
            performance_metrics = {
                'timestamp': datetime.now().isoformat(),
                'time_window': time_window,
                'sample_count': len(y_true),
                'accuracy': accuracy_score(y_true, y_pred),
                'precision': precision_score(y_true, y_pred, average='weighted'),
                'recall': recall_score(y_true, y_pred, average='weighted'),
                'f1_score': f1_score(y_true, y_pred, average='weighted')
            }

            # πŸ“ˆ Add probability-based metrics if available
            if hasattr(y_pred, '__len__') and len(y_pred[0]) == 2:  # Probability predictions
                y_pred_proba = [pred[1] for pred in y_pred]
                performance_metrics['auc_roc'] = roc_auc_score(y_true, y_pred_proba)

            # πŸ’Ύ Store performance metrics
            self._store_performance_metrics(performance_metrics)

            # 🚨 Check for performance alerts
            self._check_performance_alerts(performance_metrics)

            self.logger.info(f"πŸ“ˆ Model performance recorded: Accuracy={performance_metrics['accuracy']:.3f}")

            return performance_metrics

        except Exception as e:
            self.logger.error(f"❌ Error calculating performance metrics: {str(e)}")
            return None

    def collect_data_drift_metrics(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """
        πŸ” Detect if incoming data is different from training data

        Like noticing that today's ingredients smell different than usual.
        """

        drift_metrics = {
            'timestamp': datetime.now().isoformat(),
            'reference_period': 'training_data',
            'current_period': 'last_24h',
            'features_analyzed': list(reference_data.columns)
        }

        # πŸ“Š Calculate drift for each feature
        feature_drifts = {}

        for column in reference_data.columns:
            if reference_data[column].dtype in ['int64', 'float64']:
                # πŸ”’ Numerical feature drift
                ref_mean = reference_data[column].mean()
                curr_mean = current_data[column].mean()
                ref_std = reference_data[column].std()
                curr_std = current_data[column].std()

                mean_drift = abs(curr_mean - ref_mean) / (ref_std + 1e-7)
                std_drift = abs(curr_std - ref_std) / (ref_std + 1e-7)

                feature_drifts[column] = {
                    'type': 'numerical',
                    'mean_drift': mean_drift,
                    'std_drift': std_drift,
                    'drift_score': max(mean_drift, std_drift)
                }

            else:
                # 🏷️ Categorical feature drift
                ref_dist = reference_data[column].value_counts(normalize=True)
                curr_dist = current_data[column].value_counts(normalize=True)

                # Calculate KL divergence (simplified)
                kl_div = self._calculate_kl_divergence(ref_dist, curr_dist)

                feature_drifts[column] = {
                    'type': 'categorical',
                    'kl_divergence': kl_div,
                    'drift_score': kl_div
                }

        drift_metrics['feature_drifts'] = feature_drifts
        drift_metrics['overall_drift_score'] = np.mean([f['drift_score'] for f in feature_drifts.values()])

        # πŸ’Ύ Store drift metrics
        self._store_drift_metrics(drift_metrics)

        # 🚨 Check for drift alerts
        self._check_drift_alerts(drift_metrics)

        self.logger.info(f"πŸ” Data drift analysis complete: Overall score={drift_metrics['overall_drift_score']:.3f}")

        return drift_metrics

    def _calculate_kl_divergence(self, dist1, dist2):
        """Calculate simplified KL divergence between two distributions"""
        # Align distributions and add small epsilon to avoid division by zero
        all_categories = set(dist1.index) | set(dist2.index)
        epsilon = 1e-7

        kl_div = 0
        for cat in all_categories:
            p = dist1.get(cat, 0) + epsilon
            q = dist2.get(cat, 0) + epsilon
            kl_div += p * np.log(p / q)

        return kl_div

    def _store_prediction_record(self, record):
        """Store individual prediction records"""
        filename = f"predictions_{datetime.now().strftime('%Y%m%d')}.jsonl"
        filepath = self.storage_path / filename

        with open(filepath, 'a') as f:
            f.write(json.dumps(record) + '\n')

    def _store_system_metrics(self, metrics):
        """Store system performance metrics"""
        filename = f"system_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
        filepath = self.storage_path / filename

        with open(filepath, 'a') as f:
            f.write(json.dumps(metrics) + '\n')

    def _store_performance_metrics(self, metrics):
        """Store model performance metrics"""
        filename = f"performance_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
        filepath = self.storage_path / filename

        with open(filepath, 'a') as f:
            f.write(json.dumps(metrics) + '\n')

    def _store_drift_metrics(self, metrics):
        """Store data drift metrics"""
        filename = f"drift_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
        filepath = self.storage_path / filename

        with open(filepath, 'a') as f:
            f.write(json.dumps(metrics) + '\n')

    def _calculate_response_time(self):
        """Simulate response time calculation"""
        # In real implementation, this would measure actual response time
        return np.random.normal(100, 20)  # Simulated response time in ms

    def _generate_request_id(self):
        """Generate unique request ID"""
        from uuid import uuid4
        return str(uuid4())[:8]

    def _update_realtime_metrics(self, record):
        """Update real-time metrics for dashboards"""
        # In real implementation, this would update a time-series database
        pass

    def _check_system_alerts(self, metrics):
        """Check if system metrics trigger any alerts"""
        alerts = []

        if metrics['cpu_usage_percent'] > 80:
            alerts.append(f"πŸ”΄ HIGH CPU: {metrics['cpu_usage_percent']:.1f}%")

        if metrics['memory_usage_percent'] > 85:
            alerts.append(f"πŸ”΄ HIGH MEMORY: {metrics['memory_usage_percent']:.1f}%")

        if metrics['disk_usage_percent'] > 90:
            alerts.append(f"πŸ”΄ LOW DISK SPACE: {metrics['disk_usage_percent']:.1f}%")

        for alert in alerts:
            self.logger.warning(alert)
            # In real implementation, send alert to monitoring system

    def _check_performance_alerts(self, metrics):
        """Check if performance metrics trigger any alerts"""
        alerts = []

        if metrics['accuracy'] < 0.8:
            alerts.append(f"πŸ”΄ LOW ACCURACY: {metrics['accuracy']:.3f}")

        if metrics['precision'] < 0.7:
            alerts.append(f"🟑 LOW PRECISION: {metrics['precision']:.3f}")

        for alert in alerts:
            self.logger.warning(alert)

    def _check_drift_alerts(self, metrics):
        """Check if data drift triggers any alerts"""
        if metrics['overall_drift_score'] > 0.5:
            alert = f"🟑 DATA DRIFT DETECTED: Score={metrics['overall_drift_score']:.3f}"
            self.logger.warning(alert)

# 🏭 Usage example
def setup_monitoring():
    """Set up monitoring for ML system"""

    collector = MLMonitoringCollector()

    # πŸ“Š Example: Monitor a prediction
    customer_features = {
        'tenure': 24,
        'MonthlyCharges': 75.5,
        'Contract': 'Month-to-month'
    }

    request_id = collector.collect_prediction_data(
        customer_features=customer_features,
        prediction=0.75,
        model_version="2.1"
    )

    # πŸ–₯️ Collect system metrics
    system_health = collector.collect_system_metrics()

    return collector
Enter fullscreen mode Exit fullscreen mode

Step 2: Real-Time Alerting System (Security System)

What we're doing: Setting up an automated alert system that notifies you when something goes wrong.

# src/monitoring/alerting.py - Your intelligent alert system
import smtplib
import requests
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime
import json

class AlertManager:
    """
    This is your smart security system that:
    - Watches all the sensors 24/7
    - Knows what's normal vs. concerning
    - Sends the right alerts to the right people
    - Doesn't cry wolf (smart filtering)
    """

    def __init__(self, config_file: str = "monitoring/alert_config.json"):
        self.logger = logging.getLogger(__name__)
        self.config = self._load_alert_config(config_file)
        self.alert_history = []

    def _load_alert_config(self, config_file):
        """Load alerting configuration"""
        default_config = {
            "email": {
                "enabled": True,
                "smtp_server": "smtp.gmail.com",
                "smtp_port": 587,
                "sender_email": "ml-alerts@company.com",
                "recipients": ["team@company.com"]
            },
            "slack": {
                "enabled": True,
                "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
            },
            "thresholds": {
                "accuracy_critical": 0.75,
                "accuracy_warning": 0.85,
                "response_time_critical": 5000,  # ms
                "response_time_warning": 2000,
                "cpu_critical": 90,
                "memory_critical": 95,
                "drift_warning": 0.3,
                "drift_critical": 0.5
            },
            "alert_cooldown": 300  # 5 minutes between similar alerts
        }

        try:
            with open(config_file, 'r') as f:
                config = json.load(f)
            return {**default_config, **config}
        except FileNotFoundError:
            self.logger.info("Using default alert configuration")
            return default_config

    def send_performance_alert(self, metrics: Dict, severity: str = "warning"):
        """
        🚨 Send alert about model performance issues

        Like calling the manager when customer complaints spike.
        """

        # πŸ” Check if we should send this alert
        if not self._should_send_alert(f"performance_{severity}", metrics):
            return

        # πŸ“ Create alert message
        if severity == "critical":
            emoji = "πŸ”΄"
            priority = "CRITICAL"
        elif severity == "warning":
            emoji = "🟑"
            priority = "WARNING"
        else:
            emoji = "πŸ”΅"
            priority = "INFO"

        subject = f"{emoji} ML Model Performance Alert - {priority}"

        message = f"""
{emoji} **ML Model Performance Alert**

**Severity:** {priority}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

**Performance Metrics:**
β€’ Accuracy: {metrics.get('accuracy', 'N/A'):.3f}
β€’ Precision: {metrics.get('precision', 'N/A'):.3f}
β€’ Recall: {metrics.get('recall', 'N/A'):.3f}
β€’ F1-Score: {metrics.get('f1_score', 'N/A'):.3f}

**Recommended Actions:**
"""

        if metrics.get('accuracy', 1) < self.config['thresholds']['accuracy_critical']:
            message += "β€’ 🚨 Investigate model accuracy drop immediately\n"
            message += "β€’ πŸ”„ Consider triggering model retraining\n"
            message += "β€’ πŸ“Š Analyze recent prediction patterns\n"
        elif metrics.get('accuracy', 1) < self.config['thresholds']['accuracy_warning']:
            message += "β€’ πŸ“Š Monitor model performance closely\n"
            message += "β€’ πŸ” Review recent prediction quality\n"

        # πŸ“€ Send via configured channels
        self._send_alert_via_channels(subject, message, severity)

        # πŸ“ Log the alert
        self._log_alert("performance", severity, metrics)

    def send_system_alert(self, metrics: Dict, severity: str = "warning"):
        """
        πŸ–₯️ Send alert about system resource issues

        Like calling maintenance when equipment is overheating.
        """

        if not self._should_send_alert(f"system_{severity}", metrics):
            return

        emoji = "πŸ”΄" if severity == "critical" else "🟑"
        priority = severity.upper()

        subject = f"{emoji} System Resource Alert - {priority}"

        message = f"""
{emoji} **System Resource Alert**

**Severity:** {priority}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

**System Metrics:**
β€’ CPU Usage: {metrics.get('cpu_usage_percent', 'N/A'):.1f}%
β€’ Memory Usage: {metrics.get('memory_usage_percent', 'N/A'):.1f}%
β€’ Disk Usage: {metrics.get('disk_usage_percent', 'N/A'):.1f}%
β€’ Available Memory: {metrics.get('available_memory_gb', 'N/A'):.1f} GB

**Recommended Actions:**
"""

        if metrics.get('cpu_usage_percent', 0) > self.config['thresholds']['cpu_critical']:
            message += "β€’ πŸ”₯ Scale up CPU resources immediately\n"
            message += "β€’ πŸ” Investigate high CPU processes\n"

        if metrics.get('memory_usage_percent', 0) > self.config['thresholds']['memory_critical']:
            message += "β€’ πŸ’Ύ Scale up memory resources\n"
            message += "β€’ 🧹 Check for memory leaks\n"

        self._send_alert_via_channels(subject, message, severity)
        self._log_alert("system", severity, metrics)

    def send_drift_alert(self, drift_metrics: Dict):
        """
        🌊 Send alert about data drift detection

        Like noticing that today's customers are very different than usual.
        """

        drift_score = drift_metrics.get('overall_drift_score', 0)

        if drift_score > self.config['thresholds']['drift_critical']:
            severity = "critical"
            emoji = "πŸ”΄"
        elif drift_score > self.config['thresholds']['drift_warning']:
            severity = "warning"  
            emoji = "🟑"
        else:
            return  # No alert needed

        if not self._should_send_alert(f"drift_{severity}", drift_metrics):
            return

        subject = f"{emoji} Data Drift Alert - {severity.upper()}"

        message = f"""
{emoji} **Data Drift Detection Alert**

**Severity:** {severity.upper()}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

**Drift Analysis:**
β€’ Overall Drift Score: {drift_score:.3f}
β€’ Reference Period: {drift_metrics.get('reference_period', 'N/A')}
β€’ Current Period: {drift_metrics.get('current_period', 'N/A')}

**Top Drifting Features:**
"""

        # πŸ“Š Show top drifting features
        feature_drifts = drift_metrics.get('feature_drifts', {})
        sorted_features = sorted(feature_drifts.items(), 
                               key=lambda x: x[1].get('drift_score', 0), 
                               reverse=True)[:5]

        for feature, drift_info in sorted_features:
            score = drift_info.get('drift_score', 0)
            message += f"β€’ {feature}: {score:.3f}\n"

        message += f"""
**Recommended Actions:**
β€’ πŸ” Investigate data source changes
β€’ πŸ“Š Review feature engineering pipeline
β€’ πŸ€– Consider model retraining if drift persists
β€’ πŸ“ž Contact data providers to verify data quality
"""

        self._send_alert_via_channels(subject, message, severity)
        self._log_alert("drift", severity, drift_metrics)

    def _should_send_alert(self, alert_type: str, metrics: Dict) -> bool:
        """
        🧠 Smart alert filtering to avoid spam

        Like a smart doorbell that doesn't ring for every leaf that falls.
        """

        # πŸ• Check cooldown period
        current_time = datetime.now()

        for alert in self.alert_history:
            if (alert['type'] == alert_type and 
                (current_time - alert['timestamp']).seconds < self.config['alert_cooldown']):
                self.logger.info(f"Alert {alert_type} in cooldown period, skipping")
                return False

        return True

    def _send_alert_via_channels(self, subject: str, message: str, severity: str):
        """Send alert through all configured channels"""

        # πŸ“§ Email alert
        if self.config['email']['enabled']:
            self._send_email_alert(subject, message)

        # πŸ’¬ Slack alert
        if self.config['slack']['enabled']:
            self._send_slack_alert(subject, message, severity)

    def _send_email_alert(self, subject: str, message: str):
        """Send email alert"""
        try:
            msg = MimeMultipart()
            msg['From'] = self.config['email']['sender_email']
            msg['To'] = ', '.join(self.config['email']['recipients'])
            msg['Subject'] = subject

            msg.attach(MimeText(message, 'plain'))

            # Note: In production, use proper email credentials
            self.logger.info(f"πŸ“§ Email alert sent: {subject}")

        except Exception as e:
            self.logger.error(f"❌ Failed to send email alert: {str(e)}")

    def _send_slack_alert(self, subject: str, message: str, severity: str):
        """Send Slack alert"""
        try:
            color = {
                'critical': '#FF0000',
                'warning': '#FFA500', 
                'info': '#0000FF'
            }.get(severity, '#808080')

            payload = {
                'attachments': [{
                    'color': color,
                    'title': subject,
                    'text': message,
                    'ts': int(datetime.now().timestamp())
                }]
            }

            # Note: In production, use actual Slack webhook
            self.logger.info(f"πŸ’¬ Slack alert sent: {subject}")

        except Exception as e:
            self.logger.error(f"❌ Failed to send Slack alert: {str(e)}")

    def _log_alert(self, alert_type: str, severity: str, metrics: Dict):
        """Log alert for history tracking"""
        alert_record = {
            'timestamp': datetime.now(),
            'type': alert_type,
            'severity': severity,
            'metrics': metrics
        }

        self.alert_history.append(alert_record)

        # Keep only last 100 alerts in memory
        if len(self.alert_history) > 100:
            self.alert_history = self.alert_history[-100:]

# 🏭 Usage example
def setup_alerting():
    """Set up alerting system"""

    alert_manager = AlertManager()

    # 🚨 Example: Send performance alert
    poor_performance = {
        'accuracy': 0.72,  # Below critical threshold
        'precision': 0.68,
        'recall': 0.75
    }

    alert_manager.send_performance_alert(poor_performance, severity="critical")

    return alert_manager
Enter fullscreen mode Exit fullscreen mode

This monitoring tutorial shows you how to build a comprehensive watchdog system that keeps your ML models healthy and alerts you before problems affect your customers.
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β€’ Drift Det. β”‚ β”‚ β€’ Accuracy β”‚ β”‚ β€’ Resources β”‚ β”‚
β”‚ β”‚ β€’ Schema Val. β”‚ β”‚ β€’ Latency β”‚ β”‚ β€’ Services β”‚ β”‚
β”‚ β”‚ β€’ Stats Track β”‚ β”‚ β€’ Throughput β”‚ β”‚ β€’ Errors β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β–Ό β–Ό β–Ό β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ πŸ“ˆ Metrics β”‚ β”‚ 🚨 Alerting β”‚ β”‚ πŸ“Š Dashboardβ”‚ β”‚
β”‚ β”‚ Collection │◄───│ System │◄───│ & Reports β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β€’ Prometheus β”‚ β”‚ β€’ Slack/Email β”‚ β”‚ β€’ Grafana β”‚ β”‚
β”‚ β”‚ β€’ InfluxDB β”‚ β”‚ β€’ PagerDuty β”‚ β”‚ β€’ Streamlit β”‚ β”‚
β”‚ β”‚ β€’ Custom APIs β”‚ β”‚ β€’ Webhooks β”‚ β”‚ β€’ Reports β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


### πŸ“ Service Structure

Enter fullscreen mode Exit fullscreen mode


bash
services/monitoring/
β”œβ”€β”€ πŸ“„ monitor_churn_model.py # Main monitoring orchestrator
β”œβ”€β”€ πŸ“„ continuous_monitor.py # Real-time monitoring
β”œβ”€β”€ πŸ“„ simple_monitor_churn.py # Basic monitoring script
β”œβ”€β”€ πŸ“„ Dockerfile # Container configuration
β”œβ”€β”€ πŸ“„ requirements.txt # Python dependencies
β”œβ”€β”€ πŸ“‚ data_monitoring/
β”‚ β”œβ”€β”€ πŸ“„ init.py
β”‚ β”œβ”€β”€ πŸ“„ drift_detector.py # Data drift detection
β”‚ β”œβ”€β”€ πŸ“„ quality_checker.py # Data quality validation
β”‚ └── πŸ“„ schema_validator.py # Schema validation
β”œβ”€β”€ πŸ“‚ model_monitoring/
β”‚ β”œβ”€β”€ πŸ“„ init.py
β”‚ β”œβ”€β”€ πŸ“„ performance_tracker.py # Model performance tracking
β”‚ β”œβ”€β”€ πŸ“„ accuracy_monitor.py # Accuracy drift detection
β”‚ └── πŸ“„ prediction_monitor.py # Prediction quality analysis
β”œβ”€β”€ πŸ“‚ system_monitoring/
β”‚ β”œβ”€β”€ πŸ“„ init.py
β”‚ β”œβ”€β”€ πŸ“„ health_checker.py # System health monitoring
β”‚ β”œβ”€β”€ πŸ“„ resource_monitor.py # Resource utilization
β”‚ └── πŸ“„ service_monitor.py # Service availability
β”œβ”€β”€ πŸ“‚ alerting/
β”‚ β”œβ”€β”€ πŸ“„ init.py
β”‚ β”œβ”€β”€ πŸ“„ alert_manager.py # Alert management
β”‚ β”œβ”€β”€ πŸ“„ notification_sender.py # Notification delivery
β”‚ └── πŸ“„ escalation_rules.py # Alert escalation logic
β”œβ”€β”€ πŸ“‚ dashboards/
β”‚ β”œβ”€β”€ πŸ“„ grafana_dashboards.py # Grafana dashboard configs
β”‚ β”œβ”€β”€ πŸ“„ streamlit_monitor.py # Custom monitoring dashboard
β”‚ └── πŸ“„ report_generator.py # Automated reports
β”œβ”€β”€ πŸ“‚ metrics/
β”‚ β”œβ”€β”€ πŸ“„ init.py
β”‚ β”œβ”€β”€ πŸ“„ prometheus_metrics.py # Prometheus metric definitions
β”‚ β”œβ”€β”€ πŸ“„ business_metrics.py # Business KPI tracking
β”‚ └── πŸ“„ custom_metrics.py # Custom metric collectors
└── πŸ“‚ config/
β”œβ”€β”€ πŸ“„ monitoring_config.py # Monitoring configuration
β”œβ”€β”€ πŸ“„ alert_config.py # Alert thresholds and rules
└── πŸ“„ dashboard_config.py # Dashboard settings


## πŸ” Core Monitoring Implementation

### Main Monitoring Orchestrator

Enter fullscreen mode Exit fullscreen mode


python

monitor_churn_model.py

import pandas as pd
import numpy as np
import mlflow
import logging
import time
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import joblib
import sqlite3
import os
from dataclasses import dataclass
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score
import warnings
warnings.filterwarnings('ignore')

πŸ“Š Configure logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/monitoring.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(name)

@dataclass
class MonitoringMetrics:
"""πŸ“Š Container for monitoring metrics"""
timestamp: datetime
model_version: str
accuracy: float
precision: float
recall: float
f1_score: float
roc_auc: float
prediction_count: int
error_rate: float
avg_response_time: float
data_drift_score: float
concept_drift_score: float
business_impact: float

class ChurnModelMonitor:
"""
πŸ“Š Comprehensive monitoring system for churn prediction model

This class implements real-time monitoring of model performance,
data quality, system health, and business metrics.
"""

def __init__(self, config: Dict[str, Any]):
    """
    Initialize the monitoring system

    Args:
        config: Dictionary containing monitoring configuration
    """
    self.config = config
    self.mlflow_uri = config.get('mlflow_tracking_uri', 'http://localhost:5000')
    self.model_name = config.get('model_name', 'churn-prediction')
    self.db_path = config.get('monitoring_db', 'monitoring/churn_monitoring.db')

    # πŸ”§ Setup MLflow
    mlflow.set_tracking_uri(self.mlflow_uri)

    # πŸ—„οΈ Initialize database
    self._setup_monitoring_database()

    # πŸ“Š Load reference data for drift detection
    self.reference_data = self._load_reference_data()

    # πŸ€– Load current production model
    self.current_model = None
    self.model_metadata = {}
    self._load_production_model()

    # πŸ“ˆ Initialize metrics collectors
    self.metrics_history = []

    logger.info("πŸ“Š Churn model monitoring system initialized")

def run_monitoring_cycle(self) -> MonitoringMetrics:
    """
    πŸ”„ Execute a complete monitoring cycle

    Returns:
        MonitoringMetrics: Comprehensive monitoring results
    """

    logger.info("πŸ”„ Starting monitoring cycle...")

    try:
        # 1️⃣ Collect recent predictions
        recent_predictions = self._collect_recent_predictions()

        # 2️⃣ Monitor model performance
        performance_metrics = self._monitor_model_performance(recent_predictions)

        # 3️⃣ Detect data drift
        drift_metrics = self._detect_data_drift(recent_predictions)

        # 4️⃣ Monitor system health
        system_metrics = self._monitor_system_health()

        # 5️⃣ Calculate business impact
        business_metrics = self._calculate_business_impact(recent_predictions)

        # 6️⃣ Combine all metrics
        monitoring_result = MonitoringMetrics(
            timestamp=datetime.now(),
            model_version=self.model_metadata.get('version', 'unknown'),
            accuracy=performance_metrics.get('accuracy', 0.0),
            precision=performance_metrics.get('precision', 0.0),
            recall=performance_metrics.get('recall', 0.0),
            f1_score=performance_metrics.get('f1_score', 0.0),
            roc_auc=performance_metrics.get('roc_auc', 0.0),
            prediction_count=len(recent_predictions),
            error_rate=system_metrics.get('error_rate', 0.0),
            avg_response_time=system_metrics.get('avg_response_time', 0.0),
            data_drift_score=drift_metrics.get('data_drift_score', 0.0),
            concept_drift_score=drift_metrics.get('concept_drift_score', 0.0),
            business_impact=business_metrics.get('revenue_impact', 0.0)
        )

        # 7️⃣ Store metrics
        self._store_monitoring_metrics(monitoring_result)

        # 8️⃣ Check for alerts
        self._check_and_send_alerts(monitoring_result)

        # 9️⃣ Update dashboards
        self._update_monitoring_dashboards(monitoring_result)

        logger.info("βœ… Monitoring cycle completed successfully")
        return monitoring_result

    except Exception as e:
        logger.error(f"❌ Monitoring cycle failed: {str(e)}")
        self._send_critical_alert("Monitoring System Failure", str(e))
        raise

def _collect_recent_predictions(self, hours_back: int = 1) -> pd.DataFrame:
    """πŸ“₯ Collect recent predictions from the production system"""

    logger.info(f"πŸ“₯ Collecting predictions from last {hours_back} hours...")

    try:
        # Connect to monitoring database
        conn = sqlite3.connect(self.db_path)

        # Calculate cutoff time
        cutoff_time = datetime.now() - timedelta(hours=hours_back)

        # Query recent predictions
        query = """
        SELECT 
            prediction_id,
            timestamp,
            model_version,
            input_features,
            prediction_probability,
            prediction_class,
            response_time_ms,
            actual_outcome
        FROM predictions 
        WHERE timestamp > ?
        ORDER BY timestamp DESC
        """

        recent_data = pd.read_sql_query(
            query, 
            conn, 
            params=[cutoff_time.isoformat()]
        )

        conn.close()

        if len(recent_data) == 0:
            logger.warning("⚠️ No recent predictions found")
            return pd.DataFrame()

        # Parse input features JSON
        recent_data['features'] = recent_data['input_features'].apply(
            lambda x: json.loads(x) if x else {}
        )

        logger.info(f"πŸ“Š Collected {len(recent_data)} recent predictions")
        return recent_data

    except Exception as e:
        logger.error(f"❌ Failed to collect recent predictions: {str(e)}")
        return pd.DataFrame()

def _monitor_model_performance(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
    """πŸ“ˆ Monitor current model performance metrics"""

    logger.info("πŸ“ˆ Monitoring model performance...")

    if len(predictions_df) == 0:
        logger.warning("⚠️ No predictions available for performance monitoring")
        return {}

    try:
        # Filter predictions with actual outcomes (for performance calculation)
        labeled_predictions = predictions_df[
            predictions_df['actual_outcome'].notna()
        ].copy()

        if len(labeled_predictions) == 0:
            logger.warning("⚠️ No labeled predictions available for performance calculation")
            return {
                'total_predictions': len(predictions_df),
                'labeled_predictions': 0,
                'avg_prediction_probability': predictions_df['prediction_probability'].mean(),
                'positive_prediction_rate': predictions_df['prediction_class'].mean()
            }

        # Calculate performance metrics
        y_true = labeled_predictions['actual_outcome'].astype(int)
        y_pred = labeled_predictions['prediction_class'].astype(int)
        y_prob = labeled_predictions['prediction_probability']

        # Performance metrics
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1_score': 2 * precision_score(y_true, y_pred, zero_division=0) * recall_score(y_true, y_pred, zero_division=0) / 
                      (precision_score(y_true, y_pred, zero_division=0) + recall_score(y_true, y_pred, zero_division=0) + 1e-8),
            'roc_auc': roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) > 1 else 0.5,
            'total_predictions': len(predictions_df),
            'labeled_predictions': len(labeled_predictions),
            'positive_prediction_rate': y_pred.mean(),
            'avg_prediction_probability': y_prob.mean(),
            'prediction_confidence': y_prob.std()  # Higher std means more confident predictions
        }

        # Log performance summary
        logger.info(f"πŸ“Š Performance Summary:")
        logger.info(f"   Accuracy: {metrics['accuracy']:.4f}")
        logger.info(f"   ROC-AUC: {metrics['roc_auc']:.4f}")
        logger.info(f"   Precision: {metrics['precision']:.4f}")
        logger.info(f"   Recall: {metrics['recall']:.4f}")
        logger.info(f"   Total Predictions: {metrics['total_predictions']}")

        return metrics

    except Exception as e:
        logger.error(f"❌ Performance monitoring failed: {str(e)}")
        return {}

def _detect_data_drift(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
    """πŸ” Detect data drift in incoming predictions"""

    logger.info("πŸ” Detecting data drift...")

    if len(predictions_df) == 0 or self.reference_data is None:
        logger.warning("⚠️ Insufficient data for drift detection")
        return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}

    try:
        from scipy.stats import ks_2samp, chi2_contingency

        # Extract features from recent predictions
        recent_features = []
        for _, row in predictions_df.iterrows():
            if row['features']:
                recent_features.append(row['features'])

        if not recent_features:
            logger.warning("⚠️ No feature data available for drift detection")
            return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}

        recent_features_df = pd.DataFrame(recent_features)

        # Calculate drift scores for numerical features
        numerical_drift_scores = []
        categorical_drift_scores = []

        for column in recent_features_df.columns:
            if column in self.reference_data.columns:
                recent_values = recent_features_df[column].dropna()
                reference_values = self.reference_data[column].dropna()

                if len(recent_values) == 0 or len(reference_values) == 0:
                    continue

                # For numerical features - use Kolmogorov-Smirnov test
                if pd.api.types.is_numeric_dtype(recent_values):
                    try:
                        ks_stat, p_value = ks_2samp(recent_values, reference_values)
                        numerical_drift_scores.append(ks_stat)
                    except:
                        continue

                # For categorical features - use Chi-square test
                else:
                    try:
                        # Create contingency table
                        recent_counts = recent_values.value_counts()
                        reference_counts = reference_values.value_counts()

                        # Align categories
                        all_categories = set(recent_counts.index) | set(reference_counts.index)
                        recent_aligned = [recent_counts.get(cat, 0) for cat in all_categories]
                        reference_aligned = [reference_counts.get(cat, 0) for cat in all_categories]

                        if sum(recent_aligned) > 0 and sum(reference_aligned) > 0:
                            contingency_table = np.array([recent_aligned, reference_aligned])
                            chi2_stat, p_value, _, _ = chi2_contingency(contingency_table)
                            categorical_drift_scores.append(min(1.0, chi2_stat / 100))  # Normalize
                    except:
                        continue

        # Calculate overall drift scores
        data_drift_score = np.mean(numerical_drift_scores + categorical_drift_scores) if (numerical_drift_scores + categorical_drift_scores) else 0.0

        # Concept drift detection (simplified)
        concept_drift_score = 0.0
        if 'actual_outcome' in predictions_df.columns:
            labeled_data = predictions_df[predictions_df['actual_outcome'].notna()]
            if len(labeled_data) > 10:  # Need sufficient data
                recent_churn_rate = labeled_data['actual_outcome'].mean()
                reference_churn_rate = self.reference_data.get('Churn', pd.Series([0.5])).mean()
                concept_drift_score = abs(recent_churn_rate - reference_churn_rate)

        drift_metrics = {
            'data_drift_score': min(1.0, data_drift_score),
            'concept_drift_score': min(1.0, concept_drift_score),
            'numerical_features_drift': len(numerical_drift_scores),
            'categorical_features_drift': len(categorical_drift_scores),
            'avg_numerical_drift': np.mean(numerical_drift_scores) if numerical_drift_scores else 0.0,
            'avg_categorical_drift': np.mean(categorical_drift_scores) if categorical_drift_scores else 0.0
        }

        logger.info(f"πŸ” Drift Detection Results:")
        logger.info(f"   Data Drift Score: {drift_metrics['data_drift_score']:.4f}")
        logger.info(f"   Concept Drift Score: {drift_metrics['concept_drift_score']:.4f}")

        return drift_metrics

    except Exception as e:
        logger.error(f"❌ Drift detection failed: {str(e)}")
        return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}
Enter fullscreen mode Exit fullscreen mode

### System Health Monitoring

Enter fullscreen mode Exit fullscreen mode


python
def _monitor_system_health(self) -> Dict[str, float]:
"""πŸ–₯️ Monitor system health and resource utilization"""

logger.info("πŸ–₯️ Monitoring system health...")

try:
    import psutil

    # CPU and Memory metrics
    cpu_percent = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    disk = psutil.disk_usage('/')

    # Network I/O
    network = psutil.net_io_counters()

    # Check service availability
    service_health = self._check_service_availability()

    # Database health
    db_health = self._check_database_health()

    # Calculate error rate from recent logs
    error_rate = self._calculate_error_rate()

    # Average response time from recent requests
    avg_response_time = self._calculate_avg_response_time()

    system_metrics = {
        'cpu_usage_percent': cpu_percent,
        'memory_usage_percent': memory.percent,
        'memory_available_gb': memory.available / (1024**3),
        'disk_usage_percent': disk.percent,
        'disk_free_gb': disk.free / (1024**3),
        'network_bytes_sent': network.bytes_sent,
        'network_bytes_recv': network.bytes_recv,
        'mlflow_service_healthy': service_health.get('mlflow', False),
        'api_service_healthy': service_health.get('api', False),
        'database_healthy': db_health,
        'error_rate': error_rate,
        'avg_response_time': avg_response_time
    }

    logger.info(f"πŸ–₯️ System Health Summary:")
    logger.info(f"   CPU Usage: {cpu_percent:.1f}%")
    logger.info(f"   Memory Usage: {memory.percent:.1f}%")
    logger.info(f"   Disk Usage: {disk.percent:.1f}%")
    logger.info(f"   Error Rate: {error_rate:.4f}")

    return system_metrics

except Exception as e:
    logger.error(f"❌ System health monitoring failed: {str(e)}")
    return {'error_rate': 1.0, 'avg_response_time': 999.0}
Enter fullscreen mode Exit fullscreen mode

def _check_service_availability(self) -> Dict[str, bool]:
"""πŸ”— Check availability of critical services"""

services = {
    'mlflow': f"{self.mlflow_uri}/health",
    'api': "http://localhost:8000/health",
    'grafana': "http://localhost:3000/api/health"
}

service_status = {}

for service_name, url in services.items():
    try:
        response = requests.get(url, timeout=5)
        service_status[service_name] = response.status_code == 200
    except:
        service_status[service_name] = False

return service_status
Enter fullscreen mode Exit fullscreen mode

def _calculate_business_impact(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
"""πŸ’Ό Calculate business impact metrics"""

logger.info("πŸ’Ό Calculating business impact...")

if len(predictions_df) == 0:
    return {'revenue_impact': 0.0}

try:
    # Business assumptions from config
    avg_customer_value = self.config.get('avg_customer_value', 1000)
    retention_cost = self.config.get('retention_cost', 100)

    # Count predictions by risk level
    high_risk_predictions = (predictions_df['prediction_probability'] > 0.7).sum()
    medium_risk_predictions = ((predictions_df['prediction_probability'] > 0.4) & 
                             (predictions_df['prediction_probability'] <= 0.7)).sum()
    low_risk_predictions = (predictions_df['prediction_probability'] <= 0.4).sum()

    # Estimate business impact
    potential_churners_identified = high_risk_predictions + (medium_risk_predictions * 0.5)
    estimated_revenue_at_risk = potential_churners_identified * avg_customer_value
    estimated_retention_costs = high_risk_predictions * retention_cost

    # Calculate ROI (simplified)
    # Assume 70% retention success rate for high-risk interventions
    estimated_customers_saved = high_risk_predictions * 0.7
    estimated_revenue_saved = estimated_customers_saved * avg_customer_value
    roi = (estimated_revenue_saved - estimated_retention_costs) / max(estimated_retention_costs, 1)

    business_metrics = {
        'total_predictions': len(predictions_df),
        'high_risk_customers': high_risk_predictions,
        'medium_risk_customers': medium_risk_predictions,
        'low_risk_customers': low_risk_predictions,
        'potential_churners_identified': potential_churners_identified,
        'estimated_revenue_at_risk': estimated_revenue_at_risk,
        'estimated_retention_costs': estimated_retention_costs,
        'estimated_revenue_saved': estimated_revenue_saved,
        'estimated_roi': roi,
        'revenue_impact': estimated_revenue_saved - estimated_retention_costs
    }

    logger.info(f"πŸ’Ό Business Impact Summary:")
    logger.info(f"   High Risk Customers: {high_risk_predictions}")
    logger.info(f"   Revenue at Risk: ${estimated_revenue_at_risk:,.0f}")
    logger.info(f"   Estimated ROI: {roi:.2f}")

    return business_metrics

except Exception as e:
    logger.error(f"❌ Business impact calculation failed: {str(e)}")
    return {'revenue_impact': 0.0}
Enter fullscreen mode Exit fullscreen mode

def _store_monitoring_metrics(self, metrics: MonitoringMetrics):
"""πŸ’Ύ Store monitoring metrics in database"""

try:
    conn = sqlite3.connect(self.db_path)
    cursor = conn.cursor()

    # Insert monitoring metrics
    cursor.execute("""
        INSERT INTO monitoring_metrics (
            timestamp, model_version, accuracy, precision, recall, f1_score,
            roc_auc, prediction_count, error_rate, avg_response_time,
            data_drift_score, concept_drift_score, business_impact
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        metrics.timestamp.isoformat(),
        metrics.model_version,
        metrics.accuracy,
        metrics.precision,
        metrics.recall,
        metrics.f1_score,
        metrics.roc_auc,
        metrics.prediction_count,
        metrics.error_rate,
        metrics.avg_response_time,
        metrics.data_drift_score,
        metrics.concept_drift_score,
        metrics.business_impact
    ))

    conn.commit()
    conn.close()

    logger.info("πŸ’Ύ Monitoring metrics stored successfully")

except Exception as e:
    logger.error(f"❌ Failed to store monitoring metrics: {str(e)}")
Enter fullscreen mode Exit fullscreen mode


#mlopszoomcamp
This first part covers the core monitoring architecture, performance tracking, and data drift detection. The second part will focus on alerting systems, dashboard integration, and automated reporting capabilities.
Enter fullscreen mode Exit fullscreen mode

Top comments (0)