DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

MLOps ZoomCamp Week 05 Monitoring Notes: Practical Implementation Guide

Complete hands-on implementation tutorial - Build a production-ready monitoring system from scratch with step-by-step code examples using Python and Evidently


Introduction: From Concepts to Code

In the companion article "MLOps ZoomCamp Week 05 Monitoring Notes: Understanding ML Model Monitoring Concepts," we explored why monitoring is crucial and the different types of drift that can affect ML models. Now it's time to get hands-on and build a complete monitoring system.

This practical guide will walk you through implementing a real-world monitoring solution using Python and the Evidently library. By the end, you'll have a working system that can detect drift, generate reports, store metrics in a database, and send automated alerts.

What you'll build:

  • A drift detection system using real NYC taxi data
  • Interactive HTML reports for stakeholders
  • Database integration for historical tracking
  • Automated alerting via email and Slack
  • Production-ready monitoring pipeline

Meet Evidently: Your ML Monitoring Toolkit

Evidently is an open-source Python library specifically designed for ML model monitoring. Think of it as your monitoring Swiss Army knife - it provides everything you need out of the box.

Why Choose Evidently?

Pre-built metrics for common monitoring tasks

Interactive reports that look professional

Easy integration with existing ML pipelines

Comprehensive documentation and active community

Free and open-source with commercial support available

Key Features

  • Data drift detection using multiple statistical tests
  • Model performance monitoring for regression and classification
  • Data quality checks for missing values, duplicates, etc.
  • Interactive HTML reports perfect for stakeholders
  • JSON export for automated systems and databases

Environment Setup: Getting Your Tools Ready

Let's set up our monitoring environment step by step.

Step 1: Create Your Project Environment

# Create project directory
mkdir ml_monitoring_tutorial
cd ml_monitoring_tutorial

# Create virtual environment
python -m venv monitoring_env

# Activate environment
source monitoring_env/bin/activate  # On Windows: monitoring_env\Scripts\activate
Enter fullscreen mode Exit fullscreen mode

Step 2: Install Required Libraries

# Install core monitoring tools
pip install evidently pandas numpy scikit-learn

# Install database and visualization tools  
pip install psycopg2-binary plotly

# Install additional ML tools
pip install joblib requests
Enter fullscreen mode Exit fullscreen mode

Step 3: Create Requirements File

# Save this as requirements.txt
evidently==0.4.22
pandas>=1.5.0
numpy>=1.21.0
scikit-learn>=1.2.0
psycopg2-binary>=2.9.0
plotly>=5.0.0
joblib>=1.2.0
requests>=2.28.0
Enter fullscreen mode Exit fullscreen mode

Your First Monitoring Script: Hello Drift Detection

Let's start with a simple example to see Evidently in action.

"""
BEGINNER'S FIRST MONITORING SCRIPT
Copy this code and run it to see drift detection in action!
"""

import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable

print("Creating sample data...")

# Generate synthetic dataset (simulating a real ML project)
X, y = make_classification(
    n_samples=10000, 
    n_features=10, 
    n_informative=5,
    n_redundant=2, 
    random_state=42
)

# Convert to DataFrame for easier handling
feature_names = [f'feature_{i}' for i in range(10)]
data = pd.DataFrame(X, columns=feature_names)
data['target'] = y

print(f"Created dataset with {len(data)} samples and {len(feature_names)} features")

# Split into reference and current data
print("Splitting data...")

# Reference data (first 7000 samples) - represents your training data
reference_data = data[:7000].copy()

# Current data (last 3000 samples) - represents new production data
current_data = data[7000:].copy()

# Introduce artificial drift to demonstrate monitoring
print("Introducing artificial drift...")
current_data['feature_0'] = current_data['feature_0'] + 2  # Shift one feature
current_data.loc[:500, 'feature_1'] = np.nan  # Add missing values

print(f"Reference data: {reference_data.shape}")
print(f"Current data: {current_data.shape}")

