DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

πŸ§ͺ MLOps Zoomcamp Module 6: Chapter 1 - Testing and Quality Assurance

Master the fundamentals of testing ML systems and ensuring code quality in production

Welcome to Chapter 1 of the MLOps Zoomcamp Module 6 Best Practices! This chapter focuses entirely on testing strategies and code quality practices essential for robust machine learning systems. 🎯

πŸ“‹ Table of Contents


🎯 Introduction to Testing in MLOps {#introduction}

Testing in MLOps is more complex than traditional software testing because we deal with:

  • Data dependencies πŸ“Š - Models depend on data quality and distribution
  • Model behavior πŸ€– - Non-deterministic outputs and performance degradation
  • Pipeline complexity πŸ”„ - Multiple interconnected components
  • Infrastructure dependencies ☁️ - Cloud services, databases, and external APIs

πŸ—οΈ Types of Testing in MLOps

  1. Unit Tests πŸ”¬

    • Test individual functions and components
    • Fast execution and isolated testing
    • Example: Testing a feature engineering function
  2. Integration Tests πŸ”—

    • Test how components work together
    • End-to-end pipeline validation
    • Example: Testing the entire prediction pipeline
  3. Model Tests πŸ€–

    • Validate model behavior and performance
    • Test model loading and inference
    • Example: Testing prediction accuracy on known data
  4. Data Tests πŸ“Š

    • Validate data quality and schema
    • Test data preprocessing steps
    • Example: Checking for missing values or outliers

πŸ”¬ Unit Testing with pytest {#unit-testing}

πŸ“¦ Setting Up Your Testing Environment

Step 1: Install pytest

pipenv install --dev pytest
Enter fullscreen mode Exit fullscreen mode

Step 2: Create test directory structure

your-ml-project/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ model.py
β”‚   └── preprocessing.py
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ test_model.py
β”‚   └── test_preprocessing.py
β”œβ”€β”€ data/
└── models/
Enter fullscreen mode Exit fullscreen mode

Step 3: Configure pytest (optional)
Create pytest.ini:

[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -v --tb=short
Enter fullscreen mode Exit fullscreen mode

✨ Writing Effective Unit Tests

🎯 Testing Data Preprocessing

# tests/test_preprocessing.py
import pytest
import pandas as pd
from src.preprocessing import prepare_features, validate_data

def test_prepare_features():
    """Test feature preparation with valid input"""
    # Arrange
    raw_data = {
        'PULocationID': 43,
        'DOLocationID': 215,
        'trip_distance': 14.5,
        'datetime': '2021-01-01 00:15:56'
    }

    # Act
    features = prepare_features(raw_data)

    # Assert
    assert isinstance(features, dict)
    assert 'PULocationID' in features
    assert 'DOLocationID' in features
    assert 'trip_distance' in features
    assert features['trip_distance'] == 14.5

def test_prepare_features_missing_data():
    """Test feature preparation with missing required fields"""
    # Arrange
    incomplete_data = {
        'PULocationID': 43,
        # Missing DOLocationID and trip_distance
    }

    # Act & Assert
    with pytest.raises(KeyError):
        prepare_features(incomplete_data)

def test_validate_data_schema():
    """Test data validation against expected schema"""
    # Arrange
    valid_df = pd.DataFrame({
        'PULocationID': [1, 2, 3],
        'DOLocationID': [10, 20, 30],
        'trip_distance': [1.5, 2.3, 4.1]
    })

    # Act
    is_valid = validate_data(valid_df)

    # Assert
    assert is_valid is True
Enter fullscreen mode Exit fullscreen mode

πŸ€– Testing Model Components

# tests/test_model.py
import pytest
import joblib
from unittest.mock import Mock, patch
from src.model import ModelService, load_model, predict_duration

class TestModelService:
    """Test suite for ModelService class"""

    def setup_method(self):
        """Set up test fixtures before each test method"""
        self.mock_model = Mock()
        self.mock_model.predict.return_value = [25.5]
        self.service = ModelService(self.mock_model)

    def test_predict_single_ride(self):
        """Test prediction for a single ride"""
        # Arrange
        ride_data = {
            'PULocationID': 43,
            'DOLocationID': 215,
            'trip_distance': 14
        }

        # Act
        prediction = self.service.predict(ride_data)

        # Assert
        assert prediction is not None
        assert isinstance(prediction, (int, float))
        self.mock_model.predict.assert_called_once()

    def test_predict_batch_rides(self):
        """Test batch prediction"""
        # Arrange
        self.mock_model.predict.return_value = [25.5, 30.2, 15.8]
        rides_data = [
            {'PULocationID': 43, 'DOLocationID': 215, 'trip_distance': 14},
            {'PULocationID': 50, 'DOLocationID': 100, 'trip_distance': 8},
            {'PULocationID': 25, 'DOLocationID': 150, 'trip_distance': 22}
        ]

        # Act
        predictions = self.service.predict_batch(rides_data)

        # Assert
        assert len(predictions) == 3
        assert all(isinstance(p, (int, float)) for p in predictions)

@patch('src.model.joblib.load')
def test_load_model_success(mock_joblib_load):
    """Test successful model loading"""
    # Arrange
    mock_model = Mock()
    mock_joblib_load.return_value = mock_model
    model_path = '/path/to/model.pkl'

    # Act
    loaded_model = load_model(model_path)

    # Assert
    assert loaded_model == mock_model
    mock_joblib_load.assert_called_once_with(model_path)

def test_load_model_file_not_found():
    """Test model loading with non-existent file"""
    # Arrange
    non_existent_path = '/path/to/non_existent_model.pkl'

    # Act & Assert
    with pytest.raises(FileNotFoundError):
        load_model(non_existent_path)
Enter fullscreen mode Exit fullscreen mode

πŸ”Œ Testing API Handlers

# tests/test_handlers.py
import json
import base64
from src.lambda_handler import lambda_handler

def test_lambda_handler_valid_request():
    """Test Lambda handler with valid input"""
    # Arrange
    ride_data = {
        'ride': {
            'PULocationID': 130,
            'DOLocationID': 205,
            'trip_distance': 3.66
        },
        'ride_id': 156
    }

    encoded_data = base64.b64encode(
        json.dumps(ride_data).encode('utf-8')
    ).decode('utf-8')

    event = {
        'Records': [{
            'kinesis': {
                'data': encoded_data
            }
        }]
    }

    # Act
    response = lambda_handler(event, None)

    # Assert
    assert response['statusCode'] == 200
    response_body = json.loads(response['body'])
    assert 'predictions' in response_body
    assert len(response_body['predictions']) > 0

def test_lambda_handler_malformed_data():
    """Test Lambda handler with malformed input"""
    # Arrange
    malformed_event = {
        'Records': [{
            'kinesis': {
                'data': 'invalid_base64_data'
            }
        }]
    }

    # Act
    response = lambda_handler(malformed_event, None)

    # Assert
    assert response['statusCode'] == 400
    assert 'error' in json.loads(response['body'])
Enter fullscreen mode Exit fullscreen mode

πŸ† Unit Testing Best Practices

βœ… The AAA Pattern (Arrange, Act, Assert)

def test_feature_engineering():
    # Arrange - Set up test data and expected results
    input_data = {'trip_distance': 10.5, 'duration': 25}
    expected_speed = 25.2  # 10.5 miles / (25/60) hours

    # Act - Execute the function under test
    result = calculate_speed(input_data)

    # Assert - Verify the results
    assert abs(result - expected_speed) < 0.1
Enter fullscreen mode Exit fullscreen mode

🎭 Using Mocks Effectively

from unittest.mock import Mock, patch, MagicMock

class TestExternalDependencies:

    @patch('src.model.requests.get')
    def test_model_download_success(self, mock_get):
        """Test successful model download from external service"""
        # Arrange
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.content = b'fake_model_data'
        mock_get.return_value = mock_response

        # Act
        result = download_model_from_url('http://example.com/model.pkl')

        # Assert
        assert result == b'fake_model_data'
        mock_get.assert_called_once_with('http://example.com/model.pkl')

    @patch('src.database.connect')
    def test_save_predictions_db_error(self, mock_connect):
        """Test handling of database connection errors"""
        # Arrange
        mock_connect.side_effect = ConnectionError("Database unavailable")
        predictions = [{'ride_id': 1, 'duration': 25.5}]

        # Act & Assert
        with pytest.raises(ConnectionError):
            save_predictions_to_db(predictions)
Enter fullscreen mode Exit fullscreen mode

πŸ”§ Fixtures for Reusable Test Data

# conftest.py
import pytest

@pytest.fixture
def sample_ride_data():
    """Fixture providing sample ride data for tests"""
    return {
        'PULocationID': 43,
        'DOLocationID': 215,
        'trip_distance': 14.2,
        'datetime': '2021-01-01 00:15:56'
    }

@pytest.fixture
def trained_model():
    """Fixture providing a mock trained model"""
    model = Mock()
    model.predict.return_value = [25.5]
    return model

# tests/test_with_fixtures.py
def test_prediction_with_fixtures(sample_ride_data, trained_model):
    """Test using fixtures for common test data"""
    service = ModelService(trained_model)
    prediction = service.predict(sample_ride_data)

    assert prediction is not None
    assert isinstance(prediction, (int, float))
Enter fullscreen mode Exit fullscreen mode

πŸ§ͺ Parameterized Testing

import pytest

@pytest.mark.parametrize("distance,expected_category", [
    (0.5, "short"),
    (5.0, "medium"), 
    (15.0, "long"),
    (50.0, "very_long")
])
def test_trip_categorization(distance, expected_category):
    """Test trip distance categorization with multiple inputs"""
    result = categorize_trip_distance(distance)
    assert result == expected_category

@pytest.mark.parametrize("invalid_input", [
    None,
    -5.0,
    "not_a_number",
    []
])
def test_trip_categorization_invalid_inputs(invalid_input):
    """Test trip categorization with invalid inputs"""
    with pytest.raises((ValueError, TypeError)):
        categorize_trip_distance(invalid_input)
Enter fullscreen mode Exit fullscreen mode

πŸ”— Integration Testing {#integration-testing}

Integration testing ensures that different components of your ML system work correctly together. This is crucial for catching issues that might not appear in isolated unit tests.

🐳 Docker-Based Integration Testing

Setting Up Docker for Testing

Step 1: Create a test Dockerfile

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY src/ ./src/
COPY tests/ ./tests/
COPY models/ ./models/

CMD ["python", "-m", "pytest", "tests/integration/"]
Enter fullscreen mode Exit fullscreen mode

Step 2: Docker Compose for test environment

# docker-compose.test.yml
version: '3.8'

services:
  ml-service:
    build: .
    ports:
      - "8080:8080"
    environment:
      - MODEL_LOCATION=/app/models
      - TEST_RUN=true
    volumes:
      - ./models:/app/models
      - ./test-data:/app/test-data
    depends_on:
      - redis-cache
      - test-db

  redis-cache:
    image: redis:6-alpine
    ports:
      - "6379:6379"

  test-db:
    image: postgres:13-alpine
    environment:
      POSTGRES_DB: test_mlops
      POSTGRES_USER: test_user
      POSTGRES_PASSWORD: test_pass
    ports:
      - "5432:5432"

  localstack:
    image: localstack/localstack:latest
    ports:
      - "4566:4566"
    environment:
      - SERVICES=s3,kinesis,lambda
      - DEBUG=1
Enter fullscreen mode Exit fullscreen mode

πŸ§ͺ Integration Test Examples

# tests/integration/test_end_to_end.py
import requests
import time
import json
import pytest
from deepdiff import DeepDiff

class TestEndToEndPipeline:

    @classmethod
    def setup_class(cls):
        """Set up test environment before running tests"""
        # Wait for services to be ready
        cls.wait_for_service("http://localhost:8080/health", timeout=60)
        cls.service_url = "http://localhost:8080"

    @staticmethod
    def wait_for_service(url, timeout=30):
        """Wait for a service to become available"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                response = requests.get(url)
                if response.status_code == 200:
                    return True
            except requests.exceptions.ConnectionError:
                pass
            time.sleep(1)
        raise TimeoutError(f"Service at {url} did not become available")

    def test_health_endpoint(self):
        """Test that the service health endpoint works"""
        response = requests.get(f"{self.service_url}/health")

        assert response.status_code == 200
        health_data = response.json()
        assert health_data['status'] == 'healthy'
        assert 'model_loaded' in health_data['checks']
        assert health_data['checks']['model_loaded'] is True

    def test_single_prediction(self):
        """Test single ride prediction through the API"""
        # Arrange
        ride_data = {
            'PULocationID': 43,
            'DOLocationID': 215,
            'trip_distance': 14.2,
            'datetime': '2021-01-01 00:15:56'
        }

        expected_response = {
            'ride_id': pytest.approx(12345, abs=1),
            'predicted_duration': pytest.approx(25.5, abs=2.0),
            'model_version': '1.0.0'
        }

        # Act
        response = requests.post(
            f"{self.service_url}/predict",
            json=ride_data
        )

        # Assert
        assert response.status_code == 200
        actual_response = response.json()

        # Use DeepDiff for detailed comparison
        diff = DeepDiff(expected_response, actual_response, ignore_order=True)
        assert not diff, f"Response differs from expected: {diff}"

    def test_batch_prediction(self):
        """Test batch prediction functionality"""
        # Arrange
        batch_data = {
            'rides': [
                {
                    'ride_id': 1,
                    'PULocationID': 43,
                    'DOLocationID': 215,
                    'trip_distance': 14.2
                },
                {
                    'ride_id': 2, 
                    'PULocationID': 68,
                    'DOLocationID': 170,
                    'trip_distance': 8.5
                }
            ]
        }

        # Act
        response = requests.post(
            f"{self.service_url}/predict/batch",
            json=batch_data
        )

        # Assert
        assert response.status_code == 200
        result = response.json()
        assert 'predictions' in result
        assert len(result['predictions']) == 2

        # Verify each prediction has required fields
        for prediction in result['predictions']:
            assert 'ride_id' in prediction
            assert 'predicted_duration' in prediction
            assert isinstance(prediction['predicted_duration'], (int, float))

    def test_prediction_caching(self):
        """Test that identical requests are cached"""
        ride_data = {
            'PULocationID': 43, 
            'DOLocationID': 215,
            'trip_distance': 14.2
        }

        # First request
        start_time = time.time()
        response1 = requests.post(f"{self.service_url}/predict", json=ride_data)
        first_duration = time.time() - start_time

        # Second identical request (should be cached)
        start_time = time.time()
        response2 = requests.post(f"{self.service_url}/predict", json=ride_data)
        second_duration = time.time() - start_time

        # Assertions
        assert response1.status_code == 200
        assert response2.status_code == 200
        assert response1.json() == response2.json()

        # Second request should be significantly faster (cached)
        assert second_duration < first_duration * 0.5
Enter fullscreen mode Exit fullscreen mode

☁️ Testing Cloud Service Integrations

AWS Services with LocalStack

# tests/integration/test_aws_integration.py
import boto3
import json
import base64
from moto import mock_kinesis, mock_s3

class TestAWSIntegration:

    def setup_method(self):
        """Set up AWS clients for LocalStack"""
        self.kinesis_client = boto3.client(
            'kinesis',
            endpoint_url='http://localhost:4566',
            region_name='us-east-1',
            aws_access_key_id='test',
            aws_secret_access_key='test'
        )

        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:4566',
            region_name='us-east-1',
            aws_access_key_id='test',
            aws_secret_access_key='test'
        )

    def test_kinesis_stream_integration(self):
        """Test Kinesis stream creation and data flow"""
        stream_name = 'ride-predictions-test'

        # Create stream
        self.kinesis_client.create_stream(
            StreamName=stream_name,
            ShardCount=1
        )

        # Wait for stream to be active
        waiter = self.kinesis_client.get_waiter('stream_exists')
        waiter.wait(StreamName=stream_name)

        # Put record
        test_data = {
            'ride_id': 123,
            'predicted_duration': 25.5,
            'timestamp': '2021-01-01T00:15:56Z'
        }

        response = self.kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(test_data),
            PartitionKey='test-partition'
        )

        assert 'SequenceNumber' in response
        assert response['ResponseMetadata']['HTTPStatusCode'] == 200

    def test_s3_model_storage(self):
        """Test model storage and retrieval from S3"""
        bucket_name = 'test-models-bucket'
        model_key = 'models/v1/model.pkl'

        # Create bucket
        self.s3_client.create_bucket(Bucket=bucket_name)

        # Upload test model file
        test_model_data = b'fake_model_pickle_data'
        self.s3_client.put_object(
            Bucket=bucket_name,
            Key=model_key,
            Body=test_model_data
        )

        # Download and verify
        response = self.s3_client.get_object(
            Bucket=bucket_name,
            Key=model_key
        )

        downloaded_data = response['Body'].read()
        assert downloaded_data == test_model_data
Enter fullscreen mode Exit fullscreen mode

Google Cloud Functions Testing

# tests/integration/test_gcp_functions.py
import subprocess
import requests
import json
import base64
import time
from pathlib import Path

class TestGCPFunctions:

    def setup_method(self):
        """Start Functions Framework for testing"""
        self.port = 8888
        self.base_url = f'http://localhost:{self.port}'

        # Start functions framework
        current_path = Path(__file__).parent
        project_root = current_path.parent.parent

        self.process = subprocess.Popen([
            'functions-framework',
            '--target', 'predict_duration',
            '--signature-type', 'event',
            '--port', str(self.port)
        ], cwd=project_root)

        # Wait for function to be ready
        self._wait_for_function_ready()

    def teardown_method(self):
        """Clean up after tests"""
        if hasattr(self, 'process'):
            self.process.terminate()
            self.process.wait()

    def _wait_for_function_ready(self, timeout=30):
        """Wait for the function to be ready to receive requests"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                # Try a health check or simple request
                response = requests.get(self.base_url)
                if response.status_code in [200, 405]:  # 405 is OK for wrong method
                    return True
            except requests.exceptions.ConnectionError:
                pass
            time.sleep(1)
        raise TimeoutError("Function did not become ready in time")

    def test_pubsub_function_processing(self):
        """Test Cloud Function with PubSub event"""
        # Arrange
        ride_data = {
            'ride': {
                'PULocationID': 43,
                'DOLocationID': 215,
                'trip_distance': 14.2
            },
            'ride_id': 12345
        }

        # Encode data as base64 (PubSub format)
        ride_json = json.dumps(ride_data)
        encoded_data = base64.b64encode(ride_json.encode('utf-8')).decode('utf-8')

        # Create PubSub message format
        pubsub_event = {
            'data': {'data': encoded_data},
            'attributes': {
                'timestamp': '2021-01-01T00:15:56Z'
            }
        }

        # Act
        response = requests.post(self.base_url, json=pubsub_event)

        # Assert
        assert response.status_code == 200

        # Check logs for expected output
        time.sleep(1)  # Allow time for processing
        self.process.poll()

        # You would typically check logs or output streams here
        # For this example, we're just verifying the function responds
Enter fullscreen mode Exit fullscreen mode

βœ… Code Quality and Formatting {#code-quality}

Code quality tools help maintain consistent, readable, and error-free code across your ML project.

🎨 Automated Code Formatting

Black: The Uncompromising Code Formatter

# Install Black
pipenv install --dev black

# Format all Python files
black .

# Check what would be changed without making changes
black --diff .

# Format specific files
black src/model.py tests/test_model.py
Enter fullscreen mode Exit fullscreen mode

Black Configuration (pyproject.toml):

[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
  # directories
  \.eggs
  | \.git
  | \.hg
  | \.mypy_cache
  | \.tox
  | \.venv
  | build
  | dist
)/
'''
Enter fullscreen mode Exit fullscreen mode

isort: Import Statement Organizer

# Install isort
pipenv install --dev isort

# Sort imports in all files
isort .

# Check what would be changed
isort --diff .

# Configure isort to work with Black
Enter fullscreen mode Exit fullscreen mode

isort Configuration (.isort.cfg):

[settings]
profile = black
multi_line_output = 3
line_length = 88
known_first_party = src
known_third_party = pandas,numpy,sklearn,mlflow
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
Enter fullscreen mode Exit fullscreen mode

πŸ” Static Code Analysis

pylint: Code Quality Checker

# Install pylint
pipenv install --dev pylint

# Analyze a specific file
pylint src/model.py

# Analyze all Python files
pylint **/*.py

# Generate a detailed report
pylint --output-format=json src/ > pylint_report.json
Enter fullscreen mode Exit fullscreen mode

pylint Configuration (.pylintrc):

[MASTER]
init-hook='import sys; sys.path.append("src")'

[MESSAGES CONTROL]
disable=C0114,  # missing-module-docstring
        C0115,  # missing-class-docstring
        C0116,  # missing-function-docstring
        R0903,  # too-few-public-methods
        R0913   # too-many-arguments

[FORMAT]
max-line-length=88

[DESIGN]
max-args=7
max-locals=15
max-branches=12
Enter fullscreen mode Exit fullscreen mode

Handling pylint in Code:

# Disable specific warnings for a code block
# pylint: disable=import-error,wrong-import-position
import sys
import os
sys.path.append(os.path.dirname(__file__))
from .model import MLModel
# pylint: enable=import-error,wrong-import-position

class ModelService:
    """ML Model service for predictions"""

    def __init__(self, model_path):
        # Disable warning for this specific line
        self.model = self._load_model(model_path)  # pylint: disable=no-member

    def _load_model(self, path):
        """Load model from path"""
        # Implementation here
        pass
Enter fullscreen mode Exit fullscreen mode

mypy: Static Type Checking

# Install mypy
pipenv install --dev mypy

# Type check your code
mypy src/

# Generate HTML report
mypy --html-report mypy_report src/
Enter fullscreen mode Exit fullscreen mode

Type Annotations Example:

from typing import Dict, List, Optional, Union
import pandas as pd
import numpy as np

class ModelService:
    """Type-annotated model service"""

    def __init__(self, model_path: str) -> None:
        self.model_path = model_path
        self.model: Optional[object] = None

    def predict(self, features: Dict[str, Union[int, float, str]]) -> float:
        """Make a prediction for a single ride"""
        if self.model is None:
            raise ValueError("Model not loaded")

        # Process features and make prediction
        processed_features = self._process_features(features)
        prediction = self.model.predict([processed_features])[0]

        return float(prediction)

    def predict_batch(self, 
                     features_list: List[Dict[str, Union[int, float, str]]]
                     ) -> List[float]:
        """Make predictions for multiple rides"""
        predictions = []
        for features in features_list:
            prediction = self.predict(features)
            predictions.append(prediction)

        return predictions

    def _process_features(self, 
                         features: Dict[str, Union[int, float, str]]
                         ) -> np.ndarray:
        """Process raw features into model input format"""
        # Implementation here
        return np.array([1, 2, 3])  # Placeholder
Enter fullscreen mode Exit fullscreen mode

πŸ›‘οΈ Security Scanning

bandit: Security Issue Detection

# Install bandit
pipenv install --dev bandit

# Scan for security issues
bandit -r src/

# Generate JSON report
bandit -r src/ -f json -o security_report.json
Enter fullscreen mode Exit fullscreen mode

safety: Dependency Vulnerability Scanner

# Install safety
pipenv install --dev safety

# Check for known security vulnerabilities
safety check

# Check specific requirements file
safety check --file requirements.txt
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Testing ML-Specific Components {#ml-testing}

Machine learning systems have unique testing requirements beyond traditional software testing.

πŸ€– Model Behavior Testing

Testing Model Invariants

# tests/test_model_behavior.py
import pytest
import numpy as np
from src.model import MLModel

class TestModelBehavior:

    @pytest.fixture
    def trained_model(self):
        """Load a trained model for testing"""
        model = MLModel()
        model.load('/path/to/trained/model.pkl')
        return model

    def test_model_output_range(self, trained_model):
        """Test that model outputs are within expected range"""
        # Test with various inputs
        test_inputs = [
            {'trip_distance': 1.0, 'PULocationID': 43, 'DOLocationID': 215},
            {'trip_distance': 10.0, 'PULocationID': 100, 'DOLocationID': 50},
            {'trip_distance': 25.0, 'PULocationID': 200, 'DOLocationID': 150}
        ]

        for input_data in test_inputs:
            prediction = trained_model.predict(input_data)

            # Duration should be reasonable (between 1 minute and 2 hours)
            assert 1.0 <= prediction <= 120.0, f"Prediction {prediction} out of range"

    def test_model_monotonicity(self, trained_model):
        """Test that longer trips generally take more time"""
        base_input = {'PULocationID': 43, 'DOLocationID': 215}

        distances = [1.0, 5.0, 10.0, 20.0]
        predictions = []

        for distance in distances:
            input_data = {**base_input, 'trip_distance': distance}
            prediction = trained_model.predict(input_data)
            predictions.append(prediction)

        # Check that predictions generally increase with distance
        # Allow some flexibility for noise
        increasing_pairs = sum(1 for i in range(len(predictions)-1) 
                              if predictions[i+1] >= predictions[i])

        # At least 70% of pairs should be increasing
        assert increasing_pairs >= len(distances) * 0.7

    def test_model_consistency(self, trained_model):
        """Test that identical inputs produce identical outputs"""
        input_data = {
            'trip_distance': 5.0,
            'PULocationID': 43,
            'DOLocationID': 215,
            'day_of_week': 1,
            'hour_of_day': 14
        }

        # Make multiple predictions with same input
        predictions = [trained_model.predict(input_data) for _ in range(5)]

        # All predictions should be identical
        assert all(p == predictions[0] for p in predictions)

    def test_model_robustness_to_outliers(self, trained_model):
        """Test model behavior with extreme/outlier inputs"""
        outlier_inputs = [
            {'trip_distance': 0.1, 'PULocationID': 1, 'DOLocationID': 2},   # Very short trip
            {'trip_distance': 100.0, 'PULocationID': 1, 'DOLocationID': 263}, # Very long trip
        ]

        for input_data in outlier_inputs:
            prediction = trained_model.predict(input_data)

            # Model should still produce reasonable outputs
            assert 0.5 <= prediction <= 300.0  # Extended range for outliers
            assert not np.isnan(prediction)
            assert not np.isinf(prediction)
Enter fullscreen mode Exit fullscreen mode

Performance Testing

# tests/test_model_performance.py
import time
import pytest
import pandas as pd
from src.model import MLModel

class TestModelPerformance:

    @pytest.fixture
    def performance_test_data(self):
        """Generate test data for performance testing"""
        np.random.seed(42)
        n_samples = 1000

        data = pd.DataFrame({
            'trip_distance': np.random.exponential(5, n_samples),
            'PULocationID': np.random.randint(1, 264, n_samples),
            'DOLocationID': np.random.randint(1, 264, n_samples),
            'day_of_week': np.random.randint(0, 7, n_samples),
            'hour_of_day': np.random.randint(0, 24, n_samples)
        })

        return data

    def test_single_prediction_latency(self, trained_model):
        """Test latency for single predictions"""
        input_data = {
            'trip_distance': 5.0,
            'PULocationID': 43,
            'DOLocationID': 215
        }

        # Warm up
        trained_model.predict(input_data)

        # Measure latency
        start_time = time.time()
        prediction = trained_model.predict(input_data)
        latency = time.time() - start_time

        # Should predict within 100ms
        assert latency < 0.1, f"Prediction took {latency:.3f}s, expected < 0.1s"
        assert prediction is not None

    def test_batch_prediction_throughput(self, trained_model, performance_test_data):
        """Test throughput for batch predictions"""
        batch_size = 100
        test_batch = performance_test_data.head(batch_size).to_dict('records')

        # Warm up
        trained_model.predict_batch(test_batch[:10])

        # Measure throughput
        start_time = time.time()
        predictions = trained_model.predict_batch(test_batch)
        total_time = time.time() - start_time

        throughput = len(predictions) / total_time

        # Should process at least 500 predictions per second
        assert throughput >= 500, f"Throughput {throughput:.1f} preds/s, expected >= 500"
        assert len(predictions) == batch_size

    def test_memory_usage(self, trained_model, performance_test_data):
        """Test memory usage during batch processing"""
        import psutil
        import os

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        # Process large batch
        large_batch = performance_test_data.to_dict('records')
        predictions = trained_model.predict_batch(large_batch)

        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_increase = final_memory - initial_memory

        # Memory increase should be reasonable (< 100MB for 1000 predictions)
        assert memory_increase < 100, f"Memory increased by {memory_increase:.1f}MB"
        assert len(predictions) == len(large_batch)
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Data Quality Testing

# tests/test_data_quality.py
import pandas as pd
import pytest
from src.data_validation import validate_input_data, check_data_drift

class TestDataQuality:

    @pytest.fixture
    def reference_data(self):
        """Reference data for comparison"""
        return pd.DataFrame({
            'trip_distance': [1.2, 3.5, 7.8, 2.1, 5.0],
            'PULocationID': [43, 100, 215, 68, 170],
            'DOLocationID': [215, 50, 150, 100, 200],
            'fare_amount': [8.5, 12.0, 25.5, 9.8, 18.0]
        })

    def test_data_schema_validation(self):
        """Test that input data has correct schema"""
        # Valid data
        valid_data = pd.DataFrame({
            'trip_distance': [5.0, 3.2],
            'PULocationID': [43, 100],
            'DOLocationID': [215, 50],
            'fare_amount': [18.0, 12.5]
        })

        assert validate_input_data(valid_data) is True

        # Invalid data - missing column
        invalid_data = pd.DataFrame({
            'trip_distance': [5.0, 3.2],
            'PULocationID': [43, 100],
            # Missing DOLocationID and fare_amount
        })

        assert validate_input_data(invalid_data) is False

    def test_data_range_validation(self):
        """Test data values are within expected ranges"""
        # Out of range data
        invalid_data = pd.DataFrame({
            'trip_distance': [-1.0, 1000.0],  # Negative and extremely large
            'PULocationID': [0, 500],  # Outside valid location range
            'DOLocationID': [215, 50],
            'fare_amount': [-5.0, 10.0]  # Negative fare
        })

        validation_results = validate_input_data(invalid_data, check_ranges=True)
        assert validation_results is False

    def test_data_drift_detection(self, reference_data):
        """Test detection of data drift"""
        # Similar data (no drift)
        similar_data = pd.DataFrame({
            'trip_distance': [1.5, 3.0, 8.0, 2.5, 4.8],
            'PULocationID': [50, 95, 220, 70, 165],
            'DOLocationID': [200, 55, 145, 105, 195],
            'fare_amount': [9.0, 11.5, 26.0, 10.0, 17.5]
        })

        drift_score = check_data_drift(reference_data, similar_data)
        assert drift_score < 0.1  # Low drift score

        # Drifted data
        drifted_data = pd.DataFrame({
            'trip_distance': [20.0, 25.0, 30.0, 18.0, 22.0],  # Much longer trips
            'PULocationID': [1, 2, 3, 4, 5],  # Different location pattern
            'DOLocationID': [260, 261, 262, 263, 1],
            'fare_amount': [50.0, 60.0, 70.0, 45.0, 55.0]  # Much higher fares
        })

        drift_score = check_data_drift(reference_data, drifted_data)
        assert drift_score > 0.3  # High drift score

    def test_missing_value_handling(self):
        """Test handling of missing values"""
        data_with_nulls = pd.DataFrame({
            'trip_distance': [5.0, None, 3.2],
            'PULocationID': [43, 100, None],
            'DOLocationID': [215, None, 50],
            'fare_amount': [18.0, 12.5, 15.0]
        })

        # Should detect missing values
        validation_result = validate_input_data(data_with_nulls, allow_nulls=False)
        assert validation_result is False

        # Should allow missing values when configured
        validation_result = validate_input_data(data_with_nulls, allow_nulls=True)
        assert validation_result is True
Enter fullscreen mode Exit fullscreen mode

πŸ” Advanced Testing Strategies {#advanced-testing}

🎭 Testing with Mock Services

# tests/test_external_services.py
from unittest.mock import Mock, patch, MagicMock
import pytest
import requests
from src.external_services import ModelRepository, PredictionLogger

class TestExternalServices:

    @patch('src.external_services.requests.get')
    def test_model_download_retry_logic(self, mock_get):
        """Test retry logic for model downloads"""
        # Simulate intermittent failures
        mock_get.side_effect = [
            requests.exceptions.ConnectionError("Network error"),
            requests.exceptions.Timeout("Request timeout"),
            Mock(status_code=200, content=b'model_data')  # Success on 3rd try
        ]

        repo = ModelRepository("http://model-store.com")
        model_data = repo.download_model("model_v1.pkl")

        assert model_data == b'model_data'
        assert mock_get.call_count == 3

    @patch('src.external_services.boto3.client')
    def test_prediction_logging_s3_failure(self, mock_boto_client):
        """Test handling of S3 logging failures"""
        # Mock S3 client that fails
        mock_s3 = Mock()
        mock_s3.put_object.side_effect = Exception("S3 Error")
        mock_boto_client.return_value = mock_s3

        logger = PredictionLogger()

        # Should not raise exception even if S3 fails
        result = logger.log_prediction("ride_123", 25.5, {"model": "v1"})

        # Should indicate failure but not crash
        assert result['success'] is False
        assert 'error' in result

    @patch('src.external_services.redis.Redis')
    def test_cache_fallback_behavior(self, mock_redis):
        """Test behavior when cache is unavailable"""
        # Mock Redis that fails to connect
        mock_redis_instance = Mock()
        mock_redis_instance.get.side_effect = Exception("Redis connection failed")
        mock_redis.return_value = mock_redis_instance

        from src.model_service import CachedModelService

        service = CachedModelService()

        # Should fall back to direct computation
        result = service.predict_with_cache("ride_data_key", {"distance": 5.0})

        assert result is not None
        # Should have attempted cache but continued without it
        mock_redis_instance.get.assert_called_once()
Enter fullscreen mode Exit fullscreen mode

πŸ“ˆ Property-Based Testing

# tests/test_property_based.py
from hypothesis import given, strategies as st, assume
import pytest
from src.feature_engineering import calculate_speed, normalize_coordinates

class TestPropertyBased:

    @given(
        distance=st.floats(min_value=0.1, max_value=100.0),
        duration=st.floats(min_value=0.1, max_value=300.0)
    )
    def test_speed_calculation_properties(self, distance, duration):
        """Test properties of speed calculation"""
        speed = calculate_speed(distance, duration)

        # Speed should always be positive
        assert speed > 0

        # Speed should be proportional to distance
        double_distance_speed = calculate_speed(distance * 2, duration)
        assert double_distance_speed > speed

        # Speed should be inversely proportional to duration
        double_duration_speed = calculate_speed(distance, duration * 2)
        assert double_duration_speed < speed

    @given(
        lat=st.floats(min_value=-90.0, max_value=90.0),
        lon=st.floats(min_value=-180.0, max_value=180.0)
    )
    def test_coordinate_normalization_properties(self, lat, lon):
        """Test properties of coordinate normalization"""
        normalized_lat, normalized_lon = normalize_coordinates(lat, lon)

        # Normalized coordinates should be in [0, 1] range
        assert 0.0 <= normalized_lat <= 1.0
        assert 0.0 <= normalized_lon <= 1.0

        # Function should be deterministic
        norm_lat2, norm_lon2 = normalize_coordinates(lat, lon)
        assert normalized_lat == norm_lat2
        assert normalized_lon == norm_lon2

    @given(
        features=st.dictionaries(
            keys=st.sampled_from(['trip_distance', 'PULocationID', 'DOLocationID']),
            values=st.floats(min_value=0.1, max_value=100.0),
            min_size=3,
            max_size=3
        )
    )
    def test_feature_processing_invariants(self, features):
        """Test that feature processing maintains invariants"""
        from src.preprocessing import process_ride_features

        processed = process_ride_features(features)

        # Output should always be a dictionary
        assert isinstance(processed, dict)

        # Should contain all required features
        required_features = ['trip_distance_normalized', 'pickup_zone', 'dropoff_zone']
        for feature in required_features:
            assert feature in processed

        # Normalized distance should be in reasonable range
        assert 0.0 <= processed['trip_distance_normalized'] <= 1.0
Enter fullscreen mode Exit fullscreen mode

πŸ§ͺ Mutation Testing

Mutation testing helps evaluate the quality of your tests by introducing small changes (mutations) to your code and checking if your tests catch them.

# Install mutmut for mutation testing
pipenv install --dev mutmut

# Run mutation testing
mutmut run

# Show results
mutmut results

# Show specific mutations
mutmut show 1
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Test Coverage Analysis

# Install coverage tools
pipenv install --dev coverage pytest-cov

# Run tests with coverage
pytest --cov=src --cov-report=html --cov-report=term

# Generate detailed HTML report
coverage html

# Set coverage thresholds in pytest.ini
Enter fullscreen mode Exit fullscreen mode

pytest.ini with coverage configuration:

[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = 
    -v 
    --tb=short
    --cov=src
    --cov-report=html
    --cov-report=term-missing
    --cov-fail-under=80
Enter fullscreen mode Exit fullscreen mode

🎯 Summary

Congratulations! πŸŽ‰ You've completed Chapter 1 of the MLOps Best Practices guide. You now have a solid foundation in:

βœ… Key Takeaways

  • Testing Pyramid πŸ”Ί - Unit tests form the base, integration tests in the middle, end-to-end tests at the top
  • ML-Specific Testing πŸ€– - Model behavior, performance, and data quality testing
  • Code Quality ✨ - Automated formatting, linting, and static analysis
  • Test Automation πŸ”„ - Fixtures, parametrization, and property-based testing

πŸ› οΈ Tools Mastered

  • pytest for comprehensive testing
  • Black and isort for code formatting
  • pylint and mypy for code quality
  • Docker for integration testing
  • LocalStack for cloud service testing

🎯 Best Practices Applied

  • βœ… Write tests first (TDD approach)
  • βœ… Use mocks to isolate dependencies
  • βœ… Test edge cases and error conditions
  • βœ… Maintain high test coverage (>80%)
  • βœ… Automate quality checks in your workflow

πŸ”œ What's Next?

In Chapter 2, we'll explore automation and deployment practices including:

  • Pre-commit hooks and CI/CD pipelines
  • Makefiles and automation scripts
  • Docker deployment and orchestration
  • Monitoring and logging strategies

mlopszoomcamp

Top comments (0)