Complete hands-on implementation tutorial - Build a production-ready monitoring system from scratch with step-by-step code examples using Python and Evidently
Introduction: From Concepts to Code
In the companion article "MLOps ZoomCamp Week 05 Monitoring Notes: Understanding ML Model Monitoring Concepts," we explored why monitoring is crucial and the different types of drift that can affect ML models. Now it's time to get hands-on and build a complete monitoring system.
This practical guide will walk you through implementing a real-world monitoring solution using Python and the Evidently library. By the end, you'll have a working system that can detect drift, generate reports, store metrics in a database, and send automated alerts.
What you'll build:
- A drift detection system using real NYC taxi data
- Interactive HTML reports for stakeholders
- Database integration for historical tracking
- Automated alerting via email and Slack
- Production-ready monitoring pipeline
Meet Evidently: Your ML Monitoring Toolkit
Evidently is an open-source Python library specifically designed for ML model monitoring. Think of it as your monitoring Swiss Army knife - it provides everything you need out of the box.
Why Choose Evidently?
✅ Pre-built metrics for common monitoring tasks
✅ Interactive reports that look professional
✅ Easy integration with existing ML pipelines
✅ Comprehensive documentation and active community
✅ Free and open-source with commercial support available
Key Features
- Data drift detection using multiple statistical tests
- Model performance monitoring for regression and classification
- Data quality checks for missing values, duplicates, etc.
- Interactive HTML reports perfect for stakeholders
- JSON export for automated systems and databases
Environment Setup: Getting Your Tools Ready
Let's set up our monitoring environment step by step.
Step 1: Create Your Project Environment
# Create project directory
mkdir ml_monitoring_tutorial
cd ml_monitoring_tutorial
# Create virtual environment
python -m venv monitoring_env
# Activate environment
source monitoring_env/bin/activate # On Windows: monitoring_env\Scripts\activate
Step 2: Install Required Libraries
# Install core monitoring tools
pip install evidently pandas numpy scikit-learn
# Install database and visualization tools
pip install psycopg2-binary plotly
# Install additional ML tools
pip install joblib requests
Step 3: Create Requirements File
# Save this as requirements.txt
evidently==0.4.22
pandas>=1.5.0
numpy>=1.21.0
scikit-learn>=1.2.0
psycopg2-binary>=2.9.0
plotly>=5.0.0
joblib>=1.2.0
requests>=2.28.0
Your First Monitoring Script: Hello Drift Detection
Let's start with a simple example to see Evidently in action.
"""
BEGINNER'S FIRST MONITORING SCRIPT
Copy this code and run it to see drift detection in action!
"""
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
print("Creating sample data...")
# Generate synthetic dataset (simulating a real ML project)
X, y = make_classification(
n_samples=10000,
n_features=10,
n_informative=5,
n_redundant=2,
random_state=42
)
# Convert to DataFrame for easier handling
feature_names = [f'feature_{i}' for i in range(10)]
data = pd.DataFrame(X, columns=feature_names)
data['target'] = y
print(f"Created dataset with {len(data)} samples and {len(feature_names)} features")
# Split into reference and current data
print("Splitting data...")
# Reference data (first 7000 samples) - represents your training data
reference_data = data[:7000].copy()
# Current data (last 3000 samples) - represents new production data
current_data = data[7000:].copy()
# Introduce artificial drift to demonstrate monitoring
print("Introducing artificial drift...")
current_data['feature_0'] = current_data['feature_0'] + 2 # Shift one feature
current_data.loc[:500, 'feature_1'] = np.nan # Add missing values
print(f"Reference data: {reference_data.shape}")
print(f"Current data: {current_data.shape}")
# Create and run monitoring report
print("Running drift detection...")
# This is where the magic happens!
report = Report(metrics=[
DatasetDriftMetric(), # Overall drift check
DataDriftTable() # Column-by-column analysis
])
# Run the analysis
report.run(reference_data=reference_data, current_data=current_data)
# Save results
report.save_html("my_first_monitoring_report.html")
print("Report saved as 'my_first_monitoring_report.html'")
print("Open this file in your web browser!")
# Extract key metrics programmatically
result = report.dict()
dataset_drift = result['metrics'][0]['value']['drift_detected']
print(f"\nRESULTS SUMMARY:")
print(f"Dataset drift detected: {dataset_drift}")
if dataset_drift:
print("DRIFT DETECTED! Your data has changed.")
print("Check the HTML report to see which features drifted.")
else:
print("NO DRIFT DETECTED! Your data looks stable.")
print("\nCongratulations! You just ran your first ML monitoring analysis!")
Understanding Your First Report
When you open the HTML file in your browser, you'll see:
1. Summary Section
- Overall drift status (Detected/Not Detected)
- Number of drifted features
- Key statistics comparison
2. Drift Details Table
- List of all features
- Drift status for each (Detected/Not Detected)
- Drift scores (0-1, higher = more drift)
- Statistical test results
3. Visual Charts
- Distribution plots showing before/after
- Missing values analysis
- Feature-by-feature comparisons
Understanding Evidently's Building Blocks
Let's break down how Evidently works under the hood.
Core Components
# Essential imports
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
from evidently import ColumnMapping
# Basic report structure
report = Report(metrics=[
DatasetDriftMetric(), # Overall drift check
DataDriftTable() # Column-by-column details
])
# Run analysis
report.run(
reference_data=df_reference, # Your "good" baseline data
current_data=df_current # New data to check
)
# View results
report.show() # Interactive (Jupyter only)
report.save_html('report.html') # Standalone file
result = report.dict() # Raw data for code
Column Mapping: Teaching Evidently About Your Data
Column mapping tells Evidently what type of data each column contains, enabling appropriate statistical tests.
from evidently import ColumnMapping
# Define your data structure
column_mapping = ColumnMapping(
target='target_column', # What you're predicting
prediction='prediction_column', # Model output
numerical_features=[ # Continuous numbers
'age', 'income', 'score'
],
categorical_features=[ # Categories/labels
'category', 'region', 'type'
],
datetime_features=['timestamp'], # Date/time columns
text_features=['description'], # Text content
id_column='customer_id' # Unique identifiers
)
# Use in reports
report.run(
reference_data=df_old,
current_data=df_new,
column_mapping=column_mapping # Better analysis!
)
Available Metrics Quick Reference
# Dataset-level metrics
DatasetDriftMetric() # Overall drift detection
DataDriftTable() # Per-column drift analysis
DatasetMissingValuesMetric() # Missing values across dataset
DriftedColumnsCount() # Count of drifted columns
# Column-specific metrics
ColumnDriftMetric(column='age') # Single column drift
ColumnSummaryMetric(column='income') # Detailed statistics
ColumnQuantileMetric(column='score', quantile=0.95) # Percentile tracking
ValueDrift(column='prediction') # Single value monitoring
MissingValueCount(column='feature') # Missing values in one column
# Model performance metrics
RegressionQualityMetric() # Regression model performance
ClassificationQualityMetric() # Classification model performance
# Preset collections (bundles of related metrics)
DataDriftPreset() # Complete drift analysis
DataQualityPreset() # Data quality checks
RegressionPreset() # Full regression monitoring
ClassificationPreset() # Full classification monitoring
Real-World Example: NYC Taxi Trip Duration Monitoring
Now let's build a complete monitoring system using real data. We'll create a taxi trip duration prediction model and monitor it for drift.
Step 1: Data Preparation and Model Training
import requests
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from joblib import dump, load
import os
def download_taxi_data():
"""Download NYC taxi data for our monitoring example"""
# Create data directory
os.makedirs('data', exist_ok=True)
# Files to download
files = [
'green_tripdata_2024-03.parquet', # March 2024 data
'green_tripdata_2024-04.parquet' # April 2024 data
]
base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
for file in files:
if not os.path.exists(f'data/{file}'):
print(f"Downloading {file}...")
response = requests.get(f"{base_url}{file}")
with open(f'data/{file}', 'wb') as f:
f.write(response.content)
print(f"Downloaded {file}")
else:
print(f"{file} already exists")
def prepare_taxi_data(file_path):
"""Clean and prepare taxi data for machine learning"""
print(f"Loading and cleaning {file_path}...")
# Load the raw data
data = pd.read_parquet(file_path)
# Calculate trip duration in minutes
data['duration'] = (
data.lpep_dropoff_datetime - data.lpep_pickup_datetime
).dt.total_seconds() / 60
# Filter out unrealistic trips
data = data[
(data.duration >= 1) & # At least 1 minute
(data.duration <= 60) & # At most 60 minutes
(data.passenger_count > 0) & # At least 1 passenger
(data.trip_distance > 0) # Some distance traveled
]
print(f"Cleaned data shape: {data.shape}")
return data
def create_features(df):
"""Select and prepare features for machine learning"""
# Numerical features
numerical_features = [
'passenger_count', # How many people
'trip_distance', # How far
'fare_amount', # Base fare
'total_amount' # Total paid
]
# Categorical features (location codes)
categorical_features = [
'PULocationID', # Pickup location
'DOLocationID' # Dropoff location
]
# Handle missing values in categorical features
for col in categorical_features:
df[col] = df[col].fillna(-1) # -1 means "unknown"
return numerical_features + categorical_features
def train_baseline_model(data):
"""Train a simple model to predict trip duration"""
print("Training baseline model...")
# Get features
feature_columns = create_features(data)
# Prepare training data
X = data[feature_columns].fillna(0) # Replace NaN with 0
y = data['duration'] # Target variable
# Train model
model = LinearRegression()
model.fit(X, y)
# Check performance
predictions = model.predict(X)
mae = mean_absolute_error(y, predictions)
print(f"Model trained! MAE: {mae:.2f} minutes")
return model, feature_columns
# Run the data preparation
print("Starting taxi monitoring example...")
# Download data
download_taxi_data()
# Prepare March data (our "reference" data)
march_data = prepare_taxi_data('data/green_tripdata_2024-03.parquet')
# Train model
model, feature_columns = train_baseline_model(march_data)
# Create reference dataset
print("Creating reference dataset...")
reference_data = march_data.sample(n=5000, random_state=42).copy()
# Add predictions to reference data
reference_data['prediction'] = model.predict(
reference_data[feature_columns].fillna(0)
)
# Save model and reference data
os.makedirs('models', exist_ok=True)
dump(model, 'models/taxi_duration_model.pkl')
reference_data.to_parquet('data/reference_data.parquet')
print("Setup complete! Model and reference data saved.")
Step 2: Production Monitoring Setup
from evidently.report import Report
from evidently.metrics import (
DatasetDriftMetric, DataDriftTable, ValueDrift,
DriftedColumnsCount, MissingValueCount
)
from evidently import ColumnMapping
def create_column_mapping():
"""Define column types for better monitoring"""
return ColumnMapping(
target='duration', # What we're predicting
prediction='prediction', # Model output
numerical_features=[ # Continuous features
'passenger_count', 'trip_distance',
'fare_amount', 'total_amount'
],
categorical_features=[ # Categorical features
'PULocationID', 'DOLocationID'
]
)
def create_monitoring_report():
"""Create comprehensive monitoring report"""
return Report(metrics=[
DatasetDriftMetric(), # Overall drift check
DataDriftTable(), # Column details
ValueDrift(column='prediction'), # Model output drift
ValueDrift(column='duration'), # Target drift
DriftedColumnsCount(), # How many drifted
MissingValueCount(column='prediction') # Data quality
])
def monitor_batch(current_data, reference_data, model, feature_columns):
"""Main monitoring function"""
print("Starting batch monitoring...")
# Generate predictions for current data
current_data = current_data.copy()
current_data['prediction'] = model.predict(
current_data[feature_columns].fillna(0)
)
# Create monitoring report
monitoring_report = create_monitoring_report()
column_mapping = create_column_mapping()
# Run analysis
monitoring_report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping
)
print("Monitoring complete!")
return monitoring_report
def extract_key_metrics(report):
"""Extract important metrics from report"""
result = report.dict()
metrics = {}
for i, metric in enumerate(result['metrics']):
metric_type = metric['metric']
if metric_type == 'DatasetDriftMetric':
metrics['dataset_drift_detected'] = metric['value']['drift_detected']
metrics['dataset_drift_score'] = metric['value']['drift_score']
elif metric_type == 'DriftedColumnsCount':
metrics['num_drifted_columns'] = metric['value']['count']
elif metric_type == 'ValueDrift':
column = metric['parameters']['column']
metrics[f'{column}_drift_detected'] = metric['value']['drift_detected']
metrics[f'{column}_drift_score'] = metric['value']['drift_score']
return metrics
# Load our saved data and model
print("Loading saved model and reference data...")
model = load('models/taxi_duration_model.pkl')
reference_data = pd.read_parquet('data/reference_data.parquet')
# Prepare April data (simulating new production data)
april_data = prepare_taxi_data('data/green_tripdata_2024-04.parquet')
# Sample first week of April
april_week1 = april_data[
april_data.lpep_pickup_datetime.dt.day <= 7
].sample(n=5000, random_state=42)
print("Monitoring April data vs March reference...")
# Run monitoring
report = monitor_batch(april_week1, reference_data, model, feature_columns)
# Save report
report.save_html('taxi_monitoring_report.html')
print("Detailed report saved as 'taxi_monitoring_report.html'")
# Extract and display key metrics
metrics = extract_key_metrics(report)
print("\nKEY MONITORING RESULTS:")
print("=" * 40)
for key, value in metrics.items():
if 'detected' in key:
status = "DETECTED" if value else "OK"
print(f"{key}: {status}")
elif 'score' in key:
level = "HIGH" if value > 0.5 else "MEDIUM" if value > 0.1 else "LOW"
print(f"{key}: {value:.3f} ({level})")
else:
print(f"{key}: {value}")
print("\nInterpretation:")
print("- Drift scores closer to 0 = less drift (good)")
print("- Drift scores closer to 1 = more drift (concerning)")
print("- Focus on columns with detected drift")
Database Integration: Storing Metrics for Historical Analysis
To build a production monitoring system, we need to store metrics over time. Let's set up PostgreSQL integration.
Database Setup
import psycopg2
from datetime import datetime
import uuid
# Database configuration
DB_CONFIG = {
'host': 'localhost',
'port': 5432,
'user': 'postgres',
'password': 'your_password',
'database': 'monitoring_db'
}
# Table creation SQL
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS monitoring_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMP NOT NULL,
model_name VARCHAR(100) NOT NULL,
dataset_drift_detected BOOLEAN,
dataset_drift_score FLOAT,
prediction_drift_detected BOOLEAN,
prediction_drift_score FLOAT,
target_drift_detected BOOLEAN,
target_drift_score FLOAT,
num_drifted_columns INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
def setup_database():
"""Initialize monitoring database"""
print("Setting up monitoring database...")
try:
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute(CREATE_TABLE_SQL)
conn.commit()
cursor.close()
conn.close()
print("Database setup complete!")
except psycopg2.Error as e:
print(f"Database setup failed: {e}")
print("Make sure PostgreSQL is running")
def store_monitoring_metrics(metrics, model_name):
"""Store metrics in database"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
insert_sql = """
INSERT INTO monitoring_metrics
(timestamp, model_name, dataset_drift_detected, dataset_drift_score,
prediction_drift_detected, prediction_drift_score,
target_drift_detected, target_drift_score, num_drifted_columns)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
datetime.now(),
model_name,
metrics.get('dataset_drift_detected'),
metrics.get('dataset_drift_score'),
metrics.get('prediction_drift_detected'),
metrics.get('prediction_drift_score'),
metrics.get('duration_drift_detected'),
metrics.get('duration_drift_score'),
metrics.get('num_drifted_columns')
))
conn.commit()
cursor.close()
conn.close()
print("Metrics stored in database")
def get_monitoring_history(model_name, days=30):
"""Retrieve monitoring history from database"""
conn = psycopg2.connect(**DB_CONFIG)
query = """
SELECT timestamp, dataset_drift_score, prediction_drift_score,
num_drifted_columns, dataset_drift_detected
FROM monitoring_metrics
WHERE model_name = %s
AND timestamp >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY timestamp DESC
"""
df = pd.read_sql(query, conn, params=[model_name, days])
conn.close()
return df
# Usage example
if __name__ == "__main__":
# Set up database (run once)
setup_database()
# Store our taxi monitoring results
store_monitoring_metrics(metrics, 'taxi_duration_model')
# Retrieve history
history = get_monitoring_history('taxi_duration_model')
print(f"Retrieved {len(history)} historical records")
Automated Alerting: Never Miss a Problem
Let's add intelligent alerting to our monitoring system.
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
class MonitoringAlerter:
"""Handles alerts for monitoring system"""
def __init__(self, email_config=None, slack_webhook=None):
self.email_config = email_config
self.slack_webhook = slack_webhook
# Alert thresholds
self.thresholds = {
'drift_score': 0.1, # Drift score threshold
'drifted_columns': 3, # Max drifted columns
'missing_values': 0.05 # Max missing value ratio
}
def check_alerts(self, metrics):
"""Check metrics against thresholds and generate alerts"""
alerts = []
# Check dataset drift
if metrics.get('dataset_drift_detected'):
score = metrics.get('dataset_drift_score', 0)
if score > self.thresholds['drift_score']:
alerts.append({
'type': 'dataset_drift',
'severity': 'high' if score > 0.5 else 'medium',
'message': f"Dataset drift detected (score: {score:.3f})",
'details': metrics
})
# Check prediction drift
if metrics.get('prediction_drift_detected'):
score = metrics.get('prediction_drift_score', 0)
if score > self.thresholds['drift_score']:
alerts.append({
'type': 'prediction_drift',
'severity': 'high' if score > 0.3 else 'medium',
'message': f"Model prediction drift detected (score: {score:.3f})",
'details': metrics
})
# Check number of drifted columns
num_drifted = metrics.get('num_drifted_columns', 0)
if num_drifted > self.thresholds['drifted_columns']:
alerts.append({
'type': 'multiple_drift',
'severity': 'high',
'message': f"Multiple columns drifted ({num_drifted} columns)",
'details': metrics
})
return alerts
def send_email_alert(self, alert):
"""Send email alert"""
if not self.email_config:
print("Email not configured, skipping email alert")
return
msg = MIMEMultipart()
msg['From'] = self.email_config['from_email']
msg['To'] = self.email_config['to_email']
msg['Subject'] = f"ML Monitoring Alert: {alert['type']}"
body = f"""
ML Monitoring Alert
Type: {alert['type']}
Severity: {alert['severity']}
Message: {alert['message']}
Time: {datetime.now()}
Details:
{alert['details']}
Please investigate immediately.
"""
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP(self.email_config['smtp_server'], 587)
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print("Email alert sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
def send_slack_alert(self, alert):
"""Send Slack alert"""
if not self.slack_webhook:
print("Slack not configured, skipping Slack alert")
return
# Format message for Slack
color = "danger" if alert['severity'] == 'high' else "warning"
payload = {
"attachments": [
{
"color": color,
"title": f"ML Monitoring Alert: {alert['type']}",
"text": alert['message'],
"fields": [
{
"title": "Severity",
"value": alert['severity'],
"short": True
},
{
"title": "Time",
"value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"short": True
}
]
}
]
}
try:
response = requests.post(self.slack_webhook, json=payload)
if response.status_code == 200:
print("Slack alert sent successfully")
else:
print(f"Failed to send Slack alert: {response.status_code}")
except Exception as e:
print(f"Failed to send Slack alert: {e}")
def process_alerts(self, metrics):
"""Main alert processing function"""
alerts = self.check_alerts(metrics)
if not alerts:
print("No alerts triggered")
return
print(f"{len(alerts)} alert(s) triggered!")
for alert in alerts:
print(f"- {alert['severity'].upper()}: {alert['message']}")
# Send notifications
self.send_email_alert(alert)
self.send_slack_alert(alert)
# Usage example
def automated_monitoring_pipeline():
"""Complete monitoring pipeline with alerting"""
print("Running automated monitoring pipeline...")
# Load model and data
model = load('models/taxi_duration_model.pkl')
reference_data = pd.read_parquet('data/reference_data.parquet')
# Get new data (in production, this would be from your data pipeline)
april_data = prepare_taxi_data('data/green_tripdata_2024-04.parquet')
current_batch = april_data.sample(n=1000, random_state=42)
# Run monitoring
report = monitor_batch(current_batch, reference_data, model, feature_columns)
metrics = extract_key_metrics(report)
# Store metrics
store_monitoring_metrics(metrics, 'taxi_duration_model')
# Check for alerts
alerter = MonitoringAlerter()
alerter.process_alerts(metrics)
print("Monitoring pipeline complete!")
# Run the complete pipeline
if __name__ == "__main__":
automated_monitoring_pipeline()
Production Deployment: Docker and Scheduling
Docker Setup
Create a Dockerfile
:
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Run monitoring system
CMD ["python", "monitoring_pipeline.py"]
Create a docker-compose.yml
:
version: '3.8'
services:
db:
image: postgres:13
environment:
POSTGRES_DB: monitoring_db
POSTGRES_PASSWORD: example
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
monitoring:
build: .
environment:
- DB_HOST=db
- DB_PASSWORD=example
depends_on:
- db
volumes:
- ./data:/app/data
- ./models:/app/models
volumes:
postgres_data:
Automated Scheduling
import schedule
import time
from datetime import datetime, timedelta
class ProductionMonitoringSystem:
"""Complete production monitoring system"""
def __init__(self, model_path, reference_data_path, db_config):
self.model = load(model_path)
self.reference_data = pd.read_parquet(reference_data_path)
self.db_config = db_config
self.alerter = MonitoringAlerter()
def daily_monitoring(self):
"""Daily monitoring job"""
print(f"Running daily monitoring - {datetime.now()}")
try:
# Get yesterday's data
yesterday = datetime.now() - timedelta(days=1)
current_data = self.get_production_data(yesterday)
if len(current_data) < 100:
print("Insufficient data for monitoring")
return
# Run monitoring
report = self.run_monitoring(current_data)
# Extract metrics
metrics = extract_key_metrics(report)
# Store results
store_monitoring_metrics(metrics, 'production_model')
# Check alerts
self.alerter.process_alerts(metrics)
# Save report
report_name = f"monitoring_report_{yesterday.strftime('%Y%m%d')}.html"
report.save_html(f"reports/{report_name}")
print("Daily monitoring complete")
except Exception as e:
print(f"Monitoring failed: {e}")
# Send failure alert
self.alerter.send_email_alert({
'type': 'monitoring_failure',
'severity': 'high',
'message': f"Monitoring pipeline failed: {e}",
'details': {}
})
def start_monitoring(self):
"""Start automated monitoring schedule"""
print("Starting production monitoring system...")
# Schedule daily monitoring at 9 AM
schedule.every().day.at("09:00").do(self.daily_monitoring)
# Schedule weekly summary on Sundays at 10 AM
schedule.every().sunday.at("10:00").do(self.weekly_summary)
print("Monitoring scheduled:")
print("- Daily monitoring: 9:00 AM")
print("- Weekly summary: Sunday 10:00 AM")
# Run monitoring loop
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Start the production system
if __name__ == "__main__":
monitoring_system = ProductionMonitoringSystem(
model_path='models/taxi_duration_model.pkl',
reference_data_path='data/reference_data.parquet',
db_config=DB_CONFIG
)
# Run once for testing
monitoring_system.daily_monitoring()
# Uncomment to start automated monitoring
# monitoring_system.start_monitoring()
Best Practices and Performance Tips
1. Start Simple, Scale Smart
# Phase 1: Basic monitoring
def basic_monitoring():
report = Report(metrics=[DatasetDriftMetric()])
# Simple drift check only
# Phase 2: Add more metrics
def intermediate_monitoring():
report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
DriftedColumnsCount()
])
# Phase 3: Comprehensive monitoring
def advanced_monitoring():
report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
ValueDrift(column='prediction'),
RegressionQualityMetric(), # If regression model
ColumnQuantileMetric(column='important_feature', quantile=0.95)
])
2. Performance Optimization
def efficient_monitoring(large_dataset, reference_data, sample_size=5000):
"""Handle large datasets efficiently"""
# Sample for performance
if len(large_dataset) > sample_size:
sampled_data = large_dataset.sample(n=sample_size, random_state=42)
else:
sampled_data = large_dataset
# Use appropriate metrics
report = Report(metrics=[
DatasetDriftMetric(), # Fast overall check
# Skip detailed metrics for large datasets
])
return report
3. Adaptive Thresholds
def adaptive_threshold_monitoring(historical_metrics, current_metric):
"""Use historical data to set dynamic thresholds"""
if len(historical_metrics) < 30:
return False # Not enough history
# Calculate rolling statistics
mean_metric = np.mean(historical_metrics[-30:])
std_metric = np.std(historical_metrics[-30:])
# Dynamic threshold (2-sigma rule)
threshold = mean_metric + 2 * std_metric
return current_metric > threshold
Troubleshooting Common Issues
Issue 1: "No module named 'evidently'"
Solution:
pip install evidently
Issue 2: "Memory error with large datasets"
Solution:
# Sample your data first
large_df_sampled = large_df.sample(n=10000, random_state=42)
report.run(reference_data=ref_sample, current_data=large_df_sampled)
Issue 3: "All features show drift"
Problem: Reference and current data are too different
Solution:
# Check your data split
print("Reference period:", reference_data['date'].min(), "to", reference_data['date'].max())
print("Current period:", current_data['date'].min(), "to", current_data['date'].max())
# Ensure reasonable time gaps
# Use overlapping periods for validation
Issue 4: "No drift detected but model performance dropped"
Problem: Monitoring setup might be incomplete
Solution:
# Add model performance monitoring
from evidently.metrics import RegressionQualityMetric
enhanced_report = Report(metrics=[
DatasetDriftMetric(),
RegressionQualityMetric(), # Track model performance
ValueDrift(column='prediction'),
ValueDrift(column='target')
])
Next Steps: Scaling Your Monitoring System
Congratulations! You now have a complete ML monitoring system. Here are ways to extend it:
1. Advanced Features
- Custom metrics for business-specific KPIs
- A/B testing integration for model comparisons
- Fairness monitoring for bias detection
- Explainability tracking with SHAP or LIME
2. Infrastructure Scaling
- Kubernetes deployment for high availability
- Apache Kafka for real-time data streaming
- Apache Airflow for complex monitoring workflows
- MLflow integration for experiment tracking
3. Advanced Analytics
- Anomaly detection in monitoring metrics
- Predictive maintenance for models
- Root cause analysis for drift sources
- Automated retraining triggers
Key Takeaways
✅ Evidently makes monitoring accessible - you can start with just a few lines of code
✅ Real-world monitoring requires multiple components - detection, storage, alerting, and response
✅ Start simple and iterate - basic drift detection is better than no monitoring
✅ Automate everything - manual monitoring doesn't scale in production
✅ Monitor what matters - focus on metrics that impact your business
✅ Have a response plan - knowing what to do when alerts fire is crucial
Remember: The goal isn't perfect monitoring - it's effective monitoring that helps you maintain reliable ML systems.
Conclusion
You've built a comprehensive monitoring system that can:
- Detect various types of drift in real-time
- Generate professional reports for stakeholders
- Store historical metrics for trend analysis
- Send automated alerts when problems occur
- Scale to production workloads
The most important step is to start monitoring your models today. Even basic drift detection is infinitely better than flying blind in production.
Your monitoring journey doesn't end here - it evolves with your systems, your data, and your understanding of what matters most for your specific use cases.
Top comments (0)