# Create and run monitoring report
print("Running drift detection...")

# This is where the magic happens!
report = Report(metrics=[
    DatasetDriftMetric(),    # Overall drift check
    DataDriftTable()         # Column-by-column analysis
])

# Run the analysis
report.run(reference_data=reference_data, current_data=current_data)

# Save results
report.save_html("my_first_monitoring_report.html")
print("Report saved as 'my_first_monitoring_report.html'")
print("Open this file in your web browser!")

# Extract key metrics programmatically
result = report.dict()
dataset_drift = result['metrics'][0]['value']['drift_detected']

print(f"\nRESULTS SUMMARY:")
print(f"Dataset drift detected: {dataset_drift}")

if dataset_drift:
    print("DRIFT DETECTED! Your data has changed.")
    print("Check the HTML report to see which features drifted.")
else:
    print("NO DRIFT DETECTED! Your data looks stable.")

print("\nCongratulations! You just ran your first ML monitoring analysis!")
Enter fullscreen mode Exit fullscreen mode

Understanding Your First Report

When you open the HTML file in your browser, you'll see:

1. Summary Section

  • Overall drift status (Detected/Not Detected)
  • Number of drifted features
  • Key statistics comparison

2. Drift Details Table

  • List of all features
  • Drift status for each (Detected/Not Detected)
  • Drift scores (0-1, higher = more drift)
  • Statistical test results

3. Visual Charts

  • Distribution plots showing before/after
  • Missing values analysis
  • Feature-by-feature comparisons

Understanding Evidently's Building Blocks

Let's break down how Evidently works under the hood.

Core Components

# Essential imports
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
from evidently import ColumnMapping

# Basic report structure
report = Report(metrics=[
    DatasetDriftMetric(),    # Overall drift check
    DataDriftTable()         # Column-by-column details
])

# Run analysis
report.run(
    reference_data=df_reference,  # Your "good" baseline data
    current_data=df_current       # New data to check
)

# View results
report.show()                    # Interactive (Jupyter only)
report.save_html('report.html')  # Standalone file
result = report.dict()           # Raw data for code
Enter fullscreen mode Exit fullscreen mode

Column Mapping: Teaching Evidently About Your Data

Column mapping tells Evidently what type of data each column contains, enabling appropriate statistical tests.

from evidently import ColumnMapping

# Define your data structure
column_mapping = ColumnMapping(
    target='target_column',              # What you're predicting
    prediction='prediction_column',      # Model output
    numerical_features=[                 # Continuous numbers
        'age', 'income', 'score'
    ],
    categorical_features=[               # Categories/labels
        'category', 'region', 'type'
    ],
    datetime_features=['timestamp'],     # Date/time columns
    text_features=['description'],       # Text content
    id_column='customer_id'             # Unique identifiers
)

# Use in reports
report.run(
    reference_data=df_old,
    current_data=df_new,
    column_mapping=column_mapping  # Better analysis!
)
Enter fullscreen mode Exit fullscreen mode

Available Metrics Quick Reference

# Dataset-level metrics
DatasetDriftMetric()                    # Overall drift detection
DataDriftTable()                        # Per-column drift analysis
DatasetMissingValuesMetric()           # Missing values across dataset
DriftedColumnsCount()                  # Count of drifted columns

# Column-specific metrics
ColumnDriftMetric(column='age')        # Single column drift
ColumnSummaryMetric(column='income')   # Detailed statistics
ColumnQuantileMetric(column='score', quantile=0.95)  # Percentile tracking
ValueDrift(column='prediction')        # Single value monitoring
MissingValueCount(column='feature')    # Missing values in one column

# Model performance metrics
RegressionQualityMetric()              # Regression model performance
ClassificationQualityMetric()          # Classification model performance

# Preset collections (bundles of related metrics)
DataDriftPreset()                      # Complete drift analysis
DataQualityPreset()                    # Data quality checks
RegressionPreset()                     # Full regression monitoring
ClassificationPreset()                 # Full classification monitoring
Enter fullscreen mode Exit fullscreen mode

