DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

πŸ”§ MLOps Zoomcamp Module 6: Chapter 2 - Automation and Deployment

Streamline your ML workflows with automation, CI/CD, and production deployment strategies

Welcome to Chapter 2 of the MLOps Zoomcamp Module 6 Best Practices! This chapter focuses on automating your ML workflows and deploying models to production with confidence. πŸš€

πŸ“‹ Table of Contents


πŸ”„ Git Pre-commit Hooks {#pre-commit-hooks}

What are Pre-commit Hooks?
Pre-commit hooks automatically run tests, formatting, and linting before each git commit, ensuring code quality and preventing broken code from entering your repository.

πŸš€ Setting Up Pre-commit

# Initialize git repository
git init

# Install pre-commit
pipenv install --dev pre-commit

# Create sample configuration
pre-commit sample-config > .pre-commit-config.yaml

# Install the hooks
pre-commit install
Enter fullscreen mode Exit fullscreen mode

βš™οΈ Configuring Pre-commit Hooks

Edit .pre-commit-config.yaml:

repos:
  # Code formatting
  - repo: https://github.com/psf/black
    rev: 23.3.0
    hooks:
      - id: black
        name: black (python formatter)
        language_version: python3

  # Import sorting
  - repo: https://github.com/pycqa/isort
    rev: 5.12.0
    hooks:
      - id: isort
        name: isort (import sorter)
        args: ["--profile", "black"]

  # Local hooks for project-specific tools
  - repo: local
    hooks:
      - id: pytest
        name: pytest (run tests)
        entry: pytest
        language: system
        types: [python]
        pass_filenames: false
        always_run: true
        args: ["-v", "--tb=short"]

      - id: pylint
        name: pylint (code quality)
        entry: pylint
        language: system
        types: [python]
        args:
          - "-rn"  # Only display messages
          - "-sn"  # Don't display the score
          - "--fail-under=8.0"  # Minimum score required

  # Security checks
  - repo: https://github.com/PyCQA/bandit
    rev: 1.7.5
    hooks:
      - id: bandit
        name: bandit (security linter)
        args: ["-r", "src/"]

  # General code quality
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files
        args: ['--maxkb=1000']
Enter fullscreen mode Exit fullscreen mode

🎯 Running Pre-commit Hooks

# Run hooks on all files manually
pre-commit run --all-files

# Run specific hook
pre-commit run black

# Skip hooks for a specific commit (use sparingly!)
git commit -m "Emergency fix" --no-verify

# Update hook versions
pre-commit autoupdate
Enter fullscreen mode Exit fullscreen mode

⚠️ Troubleshooting Common Issues

  1. Hook Installation Failures πŸ”§
   # Clear cache and reinstall
   pre-commit clean
   pre-commit install
Enter fullscreen mode Exit fullscreen mode
  1. Version Conflicts πŸ“¦
   # Check specific versions in your environment
   pip list | grep black
   # Update .pre-commit-config.yaml to match
Enter fullscreen mode Exit fullscreen mode
  1. Permission Issues πŸ”’
   # Fix permissions for hook files
   chmod +x .git/hooks/pre-commit
Enter fullscreen mode Exit fullscreen mode

πŸ› οΈ Makefiles for Automation {#makefiles}

What is Make?
Make is a build automation tool that helps you run complex sequences of commands with simple aliases, perfect for MLOps workflows.

πŸ“ Creating a Comprehensive Makefile

Create a file named Makefile (no extension):

# Variables
PYTHON_VERSION := 3.9
PROJECT_NAME := mlops-ride-duration
LOCAL_TAG := $(shell date +"%Y-%m-%d-%H-%M")
DOCKER_IMAGE := $(PROJECT_NAME):$(LOCAL_TAG)

# Environment setup
.PHONY: install
install:
    @echo "πŸ”§ Installing dependencies..."
    pipenv install --dev
    pipenv run pre-commit install

.PHONY: install-prod
install-prod:
    @echo "🏭 Installing production dependencies..."
    pipenv install --deploy --system

# Code quality targets
.PHONY: format
format:
    @echo "🎨 Formatting code..."
    pipenv run black .
    pipenv run isort .

.PHONY: lint
lint:
    @echo "πŸ” Linting code..."
    pipenv run pylint src/ tests/
    pipenv run mypy src/

.PHONY: security
security:
    @echo "πŸ›‘οΈ Running security checks..."
    pipenv run bandit -r src/
    pipenv run safety check

# Testing targets
.PHONY: test
test:
    @echo "πŸ§ͺ Running unit tests..."
    pipenv run pytest tests/unit/ -v

.PHONY: test-integration
test-integration:
    @echo "πŸ”— Running integration tests..."
    pipenv run pytest tests/integration/ -v

.PHONY: test-all
test-all: test test-integration
    @echo "βœ… All tests completed!"

.PHONY: test-coverage
test-coverage:
    @echo "πŸ“Š Running tests with coverage..."
    pipenv run pytest --cov=src --cov-report=html --cov-report=term

# Quality checks (combines formatting, linting, and testing)
.PHONY: quality-checks
quality-checks: format lint security test
    @echo "✨ Quality checks completed!"

# Docker targets
.PHONY: docker-build
docker-build:
    @echo "🐳 Building Docker image..."
    docker build -t $(DOCKER_IMAGE) .
    @echo "πŸ“¦ Built image: $(DOCKER_IMAGE)"

.PHONY: docker-run
docker-run: docker-build
    @echo "πŸš€ Running Docker container..."
    docker run -p 8080:8080 \
        -e MODEL_LOCATION=/app/models \
        -e TEST_RUN=false \
        $(DOCKER_IMAGE)

.PHONY: docker-test
docker-test: docker-build
    @echo "πŸ§ͺ Running tests in Docker..."
    docker run --rm \
        -e TEST_RUN=true \
        $(DOCKER_IMAGE) \
        python -m pytest tests/ -v

# Integration testing with LocalStack
.PHONY: localstack-start
localstack-start:
    @echo "☁️ Starting LocalStack..."
    docker-compose -f docker-compose.test.yml up -d localstack
    @echo "⏳ Waiting for LocalStack to be ready..."
    sleep 10

.PHONY: localstack-stop
localstack-stop:
    @echo "πŸ›‘ Stopping LocalStack..."
    docker-compose -f docker-compose.test.yml down

.PHONY: test-cloud
test-cloud: localstack-start
    @echo "☁️ Running cloud integration tests..."
    pipenv run pytest tests/cloud/ -v
    $(MAKE) localstack-stop

# Model management
.PHONY: download-model
download-model:
    @echo "πŸ“₯ Downloading model from S3..."
    aws s3 cp --recursive \
        s3://$(S3_BUCKET)/models/$(RUN_ID)/artifacts/model/ \
        ./models/

.PHONY: validate-model
validate-model:
    @echo "πŸ” Validating model..."
    pipenv run python scripts/validate_model.py ./models/

# Deployment targets
.PHONY: deploy-staging
deploy-staging: quality-checks docker-build
    @echo "πŸš€ Deploying to staging..."
    ./scripts/deploy_staging.sh $(DOCKER_IMAGE)

.PHONY: deploy-prod
deploy-prod: quality-checks docker-build
    @echo "🏭 Deploying to production..."
    @read -p "Are you sure you want to deploy to production? [y/N] " confirm; \
    if [ "$$confirm" = "y" ] || [ "$$confirm" = "Y" ]; then \
        ./scripts/deploy_production.sh $(DOCKER_IMAGE); \
    else \
        echo "Deployment cancelled."; \
    fi

# Cleanup targets
.PHONY: clean
clean:
    @echo "🧹 Cleaning up..."
    find . -type f -name "*.pyc" -delete
    find . -type d -name "__pycache__" -delete
    find . -type d -name "*.egg-info" -exec rm -rf {} +
    rm -rf .coverage htmlcov/ .pytest_cache/

.PHONY: clean-docker
clean-docker:
    @echo "🐳 Cleaning Docker images..."
    docker image prune -f
    docker container prune -f

# Development workflow
.PHONY: dev-setup
dev-setup: install
    @echo "πŸ› οΈ Setting up development environment..."
    $(MAKE) download-model
    $(MAKE) validate-model

.PHONY: ci
ci: quality-checks test-coverage docker-test
    @echo "πŸ”„ CI pipeline completed successfully!"

# Help target
.PHONY: help
help:
    @echo "πŸ“š Available targets:"
    @echo "  install          - Install all dependencies"
    @echo "  format           - Format code with black and isort"
    @echo "  lint             - Run linting with pylint and mypy"
    @echo "  test             - Run unit tests"
    @echo "  test-integration - Run integration tests"
    @echo "  quality-checks   - Run all quality checks"
    @echo "  docker-build     - Build Docker image"
    @echo "  docker-run       - Run application in Docker"
    @echo "  deploy-staging   - Deploy to staging environment"
    @echo "  clean            - Clean temporary files"
    @echo "  help             - Show this help message"
Enter fullscreen mode Exit fullscreen mode

🎯 Makefile Best Practices

  • βœ… Use .PHONY for targets that don't create files
  • βœ… Add descriptions with @echo commands
  • βœ… Use variables for repeated values
  • βœ… Chain commands with && for error handling
  • βœ… Include help target for documentation

πŸ“‹ Common Make Commands

# Run all quality checks
make quality-checks

# Full CI pipeline
make ci

# Development setup
make dev-setup

# Deploy to staging
make deploy-staging

# Clean everything
make clean clean-docker
Enter fullscreen mode Exit fullscreen mode

πŸ€– Test Automation Scripts {#test-automation}

πŸ“œ Creating Automated Test Scripts

Main Test Runner Script (run_tests.sh)

#!/usr/bin/env bash
set -e  # Exit on error

# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
LOCAL_TAG=$(date +"%Y-%m-%d-%H-%M")
LOCAL_IMAGE_NAME="mlops-service:${LOCAL_TAG}"

echo -e "${YELLOW}πŸš€ Starting MLOps Test Pipeline${NC}"
echo -e "${YELLOW}Project: ${PROJECT_ROOT}${NC}"
echo -e "${YELLOW}Image: ${LOCAL_IMAGE_NAME}${NC}"

# Function to print status
print_status() {
    echo -e "${GREEN}βœ… $1${NC}"
}

print_error() {
    echo -e "${RED}❌ $1${NC}"
}

# Cleanup function
cleanup() {
    echo -e "${YELLOW}🧹 Cleaning up...${NC}"
    docker-compose -f docker-compose.test.yml down --remove-orphans
    exit $1
}

# Set trap for cleanup
trap 'cleanup $?' EXIT

# Change to project directory
cd "$PROJECT_ROOT"

# Step 1: Install dependencies
echo -e "${YELLOW}πŸ“¦ Installing dependencies...${NC}"
pipenv install --dev
print_status "Dependencies installed"

# Step 2: Run unit tests
echo -e "${YELLOW}πŸ§ͺ Running unit tests...${NC}"
pipenv run pytest tests/unit/ -v --tb=short
print_status "Unit tests passed"

# Step 3: Code quality checks
echo -e "${YELLOW}πŸ” Running code quality checks...${NC}"
pipenv run black --check .
pipenv run isort --check-only .
pipenv run pylint src/ --fail-under=8.0
print_status "Code quality checks passed"

# Step 4: Build Docker image
echo -e "${YELLOW}🐳 Building Docker image...${NC}"
export LOCAL_IMAGE_NAME
docker build -t "${LOCAL_IMAGE_NAME}" .
print_status "Docker image built: ${LOCAL_IMAGE_NAME}"

# Step 5: Start test environment
echo -e "${YELLOW}πŸ—οΈ Starting test environment...${NC}"
docker-compose -f docker-compose.test.yml up -d
sleep 10  # Wait for services to be ready
print_status "Test environment started"

# Step 6: Run integration tests
echo -e "${YELLOW}πŸ”— Running integration tests...${NC}"
pipenv run python tests/integration/test_docker.py
ERROR_CODE=$?

if [ $ERROR_CODE != 0 ]; then
    print_error "Integration tests failed"
    docker-compose -f docker-compose.test.yml logs
    cleanup $ERROR_CODE
fi

print_status "Integration tests passed"

# Step 7: Run cloud service tests (with LocalStack)
echo -e "${YELLOW}☁️ Running cloud service tests...${NC}"
pipenv run python tests/integration/test_kinesis.py
ERROR_CODE=$?

if [ $ERROR_CODE != 0 ]; then
    print_error "Cloud service tests failed"
    docker-compose -f docker-compose.test.yml logs
    cleanup $ERROR_CODE
fi

print_status "Cloud service tests passed"

# Step 8: Performance tests
echo -e "${YELLOW}⚑ Running performance tests...${NC}"
pipenv run python tests/performance/test_load.py
print_status "Performance tests passed"

# Success!
echo -e "${GREEN}πŸŽ‰ All tests passed successfully!${NC}"
echo -e "${GREEN}πŸ“¦ Image ready for deployment: ${LOCAL_IMAGE_NAME}${NC}"
Enter fullscreen mode Exit fullscreen mode

Docker Compose for Testing (docker-compose.test.yml)

version: '3.8'

services:
  ml-service:
    image: ${LOCAL_IMAGE_NAME}
    ports:
      - "8080:8080"
    environment:
      - PREDICTIONS_STREAM_NAME=ride_predictions
      - TEST_RUN=true
      - RUN_ID=test_run_123
      - AWS_DEFAULT_REGION=us-east-1
      - MODEL_LOCATION=/app/models
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_ENDPOINT_URL=http://localstack:4566
    volumes:
      - "./models:/app/models"
    depends_on:
      - redis
      - postgres
      - localstack
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5

  postgres:
    image: postgres:13-alpine
    environment:
      POSTGRES_DB: test_mlops
      POSTGRES_USER: test_user
      POSTGRES_PASSWORD: test_pass
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U test_user -d test_mlops"]
      interval: 5s
      timeout: 5s
      retries: 5

  localstack:
    image: localstack/localstack:latest
    ports:
      - "4566:4566"
    environment:
      - SERVICES=kinesis,s3,lambda
      - DEBUG=1
      - DATA_DIR=/tmp/localstack/data
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:4566/health"]
      interval: 10s
      timeout: 5s
      retries: 3
Enter fullscreen mode Exit fullscreen mode

Performance Testing Script (test_load.py)

#!/usr/bin/env python3
"""
Performance testing script for ML service
"""
import time
import requests
import concurrent.futures
import statistics
from typing import List, Dict

def make_prediction_request(ride_data: Dict) -> Dict:
    """Make a single prediction request"""
    try:
        start_time = time.time()
        response = requests.post(
            "http://localhost:8080/predict",
            json=ride_data,
            timeout=5.0
        )
        end_time = time.time()

        return {
            'status_code': response.status_code,
            'response_time': end_time - start_time,
            'success': response.status_code == 200
        }
    except Exception as e:
        return {
            'status_code': 0,
            'response_time': float('inf'),
            'success': False,
            'error': str(e)
        }

def load_test(num_requests: int = 100, concurrent_workers: int = 10) -> Dict:
    """Run load test with specified parameters"""

    # Sample ride data for testing
    ride_data = {
        'PULocationID': 43,
        'DOLocationID': 215,
        'trip_distance': 5.5,
        'datetime': '2021-01-01 14:30:00'
    }

    print(f"πŸš€ Starting load test with {num_requests} requests, {concurrent_workers} workers")

    start_time = time.time()
    results = []

    # Use ThreadPoolExecutor for concurrent requests
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_workers) as executor:
        futures = [
            executor.submit(make_prediction_request, ride_data)
            for _ in range(num_requests)
        ]

        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())

    end_time = time.time()
    total_time = end_time - start_time

    # Calculate statistics
    successful_requests = [r for r in results if r['success']]
    response_times = [r['response_time'] for r in successful_requests]

    if not response_times:
        raise Exception("❌ No successful requests!")

    stats = {
        'total_requests': num_requests,
        'successful_requests': len(successful_requests),
        'failed_requests': num_requests - len(successful_requests),
        'success_rate': len(successful_requests) / num_requests * 100,
        'total_time': total_time,
        'requests_per_second': num_requests / total_time,
        'avg_response_time': statistics.mean(response_times),
        'min_response_time': min(response_times),
        'max_response_time': max(response_times),
        'p95_response_time': statistics.quantiles(response_times, n=20)[18],  # 95th percentile
        'p99_response_time': statistics.quantiles(response_times, n=100)[98]  # 99th percentile
    }

    return stats

def main():
    """Main performance testing function"""
    print("⚑ MLOps Service Performance Testing")
    print("=" * 50)

    # Wait for service to be ready
    max_retries = 30
    for i in range(max_retries):
        try:
            response = requests.get("http://localhost:8080/health", timeout=2)
            if response.status_code == 200:
                print("βœ… Service is ready!")
                break
        except requests.exceptions.RequestException:
            pass

        if i == max_retries - 1:
            raise Exception("❌ Service not ready after 30 attempts")

        print(f"⏳ Waiting for service... ({i+1}/{max_retries})")
        time.sleep(1)

    # Run load tests with different configurations
    test_configs = [
        {'num_requests': 50, 'concurrent_workers': 5},
        {'num_requests': 100, 'concurrent_workers': 10},
        {'num_requests': 200, 'concurrent_workers': 20}
    ]

    for config in test_configs:
        print(f"\nπŸ“Š Test: {config['num_requests']} requests, {config['concurrent_workers']} workers")
        print("-" * 50)

        try:
            stats = load_test(**config)

            print(f"βœ… Success Rate: {stats['success_rate']:.1f}%")
            print(f"πŸš€ Requests/sec: {stats['requests_per_second']:.1f}")
            print(f"⏱️  Avg Response Time: {stats['avg_response_time']*1000:.1f}ms")
            print(f"πŸ“ˆ 95th Percentile: {stats['p95_response_time']*1000:.1f}ms")
            print(f"πŸ” 99th Percentile: {stats['p99_response_time']*1000:.1f}ms")

            # Performance assertions
            assert stats['success_rate'] >= 95, f"Success rate too low: {stats['success_rate']:.1f}%"
            assert stats['avg_response_time'] < 1.0, f"Average response time too high: {stats['avg_response_time']:.3f}s"
            assert stats['p95_response_time'] < 2.0, f"95th percentile too high: {stats['p95_response_time']:.3f}s"

            print("βœ… Performance test passed!")

        except Exception as e:
            print(f"❌ Performance test failed: {e}")
            raise

    print("\nπŸŽ‰ All performance tests completed successfully!")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

☁️ Cloud Service Testing {#cloud-testing}

🌊 Testing with LocalStack

Setting Up Kinesis Streams for Testing

#!/bin/bash
# setup_kinesis.sh - Set up Kinesis streams for testing

ENDPOINT_URL="http://localhost:4566"
STREAM_NAME="ride_predictions"
REGION="us-east-1"

echo "🌊 Setting up Kinesis streams for testing..."

# Create Kinesis stream
aws --endpoint-url=$ENDPOINT_URL \
    --region=$REGION \
    kinesis create-stream \
    --stream-name $STREAM_NAME \
    --shard-count 1

echo "⏳ Waiting for stream to become active..."
sleep 5

# Verify stream is created
aws --endpoint-url=$ENDPOINT_URL \
    --region=$REGION \
    kinesis describe-stream \
    --stream-name $STREAM_NAME

echo "βœ… Kinesis stream '$STREAM_NAME' is ready!"
Enter fullscreen mode Exit fullscreen mode

Kinesis Integration Test

# tests/integration/test_kinesis.py
import json
import base64
import boto3
import pytest
from typing import Dict, Any

class TestKinesisIntegration:

    def setup_method(self):
        """Set up Kinesis client for testing"""
        self.kinesis_client = boto3.client(
            'kinesis',
            endpoint_url='http://localhost:4566',
            region_name='us-east-1',
            aws_access_key_id='test',
            aws_secret_access_key='test'
        )
        self.stream_name = 'ride_predictions'

    def test_kinesis_stream_exists(self):
        """Test that the Kinesis stream exists and is active"""
        try:
            response = self.kinesis_client.describe_stream(
                StreamName=self.stream_name
            )

            assert response['StreamDescription']['StreamName'] == self.stream_name
            assert response['StreamDescription']['StreamStatus'] == 'ACTIVE'
            print("βœ… Kinesis stream is active")

        except Exception as e:
            pytest.fail(f"Kinesis stream test failed: {e}")

    def test_put_record_to_stream(self):
        """Test putting a record to the Kinesis stream"""
        # Prepare test data
        test_data = {
            'ride_id': 12345,
            'predicted_duration': 25.5,
            'model_version': 'v1.0.0',
            'timestamp': '2021-01-01T14:30:00Z'
        }

        try:
            # Put record to stream
            response = self.kinesis_client.put_record(
                StreamName=self.stream_name,
                Data=json.dumps(test_data),
                PartitionKey=str(test_data['ride_id'])
            )

            assert 'SequenceNumber' in response
            assert response['ResponseMetadata']['HTTPStatusCode'] == 200
            print(f"βœ… Record put to stream with sequence: {response['SequenceNumber']}")

        except Exception as e:
            pytest.fail(f"Failed to put record to Kinesis: {e}")

    def test_consume_records_from_stream(self):
        """Test consuming records from the Kinesis stream"""
        try:
            # Get shard iterator
            shard_response = self.kinesis_client.list_shards(
                StreamName=self.stream_name
            )
            shard_id = shard_response['Shards'][0]['ShardId']

            iterator_response = self.kinesis_client.get_shard_iterator(
                StreamName=self.stream_name,
                ShardId=shard_id,
                ShardIteratorType='TRIM_HORIZON'
            )

            shard_iterator = iterator_response['ShardIterator']

            # Get records
            records_response = self.kinesis_client.get_records(
                ShardIterator=shard_iterator
            )

            assert 'Records' in records_response
            print(f"βœ… Retrieved {len(records_response['Records'])} records from stream")

        except Exception as e:
            pytest.fail(f"Failed to consume from Kinesis: {e}")
Enter fullscreen mode Exit fullscreen mode

πŸ—„οΈ S3 Model Storage Testing

# tests/integration/test_s3_storage.py
import boto3
import pytest
import tempfile
import joblib
from sklearn.linear_model import LinearRegression
import numpy as np

class TestS3ModelStorage:

    def setup_method(self):
        """Set up S3 client for testing"""
        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:4566',
            region_name='us-east-1',
            aws_access_key_id='test',
            aws_secret_access_key='test'
        )
        self.bucket_name = 'test-mlops-models'
        self.model_key = 'models/v1/model.pkl'

    def test_create_s3_bucket(self):
        """Test creating S3 bucket for model storage"""
        try:
            self.s3_client.create_bucket(Bucket=self.bucket_name)

            # Verify bucket exists
            response = self.s3_client.list_buckets()
            bucket_names = [bucket['Name'] for bucket in response['Buckets']]

            assert self.bucket_name in bucket_names
            print(f"βœ… S3 bucket '{self.bucket_name}' created successfully")

        except Exception as e:
            pytest.fail(f"Failed to create S3 bucket: {e}")

    def test_upload_model_to_s3(self):
        """Test uploading a model to S3"""
        try:
            # Create a simple model for testing
            X = np.array([[1, 2], [3, 4], [5, 6]])
            y = np.array([1, 2, 3])
            model = LinearRegression().fit(X, y)

            # Save model to temporary file
            with tempfile.NamedTemporaryFile(suffix='.pkl', delete=False) as tmp_file:
                joblib.dump(model, tmp_file.name)

                # Upload to S3
                with open(tmp_file.name, 'rb') as f:
                    self.s3_client.put_object(
                        Bucket=self.bucket_name,
                        Key=self.model_key,
                        Body=f.read()
                    )

            # Verify upload
            response = self.s3_client.head_object(
                Bucket=self.bucket_name,
                Key=self.model_key
            )

            assert response['ResponseMetadata']['HTTPStatusCode'] == 200
            print(f"βœ… Model uploaded to S3: s3://{self.bucket_name}/{self.model_key}")

        except Exception as e:
            pytest.fail(f"Failed to upload model to S3: {e}")

    def test_download_model_from_s3(self):
        """Test downloading a model from S3"""
        try:
            # Download model
            with tempfile.NamedTemporaryFile(suffix='.pkl') as tmp_file:
                self.s3_client.download_file(
                    self.bucket_name,
                    self.model_key,
                    tmp_file.name
                )

                # Load and test model
                downloaded_model = joblib.load(tmp_file.name)

                # Test prediction
                test_input = np.array([[2, 3]])
                prediction = downloaded_model.predict(test_input)

                assert prediction is not None
                assert len(prediction) == 1
                print(f"βœ… Model downloaded and tested successfully: prediction = {prediction[0]:.2f}")

        except Exception as e:
            pytest.fail(f"Failed to download model from S3: {e}")
Enter fullscreen mode Exit fullscreen mode

🐳 Docker and Containerization {#docker-deployment}

πŸ“¦ Production Dockerfile

# Production Dockerfile
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements first for better caching
COPY requirements.txt .

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Create non-root user for security
RUN useradd --create-home --shell /bin/bash mlops
RUN chown -R mlops:mlops /app
USER mlops

# Copy application code
COPY --chown=mlops:mlops src/ ./src/
COPY --chown=mlops:mlops models/ ./models/

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

# Run application
CMD ["python", "-m", "src.app"]
Enter fullscreen mode Exit fullscreen mode

🏭 Multi-stage Production Build

# Multi-stage Dockerfile for production
FROM python:3.9 as builder

# Install build dependencies
RUN pip install pipenv

# Copy Pipfile
COPY Pipfile Pipfile.lock ./

# Install dependencies to a virtual environment
RUN pipenv install --deploy --system

# Production stage
FROM python:3.9-slim as production

# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

# Copy Python packages from builder
COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Set working directory
WORKDIR /app

# Create non-root user
RUN useradd --create-home --shell /bin/bash mlops
RUN chown -R mlops:mlops /app
USER mlops

# Copy application
COPY --chown=mlops:mlops src/ ./src/
COPY --chown=mlops:mlops models/ ./models/

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

# Run application
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "4", "src.app:app"]
Enter fullscreen mode Exit fullscreen mode

πŸ”§ Docker Compose for Production

# docker-compose.prod.yml
version: '3.8'

services:
  ml-service:
    build: 
      context: .
      dockerfile: Dockerfile.prod
    ports:
      - "8080:8080"
    environment:
      - ENVIRONMENT=production
      - MODEL_LOCATION=/app/models
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://mlops:${DB_PASSWORD}@postgres:5432/mlops_prod
      - AWS_DEFAULT_REGION=${AWS_REGION}
      - PREDICTIONS_STREAM_NAME=${KINESIS_STREAM}
    volumes:
      - model_cache:/app/models
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  redis:
    image: redis:6-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5

  postgres:
    image: postgres:13-alpine
    environment:
      POSTGRES_DB: mlops_prod
      POSTGRES_USER: mlops
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mlops -d mlops_prod"]
      interval: 5s
      timeout: 5s
      retries: 5

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - ml-service
    restart: unless-stopped

volumes:
  model_cache:
  redis_data:
  postgres_data:
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Monitoring and Logging {#monitoring}

πŸ“ˆ Application Health Monitoring

# src/monitoring.py
import time
import psutil
import logging
from typing import Dict, Any
from datetime import datetime

class HealthMonitor:
    """Monitor application health and performance"""

    def __init__(self):
        self.start_time = time.time()
        self.request_count = 0
        self.error_count = 0

    def get_health_status(self) -> Dict[str, Any]:
        """Get comprehensive health status"""
        current_time = time.time()
        uptime = current_time - self.start_time

        # System metrics
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')

        # Application metrics
        error_rate = (self.error_count / max(self.request_count, 1)) * 100

        health_status = {
            'status': 'healthy' if error_rate < 5.0 and cpu_percent < 80 else 'unhealthy',
            'timestamp': datetime.utcnow().isoformat(),
            'uptime_seconds': uptime,
            'system': {
                'cpu_percent': cpu_percent,
                'memory_percent': memory.percent,
                'memory_available_gb': memory.available / (1024**3),
                'disk_percent': disk.percent,
                'disk_free_gb': disk.free / (1024**3)
            },
            'application': {
                'total_requests': self.request_count,
                'error_count': self.error_count,
                'error_rate_percent': error_rate
            },
            'checks': {
                'model_loaded': self._check_model_loaded(),
                'database_connected': self._check_database(),
                'external_services': self._check_external_services()
            }
        }

        return health_status

    def _check_model_loaded(self) -> bool:
        """Check if ML model is loaded"""
        try:
            # Add your model loading check logic here
            return True
        except Exception:
            return False

    def _check_database(self) -> bool:
        """Check database connectivity"""
        try:
            # Add your database check logic here
            return True
        except Exception:
            return False

    def _check_external_services(self) -> bool:
        """Check external service connectivity"""
        try:
            # Add checks for AWS, Redis, etc.
            return True
        except Exception:
            return False

    def record_request(self, success: bool = True):
        """Record a request for monitoring"""
        self.request_count += 1
        if not success:
            self.error_count += 1

# Logging configuration
def setup_logging():
    """Set up structured logging"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler('/app/logs/application.log')
        ]
    )

    # Add structured logging for JSON format
    import json

    class JSONFormatter(logging.Formatter):
        def format(self, record):
            log_entry = {
                'timestamp': self.formatTime(record),
                'level': record.levelname,
                'logger': record.name,
                'message': record.getMessage(),
                'module': record.module,
                'function': record.funcName,
                'line': record.lineno
            }

            if hasattr(record, 'request_id'):
                log_entry['request_id'] = record.request_id

            if hasattr(record, 'user_id'):
                log_entry['user_id'] = record.user_id

            return json.dumps(log_entry)

    # Apply JSON formatter to file handler
    file_handler = logging.FileHandler('/app/logs/application.json')
    file_handler.setFormatter(JSONFormatter())
    logging.getLogger().addHandler(file_handler)
Enter fullscreen mode Exit fullscreen mode

πŸ” Performance Metrics Collection

# src/metrics.py
import time
import functools
from typing import Callable, Any
from dataclasses import dataclass
from collections import defaultdict, deque
import threading

@dataclass
class MetricPoint:
    """Single metric measurement"""
    timestamp: float
    value: float
    labels: dict

class MetricsCollector:
    """Collect and store application metrics"""

    def __init__(self, max_points: int = 1000):
        self.max_points = max_points
        self.metrics = defaultdict(lambda: deque(maxlen=max_points))
        self.lock = threading.Lock()

    def record_metric(self, name: str, value: float, labels: dict = None):
        """Record a metric point"""
        labels = labels or {}
        point = MetricPoint(
            timestamp=time.time(),
            value=value,
            labels=labels
        )

        with self.lock:
            self.metrics[name].append(point)

    def get_metrics(self, name: str, since: float = None) -> list:
        """Get metrics since a timestamp"""
        with self.lock:
            points = list(self.metrics[name])

        if since:
            points = [p for p in points if p.timestamp >= since]

        return points

    def get_metric_summary(self, name: str, window_seconds: int = 300) -> dict:
        """Get metric summary for a time window"""
        since = time.time() - window_seconds
        points = self.get_metrics(name, since)

        if not points:
            return {'count': 0, 'avg': 0, 'min': 0, 'max': 0}

        values = [p.value for p in points]
        return {
            'count': len(values),
            'avg': sum(values) / len(values),
            'min': min(values),
            'max': max(values),
            'latest': values[-1] if values else 0
        }

# Global metrics collector
metrics = MetricsCollector()

def measure_time(metric_name: str, labels: dict = None):
    """Decorator to measure function execution time"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                success = True
            except Exception as e:
                success = False
                raise
            finally:
                execution_time = time.time() - start_time
                metric_labels = (labels or {}).copy()
                metric_labels['success'] = success
                metric_labels['function'] = func.__name__

                metrics.record_metric(metric_name, execution_time, metric_labels)

            return result
        return wrapper
    return decorator

# Usage examples
@measure_time('prediction_time', {'model': 'v1'})
def predict_ride_duration(features):
    """Predict ride duration with timing"""
    # Your prediction logic here
    time.sleep(0.1)  # Simulate processing
    return 25.5

@measure_time('database_query_time', {'table': 'predictions'})
def save_prediction(prediction_data):
    """Save prediction with timing"""
    # Your database logic here
    time.sleep(0.05)  # Simulate database operation
    return True
Enter fullscreen mode Exit fullscreen mode

🎯 Summary

Congratulations! πŸŽ‰ You've completed Chapter 2 of the MLOps Best Practices guide. You now have comprehensive automation and deployment capabilities:

βœ… Key Achievements

  • Pre-commit Automation πŸ”„ - Automated quality gates before code commits
  • Make-based Workflows πŸ› οΈ - Streamlined build and deployment processes
  • Test Automation πŸ€– - Comprehensive test suites with Docker integration
  • Cloud Service Testing ☁️ - LocalStack integration for AWS services
  • Production Deployment 🐳 - Docker containerization with monitoring
  • Health & Metrics πŸ“Š - Application monitoring and performance tracking

πŸ› οΈ Tools Mastered

  • Pre-commit hooks for automated quality checks
  • Make for build automation and task management
  • Docker & Docker Compose for containerization
  • LocalStack for cloud service testing
  • Shell scripting for test automation
  • Health monitoring and metrics collection

🎯 Production Readiness Checklist

  • βœ… Automated testing pipeline
  • βœ… Code quality enforcement
  • βœ… Containerized deployment
  • βœ… Health monitoring endpoints
  • βœ… Performance metrics collection
  • βœ… Error handling and logging
  • βœ… Security scanning integration

πŸ”œ Next Steps

  1. Implement CI/CD pipelines with GitHub Actions or GitLab CI
  2. Set up monitoring with Prometheus and Grafana
  3. Add alerting for production issues
  4. Implement blue-green or canary deployments
  5. Scale with Kubernetes for container orchestration

πŸŽ“ Complete MLOps Best Practices Summary

By completing both chapters, you now have a comprehensive MLOps best practices foundation:

πŸ“š Chapter 1 Recap: Testing & Quality

  • Unit testing with pytest
  • Integration testing strategies
  • Code quality with linting and formatting
  • ML-specific testing approaches

πŸ”§ Chapter 2 Recap: Automation & Deployment

  • Pre-commit hooks for quality gates
  • Automated testing and build pipelines
  • Docker containerization
  • Production monitoring and metrics

mlopszoomcamp

Top comments (0)