π Monitoring & Observability Tutorial: Your ML Watchdog System
π What You'll Learn
In this tutorial, you'll discover how to:
- π Monitor your ML models in real-time (24/7 watchdog)
- π Track model performance and data drift (quality control)
- π¨ Set up automatic alerts for problems (early warning system)
- π Create dashboards for business insights (mission control)
- π§ Detect and fix issues before customers notice (preventive maintenance)
π€ Why Do We Need Monitoring?
Imagine you run a restaurant and need to:
- Watch the kitchen temperature (system health)
- Check if ingredients are fresh (data quality)
- Monitor customer satisfaction (model performance)
- Get alerts if something goes wrong (immediate notification)
- Track daily sales and trends (business metrics)
ML monitoring is your digital restaurant manager that watches everything 24/7!
ποΈ Understanding the Monitoring Ecosystem
Think of monitoring like a security system for your house with different types of sensors:
π The Watchers (What We Monitor)
π ML System House
βββ πͺ Front Door (API Gateway)
β βββ π Request patterns, response times
βββ π³ Kitchen (Model Serving)
β βββ π Prediction accuracy, latency
βββ π§ Refrigerator (Data Storage)
β βββ π Data quality, freshness
βββ π‘ Electrical System (Infrastructure)
βββ π CPU, memory, disk usage
π¨ The Alert System (What Triggers Alarms)
π¨ Alert Triggers
βββ π΄ Critical: Model accuracy drops below 80%
βββ π‘ Warning: Response time > 2 seconds
βββ π΅ Info: Daily prediction volume unusual
βββ π’ Success: All systems healthy
π Step-by-Step Tutorial: Building Your Monitoring System
Step 1: Setting Up the Data Collection (Installing Sensors)
What we're doing: Creating sensors to collect information about our ML system.
# src/monitoring/data_collector.py - Your data gathering sensors
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import json
from typing import Dict, List, Any
class MLMonitoringCollector:
"""
This is like installing sensors throughout your restaurant to collect data:
- Temperature sensors (system metrics)
- Customer feedback forms (prediction feedback)
- Sales counters (usage metrics)
- Quality inspectors (model performance)
"""
def __init__(self, storage_path: str = "monitoring/data/"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
def collect_prediction_data(self,
customer_features: Dict,
prediction: float,
actual_outcome: float = None,
model_version: str = "1.0"):
"""
π Collect data about each prediction made
Like recording each order in your restaurant:
- What did the customer order? (features)
- What did we serve? (prediction)
- Were they satisfied? (actual outcome)
"""
prediction_record = {
'timestamp': datetime.now().isoformat(),
'model_version': model_version,
'prediction': prediction,
'actual_outcome': actual_outcome,
'customer_features': customer_features,
'response_time_ms': self._calculate_response_time(),
'request_id': self._generate_request_id()
}
# πΎ Store the prediction record
self._store_prediction_record(prediction_record)
# π Update real-time metrics
self._update_realtime_metrics(prediction_record)
self.logger.info(f"π Prediction recorded: {prediction:.3f} (ID: {prediction_record['request_id']})")
return prediction_record['request_id']
def collect_system_metrics(self):
"""
π₯οΈ Collect system performance metrics
Like checking your restaurant's equipment:
- Are the ovens running at the right temperature?
- Is the refrigerator cold enough?
- Are the lights working?
"""
import psutil
system_metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_usage_percent': psutil.cpu_percent(interval=1),
'memory_usage_percent': psutil.virtual_memory().percent,
'disk_usage_percent': psutil.disk_usage('/').percent,
'available_memory_gb': psutil.virtual_memory().available / (1024**3),
'network_connections': len(psutil.net_connections()),
'process_count': len(psutil.pids())
}
# πΎ Store system metrics
self._store_system_metrics(system_metrics)
# π¨ Check for alerts
self._check_system_alerts(system_metrics)
return system_metrics
def collect_model_performance_metrics(self, y_true: List, y_pred: List, time_window: str = "1h"):
"""
π Calculate and collect model performance metrics
Like reviewing customer satisfaction surveys to see how you're doing.
"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score
try:
# π Calculate performance metrics
performance_metrics = {
'timestamp': datetime.now().isoformat(),
'time_window': time_window,
'sample_count': len(y_true),
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted'),
'recall': recall_score(y_true, y_pred, average='weighted'),
'f1_score': f1_score(y_true, y_pred, average='weighted')
}
# π Add probability-based metrics if available
if hasattr(y_pred, '__len__') and len(y_pred[0]) == 2: # Probability predictions
y_pred_proba = [pred[1] for pred in y_pred]
performance_metrics['auc_roc'] = roc_auc_score(y_true, y_pred_proba)
# πΎ Store performance metrics
self._store_performance_metrics(performance_metrics)
# π¨ Check for performance alerts
self._check_performance_alerts(performance_metrics)
self.logger.info(f"π Model performance recorded: Accuracy={performance_metrics['accuracy']:.3f}")
return performance_metrics
except Exception as e:
self.logger.error(f"β Error calculating performance metrics: {str(e)}")
return None
def collect_data_drift_metrics(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
"""
π Detect if incoming data is different from training data
Like noticing that today's ingredients smell different than usual.
"""
drift_metrics = {
'timestamp': datetime.now().isoformat(),
'reference_period': 'training_data',
'current_period': 'last_24h',
'features_analyzed': list(reference_data.columns)
}
# π Calculate drift for each feature
feature_drifts = {}
for column in reference_data.columns:
if reference_data[column].dtype in ['int64', 'float64']:
# π’ Numerical feature drift
ref_mean = reference_data[column].mean()
curr_mean = current_data[column].mean()
ref_std = reference_data[column].std()
curr_std = current_data[column].std()
mean_drift = abs(curr_mean - ref_mean) / (ref_std + 1e-7)
std_drift = abs(curr_std - ref_std) / (ref_std + 1e-7)
feature_drifts[column] = {
'type': 'numerical',
'mean_drift': mean_drift,
'std_drift': std_drift,
'drift_score': max(mean_drift, std_drift)
}
else:
# π·οΈ Categorical feature drift
ref_dist = reference_data[column].value_counts(normalize=True)
curr_dist = current_data[column].value_counts(normalize=True)
# Calculate KL divergence (simplified)
kl_div = self._calculate_kl_divergence(ref_dist, curr_dist)
feature_drifts[column] = {
'type': 'categorical',
'kl_divergence': kl_div,
'drift_score': kl_div
}
drift_metrics['feature_drifts'] = feature_drifts
drift_metrics['overall_drift_score'] = np.mean([f['drift_score'] for f in feature_drifts.values()])
# πΎ Store drift metrics
self._store_drift_metrics(drift_metrics)
# π¨ Check for drift alerts
self._check_drift_alerts(drift_metrics)
self.logger.info(f"π Data drift analysis complete: Overall score={drift_metrics['overall_drift_score']:.3f}")
return drift_metrics
def _calculate_kl_divergence(self, dist1, dist2):
"""Calculate simplified KL divergence between two distributions"""
# Align distributions and add small epsilon to avoid division by zero
all_categories = set(dist1.index) | set(dist2.index)
epsilon = 1e-7
kl_div = 0
for cat in all_categories:
p = dist1.get(cat, 0) + epsilon
q = dist2.get(cat, 0) + epsilon
kl_div += p * np.log(p / q)
return kl_div
def _store_prediction_record(self, record):
"""Store individual prediction records"""
filename = f"predictions_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.storage_path / filename
with open(filepath, 'a') as f:
f.write(json.dumps(record) + '\n')
def _store_system_metrics(self, metrics):
"""Store system performance metrics"""
filename = f"system_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.storage_path / filename
with open(filepath, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def _store_performance_metrics(self, metrics):
"""Store model performance metrics"""
filename = f"performance_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.storage_path / filename
with open(filepath, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def _store_drift_metrics(self, metrics):
"""Store data drift metrics"""
filename = f"drift_metrics_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.storage_path / filename
with open(filepath, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def _calculate_response_time(self):
"""Simulate response time calculation"""
# In real implementation, this would measure actual response time
return np.random.normal(100, 20) # Simulated response time in ms
def _generate_request_id(self):
"""Generate unique request ID"""
from uuid import uuid4
return str(uuid4())[:8]
def _update_realtime_metrics(self, record):
"""Update real-time metrics for dashboards"""
# In real implementation, this would update a time-series database
pass
def _check_system_alerts(self, metrics):
"""Check if system metrics trigger any alerts"""
alerts = []
if metrics['cpu_usage_percent'] > 80:
alerts.append(f"π΄ HIGH CPU: {metrics['cpu_usage_percent']:.1f}%")
if metrics['memory_usage_percent'] > 85:
alerts.append(f"π΄ HIGH MEMORY: {metrics['memory_usage_percent']:.1f}%")
if metrics['disk_usage_percent'] > 90:
alerts.append(f"π΄ LOW DISK SPACE: {metrics['disk_usage_percent']:.1f}%")
for alert in alerts:
self.logger.warning(alert)
# In real implementation, send alert to monitoring system
def _check_performance_alerts(self, metrics):
"""Check if performance metrics trigger any alerts"""
alerts = []
if metrics['accuracy'] < 0.8:
alerts.append(f"π΄ LOW ACCURACY: {metrics['accuracy']:.3f}")
if metrics['precision'] < 0.7:
alerts.append(f"π‘ LOW PRECISION: {metrics['precision']:.3f}")
for alert in alerts:
self.logger.warning(alert)
def _check_drift_alerts(self, metrics):
"""Check if data drift triggers any alerts"""
if metrics['overall_drift_score'] > 0.5:
alert = f"π‘ DATA DRIFT DETECTED: Score={metrics['overall_drift_score']:.3f}"
self.logger.warning(alert)
# π Usage example
def setup_monitoring():
"""Set up monitoring for ML system"""
collector = MLMonitoringCollector()
# π Example: Monitor a prediction
customer_features = {
'tenure': 24,
'MonthlyCharges': 75.5,
'Contract': 'Month-to-month'
}
request_id = collector.collect_prediction_data(
customer_features=customer_features,
prediction=0.75,
model_version="2.1"
)
# π₯οΈ Collect system metrics
system_health = collector.collect_system_metrics()
return collector
Step 2: Real-Time Alerting System (Security System)
What we're doing: Setting up an automated alert system that notifies you when something goes wrong.
# src/monitoring/alerting.py - Your intelligent alert system
import smtplib
import requests
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime
import json
class AlertManager:
"""
This is your smart security system that:
- Watches all the sensors 24/7
- Knows what's normal vs. concerning
- Sends the right alerts to the right people
- Doesn't cry wolf (smart filtering)
"""
def __init__(self, config_file: str = "monitoring/alert_config.json"):
self.logger = logging.getLogger(__name__)
self.config = self._load_alert_config(config_file)
self.alert_history = []
def _load_alert_config(self, config_file):
"""Load alerting configuration"""
default_config = {
"email": {
"enabled": True,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender_email": "ml-alerts@company.com",
"recipients": ["team@company.com"]
},
"slack": {
"enabled": True,
"webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
},
"thresholds": {
"accuracy_critical": 0.75,
"accuracy_warning": 0.85,
"response_time_critical": 5000, # ms
"response_time_warning": 2000,
"cpu_critical": 90,
"memory_critical": 95,
"drift_warning": 0.3,
"drift_critical": 0.5
},
"alert_cooldown": 300 # 5 minutes between similar alerts
}
try:
with open(config_file, 'r') as f:
config = json.load(f)
return {**default_config, **config}
except FileNotFoundError:
self.logger.info("Using default alert configuration")
return default_config
def send_performance_alert(self, metrics: Dict, severity: str = "warning"):
"""
π¨ Send alert about model performance issues
Like calling the manager when customer complaints spike.
"""
# π Check if we should send this alert
if not self._should_send_alert(f"performance_{severity}", metrics):
return
# π Create alert message
if severity == "critical":
emoji = "π΄"
priority = "CRITICAL"
elif severity == "warning":
emoji = "π‘"
priority = "WARNING"
else:
emoji = "π΅"
priority = "INFO"
subject = f"{emoji} ML Model Performance Alert - {priority}"
message = f"""
{emoji} **ML Model Performance Alert**
**Severity:** {priority}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Performance Metrics:**
β’ Accuracy: {metrics.get('accuracy', 'N/A'):.3f}
β’ Precision: {metrics.get('precision', 'N/A'):.3f}
β’ Recall: {metrics.get('recall', 'N/A'):.3f}
β’ F1-Score: {metrics.get('f1_score', 'N/A'):.3f}
**Recommended Actions:**
"""
if metrics.get('accuracy', 1) < self.config['thresholds']['accuracy_critical']:
message += "β’ π¨ Investigate model accuracy drop immediately\n"
message += "β’ π Consider triggering model retraining\n"
message += "β’ π Analyze recent prediction patterns\n"
elif metrics.get('accuracy', 1) < self.config['thresholds']['accuracy_warning']:
message += "β’ π Monitor model performance closely\n"
message += "β’ π Review recent prediction quality\n"
# π€ Send via configured channels
self._send_alert_via_channels(subject, message, severity)
# π Log the alert
self._log_alert("performance", severity, metrics)
def send_system_alert(self, metrics: Dict, severity: str = "warning"):
"""
π₯οΈ Send alert about system resource issues
Like calling maintenance when equipment is overheating.
"""
if not self._should_send_alert(f"system_{severity}", metrics):
return
emoji = "π΄" if severity == "critical" else "π‘"
priority = severity.upper()
subject = f"{emoji} System Resource Alert - {priority}"
message = f"""
{emoji} **System Resource Alert**
**Severity:** {priority}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**System Metrics:**
β’ CPU Usage: {metrics.get('cpu_usage_percent', 'N/A'):.1f}%
β’ Memory Usage: {metrics.get('memory_usage_percent', 'N/A'):.1f}%
β’ Disk Usage: {metrics.get('disk_usage_percent', 'N/A'):.1f}%
β’ Available Memory: {metrics.get('available_memory_gb', 'N/A'):.1f} GB
**Recommended Actions:**
"""
if metrics.get('cpu_usage_percent', 0) > self.config['thresholds']['cpu_critical']:
message += "β’ π₯ Scale up CPU resources immediately\n"
message += "β’ π Investigate high CPU processes\n"
if metrics.get('memory_usage_percent', 0) > self.config['thresholds']['memory_critical']:
message += "β’ πΎ Scale up memory resources\n"
message += "β’ π§Ή Check for memory leaks\n"
self._send_alert_via_channels(subject, message, severity)
self._log_alert("system", severity, metrics)
def send_drift_alert(self, drift_metrics: Dict):
"""
π Send alert about data drift detection
Like noticing that today's customers are very different than usual.
"""
drift_score = drift_metrics.get('overall_drift_score', 0)
if drift_score > self.config['thresholds']['drift_critical']:
severity = "critical"
emoji = "π΄"
elif drift_score > self.config['thresholds']['drift_warning']:
severity = "warning"
emoji = "π‘"
else:
return # No alert needed
if not self._should_send_alert(f"drift_{severity}", drift_metrics):
return
subject = f"{emoji} Data Drift Alert - {severity.upper()}"
message = f"""
{emoji} **Data Drift Detection Alert**
**Severity:** {severity.upper()}
**Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Drift Analysis:**
β’ Overall Drift Score: {drift_score:.3f}
β’ Reference Period: {drift_metrics.get('reference_period', 'N/A')}
β’ Current Period: {drift_metrics.get('current_period', 'N/A')}
**Top Drifting Features:**
"""
# π Show top drifting features
feature_drifts = drift_metrics.get('feature_drifts', {})
sorted_features = sorted(feature_drifts.items(),
key=lambda x: x[1].get('drift_score', 0),
reverse=True)[:5]
for feature, drift_info in sorted_features:
score = drift_info.get('drift_score', 0)
message += f"β’ {feature}: {score:.3f}\n"
message += f"""
**Recommended Actions:**
β’ π Investigate data source changes
β’ π Review feature engineering pipeline
β’ π€ Consider model retraining if drift persists
β’ π Contact data providers to verify data quality
"""
self._send_alert_via_channels(subject, message, severity)
self._log_alert("drift", severity, drift_metrics)
def _should_send_alert(self, alert_type: str, metrics: Dict) -> bool:
"""
π§ Smart alert filtering to avoid spam
Like a smart doorbell that doesn't ring for every leaf that falls.
"""
# π Check cooldown period
current_time = datetime.now()
for alert in self.alert_history:
if (alert['type'] == alert_type and
(current_time - alert['timestamp']).seconds < self.config['alert_cooldown']):
self.logger.info(f"Alert {alert_type} in cooldown period, skipping")
return False
return True
def _send_alert_via_channels(self, subject: str, message: str, severity: str):
"""Send alert through all configured channels"""
# π§ Email alert
if self.config['email']['enabled']:
self._send_email_alert(subject, message)
# π¬ Slack alert
if self.config['slack']['enabled']:
self._send_slack_alert(subject, message, severity)
def _send_email_alert(self, subject: str, message: str):
"""Send email alert"""
try:
msg = MimeMultipart()
msg['From'] = self.config['email']['sender_email']
msg['To'] = ', '.join(self.config['email']['recipients'])
msg['Subject'] = subject
msg.attach(MimeText(message, 'plain'))
# Note: In production, use proper email credentials
self.logger.info(f"π§ Email alert sent: {subject}")
except Exception as e:
self.logger.error(f"β Failed to send email alert: {str(e)}")
def _send_slack_alert(self, subject: str, message: str, severity: str):
"""Send Slack alert"""
try:
color = {
'critical': '#FF0000',
'warning': '#FFA500',
'info': '#0000FF'
}.get(severity, '#808080')
payload = {
'attachments': [{
'color': color,
'title': subject,
'text': message,
'ts': int(datetime.now().timestamp())
}]
}
# Note: In production, use actual Slack webhook
self.logger.info(f"π¬ Slack alert sent: {subject}")
except Exception as e:
self.logger.error(f"β Failed to send Slack alert: {str(e)}")
def _log_alert(self, alert_type: str, severity: str, metrics: Dict):
"""Log alert for history tracking"""
alert_record = {
'timestamp': datetime.now(),
'type': alert_type,
'severity': severity,
'metrics': metrics
}
self.alert_history.append(alert_record)
# Keep only last 100 alerts in memory
if len(self.alert_history) > 100:
self.alert_history = self.alert_history[-100:]
# π Usage example
def setup_alerting():
"""Set up alerting system"""
alert_manager = AlertManager()
# π¨ Example: Send performance alert
poor_performance = {
'accuracy': 0.72, # Below critical threshold
'precision': 0.68,
'recall': 0.75
}
alert_manager.send_performance_alert(poor_performance, severity="critical")
return alert_manager
This monitoring tutorial shows you how to build a comprehensive watchdog system that keeps your ML models healthy and alerts you before problems affect your customers.
β β β β β β β β
β β β’ Drift Det. β β β’ Accuracy β β β’ Resources β β
β β β’ Schema Val. β β β’ Latency β β β’ Services β β
β β β’ Stats Track β β β’ Throughput β β β’ Errors β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β π Metrics β β π¨ Alerting β β π Dashboardβ β
β β Collection ββββββ System ββββββ & Reports β β
β β β β β β β β
β β β’ Prometheus β β β’ Slack/Email β β β’ Grafana β β
β β β’ InfluxDB β β β’ PagerDuty β β β’ Streamlit β β
β β β’ Custom APIs β β β’ Webhooks β β β’ Reports β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
### π Service Structure
bash
services/monitoring/
βββ π monitor_churn_model.py # Main monitoring orchestrator
βββ π continuous_monitor.py # Real-time monitoring
βββ π simple_monitor_churn.py # Basic monitoring script
βββ π Dockerfile # Container configuration
βββ π requirements.txt # Python dependencies
βββ π data_monitoring/
β βββ π init.py
β βββ π drift_detector.py # Data drift detection
β βββ π quality_checker.py # Data quality validation
β βββ π schema_validator.py # Schema validation
βββ π model_monitoring/
β βββ π init.py
β βββ π performance_tracker.py # Model performance tracking
β βββ π accuracy_monitor.py # Accuracy drift detection
β βββ π prediction_monitor.py # Prediction quality analysis
βββ π system_monitoring/
β βββ π init.py
β βββ π health_checker.py # System health monitoring
β βββ π resource_monitor.py # Resource utilization
β βββ π service_monitor.py # Service availability
βββ π alerting/
β βββ π init.py
β βββ π alert_manager.py # Alert management
β βββ π notification_sender.py # Notification delivery
β βββ π escalation_rules.py # Alert escalation logic
βββ π dashboards/
β βββ π grafana_dashboards.py # Grafana dashboard configs
β βββ π streamlit_monitor.py # Custom monitoring dashboard
β βββ π report_generator.py # Automated reports
βββ π metrics/
β βββ π init.py
β βββ π prometheus_metrics.py # Prometheus metric definitions
β βββ π business_metrics.py # Business KPI tracking
β βββ π custom_metrics.py # Custom metric collectors
βββ π config/
βββ π monitoring_config.py # Monitoring configuration
βββ π alert_config.py # Alert thresholds and rules
βββ π dashboard_config.py # Dashboard settings
## π Core Monitoring Implementation
### Main Monitoring Orchestrator
python
monitor_churn_model.py
import pandas as pd
import numpy as np
import mlflow
import logging
import time
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import joblib
import sqlite3
import os
from dataclasses import dataclass
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score
import warnings
warnings.filterwarnings('ignore')
π Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/monitoring.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(name)
@dataclass
class MonitoringMetrics:
"""π Container for monitoring metrics"""
timestamp: datetime
model_version: str
accuracy: float
precision: float
recall: float
f1_score: float
roc_auc: float
prediction_count: int
error_rate: float
avg_response_time: float
data_drift_score: float
concept_drift_score: float
business_impact: float
class ChurnModelMonitor:
"""
π Comprehensive monitoring system for churn prediction model
This class implements real-time monitoring of model performance,
data quality, system health, and business metrics.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the monitoring system
Args:
config: Dictionary containing monitoring configuration
"""
self.config = config
self.mlflow_uri = config.get('mlflow_tracking_uri', 'http://localhost:5000')
self.model_name = config.get('model_name', 'churn-prediction')
self.db_path = config.get('monitoring_db', 'monitoring/churn_monitoring.db')
# π§ Setup MLflow
mlflow.set_tracking_uri(self.mlflow_uri)
# ποΈ Initialize database
self._setup_monitoring_database()
# π Load reference data for drift detection
self.reference_data = self._load_reference_data()
# π€ Load current production model
self.current_model = None
self.model_metadata = {}
self._load_production_model()
# π Initialize metrics collectors
self.metrics_history = []
logger.info("π Churn model monitoring system initialized")
def run_monitoring_cycle(self) -> MonitoringMetrics:
"""
π Execute a complete monitoring cycle
Returns:
MonitoringMetrics: Comprehensive monitoring results
"""
logger.info("π Starting monitoring cycle...")
try:
# 1οΈβ£ Collect recent predictions
recent_predictions = self._collect_recent_predictions()
# 2οΈβ£ Monitor model performance
performance_metrics = self._monitor_model_performance(recent_predictions)
# 3οΈβ£ Detect data drift
drift_metrics = self._detect_data_drift(recent_predictions)
# 4οΈβ£ Monitor system health
system_metrics = self._monitor_system_health()
# 5οΈβ£ Calculate business impact
business_metrics = self._calculate_business_impact(recent_predictions)
# 6οΈβ£ Combine all metrics
monitoring_result = MonitoringMetrics(
timestamp=datetime.now(),
model_version=self.model_metadata.get('version', 'unknown'),
accuracy=performance_metrics.get('accuracy', 0.0),
precision=performance_metrics.get('precision', 0.0),
recall=performance_metrics.get('recall', 0.0),
f1_score=performance_metrics.get('f1_score', 0.0),
roc_auc=performance_metrics.get('roc_auc', 0.0),
prediction_count=len(recent_predictions),
error_rate=system_metrics.get('error_rate', 0.0),
avg_response_time=system_metrics.get('avg_response_time', 0.0),
data_drift_score=drift_metrics.get('data_drift_score', 0.0),
concept_drift_score=drift_metrics.get('concept_drift_score', 0.0),
business_impact=business_metrics.get('revenue_impact', 0.0)
)
# 7οΈβ£ Store metrics
self._store_monitoring_metrics(monitoring_result)
# 8οΈβ£ Check for alerts
self._check_and_send_alerts(monitoring_result)
# 9οΈβ£ Update dashboards
self._update_monitoring_dashboards(monitoring_result)
logger.info("β
Monitoring cycle completed successfully")
return monitoring_result
except Exception as e:
logger.error(f"β Monitoring cycle failed: {str(e)}")
self._send_critical_alert("Monitoring System Failure", str(e))
raise
def _collect_recent_predictions(self, hours_back: int = 1) -> pd.DataFrame:
"""π₯ Collect recent predictions from the production system"""
logger.info(f"π₯ Collecting predictions from last {hours_back} hours...")
try:
# Connect to monitoring database
conn = sqlite3.connect(self.db_path)
# Calculate cutoff time
cutoff_time = datetime.now() - timedelta(hours=hours_back)
# Query recent predictions
query = """
SELECT
prediction_id,
timestamp,
model_version,
input_features,
prediction_probability,
prediction_class,
response_time_ms,
actual_outcome
FROM predictions
WHERE timestamp > ?
ORDER BY timestamp DESC
"""
recent_data = pd.read_sql_query(
query,
conn,
params=[cutoff_time.isoformat()]
)
conn.close()
if len(recent_data) == 0:
logger.warning("β οΈ No recent predictions found")
return pd.DataFrame()
# Parse input features JSON
recent_data['features'] = recent_data['input_features'].apply(
lambda x: json.loads(x) if x else {}
)
logger.info(f"π Collected {len(recent_data)} recent predictions")
return recent_data
except Exception as e:
logger.error(f"β Failed to collect recent predictions: {str(e)}")
return pd.DataFrame()
def _monitor_model_performance(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
"""π Monitor current model performance metrics"""
logger.info("π Monitoring model performance...")
if len(predictions_df) == 0:
logger.warning("β οΈ No predictions available for performance monitoring")
return {}
try:
# Filter predictions with actual outcomes (for performance calculation)
labeled_predictions = predictions_df[
predictions_df['actual_outcome'].notna()
].copy()
if len(labeled_predictions) == 0:
logger.warning("β οΈ No labeled predictions available for performance calculation")
return {
'total_predictions': len(predictions_df),
'labeled_predictions': 0,
'avg_prediction_probability': predictions_df['prediction_probability'].mean(),
'positive_prediction_rate': predictions_df['prediction_class'].mean()
}
# Calculate performance metrics
y_true = labeled_predictions['actual_outcome'].astype(int)
y_pred = labeled_predictions['prediction_class'].astype(int)
y_prob = labeled_predictions['prediction_probability']
# Performance metrics
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, zero_division=0),
'recall': recall_score(y_true, y_pred, zero_division=0),
'f1_score': 2 * precision_score(y_true, y_pred, zero_division=0) * recall_score(y_true, y_pred, zero_division=0) /
(precision_score(y_true, y_pred, zero_division=0) + recall_score(y_true, y_pred, zero_division=0) + 1e-8),
'roc_auc': roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) > 1 else 0.5,
'total_predictions': len(predictions_df),
'labeled_predictions': len(labeled_predictions),
'positive_prediction_rate': y_pred.mean(),
'avg_prediction_probability': y_prob.mean(),
'prediction_confidence': y_prob.std() # Higher std means more confident predictions
}
# Log performance summary
logger.info(f"π Performance Summary:")
logger.info(f" Accuracy: {metrics['accuracy']:.4f}")
logger.info(f" ROC-AUC: {metrics['roc_auc']:.4f}")
logger.info(f" Precision: {metrics['precision']:.4f}")
logger.info(f" Recall: {metrics['recall']:.4f}")
logger.info(f" Total Predictions: {metrics['total_predictions']}")
return metrics
except Exception as e:
logger.error(f"β Performance monitoring failed: {str(e)}")
return {}
def _detect_data_drift(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
"""π Detect data drift in incoming predictions"""
logger.info("π Detecting data drift...")
if len(predictions_df) == 0 or self.reference_data is None:
logger.warning("β οΈ Insufficient data for drift detection")
return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}
try:
from scipy.stats import ks_2samp, chi2_contingency
# Extract features from recent predictions
recent_features = []
for _, row in predictions_df.iterrows():
if row['features']:
recent_features.append(row['features'])
if not recent_features:
logger.warning("β οΈ No feature data available for drift detection")
return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}
recent_features_df = pd.DataFrame(recent_features)
# Calculate drift scores for numerical features
numerical_drift_scores = []
categorical_drift_scores = []
for column in recent_features_df.columns:
if column in self.reference_data.columns:
recent_values = recent_features_df[column].dropna()
reference_values = self.reference_data[column].dropna()
if len(recent_values) == 0 or len(reference_values) == 0:
continue
# For numerical features - use Kolmogorov-Smirnov test
if pd.api.types.is_numeric_dtype(recent_values):
try:
ks_stat, p_value = ks_2samp(recent_values, reference_values)
numerical_drift_scores.append(ks_stat)
except:
continue
# For categorical features - use Chi-square test
else:
try:
# Create contingency table
recent_counts = recent_values.value_counts()
reference_counts = reference_values.value_counts()
# Align categories
all_categories = set(recent_counts.index) | set(reference_counts.index)
recent_aligned = [recent_counts.get(cat, 0) for cat in all_categories]
reference_aligned = [reference_counts.get(cat, 0) for cat in all_categories]
if sum(recent_aligned) > 0 and sum(reference_aligned) > 0:
contingency_table = np.array([recent_aligned, reference_aligned])
chi2_stat, p_value, _, _ = chi2_contingency(contingency_table)
categorical_drift_scores.append(min(1.0, chi2_stat / 100)) # Normalize
except:
continue
# Calculate overall drift scores
data_drift_score = np.mean(numerical_drift_scores + categorical_drift_scores) if (numerical_drift_scores + categorical_drift_scores) else 0.0
# Concept drift detection (simplified)
concept_drift_score = 0.0
if 'actual_outcome' in predictions_df.columns:
labeled_data = predictions_df[predictions_df['actual_outcome'].notna()]
if len(labeled_data) > 10: # Need sufficient data
recent_churn_rate = labeled_data['actual_outcome'].mean()
reference_churn_rate = self.reference_data.get('Churn', pd.Series([0.5])).mean()
concept_drift_score = abs(recent_churn_rate - reference_churn_rate)
drift_metrics = {
'data_drift_score': min(1.0, data_drift_score),
'concept_drift_score': min(1.0, concept_drift_score),
'numerical_features_drift': len(numerical_drift_scores),
'categorical_features_drift': len(categorical_drift_scores),
'avg_numerical_drift': np.mean(numerical_drift_scores) if numerical_drift_scores else 0.0,
'avg_categorical_drift': np.mean(categorical_drift_scores) if categorical_drift_scores else 0.0
}
logger.info(f"π Drift Detection Results:")
logger.info(f" Data Drift Score: {drift_metrics['data_drift_score']:.4f}")
logger.info(f" Concept Drift Score: {drift_metrics['concept_drift_score']:.4f}")
return drift_metrics
except Exception as e:
logger.error(f"β Drift detection failed: {str(e)}")
return {'data_drift_score': 0.0, 'concept_drift_score': 0.0}
### System Health Monitoring
python
def _monitor_system_health(self) -> Dict[str, float]:
"""π₯οΈ Monitor system health and resource utilization"""
logger.info("π₯οΈ Monitoring system health...")
try:
import psutil
# CPU and Memory metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Network I/O
network = psutil.net_io_counters()
# Check service availability
service_health = self._check_service_availability()
# Database health
db_health = self._check_database_health()
# Calculate error rate from recent logs
error_rate = self._calculate_error_rate()
# Average response time from recent requests
avg_response_time = self._calculate_avg_response_time()
system_metrics = {
'cpu_usage_percent': cpu_percent,
'memory_usage_percent': memory.percent,
'memory_available_gb': memory.available / (1024**3),
'disk_usage_percent': disk.percent,
'disk_free_gb': disk.free / (1024**3),
'network_bytes_sent': network.bytes_sent,
'network_bytes_recv': network.bytes_recv,
'mlflow_service_healthy': service_health.get('mlflow', False),
'api_service_healthy': service_health.get('api', False),
'database_healthy': db_health,
'error_rate': error_rate,
'avg_response_time': avg_response_time
}
logger.info(f"π₯οΈ System Health Summary:")
logger.info(f" CPU Usage: {cpu_percent:.1f}%")
logger.info(f" Memory Usage: {memory.percent:.1f}%")
logger.info(f" Disk Usage: {disk.percent:.1f}%")
logger.info(f" Error Rate: {error_rate:.4f}")
return system_metrics
except Exception as e:
logger.error(f"β System health monitoring failed: {str(e)}")
return {'error_rate': 1.0, 'avg_response_time': 999.0}
def _check_service_availability(self) -> Dict[str, bool]:
"""π Check availability of critical services"""
services = {
'mlflow': f"{self.mlflow_uri}/health",
'api': "http://localhost:8000/health",
'grafana': "http://localhost:3000/api/health"
}
service_status = {}
for service_name, url in services.items():
try:
response = requests.get(url, timeout=5)
service_status[service_name] = response.status_code == 200
except:
service_status[service_name] = False
return service_status
def _calculate_business_impact(self, predictions_df: pd.DataFrame) -> Dict[str, float]:
"""πΌ Calculate business impact metrics"""
logger.info("πΌ Calculating business impact...")
if len(predictions_df) == 0:
return {'revenue_impact': 0.0}
try:
# Business assumptions from config
avg_customer_value = self.config.get('avg_customer_value', 1000)
retention_cost = self.config.get('retention_cost', 100)
# Count predictions by risk level
high_risk_predictions = (predictions_df['prediction_probability'] > 0.7).sum()
medium_risk_predictions = ((predictions_df['prediction_probability'] > 0.4) &
(predictions_df['prediction_probability'] <= 0.7)).sum()
low_risk_predictions = (predictions_df['prediction_probability'] <= 0.4).sum()
# Estimate business impact
potential_churners_identified = high_risk_predictions + (medium_risk_predictions * 0.5)
estimated_revenue_at_risk = potential_churners_identified * avg_customer_value
estimated_retention_costs = high_risk_predictions * retention_cost
# Calculate ROI (simplified)
# Assume 70% retention success rate for high-risk interventions
estimated_customers_saved = high_risk_predictions * 0.7
estimated_revenue_saved = estimated_customers_saved * avg_customer_value
roi = (estimated_revenue_saved - estimated_retention_costs) / max(estimated_retention_costs, 1)
business_metrics = {
'total_predictions': len(predictions_df),
'high_risk_customers': high_risk_predictions,
'medium_risk_customers': medium_risk_predictions,
'low_risk_customers': low_risk_predictions,
'potential_churners_identified': potential_churners_identified,
'estimated_revenue_at_risk': estimated_revenue_at_risk,
'estimated_retention_costs': estimated_retention_costs,
'estimated_revenue_saved': estimated_revenue_saved,
'estimated_roi': roi,
'revenue_impact': estimated_revenue_saved - estimated_retention_costs
}
logger.info(f"πΌ Business Impact Summary:")
logger.info(f" High Risk Customers: {high_risk_predictions}")
logger.info(f" Revenue at Risk: ${estimated_revenue_at_risk:,.0f}")
logger.info(f" Estimated ROI: {roi:.2f}")
return business_metrics
except Exception as e:
logger.error(f"β Business impact calculation failed: {str(e)}")
return {'revenue_impact': 0.0}
def _store_monitoring_metrics(self, metrics: MonitoringMetrics):
"""πΎ Store monitoring metrics in database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Insert monitoring metrics
cursor.execute("""
INSERT INTO monitoring_metrics (
timestamp, model_version, accuracy, precision, recall, f1_score,
roc_auc, prediction_count, error_rate, avg_response_time,
data_drift_score, concept_drift_score, business_impact
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
metrics.timestamp.isoformat(),
metrics.model_version,
metrics.accuracy,
metrics.precision,
metrics.recall,
metrics.f1_score,
metrics.roc_auc,
metrics.prediction_count,
metrics.error_rate,
metrics.avg_response_time,
metrics.data_drift_score,
metrics.concept_drift_score,
metrics.business_impact
))
conn.commit()
conn.close()
logger.info("πΎ Monitoring metrics stored successfully")
except Exception as e:
logger.error(f"β Failed to store monitoring metrics: {str(e)}")
#mlopszoomcamp
This first part covers the core monitoring architecture, performance tracking, and data drift detection. The second part will focus on alerting systems, dashboard integration, and automated reporting capabilities.
Top comments (0)