Real-World Example: NYC Taxi Trip Duration Monitoring

Now let's build a complete monitoring system using real data. We'll create a taxi trip duration prediction model and monitor it for drift.

Step 1: Data Preparation and Model Training

import requests
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from joblib import dump, load
import os

def download_taxi_data():
    """Download NYC taxi data for our monitoring example"""

    # Create data directory
    os.makedirs('data', exist_ok=True)

    # Files to download
    files = [
        'green_tripdata_2024-03.parquet',  # March 2024 data
        'green_tripdata_2024-04.parquet'   # April 2024 data
    ]

    base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

    for file in files:
        if not os.path.exists(f'data/{file}'):
            print(f"Downloading {file}...")
            response = requests.get(f"{base_url}{file}")

            with open(f'data/{file}', 'wb') as f:
                f.write(response.content)
            print(f"Downloaded {file}")
        else:
            print(f"{file} already exists")

def prepare_taxi_data(file_path):
    """Clean and prepare taxi data for machine learning"""

    print(f"Loading and cleaning {file_path}...")

    # Load the raw data
    data = pd.read_parquet(file_path)

    # Calculate trip duration in minutes
    data['duration'] = (
        data.lpep_dropoff_datetime - data.lpep_pickup_datetime
    ).dt.total_seconds() / 60

    # Filter out unrealistic trips
    data = data[
        (data.duration >= 1) &      # At least 1 minute
        (data.duration <= 60) &     # At most 60 minutes  
        (data.passenger_count > 0) & # At least 1 passenger
        (data.trip_distance > 0)    # Some distance traveled
    ]

    print(f"Cleaned data shape: {data.shape}")
    return data

def create_features(df):
    """Select and prepare features for machine learning"""

    # Numerical features
    numerical_features = [
        'passenger_count',  # How many people
        'trip_distance',    # How far
        'fare_amount',      # Base fare
        'total_amount'      # Total paid
    ]

    # Categorical features (location codes)
    categorical_features = [
        'PULocationID',     # Pickup location
        'DOLocationID'      # Dropoff location
    ]

    # Handle missing values in categorical features
    for col in categorical_features:
        df[col] = df[col].fillna(-1)  # -1 means "unknown"

    return numerical_features + categorical_features

def train_baseline_model(data):
    """Train a simple model to predict trip duration"""

    print("Training baseline model...")

    # Get features
    feature_columns = create_features(data)

    # Prepare training data
    X = data[feature_columns].fillna(0)  # Replace NaN with 0
    y = data['duration']                 # Target variable

    # Train model
    model = LinearRegression()
    model.fit(X, y)

    # Check performance
    predictions = model.predict(X)
    mae = mean_absolute_error(y, predictions)

    print(f"Model trained! MAE: {mae:.2f} minutes")

    return model, feature_columns

# Run the data preparation
print("Starting taxi monitoring example...")

# Download data
download_taxi_data()

# Prepare March data (our "reference" data)
march_data = prepare_taxi_data('data/green_tripdata_2024-03.parquet')

# Train model
model, feature_columns = train_baseline_model(march_data)

# Create reference dataset
print("Creating reference dataset...")
reference_data = march_data.sample(n=5000, random_state=42).copy()

# Add predictions to reference data
reference_data['prediction'] = model.predict(
    reference_data[feature_columns].fillna(0)
)

# Save model and reference data
os.makedirs('models', exist_ok=True)
dump(model, 'models/taxi_duration_model.pkl')
reference_data.to_parquet('data/reference_data.parquet')

print("Setup complete! Model and reference data saved.")
Enter fullscreen mode Exit fullscreen mode

Step 2: Production Monitoring Setup

from evidently.report import Report
from evidently.metrics import (
    DatasetDriftMetric, DataDriftTable, ValueDrift, 
    DriftedColumnsCount, MissingValueCount
)
from evidently import ColumnMapping

