Streamline your ML workflows with automation, CI/CD, and production deployment strategies
Welcome to Chapter 2 of the MLOps Zoomcamp Module 6 Best Practices! This chapter focuses on automating your ML workflows and deploying models to production with confidence. π
π Table of Contents
- π Git Pre-commit Hooks
- π οΈ Makefiles for Automation
- π€ Test Automation Scripts
- βοΈ Cloud Service Testing
- π³ Docker and Containerization
- π Monitoring and Logging
π Git Pre-commit Hooks {#pre-commit-hooks}
What are Pre-commit Hooks?
Pre-commit hooks automatically run tests, formatting, and linting before each git commit, ensuring code quality and preventing broken code from entering your repository.
π Setting Up Pre-commit
# Initialize git repository
git init
# Install pre-commit
pipenv install --dev pre-commit
# Create sample configuration
pre-commit sample-config > .pre-commit-config.yaml
# Install the hooks
pre-commit install
βοΈ Configuring Pre-commit Hooks
Edit .pre-commit-config.yaml
:
repos:
# Code formatting
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
name: black (python formatter)
language_version: python3
# Import sorting
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (import sorter)
args: ["--profile", "black"]
# Local hooks for project-specific tools
- repo: local
hooks:
- id: pytest
name: pytest (run tests)
entry: pytest
language: system
types: [python]
pass_filenames: false
always_run: true
args: ["-v", "--tb=short"]
- id: pylint
name: pylint (code quality)
entry: pylint
language: system
types: [python]
args:
- "-rn" # Only display messages
- "-sn" # Don't display the score
- "--fail-under=8.0" # Minimum score required
# Security checks
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
name: bandit (security linter)
args: ["-r", "src/"]
# General code quality
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
args: ['--maxkb=1000']
π― Running Pre-commit Hooks
# Run hooks on all files manually
pre-commit run --all-files
# Run specific hook
pre-commit run black
# Skip hooks for a specific commit (use sparingly!)
git commit -m "Emergency fix" --no-verify
# Update hook versions
pre-commit autoupdate
β οΈ Troubleshooting Common Issues
- Hook Installation Failures π§
# Clear cache and reinstall
pre-commit clean
pre-commit install
- Version Conflicts π¦
# Check specific versions in your environment
pip list | grep black
# Update .pre-commit-config.yaml to match
- Permission Issues π
# Fix permissions for hook files
chmod +x .git/hooks/pre-commit
π οΈ Makefiles for Automation {#makefiles}
What is Make?
Make is a build automation tool that helps you run complex sequences of commands with simple aliases, perfect for MLOps workflows.
π Creating a Comprehensive Makefile
Create a file named Makefile
(no extension):
# Variables
PYTHON_VERSION := 3.9
PROJECT_NAME := mlops-ride-duration
LOCAL_TAG := $(shell date +"%Y-%m-%d-%H-%M")
DOCKER_IMAGE := $(PROJECT_NAME):$(LOCAL_TAG)
# Environment setup
.PHONY: install
install:
@echo "π§ Installing dependencies..."
pipenv install --dev
pipenv run pre-commit install
.PHONY: install-prod
install-prod:
@echo "π Installing production dependencies..."
pipenv install --deploy --system
# Code quality targets
.PHONY: format
format:
@echo "π¨ Formatting code..."
pipenv run black .
pipenv run isort .
.PHONY: lint
lint:
@echo "π Linting code..."
pipenv run pylint src/ tests/
pipenv run mypy src/
.PHONY: security
security:
@echo "π‘οΈ Running security checks..."
pipenv run bandit -r src/
pipenv run safety check
# Testing targets
.PHONY: test
test:
@echo "π§ͺ Running unit tests..."
pipenv run pytest tests/unit/ -v
.PHONY: test-integration
test-integration:
@echo "π Running integration tests..."
pipenv run pytest tests/integration/ -v
.PHONY: test-all
test-all: test test-integration
@echo "β
All tests completed!"
.PHONY: test-coverage
test-coverage:
@echo "π Running tests with coverage..."
pipenv run pytest --cov=src --cov-report=html --cov-report=term
# Quality checks (combines formatting, linting, and testing)
.PHONY: quality-checks
quality-checks: format lint security test
@echo "β¨ Quality checks completed!"
# Docker targets
.PHONY: docker-build
docker-build:
@echo "π³ Building Docker image..."
docker build -t $(DOCKER_IMAGE) .
@echo "π¦ Built image: $(DOCKER_IMAGE)"
.PHONY: docker-run
docker-run: docker-build
@echo "π Running Docker container..."
docker run -p 8080:8080 \
-e MODEL_LOCATION=/app/models \
-e TEST_RUN=false \
$(DOCKER_IMAGE)
.PHONY: docker-test
docker-test: docker-build
@echo "π§ͺ Running tests in Docker..."
docker run --rm \
-e TEST_RUN=true \
$(DOCKER_IMAGE) \
python -m pytest tests/ -v
# Integration testing with LocalStack
.PHONY: localstack-start
localstack-start:
@echo "βοΈ Starting LocalStack..."
docker-compose -f docker-compose.test.yml up -d localstack
@echo "β³ Waiting for LocalStack to be ready..."
sleep 10
.PHONY: localstack-stop
localstack-stop:
@echo "π Stopping LocalStack..."
docker-compose -f docker-compose.test.yml down
.PHONY: test-cloud
test-cloud: localstack-start
@echo "βοΈ Running cloud integration tests..."
pipenv run pytest tests/cloud/ -v
$(MAKE) localstack-stop
# Model management
.PHONY: download-model
download-model:
@echo "π₯ Downloading model from S3..."
aws s3 cp --recursive \
s3://$(S3_BUCKET)/models/$(RUN_ID)/artifacts/model/ \
./models/
.PHONY: validate-model
validate-model:
@echo "π Validating model..."
pipenv run python scripts/validate_model.py ./models/
# Deployment targets
.PHONY: deploy-staging
deploy-staging: quality-checks docker-build
@echo "π Deploying to staging..."
./scripts/deploy_staging.sh $(DOCKER_IMAGE)
.PHONY: deploy-prod
deploy-prod: quality-checks docker-build
@echo "π Deploying to production..."
@read -p "Are you sure you want to deploy to production? [y/N] " confirm; \
if [ "$$confirm" = "y" ] || [ "$$confirm" = "Y" ]; then \
./scripts/deploy_production.sh $(DOCKER_IMAGE); \
else \
echo "Deployment cancelled."; \
fi
# Cleanup targets
.PHONY: clean
clean:
@echo "π§Ή Cleaning up..."
find . -type f -name "*.pyc" -delete
find . -type d -name "__pycache__" -delete
find . -type d -name "*.egg-info" -exec rm -rf {} +
rm -rf .coverage htmlcov/ .pytest_cache/
.PHONY: clean-docker
clean-docker:
@echo "π³ Cleaning Docker images..."
docker image prune -f
docker container prune -f
# Development workflow
.PHONY: dev-setup
dev-setup: install
@echo "π οΈ Setting up development environment..."
$(MAKE) download-model
$(MAKE) validate-model
.PHONY: ci
ci: quality-checks test-coverage docker-test
@echo "π CI pipeline completed successfully!"
# Help target
.PHONY: help
help:
@echo "π Available targets:"
@echo " install - Install all dependencies"
@echo " format - Format code with black and isort"
@echo " lint - Run linting with pylint and mypy"
@echo " test - Run unit tests"
@echo " test-integration - Run integration tests"
@echo " quality-checks - Run all quality checks"
@echo " docker-build - Build Docker image"
@echo " docker-run - Run application in Docker"
@echo " deploy-staging - Deploy to staging environment"
@echo " clean - Clean temporary files"
@echo " help - Show this help message"
π― Makefile Best Practices
- β Use .PHONY for targets that don't create files
- β Add descriptions with @echo commands
- β Use variables for repeated values
- β Chain commands with && for error handling
- β Include help target for documentation
π Common Make Commands
# Run all quality checks
make quality-checks
# Full CI pipeline
make ci
# Development setup
make dev-setup
# Deploy to staging
make deploy-staging
# Clean everything
make clean clean-docker
π€ Test Automation Scripts {#test-automation}
π Creating Automated Test Scripts
Main Test Runner Script (run_tests.sh
)
#!/usr/bin/env bash
set -e # Exit on error
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
LOCAL_TAG=$(date +"%Y-%m-%d-%H-%M")
LOCAL_IMAGE_NAME="mlops-service:${LOCAL_TAG}"
echo -e "${YELLOW}π Starting MLOps Test Pipeline${NC}"
echo -e "${YELLOW}Project: ${PROJECT_ROOT}${NC}"
echo -e "${YELLOW}Image: ${LOCAL_IMAGE_NAME}${NC}"
# Function to print status
print_status() {
echo -e "${GREEN}β
$1${NC}"
}
print_error() {
echo -e "${RED}β $1${NC}"
}
# Cleanup function
cleanup() {
echo -e "${YELLOW}π§Ή Cleaning up...${NC}"
docker-compose -f docker-compose.test.yml down --remove-orphans
exit $1
}
# Set trap for cleanup
trap 'cleanup $?' EXIT
# Change to project directory
cd "$PROJECT_ROOT"
# Step 1: Install dependencies
echo -e "${YELLOW}π¦ Installing dependencies...${NC}"
pipenv install --dev
print_status "Dependencies installed"
# Step 2: Run unit tests
echo -e "${YELLOW}π§ͺ Running unit tests...${NC}"
pipenv run pytest tests/unit/ -v --tb=short
print_status "Unit tests passed"
# Step 3: Code quality checks
echo -e "${YELLOW}π Running code quality checks...${NC}"
pipenv run black --check .
pipenv run isort --check-only .
pipenv run pylint src/ --fail-under=8.0
print_status "Code quality checks passed"
# Step 4: Build Docker image
echo -e "${YELLOW}π³ Building Docker image...${NC}"
export LOCAL_IMAGE_NAME
docker build -t "${LOCAL_IMAGE_NAME}" .
print_status "Docker image built: ${LOCAL_IMAGE_NAME}"
# Step 5: Start test environment
echo -e "${YELLOW}ποΈ Starting test environment...${NC}"
docker-compose -f docker-compose.test.yml up -d
sleep 10 # Wait for services to be ready
print_status "Test environment started"
# Step 6: Run integration tests
echo -e "${YELLOW}π Running integration tests...${NC}"
pipenv run python tests/integration/test_docker.py
ERROR_CODE=$?
if [ $ERROR_CODE != 0 ]; then
print_error "Integration tests failed"
docker-compose -f docker-compose.test.yml logs
cleanup $ERROR_CODE
fi
print_status "Integration tests passed"
# Step 7: Run cloud service tests (with LocalStack)
echo -e "${YELLOW}βοΈ Running cloud service tests...${NC}"
pipenv run python tests/integration/test_kinesis.py
ERROR_CODE=$?
if [ $ERROR_CODE != 0 ]; then
print_error "Cloud service tests failed"
docker-compose -f docker-compose.test.yml logs
cleanup $ERROR_CODE
fi
print_status "Cloud service tests passed"
# Step 8: Performance tests
echo -e "${YELLOW}β‘ Running performance tests...${NC}"
pipenv run python tests/performance/test_load.py
print_status "Performance tests passed"
# Success!
echo -e "${GREEN}π All tests passed successfully!${NC}"
echo -e "${GREEN}π¦ Image ready for deployment: ${LOCAL_IMAGE_NAME}${NC}"
Docker Compose for Testing (docker-compose.test.yml
)
version: '3.8'
services:
ml-service:
image: ${LOCAL_IMAGE_NAME}
ports:
- "8080:8080"
environment:
- PREDICTIONS_STREAM_NAME=ride_predictions
- TEST_RUN=true
- RUN_ID=test_run_123
- AWS_DEFAULT_REGION=us-east-1
- MODEL_LOCATION=/app/models
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_ENDPOINT_URL=http://localstack:4566
volumes:
- "./models:/app/models"
depends_on:
- redis
- postgres
- localstack
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:6-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
postgres:
image: postgres:13-alpine
environment:
POSTGRES_DB: test_mlops
POSTGRES_USER: test_user
POSTGRES_PASSWORD: test_pass
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U test_user -d test_mlops"]
interval: 5s
timeout: 5s
retries: 5
localstack:
image: localstack/localstack:latest
ports:
- "4566:4566"
environment:
- SERVICES=kinesis,s3,lambda
- DEBUG=1
- DATA_DIR=/tmp/localstack/data
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566/health"]
interval: 10s
timeout: 5s
retries: 3
Performance Testing Script (test_load.py
)
#!/usr/bin/env python3
"""
Performance testing script for ML service
"""
import time
import requests
import concurrent.futures
import statistics
from typing import List, Dict
def make_prediction_request(ride_data: Dict) -> Dict:
"""Make a single prediction request"""
try:
start_time = time.time()
response = requests.post(
"http://localhost:8080/predict",
json=ride_data,
timeout=5.0
)
end_time = time.time()
return {
'status_code': response.status_code,
'response_time': end_time - start_time,
'success': response.status_code == 200
}
except Exception as e:
return {
'status_code': 0,
'response_time': float('inf'),
'success': False,
'error': str(e)
}
def load_test(num_requests: int = 100, concurrent_workers: int = 10) -> Dict:
"""Run load test with specified parameters"""
# Sample ride data for testing
ride_data = {
'PULocationID': 43,
'DOLocationID': 215,
'trip_distance': 5.5,
'datetime': '2021-01-01 14:30:00'
}
print(f"π Starting load test with {num_requests} requests, {concurrent_workers} workers")
start_time = time.time()
results = []
# Use ThreadPoolExecutor for concurrent requests
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_workers) as executor:
futures = [
executor.submit(make_prediction_request, ride_data)
for _ in range(num_requests)
]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
end_time = time.time()
total_time = end_time - start_time
# Calculate statistics
successful_requests = [r for r in results if r['success']]
response_times = [r['response_time'] for r in successful_requests]
if not response_times:
raise Exception("β No successful requests!")
stats = {
'total_requests': num_requests,
'successful_requests': len(successful_requests),
'failed_requests': num_requests - len(successful_requests),
'success_rate': len(successful_requests) / num_requests * 100,
'total_time': total_time,
'requests_per_second': num_requests / total_time,
'avg_response_time': statistics.mean(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'p95_response_time': statistics.quantiles(response_times, n=20)[18], # 95th percentile
'p99_response_time': statistics.quantiles(response_times, n=100)[98] # 99th percentile
}
return stats
def main():
"""Main performance testing function"""
print("β‘ MLOps Service Performance Testing")
print("=" * 50)
# Wait for service to be ready
max_retries = 30
for i in range(max_retries):
try:
response = requests.get("http://localhost:8080/health", timeout=2)
if response.status_code == 200:
print("β
Service is ready!")
break
except requests.exceptions.RequestException:
pass
if i == max_retries - 1:
raise Exception("β Service not ready after 30 attempts")
print(f"β³ Waiting for service... ({i+1}/{max_retries})")
time.sleep(1)
# Run load tests with different configurations
test_configs = [
{'num_requests': 50, 'concurrent_workers': 5},
{'num_requests': 100, 'concurrent_workers': 10},
{'num_requests': 200, 'concurrent_workers': 20}
]
for config in test_configs:
print(f"\nπ Test: {config['num_requests']} requests, {config['concurrent_workers']} workers")
print("-" * 50)
try:
stats = load_test(**config)
print(f"β
Success Rate: {stats['success_rate']:.1f}%")
print(f"π Requests/sec: {stats['requests_per_second']:.1f}")
print(f"β±οΈ Avg Response Time: {stats['avg_response_time']*1000:.1f}ms")
print(f"π 95th Percentile: {stats['p95_response_time']*1000:.1f}ms")
print(f"π 99th Percentile: {stats['p99_response_time']*1000:.1f}ms")
# Performance assertions
assert stats['success_rate'] >= 95, f"Success rate too low: {stats['success_rate']:.1f}%"
assert stats['avg_response_time'] < 1.0, f"Average response time too high: {stats['avg_response_time']:.3f}s"
assert stats['p95_response_time'] < 2.0, f"95th percentile too high: {stats['p95_response_time']:.3f}s"
print("β
Performance test passed!")
except Exception as e:
print(f"β Performance test failed: {e}")
raise
print("\nπ All performance tests completed successfully!")
if __name__ == "__main__":
main()
βοΈ Cloud Service Testing {#cloud-testing}
π Testing with LocalStack
Setting Up Kinesis Streams for Testing
#!/bin/bash
# setup_kinesis.sh - Set up Kinesis streams for testing
ENDPOINT_URL="http://localhost:4566"
STREAM_NAME="ride_predictions"
REGION="us-east-1"
echo "π Setting up Kinesis streams for testing..."
# Create Kinesis stream
aws --endpoint-url=$ENDPOINT_URL \
--region=$REGION \
kinesis create-stream \
--stream-name $STREAM_NAME \
--shard-count 1
echo "β³ Waiting for stream to become active..."
sleep 5
# Verify stream is created
aws --endpoint-url=$ENDPOINT_URL \
--region=$REGION \
kinesis describe-stream \
--stream-name $STREAM_NAME
echo "β
Kinesis stream '$STREAM_NAME' is ready!"
Kinesis Integration Test
# tests/integration/test_kinesis.py
import json
import base64
import boto3
import pytest
from typing import Dict, Any
class TestKinesisIntegration:
def setup_method(self):
"""Set up Kinesis client for testing"""
self.kinesis_client = boto3.client(
'kinesis',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
self.stream_name = 'ride_predictions'
def test_kinesis_stream_exists(self):
"""Test that the Kinesis stream exists and is active"""
try:
response = self.kinesis_client.describe_stream(
StreamName=self.stream_name
)
assert response['StreamDescription']['StreamName'] == self.stream_name
assert response['StreamDescription']['StreamStatus'] == 'ACTIVE'
print("β
Kinesis stream is active")
except Exception as e:
pytest.fail(f"Kinesis stream test failed: {e}")
def test_put_record_to_stream(self):
"""Test putting a record to the Kinesis stream"""
# Prepare test data
test_data = {
'ride_id': 12345,
'predicted_duration': 25.5,
'model_version': 'v1.0.0',
'timestamp': '2021-01-01T14:30:00Z'
}
try:
# Put record to stream
response = self.kinesis_client.put_record(
StreamName=self.stream_name,
Data=json.dumps(test_data),
PartitionKey=str(test_data['ride_id'])
)
assert 'SequenceNumber' in response
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
print(f"β
Record put to stream with sequence: {response['SequenceNumber']}")
except Exception as e:
pytest.fail(f"Failed to put record to Kinesis: {e}")
def test_consume_records_from_stream(self):
"""Test consuming records from the Kinesis stream"""
try:
# Get shard iterator
shard_response = self.kinesis_client.list_shards(
StreamName=self.stream_name
)
shard_id = shard_response['Shards'][0]['ShardId']
iterator_response = self.kinesis_client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON'
)
shard_iterator = iterator_response['ShardIterator']
# Get records
records_response = self.kinesis_client.get_records(
ShardIterator=shard_iterator
)
assert 'Records' in records_response
print(f"β
Retrieved {len(records_response['Records'])} records from stream")
except Exception as e:
pytest.fail(f"Failed to consume from Kinesis: {e}")
ποΈ S3 Model Storage Testing
# tests/integration/test_s3_storage.py
import boto3
import pytest
import tempfile
import joblib
from sklearn.linear_model import LinearRegression
import numpy as np
class TestS3ModelStorage:
def setup_method(self):
"""Set up S3 client for testing"""
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
self.bucket_name = 'test-mlops-models'
self.model_key = 'models/v1/model.pkl'
def test_create_s3_bucket(self):
"""Test creating S3 bucket for model storage"""
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
# Verify bucket exists
response = self.s3_client.list_buckets()
bucket_names = [bucket['Name'] for bucket in response['Buckets']]
assert self.bucket_name in bucket_names
print(f"β
S3 bucket '{self.bucket_name}' created successfully")
except Exception as e:
pytest.fail(f"Failed to create S3 bucket: {e}")
def test_upload_model_to_s3(self):
"""Test uploading a model to S3"""
try:
# Create a simple model for testing
X = np.array([[1, 2], [3, 4], [5, 6]])
y = np.array([1, 2, 3])
model = LinearRegression().fit(X, y)
# Save model to temporary file
with tempfile.NamedTemporaryFile(suffix='.pkl', delete=False) as tmp_file:
joblib.dump(model, tmp_file.name)
# Upload to S3
with open(tmp_file.name, 'rb') as f:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=self.model_key,
Body=f.read()
)
# Verify upload
response = self.s3_client.head_object(
Bucket=self.bucket_name,
Key=self.model_key
)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
print(f"β
Model uploaded to S3: s3://{self.bucket_name}/{self.model_key}")
except Exception as e:
pytest.fail(f"Failed to upload model to S3: {e}")
def test_download_model_from_s3(self):
"""Test downloading a model from S3"""
try:
# Download model
with tempfile.NamedTemporaryFile(suffix='.pkl') as tmp_file:
self.s3_client.download_file(
self.bucket_name,
self.model_key,
tmp_file.name
)
# Load and test model
downloaded_model = joblib.load(tmp_file.name)
# Test prediction
test_input = np.array([[2, 3]])
prediction = downloaded_model.predict(test_input)
assert prediction is not None
assert len(prediction) == 1
print(f"β
Model downloaded and tested successfully: prediction = {prediction[0]:.2f}")
except Exception as e:
pytest.fail(f"Failed to download model from S3: {e}")
π³ Docker and Containerization {#docker-deployment}
π¦ Production Dockerfile
# Production Dockerfile
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Create non-root user for security
RUN useradd --create-home --shell /bin/bash mlops
RUN chown -R mlops:mlops /app
USER mlops
# Copy application code
COPY --chown=mlops:mlops src/ ./src/
COPY --chown=mlops:mlops models/ ./models/
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Run application
CMD ["python", "-m", "src.app"]
π Multi-stage Production Build
# Multi-stage Dockerfile for production
FROM python:3.9 as builder
# Install build dependencies
RUN pip install pipenv
# Copy Pipfile
COPY Pipfile Pipfile.lock ./
# Install dependencies to a virtual environment
RUN pipenv install --deploy --system
# Production stage
FROM python:3.9-slim as production
# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
# Copy Python packages from builder
COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Set working directory
WORKDIR /app
# Create non-root user
RUN useradd --create-home --shell /bin/bash mlops
RUN chown -R mlops:mlops /app
USER mlops
# Copy application
COPY --chown=mlops:mlops src/ ./src/
COPY --chown=mlops:mlops models/ ./models/
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Run application
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "4", "src.app:app"]
π§ Docker Compose for Production
# docker-compose.prod.yml
version: '3.8'
services:
ml-service:
build:
context: .
dockerfile: Dockerfile.prod
ports:
- "8080:8080"
environment:
- ENVIRONMENT=production
- MODEL_LOCATION=/app/models
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://mlops:${DB_PASSWORD}@postgres:5432/mlops_prod
- AWS_DEFAULT_REGION=${AWS_REGION}
- PREDICTIONS_STREAM_NAME=${KINESIS_STREAM}
volumes:
- model_cache:/app/models
depends_on:
- redis
- postgres
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
postgres:
image: postgres:13-alpine
environment:
POSTGRES_DB: mlops_prod
POSTGRES_USER: mlops
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlops -d mlops_prod"]
interval: 5s
timeout: 5s
retries: 5
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- ml-service
restart: unless-stopped
volumes:
model_cache:
redis_data:
postgres_data:
π Monitoring and Logging {#monitoring}
π Application Health Monitoring
# src/monitoring.py
import time
import psutil
import logging
from typing import Dict, Any
from datetime import datetime
class HealthMonitor:
"""Monitor application health and performance"""
def __init__(self):
self.start_time = time.time()
self.request_count = 0
self.error_count = 0
def get_health_status(self) -> Dict[str, Any]:
"""Get comprehensive health status"""
current_time = time.time()
uptime = current_time - self.start_time
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Application metrics
error_rate = (self.error_count / max(self.request_count, 1)) * 100
health_status = {
'status': 'healthy' if error_rate < 5.0 and cpu_percent < 80 else 'unhealthy',
'timestamp': datetime.utcnow().isoformat(),
'uptime_seconds': uptime,
'system': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available_gb': memory.available / (1024**3),
'disk_percent': disk.percent,
'disk_free_gb': disk.free / (1024**3)
},
'application': {
'total_requests': self.request_count,
'error_count': self.error_count,
'error_rate_percent': error_rate
},
'checks': {
'model_loaded': self._check_model_loaded(),
'database_connected': self._check_database(),
'external_services': self._check_external_services()
}
}
return health_status
def _check_model_loaded(self) -> bool:
"""Check if ML model is loaded"""
try:
# Add your model loading check logic here
return True
except Exception:
return False
def _check_database(self) -> bool:
"""Check database connectivity"""
try:
# Add your database check logic here
return True
except Exception:
return False
def _check_external_services(self) -> bool:
"""Check external service connectivity"""
try:
# Add checks for AWS, Redis, etc.
return True
except Exception:
return False
def record_request(self, success: bool = True):
"""Record a request for monitoring"""
self.request_count += 1
if not success:
self.error_count += 1
# Logging configuration
def setup_logging():
"""Set up structured logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/app/logs/application.log')
]
)
# Add structured logging for JSON format
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
return json.dumps(log_entry)
# Apply JSON formatter to file handler
file_handler = logging.FileHandler('/app/logs/application.json')
file_handler.setFormatter(JSONFormatter())
logging.getLogger().addHandler(file_handler)
π Performance Metrics Collection
# src/metrics.py
import time
import functools
from typing import Callable, Any
from dataclasses import dataclass
from collections import defaultdict, deque
import threading
@dataclass
class MetricPoint:
"""Single metric measurement"""
timestamp: float
value: float
labels: dict
class MetricsCollector:
"""Collect and store application metrics"""
def __init__(self, max_points: int = 1000):
self.max_points = max_points
self.metrics = defaultdict(lambda: deque(maxlen=max_points))
self.lock = threading.Lock()
def record_metric(self, name: str, value: float, labels: dict = None):
"""Record a metric point"""
labels = labels or {}
point = MetricPoint(
timestamp=time.time(),
value=value,
labels=labels
)
with self.lock:
self.metrics[name].append(point)
def get_metrics(self, name: str, since: float = None) -> list:
"""Get metrics since a timestamp"""
with self.lock:
points = list(self.metrics[name])
if since:
points = [p for p in points if p.timestamp >= since]
return points
def get_metric_summary(self, name: str, window_seconds: int = 300) -> dict:
"""Get metric summary for a time window"""
since = time.time() - window_seconds
points = self.get_metrics(name, since)
if not points:
return {'count': 0, 'avg': 0, 'min': 0, 'max': 0}
values = [p.value for p in points]
return {
'count': len(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values),
'latest': values[-1] if values else 0
}
# Global metrics collector
metrics = MetricsCollector()
def measure_time(metric_name: str, labels: dict = None):
"""Decorator to measure function execution time"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
success = False
raise
finally:
execution_time = time.time() - start_time
metric_labels = (labels or {}).copy()
metric_labels['success'] = success
metric_labels['function'] = func.__name__
metrics.record_metric(metric_name, execution_time, metric_labels)
return result
return wrapper
return decorator
# Usage examples
@measure_time('prediction_time', {'model': 'v1'})
def predict_ride_duration(features):
"""Predict ride duration with timing"""
# Your prediction logic here
time.sleep(0.1) # Simulate processing
return 25.5
@measure_time('database_query_time', {'table': 'predictions'})
def save_prediction(prediction_data):
"""Save prediction with timing"""
# Your database logic here
time.sleep(0.05) # Simulate database operation
return True
π― Summary
Congratulations! π You've completed Chapter 2 of the MLOps Best Practices guide. You now have comprehensive automation and deployment capabilities:
β Key Achievements
- Pre-commit Automation π - Automated quality gates before code commits
- Make-based Workflows π οΈ - Streamlined build and deployment processes
- Test Automation π€ - Comprehensive test suites with Docker integration
- Cloud Service Testing βοΈ - LocalStack integration for AWS services
- Production Deployment π³ - Docker containerization with monitoring
- Health & Metrics π - Application monitoring and performance tracking
π οΈ Tools Mastered
- Pre-commit hooks for automated quality checks
- Make for build automation and task management
- Docker & Docker Compose for containerization
- LocalStack for cloud service testing
- Shell scripting for test automation
- Health monitoring and metrics collection
π― Production Readiness Checklist
- β Automated testing pipeline
- β Code quality enforcement
- β Containerized deployment
- β Health monitoring endpoints
- β Performance metrics collection
- β Error handling and logging
- β Security scanning integration
π Next Steps
- Implement CI/CD pipelines with GitHub Actions or GitLab CI
- Set up monitoring with Prometheus and Grafana
- Add alerting for production issues
- Implement blue-green or canary deployments
- Scale with Kubernetes for container orchestration
π Complete MLOps Best Practices Summary
By completing both chapters, you now have a comprehensive MLOps best practices foundation:
π Chapter 1 Recap: Testing & Quality
- Unit testing with pytest
- Integration testing strategies
- Code quality with linting and formatting
- ML-specific testing approaches
π§ Chapter 2 Recap: Automation & Deployment
- Pre-commit hooks for quality gates
- Automated testing and build pipelines
- Docker containerization
- Production monitoring and metrics
Top comments (0)