Master the fundamentals of testing ML systems and ensuring code quality in production
Welcome to Chapter 1 of the MLOps Zoomcamp Module 6 Best Practices! This chapter focuses entirely on testing strategies and code quality practices essential for robust machine learning systems. π―
π Table of Contents
- π― Introduction to Testing in MLOps
- π¬ Unit Testing with pytest
- π Integration Testing
- β Code Quality and Formatting
- π Testing ML-Specific Components
- π Advanced Testing Strategies
π― Introduction to Testing in MLOps {#introduction}
Testing in MLOps is more complex than traditional software testing because we deal with:
- Data dependencies π - Models depend on data quality and distribution
- Model behavior π€ - Non-deterministic outputs and performance degradation
- Pipeline complexity π - Multiple interconnected components
- Infrastructure dependencies βοΈ - Cloud services, databases, and external APIs
ποΈ Types of Testing in MLOps
-
Unit Tests π¬
- Test individual functions and components
- Fast execution and isolated testing
- Example: Testing a feature engineering function
-
Integration Tests π
- Test how components work together
- End-to-end pipeline validation
- Example: Testing the entire prediction pipeline
-
Model Tests π€
- Validate model behavior and performance
- Test model loading and inference
- Example: Testing prediction accuracy on known data
-
Data Tests π
- Validate data quality and schema
- Test data preprocessing steps
- Example: Checking for missing values or outliers
π¬ Unit Testing with pytest {#unit-testing}
π¦ Setting Up Your Testing Environment
Step 1: Install pytest
pipenv install --dev pytest
Step 2: Create test directory structure
your-ml-project/
βββ src/
β βββ __init__.py
β βββ model.py
β βββ preprocessing.py
βββ tests/
β βββ __init__.py
β βββ test_model.py
β βββ test_preprocessing.py
βββ data/
βββ models/
Step 3: Configure pytest (optional)
Create pytest.ini
:
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -v --tb=short
β¨ Writing Effective Unit Tests
π― Testing Data Preprocessing
# tests/test_preprocessing.py
import pytest
import pandas as pd
from src.preprocessing import prepare_features, validate_data
def test_prepare_features():
"""Test feature preparation with valid input"""
# Arrange
raw_data = {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.5,
'datetime': '2021-01-01 00:15:56'
}
# Act
features = prepare_features(raw_data)
# Assert
assert isinstance(features, dict)
assert 'PULocationID' in features
assert 'DOLocationID' in features
assert 'trip_distance' in features
assert features['trip_distance'] == 14.5
def test_prepare_features_missing_data():
"""Test feature preparation with missing required fields"""
# Arrange
incomplete_data = {
'PULocationID': 43,
# Missing DOLocationID and trip_distance
}
# Act & Assert
with pytest.raises(KeyError):
prepare_features(incomplete_data)
def test_validate_data_schema():
"""Test data validation against expected schema"""
# Arrange
valid_df = pd.DataFrame({
'PULocationID': [1, 2, 3],
'DOLocationID': [10, 20, 30],
'trip_distance': [1.5, 2.3, 4.1]
})
# Act
is_valid = validate_data(valid_df)
# Assert
assert is_valid is True
π€ Testing Model Components
# tests/test_model.py
import pytest
import joblib
from unittest.mock import Mock, patch
from src.model import ModelService, load_model, predict_duration
class TestModelService:
"""Test suite for ModelService class"""
def setup_method(self):
"""Set up test fixtures before each test method"""
self.mock_model = Mock()
self.mock_model.predict.return_value = [25.5]
self.service = ModelService(self.mock_model)
def test_predict_single_ride(self):
"""Test prediction for a single ride"""
# Arrange
ride_data = {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14
}
# Act
prediction = self.service.predict(ride_data)
# Assert
assert prediction is not None
assert isinstance(prediction, (int, float))
self.mock_model.predict.assert_called_once()
def test_predict_batch_rides(self):
"""Test batch prediction"""
# Arrange
self.mock_model.predict.return_value = [25.5, 30.2, 15.8]
rides_data = [
{'PULocationID': 43, 'DOLocationID': 215, 'trip_distance': 14},
{'PULocationID': 50, 'DOLocationID': 100, 'trip_distance': 8},
{'PULocationID': 25, 'DOLocationID': 150, 'trip_distance': 22}
]
# Act
predictions = self.service.predict_batch(rides_data)
# Assert
assert len(predictions) == 3
assert all(isinstance(p, (int, float)) for p in predictions)
@patch('src.model.joblib.load')
def test_load_model_success(mock_joblib_load):
"""Test successful model loading"""
# Arrange
mock_model = Mock()
mock_joblib_load.return_value = mock_model
model_path = '/path/to/model.pkl'
# Act
loaded_model = load_model(model_path)
# Assert
assert loaded_model == mock_model
mock_joblib_load.assert_called_once_with(model_path)
def test_load_model_file_not_found():
"""Test model loading with non-existent file"""
# Arrange
non_existent_path = '/path/to/non_existent_model.pkl'
# Act & Assert
with pytest.raises(FileNotFoundError):
load_model(non_existent_path)
π Testing API Handlers
# tests/test_handlers.py
import json
import base64
from src.lambda_handler import lambda_handler
def test_lambda_handler_valid_request():
"""Test Lambda handler with valid input"""
# Arrange
ride_data = {
'ride': {
'PULocationID': 130,
'DOLocationID': 205,
'trip_distance': 3.66
},
'ride_id': 156
}
encoded_data = base64.b64encode(
json.dumps(ride_data).encode('utf-8')
).decode('utf-8')
event = {
'Records': [{
'kinesis': {
'data': encoded_data
}
}]
}
# Act
response = lambda_handler(event, None)
# Assert
assert response['statusCode'] == 200
response_body = json.loads(response['body'])
assert 'predictions' in response_body
assert len(response_body['predictions']) > 0
def test_lambda_handler_malformed_data():
"""Test Lambda handler with malformed input"""
# Arrange
malformed_event = {
'Records': [{
'kinesis': {
'data': 'invalid_base64_data'
}
}]
}
# Act
response = lambda_handler(malformed_event, None)
# Assert
assert response['statusCode'] == 400
assert 'error' in json.loads(response['body'])
π Unit Testing Best Practices
β The AAA Pattern (Arrange, Act, Assert)
def test_feature_engineering():
# Arrange - Set up test data and expected results
input_data = {'trip_distance': 10.5, 'duration': 25}
expected_speed = 25.2 # 10.5 miles / (25/60) hours
# Act - Execute the function under test
result = calculate_speed(input_data)
# Assert - Verify the results
assert abs(result - expected_speed) < 0.1
π Using Mocks Effectively
from unittest.mock import Mock, patch, MagicMock
class TestExternalDependencies:
@patch('src.model.requests.get')
def test_model_download_success(self, mock_get):
"""Test successful model download from external service"""
# Arrange
mock_response = Mock()
mock_response.status_code = 200
mock_response.content = b'fake_model_data'
mock_get.return_value = mock_response
# Act
result = download_model_from_url('http://example.com/model.pkl')
# Assert
assert result == b'fake_model_data'
mock_get.assert_called_once_with('http://example.com/model.pkl')
@patch('src.database.connect')
def test_save_predictions_db_error(self, mock_connect):
"""Test handling of database connection errors"""
# Arrange
mock_connect.side_effect = ConnectionError("Database unavailable")
predictions = [{'ride_id': 1, 'duration': 25.5}]
# Act & Assert
with pytest.raises(ConnectionError):
save_predictions_to_db(predictions)
π§ Fixtures for Reusable Test Data
# conftest.py
import pytest
@pytest.fixture
def sample_ride_data():
"""Fixture providing sample ride data for tests"""
return {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.2,
'datetime': '2021-01-01 00:15:56'
}
@pytest.fixture
def trained_model():
"""Fixture providing a mock trained model"""
model = Mock()
model.predict.return_value = [25.5]
return model
# tests/test_with_fixtures.py
def test_prediction_with_fixtures(sample_ride_data, trained_model):
"""Test using fixtures for common test data"""
service = ModelService(trained_model)
prediction = service.predict(sample_ride_data)
assert prediction is not None
assert isinstance(prediction, (int, float))
π§ͺ Parameterized Testing
import pytest
@pytest.mark.parametrize("distance,expected_category", [
(0.5, "short"),
(5.0, "medium"),
(15.0, "long"),
(50.0, "very_long")
])
def test_trip_categorization(distance, expected_category):
"""Test trip distance categorization with multiple inputs"""
result = categorize_trip_distance(distance)
assert result == expected_category
@pytest.mark.parametrize("invalid_input", [
None,
-5.0,
"not_a_number",
[]
])
def test_trip_categorization_invalid_inputs(invalid_input):
"""Test trip categorization with invalid inputs"""
with pytest.raises((ValueError, TypeError)):
categorize_trip_distance(invalid_input)
π Integration Testing {#integration-testing}
Integration testing ensures that different components of your ML system work correctly together. This is crucial for catching issues that might not appear in isolated unit tests.
π³ Docker-Based Integration Testing
Setting Up Docker for Testing
Step 1: Create a test Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
COPY tests/ ./tests/
COPY models/ ./models/
CMD ["python", "-m", "pytest", "tests/integration/"]
Step 2: Docker Compose for test environment
# docker-compose.test.yml
version: '3.8'
services:
ml-service:
build: .
ports:
- "8080:8080"
environment:
- MODEL_LOCATION=/app/models
- TEST_RUN=true
volumes:
- ./models:/app/models
- ./test-data:/app/test-data
depends_on:
- redis-cache
- test-db
redis-cache:
image: redis:6-alpine
ports:
- "6379:6379"
test-db:
image: postgres:13-alpine
environment:
POSTGRES_DB: test_mlops
POSTGRES_USER: test_user
POSTGRES_PASSWORD: test_pass
ports:
- "5432:5432"
localstack:
image: localstack/localstack:latest
ports:
- "4566:4566"
environment:
- SERVICES=s3,kinesis,lambda
- DEBUG=1
π§ͺ Integration Test Examples
# tests/integration/test_end_to_end.py
import requests
import time
import json
import pytest
from deepdiff import DeepDiff
class TestEndToEndPipeline:
@classmethod
def setup_class(cls):
"""Set up test environment before running tests"""
# Wait for services to be ready
cls.wait_for_service("http://localhost:8080/health", timeout=60)
cls.service_url = "http://localhost:8080"
@staticmethod
def wait_for_service(url, timeout=30):
"""Wait for a service to become available"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(url)
if response.status_code == 200:
return True
except requests.exceptions.ConnectionError:
pass
time.sleep(1)
raise TimeoutError(f"Service at {url} did not become available")
def test_health_endpoint(self):
"""Test that the service health endpoint works"""
response = requests.get(f"{self.service_url}/health")
assert response.status_code == 200
health_data = response.json()
assert health_data['status'] == 'healthy'
assert 'model_loaded' in health_data['checks']
assert health_data['checks']['model_loaded'] is True
def test_single_prediction(self):
"""Test single ride prediction through the API"""
# Arrange
ride_data = {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.2,
'datetime': '2021-01-01 00:15:56'
}
expected_response = {
'ride_id': pytest.approx(12345, abs=1),
'predicted_duration': pytest.approx(25.5, abs=2.0),
'model_version': '1.0.0'
}
# Act
response = requests.post(
f"{self.service_url}/predict",
json=ride_data
)
# Assert
assert response.status_code == 200
actual_response = response.json()
# Use DeepDiff for detailed comparison
diff = DeepDiff(expected_response, actual_response, ignore_order=True)
assert not diff, f"Response differs from expected: {diff}"
def test_batch_prediction(self):
"""Test batch prediction functionality"""
# Arrange
batch_data = {
'rides': [
{
'ride_id': 1,
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.2
},
{
'ride_id': 2,
'PULocationID': 68,
'DOLocationID': 170,
'trip_distance': 8.5
}
]
}
# Act
response = requests.post(
f"{self.service_url}/predict/batch",
json=batch_data
)
# Assert
assert response.status_code == 200
result = response.json()
assert 'predictions' in result
assert len(result['predictions']) == 2
# Verify each prediction has required fields
for prediction in result['predictions']:
assert 'ride_id' in prediction
assert 'predicted_duration' in prediction
assert isinstance(prediction['predicted_duration'], (int, float))
def test_prediction_caching(self):
"""Test that identical requests are cached"""
ride_data = {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.2
}
# First request
start_time = time.time()
response1 = requests.post(f"{self.service_url}/predict", json=ride_data)
first_duration = time.time() - start_time
# Second identical request (should be cached)
start_time = time.time()
response2 = requests.post(f"{self.service_url}/predict", json=ride_data)
second_duration = time.time() - start_time
# Assertions
assert response1.status_code == 200
assert response2.status_code == 200
assert response1.json() == response2.json()
# Second request should be significantly faster (cached)
assert second_duration < first_duration * 0.5
βοΈ Testing Cloud Service Integrations
AWS Services with LocalStack
# tests/integration/test_aws_integration.py
import boto3
import json
import base64
from moto import mock_kinesis, mock_s3
class TestAWSIntegration:
def setup_method(self):
"""Set up AWS clients for LocalStack"""
self.kinesis_client = boto3.client(
'kinesis',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
def test_kinesis_stream_integration(self):
"""Test Kinesis stream creation and data flow"""
stream_name = 'ride-predictions-test'
# Create stream
self.kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=1
)
# Wait for stream to be active
waiter = self.kinesis_client.get_waiter('stream_exists')
waiter.wait(StreamName=stream_name)
# Put record
test_data = {
'ride_id': 123,
'predicted_duration': 25.5,
'timestamp': '2021-01-01T00:15:56Z'
}
response = self.kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(test_data),
PartitionKey='test-partition'
)
assert 'SequenceNumber' in response
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
def test_s3_model_storage(self):
"""Test model storage and retrieval from S3"""
bucket_name = 'test-models-bucket'
model_key = 'models/v1/model.pkl'
# Create bucket
self.s3_client.create_bucket(Bucket=bucket_name)
# Upload test model file
test_model_data = b'fake_model_pickle_data'
self.s3_client.put_object(
Bucket=bucket_name,
Key=model_key,
Body=test_model_data
)
# Download and verify
response = self.s3_client.get_object(
Bucket=bucket_name,
Key=model_key
)
downloaded_data = response['Body'].read()
assert downloaded_data == test_model_data
Google Cloud Functions Testing
# tests/integration/test_gcp_functions.py
import subprocess
import requests
import json
import base64
import time
from pathlib import Path
class TestGCPFunctions:
def setup_method(self):
"""Start Functions Framework for testing"""
self.port = 8888
self.base_url = f'http://localhost:{self.port}'
# Start functions framework
current_path = Path(__file__).parent
project_root = current_path.parent.parent
self.process = subprocess.Popen([
'functions-framework',
'--target', 'predict_duration',
'--signature-type', 'event',
'--port', str(self.port)
], cwd=project_root)
# Wait for function to be ready
self._wait_for_function_ready()
def teardown_method(self):
"""Clean up after tests"""
if hasattr(self, 'process'):
self.process.terminate()
self.process.wait()
def _wait_for_function_ready(self, timeout=30):
"""Wait for the function to be ready to receive requests"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Try a health check or simple request
response = requests.get(self.base_url)
if response.status_code in [200, 405]: # 405 is OK for wrong method
return True
except requests.exceptions.ConnectionError:
pass
time.sleep(1)
raise TimeoutError("Function did not become ready in time")
def test_pubsub_function_processing(self):
"""Test Cloud Function with PubSub event"""
# Arrange
ride_data = {
'ride': {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 14.2
},
'ride_id': 12345
}
# Encode data as base64 (PubSub format)
ride_json = json.dumps(ride_data)
encoded_data = base64.b64encode(ride_json.encode('utf-8')).decode('utf-8')
# Create PubSub message format
pubsub_event = {
'data': {'data': encoded_data},
'attributes': {
'timestamp': '2021-01-01T00:15:56Z'
}
}
# Act
response = requests.post(self.base_url, json=pubsub_event)
# Assert
assert response.status_code == 200
# Check logs for expected output
time.sleep(1) # Allow time for processing
self.process.poll()
# You would typically check logs or output streams here
# For this example, we're just verifying the function responds
β Code Quality and Formatting {#code-quality}
Code quality tools help maintain consistent, readable, and error-free code across your ML project.
π¨ Automated Code Formatting
Black: The Uncompromising Code Formatter
# Install Black
pipenv install --dev black
# Format all Python files
black .
# Check what would be changed without making changes
black --diff .
# Format specific files
black src/model.py tests/test_model.py
Black Configuration (pyproject.toml
):
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
# directories
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| build
| dist
)/
'''
isort: Import Statement Organizer
# Install isort
pipenv install --dev isort
# Sort imports in all files
isort .
# Check what would be changed
isort --diff .
# Configure isort to work with Black
isort Configuration (.isort.cfg
):
[settings]
profile = black
multi_line_output = 3
line_length = 88
known_first_party = src
known_third_party = pandas,numpy,sklearn,mlflow
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
π Static Code Analysis
pylint: Code Quality Checker
# Install pylint
pipenv install --dev pylint
# Analyze a specific file
pylint src/model.py
# Analyze all Python files
pylint **/*.py
# Generate a detailed report
pylint --output-format=json src/ > pylint_report.json
pylint Configuration (.pylintrc
):
[MASTER]
init-hook='import sys; sys.path.append("src")'
[MESSAGES CONTROL]
disable=C0114, # missing-module-docstring
C0115, # missing-class-docstring
C0116, # missing-function-docstring
R0903, # too-few-public-methods
R0913 # too-many-arguments
[FORMAT]
max-line-length=88
[DESIGN]
max-args=7
max-locals=15
max-branches=12
Handling pylint in Code:
# Disable specific warnings for a code block
# pylint: disable=import-error,wrong-import-position
import sys
import os
sys.path.append(os.path.dirname(__file__))
from .model import MLModel
# pylint: enable=import-error,wrong-import-position
class ModelService:
"""ML Model service for predictions"""
def __init__(self, model_path):
# Disable warning for this specific line
self.model = self._load_model(model_path) # pylint: disable=no-member
def _load_model(self, path):
"""Load model from path"""
# Implementation here
pass
mypy: Static Type Checking
# Install mypy
pipenv install --dev mypy
# Type check your code
mypy src/
# Generate HTML report
mypy --html-report mypy_report src/
Type Annotations Example:
from typing import Dict, List, Optional, Union
import pandas as pd
import numpy as np
class ModelService:
"""Type-annotated model service"""
def __init__(self, model_path: str) -> None:
self.model_path = model_path
self.model: Optional[object] = None
def predict(self, features: Dict[str, Union[int, float, str]]) -> float:
"""Make a prediction for a single ride"""
if self.model is None:
raise ValueError("Model not loaded")
# Process features and make prediction
processed_features = self._process_features(features)
prediction = self.model.predict([processed_features])[0]
return float(prediction)
def predict_batch(self,
features_list: List[Dict[str, Union[int, float, str]]]
) -> List[float]:
"""Make predictions for multiple rides"""
predictions = []
for features in features_list:
prediction = self.predict(features)
predictions.append(prediction)
return predictions
def _process_features(self,
features: Dict[str, Union[int, float, str]]
) -> np.ndarray:
"""Process raw features into model input format"""
# Implementation here
return np.array([1, 2, 3]) # Placeholder
π‘οΈ Security Scanning
bandit: Security Issue Detection
# Install bandit
pipenv install --dev bandit
# Scan for security issues
bandit -r src/
# Generate JSON report
bandit -r src/ -f json -o security_report.json
safety: Dependency Vulnerability Scanner
# Install safety
pipenv install --dev safety
# Check for known security vulnerabilities
safety check
# Check specific requirements file
safety check --file requirements.txt
π Testing ML-Specific Components {#ml-testing}
Machine learning systems have unique testing requirements beyond traditional software testing.
π€ Model Behavior Testing
Testing Model Invariants
# tests/test_model_behavior.py
import pytest
import numpy as np
from src.model import MLModel
class TestModelBehavior:
@pytest.fixture
def trained_model(self):
"""Load a trained model for testing"""
model = MLModel()
model.load('/path/to/trained/model.pkl')
return model
def test_model_output_range(self, trained_model):
"""Test that model outputs are within expected range"""
# Test with various inputs
test_inputs = [
{'trip_distance': 1.0, 'PULocationID': 43, 'DOLocationID': 215},
{'trip_distance': 10.0, 'PULocationID': 100, 'DOLocationID': 50},
{'trip_distance': 25.0, 'PULocationID': 200, 'DOLocationID': 150}
]
for input_data in test_inputs:
prediction = trained_model.predict(input_data)
# Duration should be reasonable (between 1 minute and 2 hours)
assert 1.0 <= prediction <= 120.0, f"Prediction {prediction} out of range"
def test_model_monotonicity(self, trained_model):
"""Test that longer trips generally take more time"""
base_input = {'PULocationID': 43, 'DOLocationID': 215}
distances = [1.0, 5.0, 10.0, 20.0]
predictions = []
for distance in distances:
input_data = {**base_input, 'trip_distance': distance}
prediction = trained_model.predict(input_data)
predictions.append(prediction)
# Check that predictions generally increase with distance
# Allow some flexibility for noise
increasing_pairs = sum(1 for i in range(len(predictions)-1)
if predictions[i+1] >= predictions[i])
# At least 70% of pairs should be increasing
assert increasing_pairs >= len(distances) * 0.7
def test_model_consistency(self, trained_model):
"""Test that identical inputs produce identical outputs"""
input_data = {
'trip_distance': 5.0,
'PULocationID': 43,
'DOLocationID': 215,
'day_of_week': 1,
'hour_of_day': 14
}
# Make multiple predictions with same input
predictions = [trained_model.predict(input_data) for _ in range(5)]
# All predictions should be identical
assert all(p == predictions[0] for p in predictions)
def test_model_robustness_to_outliers(self, trained_model):
"""Test model behavior with extreme/outlier inputs"""
outlier_inputs = [
{'trip_distance': 0.1, 'PULocationID': 1, 'DOLocationID': 2}, # Very short trip
{'trip_distance': 100.0, 'PULocationID': 1, 'DOLocationID': 263}, # Very long trip
]
for input_data in outlier_inputs:
prediction = trained_model.predict(input_data)
# Model should still produce reasonable outputs
assert 0.5 <= prediction <= 300.0 # Extended range for outliers
assert not np.isnan(prediction)
assert not np.isinf(prediction)
Performance Testing
# tests/test_model_performance.py
import time
import pytest
import pandas as pd
from src.model import MLModel
class TestModelPerformance:
@pytest.fixture
def performance_test_data(self):
"""Generate test data for performance testing"""
np.random.seed(42)
n_samples = 1000
data = pd.DataFrame({
'trip_distance': np.random.exponential(5, n_samples),
'PULocationID': np.random.randint(1, 264, n_samples),
'DOLocationID': np.random.randint(1, 264, n_samples),
'day_of_week': np.random.randint(0, 7, n_samples),
'hour_of_day': np.random.randint(0, 24, n_samples)
})
return data
def test_single_prediction_latency(self, trained_model):
"""Test latency for single predictions"""
input_data = {
'trip_distance': 5.0,
'PULocationID': 43,
'DOLocationID': 215
}
# Warm up
trained_model.predict(input_data)
# Measure latency
start_time = time.time()
prediction = trained_model.predict(input_data)
latency = time.time() - start_time
# Should predict within 100ms
assert latency < 0.1, f"Prediction took {latency:.3f}s, expected < 0.1s"
assert prediction is not None
def test_batch_prediction_throughput(self, trained_model, performance_test_data):
"""Test throughput for batch predictions"""
batch_size = 100
test_batch = performance_test_data.head(batch_size).to_dict('records')
# Warm up
trained_model.predict_batch(test_batch[:10])
# Measure throughput
start_time = time.time()
predictions = trained_model.predict_batch(test_batch)
total_time = time.time() - start_time
throughput = len(predictions) / total_time
# Should process at least 500 predictions per second
assert throughput >= 500, f"Throughput {throughput:.1f} preds/s, expected >= 500"
assert len(predictions) == batch_size
def test_memory_usage(self, trained_model, performance_test_data):
"""Test memory usage during batch processing"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Process large batch
large_batch = performance_test_data.to_dict('records')
predictions = trained_model.predict_batch(large_batch)
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = final_memory - initial_memory
# Memory increase should be reasonable (< 100MB for 1000 predictions)
assert memory_increase < 100, f"Memory increased by {memory_increase:.1f}MB"
assert len(predictions) == len(large_batch)
π Data Quality Testing
# tests/test_data_quality.py
import pandas as pd
import pytest
from src.data_validation import validate_input_data, check_data_drift
class TestDataQuality:
@pytest.fixture
def reference_data(self):
"""Reference data for comparison"""
return pd.DataFrame({
'trip_distance': [1.2, 3.5, 7.8, 2.1, 5.0],
'PULocationID': [43, 100, 215, 68, 170],
'DOLocationID': [215, 50, 150, 100, 200],
'fare_amount': [8.5, 12.0, 25.5, 9.8, 18.0]
})
def test_data_schema_validation(self):
"""Test that input data has correct schema"""
# Valid data
valid_data = pd.DataFrame({
'trip_distance': [5.0, 3.2],
'PULocationID': [43, 100],
'DOLocationID': [215, 50],
'fare_amount': [18.0, 12.5]
})
assert validate_input_data(valid_data) is True
# Invalid data - missing column
invalid_data = pd.DataFrame({
'trip_distance': [5.0, 3.2],
'PULocationID': [43, 100],
# Missing DOLocationID and fare_amount
})
assert validate_input_data(invalid_data) is False
def test_data_range_validation(self):
"""Test data values are within expected ranges"""
# Out of range data
invalid_data = pd.DataFrame({
'trip_distance': [-1.0, 1000.0], # Negative and extremely large
'PULocationID': [0, 500], # Outside valid location range
'DOLocationID': [215, 50],
'fare_amount': [-5.0, 10.0] # Negative fare
})
validation_results = validate_input_data(invalid_data, check_ranges=True)
assert validation_results is False
def test_data_drift_detection(self, reference_data):
"""Test detection of data drift"""
# Similar data (no drift)
similar_data = pd.DataFrame({
'trip_distance': [1.5, 3.0, 8.0, 2.5, 4.8],
'PULocationID': [50, 95, 220, 70, 165],
'DOLocationID': [200, 55, 145, 105, 195],
'fare_amount': [9.0, 11.5, 26.0, 10.0, 17.5]
})
drift_score = check_data_drift(reference_data, similar_data)
assert drift_score < 0.1 # Low drift score
# Drifted data
drifted_data = pd.DataFrame({
'trip_distance': [20.0, 25.0, 30.0, 18.0, 22.0], # Much longer trips
'PULocationID': [1, 2, 3, 4, 5], # Different location pattern
'DOLocationID': [260, 261, 262, 263, 1],
'fare_amount': [50.0, 60.0, 70.0, 45.0, 55.0] # Much higher fares
})
drift_score = check_data_drift(reference_data, drifted_data)
assert drift_score > 0.3 # High drift score
def test_missing_value_handling(self):
"""Test handling of missing values"""
data_with_nulls = pd.DataFrame({
'trip_distance': [5.0, None, 3.2],
'PULocationID': [43, 100, None],
'DOLocationID': [215, None, 50],
'fare_amount': [18.0, 12.5, 15.0]
})
# Should detect missing values
validation_result = validate_input_data(data_with_nulls, allow_nulls=False)
assert validation_result is False
# Should allow missing values when configured
validation_result = validate_input_data(data_with_nulls, allow_nulls=True)
assert validation_result is True
π Advanced Testing Strategies {#advanced-testing}
π Testing with Mock Services
# tests/test_external_services.py
from unittest.mock import Mock, patch, MagicMock
import pytest
import requests
from src.external_services import ModelRepository, PredictionLogger
class TestExternalServices:
@patch('src.external_services.requests.get')
def test_model_download_retry_logic(self, mock_get):
"""Test retry logic for model downloads"""
# Simulate intermittent failures
mock_get.side_effect = [
requests.exceptions.ConnectionError("Network error"),
requests.exceptions.Timeout("Request timeout"),
Mock(status_code=200, content=b'model_data') # Success on 3rd try
]
repo = ModelRepository("http://model-store.com")
model_data = repo.download_model("model_v1.pkl")
assert model_data == b'model_data'
assert mock_get.call_count == 3
@patch('src.external_services.boto3.client')
def test_prediction_logging_s3_failure(self, mock_boto_client):
"""Test handling of S3 logging failures"""
# Mock S3 client that fails
mock_s3 = Mock()
mock_s3.put_object.side_effect = Exception("S3 Error")
mock_boto_client.return_value = mock_s3
logger = PredictionLogger()
# Should not raise exception even if S3 fails
result = logger.log_prediction("ride_123", 25.5, {"model": "v1"})
# Should indicate failure but not crash
assert result['success'] is False
assert 'error' in result
@patch('src.external_services.redis.Redis')
def test_cache_fallback_behavior(self, mock_redis):
"""Test behavior when cache is unavailable"""
# Mock Redis that fails to connect
mock_redis_instance = Mock()
mock_redis_instance.get.side_effect = Exception("Redis connection failed")
mock_redis.return_value = mock_redis_instance
from src.model_service import CachedModelService
service = CachedModelService()
# Should fall back to direct computation
result = service.predict_with_cache("ride_data_key", {"distance": 5.0})
assert result is not None
# Should have attempted cache but continued without it
mock_redis_instance.get.assert_called_once()
π Property-Based Testing
# tests/test_property_based.py
from hypothesis import given, strategies as st, assume
import pytest
from src.feature_engineering import calculate_speed, normalize_coordinates
class TestPropertyBased:
@given(
distance=st.floats(min_value=0.1, max_value=100.0),
duration=st.floats(min_value=0.1, max_value=300.0)
)
def test_speed_calculation_properties(self, distance, duration):
"""Test properties of speed calculation"""
speed = calculate_speed(distance, duration)
# Speed should always be positive
assert speed > 0
# Speed should be proportional to distance
double_distance_speed = calculate_speed(distance * 2, duration)
assert double_distance_speed > speed
# Speed should be inversely proportional to duration
double_duration_speed = calculate_speed(distance, duration * 2)
assert double_duration_speed < speed
@given(
lat=st.floats(min_value=-90.0, max_value=90.0),
lon=st.floats(min_value=-180.0, max_value=180.0)
)
def test_coordinate_normalization_properties(self, lat, lon):
"""Test properties of coordinate normalization"""
normalized_lat, normalized_lon = normalize_coordinates(lat, lon)
# Normalized coordinates should be in [0, 1] range
assert 0.0 <= normalized_lat <= 1.0
assert 0.0 <= normalized_lon <= 1.0
# Function should be deterministic
norm_lat2, norm_lon2 = normalize_coordinates(lat, lon)
assert normalized_lat == norm_lat2
assert normalized_lon == norm_lon2
@given(
features=st.dictionaries(
keys=st.sampled_from(['trip_distance', 'PULocationID', 'DOLocationID']),
values=st.floats(min_value=0.1, max_value=100.0),
min_size=3,
max_size=3
)
)
def test_feature_processing_invariants(self, features):
"""Test that feature processing maintains invariants"""
from src.preprocessing import process_ride_features
processed = process_ride_features(features)
# Output should always be a dictionary
assert isinstance(processed, dict)
# Should contain all required features
required_features = ['trip_distance_normalized', 'pickup_zone', 'dropoff_zone']
for feature in required_features:
assert feature in processed
# Normalized distance should be in reasonable range
assert 0.0 <= processed['trip_distance_normalized'] <= 1.0
π§ͺ Mutation Testing
Mutation testing helps evaluate the quality of your tests by introducing small changes (mutations) to your code and checking if your tests catch them.
# Install mutmut for mutation testing
pipenv install --dev mutmut
# Run mutation testing
mutmut run
# Show results
mutmut results
# Show specific mutations
mutmut show 1
π Test Coverage Analysis
# Install coverage tools
pipenv install --dev coverage pytest-cov
# Run tests with coverage
pytest --cov=src --cov-report=html --cov-report=term
# Generate detailed HTML report
coverage html
# Set coverage thresholds in pytest.ini
pytest.ini with coverage configuration:
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--cov=src
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80
π― Summary
Congratulations! π You've completed Chapter 1 of the MLOps Best Practices guide. You now have a solid foundation in:
β Key Takeaways
- Testing Pyramid πΊ - Unit tests form the base, integration tests in the middle, end-to-end tests at the top
- ML-Specific Testing π€ - Model behavior, performance, and data quality testing
- Code Quality β¨ - Automated formatting, linting, and static analysis
- Test Automation π - Fixtures, parametrization, and property-based testing
π οΈ Tools Mastered
- pytest for comprehensive testing
- Black and isort for code formatting
- pylint and mypy for code quality
- Docker for integration testing
- LocalStack for cloud service testing
π― Best Practices Applied
- β Write tests first (TDD approach)
- β Use mocks to isolate dependencies
- β Test edge cases and error conditions
- β Maintain high test coverage (>80%)
- β Automate quality checks in your workflow
π What's Next?
In Chapter 2, we'll explore automation and deployment practices including:
- Pre-commit hooks and CI/CD pipelines
- Makefiles and automation scripts
- Docker deployment and orchestration
- Monitoring and logging strategies
Top comments (0)