def create_column_mapping():
    """Define column types for better monitoring"""

    return ColumnMapping(
        target='duration',                    # What we're predicting
        prediction='prediction',              # Model output
        numerical_features=[                  # Continuous features
            'passenger_count', 'trip_distance', 
            'fare_amount', 'total_amount'
        ],
        categorical_features=[                # Categorical features
            'PULocationID', 'DOLocationID'
        ]
    )

def create_monitoring_report():
    """Create comprehensive monitoring report"""

    return Report(metrics=[
        DatasetDriftMetric(),                 # Overall drift check
        DataDriftTable(),                     # Column details
        ValueDrift(column='prediction'),      # Model output drift
        ValueDrift(column='duration'),        # Target drift
        DriftedColumnsCount(),               # How many drifted
        MissingValueCount(column='prediction') # Data quality
    ])

def monitor_batch(current_data, reference_data, model, feature_columns):
    """Main monitoring function"""

    print("Starting batch monitoring...")

    # Generate predictions for current data
    current_data = current_data.copy()
    current_data['prediction'] = model.predict(
        current_data[feature_columns].fillna(0)
    )

    # Create monitoring report
    monitoring_report = create_monitoring_report()
    column_mapping = create_column_mapping()

    # Run analysis
    monitoring_report.run(
        reference_data=reference_data,
        current_data=current_data,
        column_mapping=column_mapping
    )

    print("Monitoring complete!")
    return monitoring_report

def extract_key_metrics(report):
    """Extract important metrics from report"""

    result = report.dict()

    metrics = {}

    for i, metric in enumerate(result['metrics']):
        metric_type = metric['metric']

        if metric_type == 'DatasetDriftMetric':
            metrics['dataset_drift_detected'] = metric['value']['drift_detected']
            metrics['dataset_drift_score'] = metric['value']['drift_score']

        elif metric_type == 'DriftedColumnsCount':
            metrics['num_drifted_columns'] = metric['value']['count']

        elif metric_type == 'ValueDrift':
            column = metric['parameters']['column']
            metrics[f'{column}_drift_detected'] = metric['value']['drift_detected']
            metrics[f'{column}_drift_score'] = metric['value']['drift_score']

    return metrics

# Load our saved data and model
print("Loading saved model and reference data...")
model = load('models/taxi_duration_model.pkl')
reference_data = pd.read_parquet('data/reference_data.parquet')

# Prepare April data (simulating new production data)
april_data = prepare_taxi_data('data/green_tripdata_2024-04.parquet')

# Sample first week of April
april_week1 = april_data[
    april_data.lpep_pickup_datetime.dt.day <= 7
].sample(n=5000, random_state=42)

print("Monitoring April data vs March reference...")

# Run monitoring
report = monitor_batch(april_week1, reference_data, model, feature_columns)

# Save report
report.save_html('taxi_monitoring_report.html')
print("Detailed report saved as 'taxi_monitoring_report.html'")

# Extract and display key metrics
metrics = extract_key_metrics(report)

print("\nKEY MONITORING RESULTS:")
print("=" * 40)

for key, value in metrics.items():
    if 'detected' in key:
        status = "DETECTED" if value else "OK"
        print(f"{key}: {status}")
    elif 'score' in key:
        level = "HIGH" if value > 0.5 else "MEDIUM" if value > 0.1 else "LOW"
        print(f"{key}: {value:.3f} ({level})")
    else:
        print(f"{key}: {value}")

print("\nInterpretation:")
print("- Drift scores closer to 0 = less drift (good)")
print("- Drift scores closer to 1 = more drift (concerning)")
print("- Focus on columns with detected drift")
Enter fullscreen mode Exit fullscreen mode

Database Integration: Storing Metrics for Historical Analysis

To build a production monitoring system, we need to store metrics over time. Let's set up PostgreSQL integration.

Database Setup

import psycopg2
from datetime import datetime
import uuid

