📊 Dashboards & Advanced Monitoring Tutorial: Your ML Mission Control Center
📋 What You'll Learn
In this tutorial, you'll discover how to:
- 📊 Build interactive dashboards for ML monitoring (mission control center)
- 📈 Create business intelligence reports (executive summaries)
- 🎯 Set up advanced alerting rules (smart early warning)
- 📱 Configure mobile notifications (alerts on the go)
- 🔄 Implement automated responses (self-healing systems)
🤔 Why Do We Need Advanced Monitoring?
Imagine you're running a chain of restaurants and need:
- A central command center to see all locations (dashboards)
- Executive reports for decision-making (business intelligence)
- Smart alerts that know the difference between urgent and routine (intelligent alerting)
- Ability to fix small problems automatically (automated responses)
- Real-time visibility into customer satisfaction (business metrics)
Advanced monitoring gives you superhuman oversight of your ML systems!
🏛️ Understanding the Mission Control Center
Think of your monitoring setup like NASA's mission control:
🖥️ The Big Screens (Dashboards)
📺 Mission Control Displays
├── 🎯 Main Screen: Overall system health
├── 📊 Left Screen: Model performance metrics
├── 📈 Right Screen: Business impact metrics
├── 🚨 Alert Panel: Current alerts and warnings
└── 📱 Mobile View: Key metrics on the go
👥 The Control Room Roles
👥 Monitoring Team
├── 🎯 Data Scientist: Model performance
├── 💻 DevOps Engineer: System health
├── 📊 Business Analyst: Impact metrics
├── 🚨 On-call Engineer: Emergency response
└── 📈 Manager: Strategic overview
🎓 Step-by-Step Tutorial: Building Your Mission Control
Step 1: Creating Interactive Dashboards (The Big Screens)
What we're doing: Building beautiful, interactive dashboards that show everything at a glance.
# src/monitoring/dashboard_builder.py - Your mission control display creator
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd
import streamlit as st
from datetime import datetime, timedelta
import numpy as np
class MLDashboard:
"""
This is your mission control display builder that:
- Creates beautiful interactive charts
- Updates in real-time
- Shows different views for different people
- Makes complex data easy to understand
"""
def __init__(self, data_source):
self.data_source = data_source
self.logger = logging.getLogger(__name__)
def create_executive_dashboard(self):
"""
👔 Executive Dashboard: High-level business view
Like the CEO's cockpit - shows only what matters for big decisions.
"""
st.set_page_config(page_title="ML Executive Dashboard", layout="wide")
st.title("🎯 ML Executive Dashboard")
st.markdown("### Real-time AI Performance & Business Impact")
# 📊 Key Performance Indicators (KPIs)
col1, col2, col3, col4 = st.columns(4)
with col1:
self._create_kpi_card(
title="Model Accuracy",
value="94.2%",
change="+2.1%",
positive=True,
icon="🎯"
)
with col2:
self._create_kpi_card(
title="Customers Saved",
value="1,247",
change="+156",
positive=True,
icon="💰"
)
with col3:
self._create_kpi_card(
title="Revenue Protected",
value="$94.5K",
change="+$12.3K",
positive=True,
icon="📈"
)
with col4:
self._create_kpi_card(
title="System Health",
value="99.8%",
change="-0.1%",
positive=False,
icon="🖥️"
)
# 📈 Business Impact Chart
st.subheader("📈 Monthly Business Impact")
business_chart = self._create_business_impact_chart()
st.plotly_chart(business_chart, use_container_width=True)
# 🎯 Model Performance Trend
col1, col2 = st.columns(2)
with col1:
st.subheader("🎯 Model Performance Trend")
performance_chart = self._create_performance_trend_chart()
st.plotly_chart(performance_chart, use_container_width=True)
with col2:
st.subheader("🚨 Active Alerts")
self._create_alerts_summary()
def create_technical_dashboard(self):
"""
🔧 Technical Dashboard: Detailed system view
Like the engineer's control panel - shows all the technical details.
"""
st.title("🔧 ML Technical Dashboard")
st.markdown("### Detailed System Monitoring & Diagnostics")
# 📊 Real-time metrics
st.subheader("⚡ Real-time System Metrics")
col1, col2, col3 = st.columns(3)
with col1:
cpu_gauge = self._create_gauge_chart("CPU Usage", 67, "🔥")
st.plotly_chart(cpu_gauge, use_container_width=True)
with col2:
memory_gauge = self._create_gauge_chart("Memory Usage", 73, "💾")
st.plotly_chart(memory_gauge, use_container_width=True)
with col3:
response_gauge = self._create_gauge_chart("Response Time", 45, "⚡", max_val=1000, unit="ms")
st.plotly_chart(response_gauge, use_container_width=True)
# 📈 Detailed performance charts
st.subheader("📈 Detailed Performance Analysis")
tab1, tab2, tab3, tab4 = st.tabs(["Model Performance", "Data Quality", "System Health", "Predictions"])
with tab1:
self._create_model_performance_tab()
with tab2:
self._create_data_quality_tab()
with tab3:
self._create_system_health_tab()
with tab4:
self._create_predictions_tab()
def _create_kpi_card(self, title, value, change, positive, icon):
"""Create a KPI card widget"""
color = "green" if positive else "red"
arrow = "↗️" if positive else "↘️"
st.markdown(f"""
<div style="
border: 1px solid #ddd;
border-radius: 10px;
padding: 20px;
text-align: center;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
">
<h1 style="margin: 0; color: #333;">{icon}</h1>
<h3 style="margin: 10px 0 5px 0; color: #666;">{title}</h3>
<h2 style="margin: 0; color: #333;">{value}</h2>
<p style="margin: 5px 0 0 0; color: {color};">{arrow} {change}</p>
</div>
""", unsafe_allow_html=True)
def _create_business_impact_chart(self):
"""Create business impact visualization"""
# 📊 Sample data (in real app, load from data source)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
revenue_saved = [45.2, 52.1, 48.7, 61.3, 58.9, 67.4]
customers_retained = [892, 1045, 967, 1234, 1187, 1356]
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Revenue line
fig.add_trace(
go.Scatter(x=months, y=revenue_saved, name="Revenue Saved ($K)",
line=dict(color="#1f77b4", width=3)),
secondary_y=False,
)
# Customers bar
fig.add_trace(
go.Bar(x=months, y=customers_retained, name="Customers Retained",
marker_color="#ff7f0e", opacity=0.7),
secondary_y=True,
)
fig.update_xaxes(title_text="Month")
fig.update_yaxes(title_text="Revenue Saved ($K)", secondary_y=False)
fig.update_yaxes(title_text="Customers Retained", secondary_y=True)
fig.update_layout(
title="Monthly Business Impact",
hovermode="x unified",
height=400
)
return fig
def _create_performance_trend_chart(self):
"""Create model performance trend chart"""
# 📈 Sample performance data
dates = pd.date_range(start='2024-01-01', end='2024-01-30', freq='D')
accuracy = np.random.normal(0.94, 0.02, len(dates))
precision = np.random.normal(0.91, 0.025, len(dates))
recall = np.random.normal(0.88, 0.03, len(dates))
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates, y=accuracy, name='Accuracy',
line=dict(color='#1f77b4', width=2)
))
fig.add_trace(go.Scatter(
x=dates, y=precision, name='Precision',
line=dict(color='#ff7f0e', width=2)
))
fig.add_trace(go.Scatter(
x=dates, y=recall, name='Recall',
line=dict(color='#2ca02c', width=2)
))
# Add threshold lines
fig.add_hline(y=0.85, line_dash="dash", line_color="red",
annotation_text="Critical Threshold")
fig.update_layout(
title="Model Performance Trend (Last 30 Days)",
xaxis_title="Date",
yaxis_title="Score",
height=300,
yaxis=dict(range=[0.8, 1.0])
)
return fig
def _create_gauge_chart(self, title, value, icon, max_val=100, unit="%"):
"""Create a gauge chart for metrics"""
fig = go.Figure(go.Indicator(
mode = "gauge+number",
value = value,
title = {'text': f"{icon} {title}"},
domain = {'x': [0, 1], 'y': [0, 1]},
number = {'suffix': f" {unit}"},
gauge = {
'axis': {'range': [None, max_val]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, max_val*0.5], 'color': "lightgray"},
{'range': [max_val*0.5, max_val*0.8], 'color': "yellow"},
{'range': [max_val*0.8, max_val], 'color': "red"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': max_val*0.9
}
}
))
fig.update_layout(height=250)
return fig
def _create_alerts_summary(self):
"""Create alerts summary widget"""
alerts = [
{"level": "🟡", "message": "Data drift detected in tenure feature", "time": "5m ago"},
{"level": "🔵", "message": "Model retrained successfully", "time": "2h ago"},
{"level": "🟢", "message": "All systems operational", "time": "1d ago"}
]
for alert in alerts:
st.markdown(f"""
<div style="
border-left: 4px solid #ddd;
padding: 10px;
margin: 5px 0;
background: #f9f9f9;
">
{alert['level']} {alert['message']} <small style="color: #666;">({alert['time']})</small>
</div>
""", unsafe_allow_html=True)
def _create_model_performance_tab(self):
"""Create detailed model performance tab"""
col1, col2 = st.columns(2)
with col1:
# Confusion Matrix Heatmap
st.subheader("🎯 Confusion Matrix")
confusion_data = np.array([[1456, 78], [92, 374]])
fig = px.imshow(
confusion_data,
labels=dict(x="Predicted", y="Actual", color="Count"),
x=['No Churn', 'Churn'],
y=['No Churn', 'Churn'],
color_continuous_scale='Blues',
text_auto=True
)
fig.update_layout(height=300)
st.plotly_chart(fig, use_container_width=True)
with col2:
# ROC Curve
st.subheader("📈 ROC Curve")
# Sample ROC data
fpr = np.array([0, 0.1, 0.3, 0.5, 1])
tpr = np.array([0, 0.7, 0.85, 0.95, 1])
fig = go.Figure()
fig.add_trace(go.Scatter(x=fpr, y=tpr, name='ROC Curve (AUC=0.91)'))
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], name='Random', line_dash='dash'))
fig.update_layout(
title="ROC Curve",
xaxis_title="False Positive Rate",
yaxis_title="True Positive Rate",
height=300
)
st.plotly_chart(fig, use_container_width=True)
def _create_data_quality_tab(self):
"""Create data quality monitoring tab"""
st.subheader("📊 Data Quality Metrics")
# Data drift visualization
features = ['tenure', 'MonthlyCharges', 'TotalCharges', 'Contract', 'PaymentMethod']
drift_scores = [0.12, 0.34, 0.08, 0.45, 0.23]
fig = go.Figure(data=go.Bar(
x=features,
y=drift_scores,
marker_color=['green' if score < 0.3 else 'orange' if score < 0.5 else 'red'
for score in drift_scores]
))
fig.add_hline(y=0.3, line_dash="dash", line_color="orange",
annotation_text="Warning Threshold")
fig.add_hline(y=0.5, line_dash="dash", line_color="red",
annotation_text="Critical Threshold")
fig.update_layout(
title="Feature Drift Scores",
xaxis_title="Features",
yaxis_title="Drift Score",
height=400
)
st.plotly_chart(fig, use_container_width=True)
def _create_system_health_tab(self):
"""Create system health monitoring tab"""
st.subheader("🖥️ System Health Overview")
# System metrics over time
hours = list(range(24))
cpu_usage = np.random.normal(45, 15, 24)
memory_usage = np.random.normal(60, 10, 24)
response_time = np.random.normal(150, 30, 24)
fig = make_subplots(rows=3, cols=1,
subplot_titles=['CPU Usage (%)', 'Memory Usage (%)', 'Response Time (ms)'],
vertical_spacing=0.1)
fig.add_trace(go.Scatter(x=hours, y=cpu_usage, name='CPU', line_color='red'), row=1, col=1)
fig.add_trace(go.Scatter(x=hours, y=memory_usage, name='Memory', line_color='blue'), row=2, col=1)
fig.add_trace(go.Scatter(x=hours, y=response_time, name='Response Time', line_color='green'), row=3, col=1)
fig.update_layout(height=600, showlegend=False)
fig.update_xaxes(title_text="Hour of Day", row=3, col=1)
st.plotly_chart(fig, use_container_width=True)
def _create_predictions_tab(self):
"""Create predictions analysis tab"""
st.subheader("🔮 Prediction Analysis")
col1, col2 = st.columns(2)
with col1:
# Prediction distribution
st.write("**Prediction Score Distribution**")
scores = np.random.beta(2, 5, 1000) # Sample prediction scores
fig = px.histogram(scores, nbins=30, title="Churn Probability Distribution")
fig.update_layout(
xaxis_title="Churn Probability",
yaxis_title="Count",
height=300
)
st.plotly_chart(fig, use_container_width=True)
with col2:
# Prediction volume over time
st.write("**Daily Prediction Volume**")
dates = pd.date_range(start='2024-01-01', end='2024-01-30', freq='D')
volumes = np.random.poisson(500, len(dates))
fig = go.Figure(data=go.Bar(x=dates, y=volumes))
fig.update_layout(
title="Daily Predictions",
xaxis_title="Date",
yaxis_title="Number of Predictions",
height=300
)
st.plotly_chart(fig, use_container_width=True)
# 🏭 Usage example
def create_dashboard_app():
"""Create the main dashboard application"""
dashboard = MLDashboard(data_source="your_data_source")
# 📊 Sidebar navigation
st.sidebar.title("🎯 ML Monitoring")
dashboard_type = st.sidebar.selectbox(
"Choose Dashboard",
["Executive View", "Technical View"]
)
if dashboard_type == "Executive View":
dashboard.create_executive_dashboard()
else:
dashboard.create_technical_dashboard()
if __name__ == "__main__":
create_dashboard_app()
Step 2: Business Intelligence Reports (Executive Summaries)
What we're doing: Creating automated reports that translate technical metrics into business insights.
# src/monitoring/business_intelligence.py - Your BI report generator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import numpy as np
class BusinessIntelligenceReporter:
"""
This is your business translator that:
- Converts technical metrics into business language
- Creates executive summaries
- Calculates ROI and business impact
- Generates automated reports
"""
def __init__(self, data_source):
self.data_source = data_source
self.logger = logging.getLogger(__name__)
def generate_monthly_executive_report(self, month: int, year: int):
"""
📊 Generate comprehensive monthly executive report
Like creating a board presentation that shows business value.
"""
self.logger.info(f"📊 Generating executive report for {month}/{year}")
# 📈 Collect business metrics
metrics = self._calculate_business_metrics(month, year)
# 📝 Generate report
report = f"""
# 📊 ML System Executive Report - {month}/{year}
## 🎯 Executive Summary
Our machine learning system has **protected ${metrics['revenue_protected']:,.0f}** in revenue this month by accurately predicting and preventing **{metrics['customers_saved']:,}** customer churns.
### 📈 Key Achievements
- **Model Accuracy**: {metrics['accuracy']:.1%} (Target: 85%)
- **Revenue Protected**: ${metrics['revenue_protected']:,.0f}
- **Customers Retained**: {metrics['customers_saved']:,}
- **System Uptime**: {metrics['uptime']:.2%}
- **ROI**: {metrics['roi']:.1f}x
### 💰 Business Impact
The ML system delivered exceptional value this month:
1. **Revenue Protection**: Successfully identified {metrics['high_risk_customers']:,} high-risk customers, resulting in targeted retention campaigns that saved ${metrics['revenue_protected']:,.0f}.
2. **Cost Efficiency**: Reduced unnecessary retention spending by {metrics['cost_reduction']:.1%} through accurate targeting.
3. **Customer Experience**: Maintained high satisfaction by proactively addressing at-risk customers.
### 🎯 Model Performance
Our churn prediction model performed excellently:
- **Accuracy**: {metrics['accuracy']:.1%} (↑ {metrics['accuracy_change']:+.1%} from last month)
- **Precision**: {metrics['precision']:.1%} (correctly identified {metrics['precision']:.0%} of predicted churners)
- **Recall**: {metrics['recall']:.1%} (caught {metrics['recall']:.0%} of actual churners)
### 🚨 Areas of Attention
{self._generate_attention_areas(metrics)}
### 📊 Recommendations
{self._generate_recommendations(metrics)}
---
*Report generated automatically by ML Monitoring System*
*Next report: {self._get_next_report_date()}*
"""
# 💾 Save report
report_path = f"reports/executive_report_{year}_{month:02d}.md"
with open(report_path, 'w') as f:
f.write(report)
self.logger.info(f"📊 Executive report saved to {report_path}")
return report, metrics
def _calculate_business_metrics(self, month: int, year: int):
"""Calculate business-focused metrics"""
# 💰 Business calculations (simplified examples)
# Customer value calculations
avg_customer_value = 1200 # Average annual customer value
churn_predictions = 2500 # Total predictions made
true_positives = 450 # Correctly identified churners
false_positives = 180 # False alarms
true_negatives = 1750 # Correctly identified loyal customers
false_negatives = 120 # Missed churners
# Calculate metrics
accuracy = (true_positives + true_negatives) / churn_predictions
precision = true_positives / (true_positives + false_positives)
recall = true_positives / (true_positives + false_negatives)
# Business impact
customers_saved = true_positives # Customers we correctly identified and saved
revenue_protected = customers_saved * avg_customer_value
# Cost analysis
retention_cost_per_customer = 50
total_retention_cost = (true_positives + false_positives) * retention_cost_per_customer
roi = revenue_protected / total_retention_cost if total_retention_cost > 0 else 0
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'customers_saved': customers_saved,
'revenue_protected': revenue_protected,
'high_risk_customers': true_positives + false_positives,
'uptime': 99.8, # System uptime percentage
'roi': roi,
'cost_reduction': 23.5, # Percentage cost reduction
'accuracy_change': 2.1, # Change from previous month
'total_predictions': churn_predictions
}
def _generate_attention_areas(self, metrics):
"""Generate areas that need attention"""
attention_areas = []
if metrics['accuracy'] < 0.85:
attention_areas.append("• **Model Accuracy**: Below target threshold, consider retraining")
if metrics['roi'] < 3.0:
attention_areas.append("• **ROI**: Lower than expected, review retention campaign costs")
if metrics['uptime'] < 99.5:
attention_areas.append("• **System Reliability**: Uptime below SLA, investigate infrastructure")
if not attention_areas:
return "✅ All metrics within target ranges. No immediate attention required."
return "\n".join(attention_areas)
def _generate_recommendations(self, metrics):
"""Generate actionable recommendations"""
recommendations = []
if metrics['accuracy'] > 0.90:
recommendations.append("• **Scale Success**: Consider expanding ML system to other customer segments")
if metrics['roi'] > 5.0:
recommendations.append("• **Investment Opportunity**: High ROI indicates potential for increased investment")
recommendations.append("• **Continuous Improvement**: Schedule monthly model performance review")
recommendations.append("• **Data Quality**: Maintain data quality monitoring to ensure consistent performance")
return "\n".join(recommendations)
def _get_next_report_date(self):
"""Get next report generation date"""
next_month = datetime.now().replace(day=1) + timedelta(days=32)
return next_month.replace(day=1).strftime("%B %d, %Y")
def create_roi_analysis(self):
"""
💰 Create detailed ROI analysis
Shows executives exactly how much money the ML system is making/saving.
"""
# 📊 Sample ROI data
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
investment = [25000, 22000, 23000, 26000, 24000, 25000] # Monthly costs
returns = [95000, 110000, 102000, 135000, 118000, 142000] # Revenue protected
roi = [(r - i) / i * 100 for r, i in zip(returns, investment)]
# 📈 Create ROI visualization
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Investment vs Returns
x = range(len(months))
width = 0.35
ax1.bar([i - width/2 for i in x], investment, width, label='Investment', color='red', alpha=0.7)
ax1.bar([i + width/2 for i in x], returns, width, label='Returns', color='green', alpha=0.7)
ax1.set_xlabel('Month')
ax1.set_ylabel('Amount ($)')
ax1.set_title('ML Investment vs Returns')
ax1.set_xticks(x)
ax1.set_xticklabels(months)
ax1.legend()
ax1.grid(True, alpha=0.3)
# ROI trend
ax2.plot(months, roi, marker='o', linewidth=3, markersize=8, color='blue')
ax2.fill_between(months, roi, alpha=0.3, color='blue')
ax2.set_xlabel('Month')
ax2.set_ylabel('ROI (%)')
ax2.set_title('Return on Investment Trend')
ax2.grid(True, alpha=0.3)
# Add ROI target line
ax2.axhline(y=300, color='red', linestyle='--', label='Target ROI (300%)')
ax2.legend()
plt.tight_layout()
plt.savefig('reports/figures/roi_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
# 📊 Calculate summary statistics
total_investment = sum(investment)
total_returns = sum(returns)
overall_roi = (total_returns - total_investment) / total_investment * 100
summary = f"""
💰 **ROI Analysis Summary**
- **Total Investment**: ${total_investment:,}
- **Total Returns**: ${total_returns:,}
- **Net Profit**: ${total_returns - total_investment:,}
- **Overall ROI**: {overall_roi:.1f}%
- **Average Monthly ROI**: {np.mean(roi):.1f}%
- **Best Month**: {months[np.argmax(roi)]} ({max(roi):.1f}% ROI)
"""
return summary
# 🏭 Usage example
def generate_business_reports():
"""Generate comprehensive business intelligence reports"""
bi_reporter = BusinessIntelligenceReporter("your_data_source")
# 📊 Generate monthly executive report
report, metrics = bi_reporter.generate_monthly_executive_report(1, 2024)
# 💰 Generate ROI analysis
roi_summary = bi_reporter.create_roi_analysis()
print("📊 Business reports generated successfully!")
print(roi_summary)
return report, roi_summary
This advanced monitoring tutorial shows you how to create a complete mission control center for your ML systems, giving you the visibility and control needed to run ML in production successfully. The combination of real-time dashboards, intelligent alerting, and business intelligence ensures you can maximize the value of your ML investments while maintaining reliability.
Step 3: Intelligent Alert System (The Early Warning System)
What we're doing: Building a smart alert system that knows the difference between real emergencies and false alarms.
# src/monitoring/intelligent_alerts.py - Your smart alert system
import json
import logging
import smtplib
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from enum import Enum
from dataclasses import dataclass
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertSeverity(Enum):
"""
🚨 Alert severity levels - like a traffic light system
LOW = Green light: Everything's okay, just letting you know
MEDIUM = Yellow light: Pay attention, something might need fixing
HIGH = Orange light: Take action soon, this could become a problem
CRITICAL = Red light: Drop everything and fix this NOW!
"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertType(Enum):
"""📋 Types of alerts"""
PERFORMANCE_DEGRADATION = "performance_degradation"
DATA_DRIFT = "data_drift"
SYSTEM_FAILURE = "system_failure"
HIGH_ERROR_RATE = "high_error_rate"
BUSINESS_IMPACT = "business_impact"
MODEL_ACCURACY_DROP = "model_accuracy_drop"
RESOURCE_EXHAUSTION = "resource_exhaustion"
SERVICE_UNAVAILABLE = "service_unavailable"
@dataclass
class Alert:
"""📨 Alert data structure"""
alert_id: str
alert_type: AlertType
severity: AlertSeverity
title: str
message: str
metrics: Dict[str, Any]
timestamp: datetime
acknowledged: bool = False
resolved: bool = False
resolution_notes: Optional[str] = None
class AlertManager:
"""
🚨 Comprehensive alert management system
Handles alert generation, escalation, notification delivery,
and resolution tracking for the MLOps monitoring system.
"""
def __init__(self, config: Dict[str, Any]):
"""Initialize alert manager with configuration"""
self.config = config
self.logger = logging.getLogger(__name__)
# Alert thresholds
self.thresholds = {
'accuracy_drop': config.get('accuracy_drop_threshold', 0.05),
'data_drift': config.get('data_drift_threshold', 0.3),
'error_rate': config.get('error_rate_threshold', 0.1),
'response_time': config.get('response_time_threshold', 1000), # ms
'cpu_usage': config.get('cpu_usage_threshold', 80), # %
'memory_usage': config.get('memory_usage_threshold', 85), # %
'disk_usage': config.get('disk_usage_threshold', 90), # %
}
# Notification channels
self.notification_config = {
'slack_webhook': config.get('slack_webhook_url'),
'email_smtp': config.get('email_smtp_server'),
'email_user': config.get('email_username'),
'email_password': config.get('email_password'),
'pagerduty_key': config.get('pagerduty_integration_key'),
'teams_webhook': config.get('teams_webhook_url')
}
# Active alerts tracking
self.active_alerts: Dict[str, Alert] = {}
self.logger.info("🚨 Alert manager initialized")
def check_and_generate_alerts(self, metrics: 'MonitoringMetrics') -> List[Alert]:
"""
🔍 Check metrics against thresholds and generate alerts
Args:
metrics: Current monitoring metrics
Returns:
List of generated alerts
"""
generated_alerts = []
# 📈 Model Performance Alerts
performance_alerts = self._check_performance_alerts(metrics)
generated_alerts.extend(performance_alerts)
# 🔍 Data Quality Alerts
data_alerts = self._check_data_alerts(metrics)
generated_alerts.extend(data_alerts)
# 🖥️ System Health Alerts
system_alerts = self._check_system_alerts(metrics)
generated_alerts.extend(system_alerts)
# 💼 Business Impact Alerts
business_alerts = self._check_business_alerts(metrics)
generated_alerts.extend(business_alerts)
# Process and send alerts
for alert in generated_alerts:
self._process_alert(alert)
return generated_alerts
def _check_performance_alerts(self, metrics: 'MonitoringMetrics') -> List[Alert]:
"""📈 Check for model performance degradation"""
alerts = []
# Accuracy drop alert
if hasattr(metrics, 'accuracy') and metrics.accuracy > 0:
# Get historical accuracy for comparison
historical_accuracy = self._get_historical_accuracy()
if historical_accuracy and (historical_accuracy - metrics.accuracy) > self.thresholds['accuracy_drop']:
alert = Alert(
alert_id=f"accuracy_drop_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.MODEL_ACCURACY_DROP,
severity=AlertSeverity.HIGH,
title="🔴 Model Accuracy Degradation Detected",
message=f"""
Model accuracy has dropped significantly:
📊 Current Accuracy: {metrics.accuracy:.4f}
📈 Historical Average: {historical_accuracy:.4f}
📉 Drop: {historical_accuracy - metrics.accuracy:.4f}
🎯 Threshold: {self.thresholds['accuracy_drop']:.4f}
🤖 Model Version: {metrics.model_version}
⏰ Detected At: {metrics.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
🔧 Recommended Actions:
• Investigate data quality issues
• Check for concept drift
• Consider model retraining
• Review recent predictions
""",
metrics={
'current_accuracy': metrics.accuracy,
'historical_accuracy': historical_accuracy,
'accuracy_drop': historical_accuracy - metrics.accuracy,
'model_version': metrics.model_version
},
timestamp=metrics.timestamp
)
alerts.append(alert)
# ROC-AUC drop alert
if hasattr(metrics, 'roc_auc') and metrics.roc_auc > 0:
if metrics.roc_auc < 0.75: # Below acceptable threshold
alert = Alert(
alert_id=f"roc_auc_low_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.PERFORMANCE_DEGRADATION,
severity=AlertSeverity.MEDIUM,
title="⚠️ Low ROC-AUC Score Detected",
message=f"""
Model ROC-AUC score is below acceptable threshold:
📊 Current ROC-AUC: {metrics.roc_auc:.4f}
🎯 Minimum Threshold: 0.75
This indicates the model may not be discriminating well between churners and non-churners.
""",
metrics={'roc_auc': metrics.roc_auc},
timestamp=metrics.timestamp
)
alerts.append(alert)
return alerts
def _check_data_alerts(self, metrics: 'MonitoringMetrics') -> List[Alert]:
"""🔍 Check for data quality and drift issues"""
alerts = []
# Data drift alert
if hasattr(metrics, 'data_drift_score') and metrics.data_drift_score > self.thresholds['data_drift']:
severity = AlertSeverity.HIGH if metrics.data_drift_score > 0.6 else AlertSeverity.MEDIUM
alert = Alert(
alert_id=f"data_drift_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.DATA_DRIFT,
severity=severity,
title="🔄 Significant Data Drift Detected",
message=f"""
Significant data drift detected in incoming data:
📊 Data Drift Score: {metrics.data_drift_score:.4f}
🎯 Threshold: {self.thresholds['data_drift']:.4f}
📈 Concept Drift: {getattr(metrics, 'concept_drift_score', 0):.4f}
📋 Impact:
• Model predictions may become less reliable
• Business metrics could be affected
• Model retraining may be required
🔧 Recommended Actions:
• Investigate data source changes
• Review feature distributions
• Consider immediate model retraining
• Update monitoring thresholds if appropriate
""",
metrics={
'data_drift_score': metrics.data_drift_score,
'concept_drift_score': getattr(metrics, 'concept_drift_score', 0),
'threshold': self.thresholds['data_drift']
},
timestamp=metrics.timestamp
)
alerts.append(alert)
# Low prediction volume alert
if hasattr(metrics, 'prediction_count') and metrics.prediction_count < 10:
alert = Alert(
alert_id=f"low_volume_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.SYSTEM_FAILURE,
severity=AlertSeverity.MEDIUM,
title="⚠️ Low Prediction Volume",
message=f"""
Unusually low prediction volume detected:
📊 Recent Predictions: {metrics.prediction_count}
📈 Expected Volume: >50 per hour
This could indicate:
• API service issues
• Client connectivity problems
• Reduced business activity
""",
metrics={'prediction_count': metrics.prediction_count},
timestamp=metrics.timestamp
)
alerts.append(alert)
return alerts
def _check_system_alerts(self, metrics: 'MonitoringMetrics') -> List[Alert]:
"""🖥️ Check for system health issues"""
alerts = []
# High error rate alert
if hasattr(metrics, 'error_rate') and metrics.error_rate > self.thresholds['error_rate']:
alert = Alert(
alert_id=f"error_rate_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.HIGH_ERROR_RATE,
severity=AlertSeverity.HIGH,
title="🔴 High Error Rate Detected",
message=f"""
System error rate has exceeded acceptable threshold:
📊 Current Error Rate: {metrics.error_rate:.2%}
🎯 Threshold: {self.thresholds['error_rate']:.2%}
⏰ Detection Time: {metrics.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
🔧 Immediate Actions Required:
• Check application logs for error patterns
• Verify service dependencies
• Monitor system resources
• Consider service restart if necessary
""",
metrics={'error_rate': metrics.error_rate},
timestamp=metrics.timestamp
)
alerts.append(alert)
# High response time alert
if hasattr(metrics, 'avg_response_time') and metrics.avg_response_time > self.thresholds['response_time']:
alert = Alert(
alert_id=f"response_time_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.PERFORMANCE_DEGRADATION,
severity=AlertSeverity.MEDIUM,
title="⏱️ High Response Time Detected",
message=f"""
API response time has increased significantly:
📊 Current Avg Response Time: {metrics.avg_response_time:.0f}ms
🎯 Threshold: {self.thresholds['response_time']:.0f}ms
This may impact user experience and system performance.
""",
metrics={'avg_response_time': metrics.avg_response_time},
timestamp=metrics.timestamp
)
alerts.append(alert)
return alerts
def _check_business_alerts(self, metrics: 'MonitoringMetrics') -> List[Alert]:
"""💼 Check for business impact issues"""
alerts = []
# Significant business impact alert
if hasattr(metrics, 'business_impact') and abs(metrics.business_impact) > 10000:
severity = AlertSeverity.HIGH if abs(metrics.business_impact) > 50000 else AlertSeverity.MEDIUM
impact_type = "Positive" if metrics.business_impact > 0 else "Negative"
alert = Alert(
alert_id=f"business_impact_{metrics.timestamp.strftime('%Y%m%d_%H%M%S')}",
alert_type=AlertType.BUSINESS_IMPACT,
severity=severity,
title=f"💰 Significant {impact_type} Business Impact",
message=f"""
{impact_type} business impact detected:
💰 Estimated Impact: ${metrics.business_impact:,.0f}
📊 Based on {getattr(metrics, 'prediction_count', 0)} predictions
⏰ Time Period: Last hour
{"🎉 Great performance! Consider scaling up." if metrics.business_impact > 0 else "⚠️ Investigate performance issues immediately."}
""",
metrics={'business_impact': metrics.business_impact},
timestamp=metrics.timestamp
)
alerts.append(alert)
return alerts
📊 Dashboard & Visualization System
# dashboards/grafana_dashboards.py
import json
import requests
import logging
from typing import Dict, List, Any
class GrafanaDashboardManager:
"""
📊 Automated Grafana dashboard management
Creates and updates monitoring dashboards for the MLOps system
"""
def __init__(self, grafana_url: str, api_key: str):
self.grafana_url = grafana_url.rstrip('/')
self.api_key = api_key
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
self.logger = logging.getLogger(__name__)
def create_churn_monitoring_dashboard(self) -> Dict[str, Any]:
"""🎨 Create comprehensive churn monitoring dashboard"""
dashboard_config = {
"dashboard": {
"id": None,
"title": "🎯 Customer Churn Prediction - MLOps Monitoring",
"tags": ["mlops", "churn", "monitoring"],
"timezone": "browser",
"refresh": "30s",
"time": {
"from": "now-3h",
"to": "now"
},
"panels": [
# Model Performance Panel
{
"id": 1,
"title": "📈 Model Performance Metrics",
"type": "stat",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
"targets": [{
"expr": "churn_model_accuracy",
"legendFormat": "Accuracy",
"refId": "A"
}, {
"expr": "churn_model_roc_auc",
"legendFormat": "ROC-AUC",
"refId": "B"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"mappings": [],
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 0.7},
{"color": "green", "value": 0.8}
]
},
"unit": "percentunit"
}
}
},
# Prediction Volume Panel
{
"id": 2,
"title": "📊 Prediction Volume & Distribution",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
"targets": [{
"expr": "rate(churn_predictions_total[5m])",
"legendFormat": "Predictions/sec",
"refId": "A"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"barAlignment": 0,
"lineWidth": 1,
"fillOpacity": 10,
"gradientMode": "none"
}
}
}
},
# Data Drift Panel
{
"id": 3,
"title": "🔄 Data Drift Detection",
"type": "gauge",
"gridPos": {"h": 8, "w": 8, "x": 0, "y": 8},
"targets": [{
"expr": "churn_data_drift_score",
"legendFormat": "Drift Score",
"refId": "A"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "thresholds"},
"mappings": [],
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 0.3},
{"color": "red", "value": 0.6}
]
},
"unit": "short",
"min": 0,
"max": 1
}
},
"options": {
"orientation": "auto",
"reduceOptions": {
"values": False,
"calcs": ["lastNotNull"],
"fields": ""
},
"showThresholdLabels": False,
"showThresholdMarkers": True
}
},
# System Health Panel
{
"id": 4,
"title": "🖥️ System Health",
"type": "stat",
"gridPos": {"h": 8, "w": 8, "x": 8, "y": 8},
"targets": [{
"expr": "system_cpu_usage_percent",
"legendFormat": "CPU",
"refId": "A"
}, {
"expr": "system_memory_usage_percent",
"legendFormat": "Memory",
"refId": "B"
}, {
"expr": "system_error_rate",
"legendFormat": "Error Rate",
"refId": "C"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "thresholds"},
"mappings": [],
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
},
"unit": "percent"
}
}
},
# Business Impact Panel
{
"id": 5,
"title": "💰 Business Impact",
"type": "timeseries",
"gridPos": {"h": 8, "w": 8, "x": 16, "y": 8},
"targets": [{
"expr": "churn_business_impact_revenue",
"legendFormat": "Revenue Impact",
"refId": "A"
}, {
"expr": "churn_business_impact_costs",
"legendFormat": "Retention Costs",
"refId": "B"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"unit": "currencyUSD"
}
}
},
# Alert Status Panel
{
"id": 6,
"title": "🚨 Active Alerts",
"type": "table",
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 16},
"targets": [{
"expr": "ALERTS{alertname=~'.*churn.*'}",
"legendFormat": "",
"refId": "A"
}],
"fieldConfig": {
"defaults": {
"color": {"mode": "thresholds"},
"mappings": [],
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "red", "value": 1}
]
}
}
},
"options": {
"showHeader": True
}
}
],
"templating": {
"list": [
{
"name": "model_version",
"type": "query",
"query": "label_values(churn_model_version)",
"refresh": 1,
"includeAll": True,
"multi": False
}
]
},
"annotations": {
"list": [
{
"name": "Model Deployments",
"datasource": "prometheus",
"enable": True,
"expr": "changes(churn_model_version[1d])",
"iconColor": "blue",
"titleFormat": "Model {{model_version}} deployed"
}
]
}
},
"overwrite": True
}
try:
response = requests.post(
f"{self.grafana_url}/api/dashboards/db",
headers=self.headers,
data=json.dumps(dashboard_config)
)
if response.status_code == 200:
self.logger.info("✅ Grafana dashboard created successfully")
return response.json()
else:
self.logger.error(f"❌ Failed to create dashboard: {response.text}")
return {}
except Exception as e:
self.logger.error(f"❌ Dashboard creation error: {str(e)}")
return {}
📈 Automated Reporting System
# dashboards/report_generator.py
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import sqlite3
import logging
from typing import Dict, List, Any
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import io
import base64
class AutomatedReportGenerator:
"""
📊 Automated monitoring report generation system
Generates comprehensive monitoring reports with visualizations
and performance summaries for stakeholders.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.db_path = config.get('monitoring_db', 'monitoring/churn_monitoring.db')
self.logger = logging.getLogger(__name__)
# Setup plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def generate_daily_report(self) -> str:
"""📋 Generate daily monitoring report"""
self.logger.info("📋 Generating daily monitoring report...")
try:
# Get data for the last 24 hours
end_time = datetime.now()
start_time = end_time - timedelta(days=1)
# Fetch monitoring data
metrics_data = self._fetch_monitoring_data(start_time, end_time)
predictions_data = self._fetch_predictions_data(start_time, end_time)
# Generate report sections
summary = self._generate_summary_section(metrics_data, predictions_data)
performance_analysis = self._generate_performance_section(metrics_data)
business_impact = self._generate_business_section(predictions_data)
alerts_summary = self._generate_alerts_section(start_time, end_time)
# Create visualizations
charts = self._generate_report_charts(metrics_data, predictions_data)
# Compile HTML report
html_report = self._compile_html_report(
summary, performance_analysis, business_impact,
alerts_summary, charts, start_time, end_time
)
# Save report
report_filename = f"reports/churn_monitoring_daily_{end_time.strftime('%Y%m%d')}.html"
with open(report_filename, 'w') as f:
f.write(html_report)
self.logger.info(f"✅ Daily report generated: {report_filename}")
# Send email if configured
if self.config.get('email_reports', False):
self._send_email_report(html_report, "Daily Churn Monitoring Report")
return report_filename
except Exception as e:
self.logger.error(f"❌ Report generation failed: {str(e)}")
return ""
def _generate_summary_section(self, metrics_data: pd.DataFrame,
predictions_data: pd.DataFrame) -> str:
"""📊 Generate executive summary section"""
if len(metrics_data) == 0:
return "<p>No monitoring data available for the reporting period.</p>"
# Calculate key metrics
avg_accuracy = metrics_data['accuracy'].mean()
avg_roc_auc = metrics_data['roc_auc'].mean()
total_predictions = predictions_data['prediction_id'].nunique() if len(predictions_data) > 0 else 0
avg_response_time = metrics_data['avg_response_time'].mean()
max_drift_score = metrics_data['data_drift_score'].max()
# Determine overall health status
health_status = "🟢 Healthy"
if avg_accuracy < 0.8 or max_drift_score > 0.5 or avg_response_time > 1000:
health_status = "🟡 Attention Needed"
if avg_accuracy < 0.7 or max_drift_score > 0.7 or avg_response_time > 2000:
health_status = "🔴 Critical"
summary_html = f"""
<div class="summary-section">
<h2>📊 Executive Summary</h2>
<div class="summary-grid">
<div class="summary-card">
<h3>Overall System Health</h3>
<div class="metric-value">{health_status}</div>
</div>
<div class="summary-card">
<h3>Model Performance</h3>
<div class="metric-value">Accuracy: {avg_accuracy:.3f}</div>
<div class="metric-sub">ROC-AUC: {avg_roc_auc:.3f}</div>
</div>
<div class="summary-card">
<h3>Prediction Volume</h3>
<div class="metric-value">{total_predictions:,}</div>
<div class="metric-sub">predictions in 24h</div>
</div>
<div class="summary-card">
<h3>System Performance</h3>
<div class="metric-value">{avg_response_time:.0f}ms</div>
<div class="metric-sub">avg response time</div>
</div>
<div class="summary-card">
<h3>Data Quality</h3>
<div class="metric-value">Drift: {max_drift_score:.3f}</div>
<div class="metric-sub">max drift score</div>
</div>
</div>
</div>
"""
return summary_html
def _generate_report_charts(self, metrics_data: pd.DataFrame,
predictions_data: pd.DataFrame) -> Dict[str, str]:
"""📈 Generate charts for the report"""
charts = {}
if len(metrics_data) == 0:
return charts
# Convert timestamp column to datetime
metrics_data['timestamp'] = pd.to_datetime(metrics_data['timestamp'])
# Model Performance Over Time Chart
plt.figure(figsize=(12, 6))
plt.subplot(2, 2, 1)
plt.plot(metrics_data['timestamp'], metrics_data['accuracy'], marker='o', label='Accuracy')
plt.plot(metrics_data['timestamp'], metrics_data['roc_auc'], marker='s', label='ROC-AUC')
plt.title('Model Performance Over Time')
plt.xlabel('Time')
plt.ylabel('Score')
plt.legend()
plt.xticks(rotation=45)
# Data Drift Chart
plt.subplot(2, 2, 2)
plt.plot(metrics_data['timestamp'], metrics_data['data_drift_score'],
marker='o', color='orange', label='Data Drift')
plt.axhline(y=0.3, color='red', linestyle='--', label='Alert Threshold')
plt.title('Data Drift Detection')
plt.xlabel('Time')
plt.ylabel('Drift Score')
plt.legend()
plt.xticks(rotation=45)
# System Performance Chart
plt.subplot(2, 2, 3)
plt.plot(metrics_data['timestamp'], metrics_data['avg_response_time'],
marker='o', color='green', label='Response Time')
plt.title('System Response Time')
plt.xlabel('Time')
plt.ylabel('Time (ms)')
plt.xticks(rotation=45)
# Prediction Volume Chart
plt.subplot(2, 2, 4)
plt.bar(range(len(metrics_data)), metrics_data['prediction_count'],
color='skyblue', label='Predictions')
plt.title('Prediction Volume')
plt.xlabel('Time Period')
plt.ylabel('Count')
plt.tight_layout()
# Save chart as base64 string
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight')
img_buffer.seek(0)
chart_b64 = base64.b64encode(img_buffer.getvalue()).decode()
charts['main_dashboard'] = chart_b64
plt.close()
return charts
def _compile_html_report(self, summary: str, performance: str, business: str,
alerts: str, charts: Dict[str, str],
start_time: datetime, end_time: datetime) -> str:
"""📄 Compile complete HTML report"""
chart_html = ""
if 'main_dashboard' in charts:
chart_html = f'<img src="data:image/png;base64,{charts["main_dashboard"]}" alt="Dashboard Charts" style="width:100%; max-width:800px;">'
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<title>Customer Churn Prediction - Daily Monitoring Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white; padding: 20px; border-radius: 10px; }}
.summary-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px; margin: 20px 0; }}
.summary-card {{ background: #f8f9fa; padding: 15px; border-radius: 8px; border-left: 4px solid #007bff; }}
.metric-value {{ font-size: 24px; font-weight: bold; color: #007bff; }}
.metric-sub {{ font-size: 14px; color: #6c757d; }}
.section {{ margin: 30px 0; padding: 20px; border: 1px solid #dee2e6; border-radius: 8px; }}
.chart-container {{ text-align: center; margin: 20px 0; }}
</style>
</head>
<body>
<div class="header">
<h1>🎯 Customer Churn Prediction - Daily Monitoring Report</h1>
<p>Report Period: {start_time.strftime('%Y-%m-%d %H:%M')} - {end_time.strftime('%Y-%m-%d %H:%M')}</p>
<p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
{summary}
<div class="section">
<h2>📈 Performance Analysis</h2>
{performance}
</div>
<div class="section">
<h2>💼 Business Impact</h2>
{business}
</div>
<div class="section">
<h2>🚨 Alerts Summary</h2>
{alerts}
</div>
<div class="chart-container">
<h2>📊 Monitoring Dashboard</h2>
{chart_html}
</div>
<div class="section">
<h2>🔧 Recommendations</h2>
<ul>
<li>Monitor data drift scores - trigger retraining if consistently above 0.3</li>
<li>Investigate any accuracy drops below 0.8</li>
<li>Optimize system performance if response times exceed 1000ms</li>
<li>Review business impact metrics for optimization opportunities</li>
</ul>
</div>
</body>
</html>
"""
return html_template
mlopszoomcamp
This comprehensive monitoring documentation covers the complete observability infrastructure, from real-time alerting to automated reporting, ensuring proactive management of the MLOps system's health and performance.
Top comments (0)