# Database configuration
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'user': 'postgres', 
    'password': 'your_password',
    'database': 'monitoring_db'
}

# Table creation SQL
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS monitoring_metrics (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    timestamp TIMESTAMP NOT NULL,
    model_name VARCHAR(100) NOT NULL,
    dataset_drift_detected BOOLEAN,
    dataset_drift_score FLOAT,
    prediction_drift_detected BOOLEAN,
    prediction_drift_score FLOAT,
    target_drift_detected BOOLEAN, 
    target_drift_score FLOAT,
    num_drifted_columns INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

def setup_database():
    """Initialize monitoring database"""

    print("Setting up monitoring database...")

    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()

        cursor.execute(CREATE_TABLE_SQL)
        conn.commit()

        cursor.close()
        conn.close()

        print("Database setup complete!")

    except psycopg2.Error as e:
        print(f"Database setup failed: {e}")
        print("Make sure PostgreSQL is running")

def store_monitoring_metrics(metrics, model_name):
    """Store metrics in database"""

    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    insert_sql = """
    INSERT INTO monitoring_metrics 
    (timestamp, model_name, dataset_drift_detected, dataset_drift_score,
     prediction_drift_detected, prediction_drift_score,
     target_drift_detected, target_drift_score, num_drifted_columns)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    cursor.execute(insert_sql, (
        datetime.now(),
        model_name,
        metrics.get('dataset_drift_detected'),
        metrics.get('dataset_drift_score'),
        metrics.get('prediction_drift_detected'),
        metrics.get('prediction_drift_score'),
        metrics.get('duration_drift_detected'), 
        metrics.get('duration_drift_score'),
        metrics.get('num_drifted_columns')
    ))

    conn.commit()
    cursor.close()
    conn.close()

    print("Metrics stored in database")

def get_monitoring_history(model_name, days=30):
    """Retrieve monitoring history from database"""

    conn = psycopg2.connect(**DB_CONFIG)

    query = """
    SELECT timestamp, dataset_drift_score, prediction_drift_score,
           num_drifted_columns, dataset_drift_detected
    FROM monitoring_metrics 
    WHERE model_name = %s 
    AND timestamp >= CURRENT_DATE - INTERVAL '%s days'
    ORDER BY timestamp DESC
    """

    df = pd.read_sql(query, conn, params=[model_name, days])
    conn.close()

    return df

# Usage example
if __name__ == "__main__":
    # Set up database (run once)
    setup_database()

    # Store our taxi monitoring results
    store_monitoring_metrics(metrics, 'taxi_duration_model')

    # Retrieve history
    history = get_monitoring_history('taxi_duration_model')
    print(f"Retrieved {len(history)} historical records")
Enter fullscreen mode Exit fullscreen mode

Automated Alerting: Never Miss a Problem

Let's add intelligent alerting to our monitoring system.

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests

class MonitoringAlerter:
    """Handles alerts for monitoring system"""

    def __init__(self, email_config=None, slack_webhook=None):
        self.email_config = email_config
        self.slack_webhook = slack_webhook

        # Alert thresholds
        self.thresholds = {
            'drift_score': 0.1,           # Drift score threshold
            'drifted_columns': 3,         # Max drifted columns
            'missing_values': 0.05        # Max missing value ratio
        }

    def check_alerts(self, metrics):
        """Check metrics against thresholds and generate alerts"""

        alerts = []

        # Check dataset drift
        if metrics.get('dataset_drift_detected'):
            score = metrics.get('dataset_drift_score', 0)
            if score > self.thresholds['drift_score']:
                alerts.append({
                    'type': 'dataset_drift',
                    'severity': 'high' if score > 0.5 else 'medium',
                    'message': f"Dataset drift detected (score: {score:.3f})",
                    'details': metrics
                })

        # Check prediction drift
        if metrics.get('prediction_drift_detected'):
            score = metrics.get('prediction_drift_score', 0)
            if score > self.thresholds['drift_score']:
                alerts.append({
                    'type': 'prediction_drift', 
                    'severity': 'high' if score > 0.3 else 'medium',
                    'message': f"Model prediction drift detected (score: {score:.3f})",
                    'details': metrics
                })

        # Check number of drifted columns
        num_drifted = metrics.get('num_drifted_columns', 0)
        if num_drifted > self.thresholds['drifted_columns']:
            alerts.append({
                'type': 'multiple_drift',
                'severity': 'high',
                'message': f"Multiple columns drifted ({num_drifted} columns)",
                'details': metrics
            })

        return alerts

    def send_email_alert(self, alert):
        """Send email alert"""

        if not self.email_config:
            print("Email not configured, skipping email alert")
            return

        msg = MIMEMultipart()
        msg['From'] = self.email_config['from_email']
        msg['To'] = self.email_config['to_email']
        msg['Subject'] = f"ML Monitoring Alert: {alert['type']}"

        body = f"""
        ML Monitoring Alert

        Type: {alert['type']}
        Severity: {alert['severity']}
        Message: {alert['message']}

        Time: {datetime.now()}

        Details:
        {alert['details']}

        Please investigate immediately.
        """

        msg.attach(MIMEText(body, 'plain'))

        try:
            server = smtplib.SMTP(self.email_config['smtp_server'], 587)
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            server.send_message(msg)
            server.quit()

            print("Email alert sent successfully")

        except Exception as e:
            print(f"Failed to send email: {e}")

    def send_slack_alert(self, alert):
        """Send Slack alert"""

        if not self.slack_webhook:
            print("Slack not configured, skipping Slack alert")
            return

        # Format message for Slack
        color = "danger" if alert['severity'] == 'high' else "warning"

        payload = {
            "attachments": [
                {
                    "color": color,
                    "title": f"ML Monitoring Alert: {alert['type']}",
                    "text": alert['message'],
                    "fields": [
                        {
                            "title": "Severity",
                            "value": alert['severity'],
                            "short": True
                        },
                        {
                            "title": "Time", 
                            "value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                            "short": True
                        }
                    ]
                }
            ]
        }

        try:
            response = requests.post(self.slack_webhook, json=payload)
            if response.status_code == 200:
                print("Slack alert sent successfully")
            else:
                print(f"Failed to send Slack alert: {response.status_code}")

        except Exception as e:
            print(f"Failed to send Slack alert: {e}")

    def process_alerts(self, metrics):
        """Main alert processing function"""

        alerts = self.check_alerts(metrics)

        if not alerts:
            print("No alerts triggered")
            return

        print(f"{len(alerts)} alert(s) triggered!")

        for alert in alerts:
            print(f"- {alert['severity'].upper()}: {alert['message']}")

            # Send notifications
            self.send_email_alert(alert)
            self.send_slack_alert(alert)

# Usage example
def automated_monitoring_pipeline():
    """Complete monitoring pipeline with alerting"""

    print("Running automated monitoring pipeline...")

    # Load model and data
    model = load('models/taxi_duration_model.pkl')
    reference_data = pd.read_parquet('data/reference_data.parquet')

    # Get new data (in production, this would be from your data pipeline)
    april_data = prepare_taxi_data('data/green_tripdata_2024-04.parquet')
    current_batch = april_data.sample(n=1000, random_state=42)

    # Run monitoring
    report = monitor_batch(current_batch, reference_data, model, feature_columns)
    metrics = extract_key_metrics(report)

    # Store metrics
    store_monitoring_metrics(metrics, 'taxi_duration_model')

    # Check for alerts
    alerter = MonitoringAlerter()
    alerter.process_alerts(metrics)

    print("Monitoring pipeline complete!")

# Run the complete pipeline
if __name__ == "__main__":
    automated_monitoring_pipeline()
Enter fullscreen mode Exit fullscreen mode

Production Deployment: Docker and Scheduling

Docker Setup

Create a Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Run monitoring system
CMD ["python", "monitoring_pipeline.py"]
Enter fullscreen mode Exit fullscreen mode

Create a docker-compose.yml:

version: '3.8'
services:
  db:
    image: postgres:13
    environment:
      POSTGRES_DB: monitoring_db
      POSTGRES_PASSWORD: example
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  monitoring:
    build: .
    environment:
      - DB_HOST=db
      - DB_PASSWORD=example
    depends_on:
      - db
    volumes:
      - ./data:/app/data
      - ./models:/app/models

volumes:
  postgres_data:
Enter fullscreen mode Exit fullscreen mode

Automated Scheduling

import schedule
import time
from datetime import datetime, timedelta

class ProductionMonitoringSystem:
    """Complete production monitoring system"""

    def __init__(self, model_path, reference_data_path, db_config):
        self.model = load(model_path)
        self.reference_data = pd.read_parquet(reference_data_path)
        self.db_config = db_config
        self.alerter = MonitoringAlerter()

    def daily_monitoring(self):
        """Daily monitoring job"""

        print(f"Running daily monitoring - {datetime.now()}")

        try:
            # Get yesterday's data
            yesterday = datetime.now() - timedelta(days=1)
            current_data = self.get_production_data(yesterday)

            if len(current_data) < 100:
                print("Insufficient data for monitoring")
                return

            # Run monitoring
            report = self.run_monitoring(current_data)

            # Extract metrics
            metrics = extract_key_metrics(report)

            # Store results
            store_monitoring_metrics(metrics, 'production_model')

            # Check alerts
            self.alerter.process_alerts(metrics)

            # Save report
            report_name = f"monitoring_report_{yesterday.strftime('%Y%m%d')}.html"
            report.save_html(f"reports/{report_name}")

            print("Daily monitoring complete")

        except Exception as e:
            print(f"Monitoring failed: {e}")
            # Send failure alert
            self.alerter.send_email_alert({
                'type': 'monitoring_failure',
                'severity': 'high', 
                'message': f"Monitoring pipeline failed: {e}",
                'details': {}
            })

    def start_monitoring(self):
        """Start automated monitoring schedule"""

        print("Starting production monitoring system...")

        # Schedule daily monitoring at 9 AM
        schedule.every().day.at("09:00").do(self.daily_monitoring)

        # Schedule weekly summary on Sundays at 10 AM
        schedule.every().sunday.at("10:00").do(self.weekly_summary)

        print("Monitoring scheduled:")
        print("- Daily monitoring: 9:00 AM")
        print("- Weekly summary: Sunday 10:00 AM")

        # Run monitoring loop
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

# Start the production system
if __name__ == "__main__":
    monitoring_system = ProductionMonitoringSystem(
        model_path='models/taxi_duration_model.pkl',
        reference_data_path='data/reference_data.parquet',
        db_config=DB_CONFIG
    )

    # Run once for testing
    monitoring_system.daily_monitoring()

    # Uncomment to start automated monitoring
    # monitoring_system.start_monitoring()
Enter fullscreen mode Exit fullscreen mode

Best Practices and Performance Tips

1. Start Simple, Scale Smart

# Phase 1: Basic monitoring
def basic_monitoring():
    report = Report(metrics=[DatasetDriftMetric()])
    # Simple drift check only

# Phase 2: Add more metrics
def intermediate_monitoring():
    report = Report(metrics=[
        DatasetDriftMetric(),
        DataDriftTable(),
        DriftedColumnsCount()
    ])

# Phase 3: Comprehensive monitoring
def advanced_monitoring():
    report = Report(metrics=[
        DatasetDriftMetric(),
        DataDriftTable(), 
        ValueDrift(column='prediction'),
        RegressionQualityMetric(),  # If regression model
        ColumnQuantileMetric(column='important_feature', quantile=0.95)
    ])
Enter fullscreen mode Exit fullscreen mode

2. Performance Optimization

def efficient_monitoring(large_dataset, reference_data, sample_size=5000):
    """Handle large datasets efficiently"""

    # Sample for performance
    if len(large_dataset) > sample_size:
        sampled_data = large_dataset.sample(n=sample_size, random_state=42)
    else:
        sampled_data = large_dataset

    # Use appropriate metrics
    report = Report(metrics=[
        DatasetDriftMetric(),  # Fast overall check
        # Skip detailed metrics for large datasets
    ])

    return report
Enter fullscreen mode Exit fullscreen mode

3. Adaptive Thresholds

def adaptive_threshold_monitoring(historical_metrics, current_metric):
    """Use historical data to set dynamic thresholds"""

    if len(historical_metrics) < 30:
        return False  # Not enough history

    # Calculate rolling statistics
    mean_metric = np.mean(historical_metrics[-30:])
    std_metric = np.std(historical_metrics[-30:])

    # Dynamic threshold (2-sigma rule)
    threshold = mean_metric + 2 * std_metric

    return current_metric > threshold
Enter fullscreen mode Exit fullscreen mode

Troubleshooting Common Issues

Issue 1: "No module named 'evidently'"

Solution:

pip install evidently
Enter fullscreen mode Exit fullscreen mode

Issue 2: "Memory error with large datasets"

Solution:

# Sample your data first
large_df_sampled = large_df.sample(n=10000, random_state=42)
report.run(reference_data=ref_sample, current_data=large_df_sampled)
Enter fullscreen mode Exit fullscreen mode

Issue 3: "All features show drift"

Problem: Reference and current data are too different

Solution:

# Check your data split
print("Reference period:", reference_data['date'].min(), "to", reference_data['date'].max())
print("Current period:", current_data['date'].min(), "to", current_data['date'].max())

# Ensure reasonable time gaps
# Use overlapping periods for validation
Enter fullscreen mode Exit fullscreen mode

Issue 4: "No drift detected but model performance dropped"

Problem: Monitoring setup might be incomplete

Solution:

# Add model performance monitoring
from evidently.metrics import RegressionQualityMetric

enhanced_report = Report(metrics=[
    DatasetDriftMetric(),
    RegressionQualityMetric(),  # Track model performance
    ValueDrift(column='prediction'),
    ValueDrift(column='target')
])
Enter fullscreen mode Exit fullscreen mode

Next Steps: Scaling Your Monitoring System

Congratulations! You now have a complete ML monitoring system. Here are ways to extend it:

1. Advanced Features

  • Custom metrics for business-specific KPIs
  • A/B testing integration for model comparisons
  • Fairness monitoring for bias detection
  • Explainability tracking with SHAP or LIME

2. Infrastructure Scaling

  • Kubernetes deployment for high availability
  • Apache Kafka for real-time data streaming
  • Apache Airflow for complex monitoring workflows
  • MLflow integration for experiment tracking

3. Advanced Analytics

  • Anomaly detection in monitoring metrics
  • Predictive maintenance for models
  • Root cause analysis for drift sources
  • Automated retraining triggers

Key Takeaways

Evidently makes monitoring accessible - you can start with just a few lines of code

Real-world monitoring requires multiple components - detection, storage, alerting, and response

Start simple and iterate - basic drift detection is better than no monitoring

Automate everything - manual monitoring doesn't scale in production

Monitor what matters - focus on metrics that impact your business

Have a response plan - knowing what to do when alerts fire is crucial

Remember: The goal isn't perfect monitoring - it's effective monitoring that helps you maintain reliable ML systems.


Conclusion

You've built a comprehensive monitoring system that can:

  • Detect various types of drift in real-time
  • Generate professional reports for stakeholders
  • Store historical metrics for trend analysis
  • Send automated alerts when problems occur
  • Scale to production workloads

The most important step is to start monitoring your models today. Even basic drift detection is infinitely better than flying blind in production.

Your monitoring journey doesn't end here - it evolves with your systems, your data, and your understanding of what matters most for your specific use cases.


Top comments (0)