DEV Community

Koteswararao J
Koteswararao J

Posted on

End-to-End MLOps Project: Customer Churn Prediction

End-to-End MLOps Project: Customer Churn Prediction

Project Overview

Build a complete machine learning pipeline to predict customer churn with automated training, versioning, testing, and deployment.

Tech Stack

  • ML Framework: Scikit-learn, XGBoost
  • Experiment Tracking: MLflow
  • Version Control: Git, DVC (Data Version Control)
  • CI/CD: GitHub Actions
  • Containerization: Docker
  • Deployment: FastAPI + Docker
  • Monitoring: Prometheus + Grafana (optional)
  • Cloud: AWS/GCP/Azure (we'll use AWS)

Project Structure

customer-churn-mlops/
├── data/
│   ├── raw/
│   └── processed/
├── notebooks/
│   └── exploratory_analysis.ipynb
├── src/
│   ├── __init__.py
│   ├── data_preprocessing.py
│   ├── feature_engineering.py
│   ├── model_training.py
│   └── model_evaluation.py
├── api/
│   ├── __init__.py
│   ├── main.py
│   └── schemas.py
├── tests/
│   ├── test_preprocessing.py
│   └── test_model.py
├── models/
├── .github/
│   └── workflows/
│       ├── ci.yml
│       └── cd.yml
├── Dockerfile
├── requirements.txt
├── config.yaml
├── dvc.yaml
└── README.md
Enter fullscreen mode Exit fullscreen mode

Step 1: Setup & Data Preparation

1.1 Initialize Project

# Create project directory
mkdir customer-churn-mlops
cd customer-churn-mlops

# Initialize git
git init

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install pandas numpy scikit-learn xgboost mlflow dvc fastapi uvicorn pytest
pip freeze > requirements.txt
Enter fullscreen mode Exit fullscreen mode

1.2 Download Dataset

# src/data_preprocessing.py
import pandas as pd
from sklearn.model_selection import train_test_split
import yaml

class DataPreprocessor:
    def __init__(self, config_path='config.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)

    def load_data(self, path):
        """Load customer churn dataset"""
        df = pd.read_csv(path)
        return df

    def preprocess(self, df):
        """Clean and preprocess data"""
        # Handle missing values
        df = df.dropna()

        # Encode categorical variables
        categorical_cols = df.select_dtypes(include=['object']).columns
        df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)

        return df

    def split_data(self, df, target_col='Churn'):
        """Split data into train and test sets"""
        X = df.drop(target_col, axis=1)
        y = df[target_col]

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, 
            test_size=self.config['data']['test_size'],
            random_state=self.config['data']['random_state']
        )

        return X_train, X_test, y_train, y_test
Enter fullscreen mode Exit fullscreen mode

Step 2: Feature Engineering

# src/feature_engineering.py
from sklearn.preprocessing import StandardScaler
import joblib

class FeatureEngineering:
    def __init__(self):
        self.scaler = StandardScaler()

    def create_features(self, X_train, X_test):
        """Create and scale features"""
        # Scale numerical features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Save scaler
        joblib.dump(self.scaler, 'models/scaler.pkl')

        return X_train_scaled, X_test_scaled
Enter fullscreen mode Exit fullscreen mode

Step 3: Model Training with MLflow

# src/model_training.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import yaml

class ModelTrainer:
    def __init__(self, config_path='config.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)

        mlflow.set_tracking_uri(self.config['mlflow']['tracking_uri'])
        mlflow.set_experiment(self.config['mlflow']['experiment_name'])

    def train_model(self, X_train, y_train, X_test, y_test):
        """Train model with MLflow tracking"""
        with mlflow.start_run():
            # Initialize model
            model = XGBClassifier(
                n_estimators=self.config['model']['n_estimators'],
                max_depth=self.config['model']['max_depth'],
                learning_rate=self.config['model']['learning_rate']
            )

            # Train model
            model.fit(X_train, y_train)

            # Make predictions
            y_pred = model.predict(X_test)

            # Calculate metrics
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred)
            recall = recall_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)

            # Log parameters
            mlflow.log_params(self.config['model'])

            # Log metrics
            mlflow.log_metric("accuracy", accuracy)
            mlflow.log_metric("precision", precision)
            mlflow.log_metric("recall", recall)
            mlflow.log_metric("f1_score", f1)

            # Log model
            mlflow.sklearn.log_model(model, "model")

            print(f"Accuracy: {accuracy:.4f}")
            print(f"Precision: {precision:.4f}")
            print(f"Recall: {recall:.4f}")
            print(f"F1 Score: {f1:.4f}")

            return model
Enter fullscreen mode Exit fullscreen mode

Step 4: Configuration File

# config.yaml
data:
  raw_path: 'data/raw/churn.csv'
  processed_path: 'data/processed/'
  test_size: 0.2
  random_state: 42

model:
  n_estimators: 100
  max_depth: 6
  learning_rate: 0.1

mlflow:
  tracking_uri: 'http://localhost:5000'
  experiment_name: 'customer-churn-prediction'

api:
  host: '0.0.0.0'
  port: 8000
Enter fullscreen mode Exit fullscreen mode

Step 5: DVC Setup (Data Version Control)

# Initialize DVC
dvc init

# Add data to DVC tracking
dvc add data/raw/churn.csv

# Create DVC pipeline
Enter fullscreen mode Exit fullscreen mode
# dvc.yaml
stages:
  preprocess:
    cmd: python src/data_preprocessing.py
    deps:
      - data/raw/churn.csv
      - src/data_preprocessing.py
    outs:
      - data/processed/train.csv
      - data/processed/test.csv

  train:
    cmd: python src/model_training.py
    deps:
      - data/processed/train.csv
      - src/model_training.py
    outs:
      - models/model.pkl
    metrics:
      - metrics.json:
          cache: false
Enter fullscreen mode Exit fullscreen mode

Step 6: FastAPI Deployment

# api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import mlflow.sklearn

app = FastAPI(title="Customer Churn Prediction API")

# Load model and scaler
model = mlflow.sklearn.load_model("models/model")
scaler = joblib.load("models/scaler.pkl")

class CustomerData(BaseModel):
    tenure: float
    monthly_charges: float
    total_charges: float
    # Add other features as needed

@app.get("/")
def home():
    return {"message": "Customer Churn Prediction API", "status": "running"}

@app.post("/predict")
def predict_churn(data: CustomerData):
    try:
        # Convert to array
        features = np.array([[
            data.tenure,
            data.monthly_charges,
            data.total_charges
        ]])

        # Scale features
        features_scaled = scaler.transform(features)

        # Make prediction
        prediction = model.predict(features_scaled)
        probability = model.predict_proba(features_scaled)

        return {
            "churn_prediction": int(prediction[0]),
            "churn_probability": float(probability[0][1])
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
def health_check():
    return {"status": "healthy"}
Enter fullscreen mode Exit fullscreen mode

Step 7: Dockerfile

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Copy requirements
COPY requirements.txt .

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy application files
COPY . .

# Expose port
EXPOSE 8000

# Run the application
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Enter fullscreen mode Exit fullscreen mode

Step 8: CI/CD with GitHub Actions

# .github/workflows/ci.yml
name: CI Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.9

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest

    - name: Run tests
      run: |
        pytest tests/

    - name: Lint code
      run: |
        pip install flake8
        flake8 src/ --max-line-length=120
Enter fullscreen mode Exit fullscreen mode
# .github/workflows/cd.yml
name: CD Pipeline

on:
  push:
    branches: [ main ]

jobs:
  deploy:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v1
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-east-1

    - name: Build Docker image
      run: |
        docker build -t churn-prediction:latest .

    - name: Push to ECR
      run: |
        aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin ${{ secrets.ECR_REGISTRY }}
        docker tag churn-prediction:latest ${{ secrets.ECR_REGISTRY }}/churn-prediction:latest
        docker push ${{ secrets.ECR_REGISTRY }}/churn-prediction:latest

    - name: Deploy to ECS
      run: |
        aws ecs update-service --cluster churn-cluster --service churn-service --force-new-deployment
Enter fullscreen mode Exit fullscreen mode

Step 9: Testing

# tests/test_model.py
import pytest
import numpy as np
from src.model_training import ModelTrainer

def test_model_training():
    """Test model training pipeline"""
    # Create dummy data
    X_train = np.random.rand(100, 10)
    y_train = np.random.randint(0, 2, 100)
    X_test = np.random.rand(20, 10)
    y_test = np.random.randint(0, 2, 20)

    trainer = ModelTrainer()
    model = trainer.train_model(X_train, y_train, X_test, y_test)

    assert model is not None
    assert hasattr(model, 'predict')

def test_prediction_shape():
    """Test prediction output shape"""
    from api.main import model

    sample_data = np.random.rand(1, 10)
    prediction = model.predict(sample_data)

    assert prediction.shape == (1,)
Enter fullscreen mode Exit fullscreen mode

Step 10: Running the Complete Pipeline

Local Development

# 1. Train the model
python src/model_training.py

# 2. Start MLflow UI
mlflow ui --port 5000

# 3. Run the API
uvicorn api.main:app --reload

# 4. Test the API
curl -X POST "http://localhost:8000/predict" \
  -H "Content-Type: application/json" \
  -d '{"tenure": 12, "monthly_charges": 50.0, "total_charges": 600.0}'
Enter fullscreen mode Exit fullscreen mode

Docker Deployment

# Build image
docker build -t churn-prediction:latest .

# Run container
docker run -p 8000:8000 churn-prediction:latest

# Test
curl http://localhost:8000/health
Enter fullscreen mode Exit fullscreen mode

Production Deployment (AWS)

# Create ECR repository
aws ecr create-repository --repository-name churn-prediction

# Build and push
docker build -t churn-prediction:latest .
docker tag churn-prediction:latest <account-id>.dkr.ecr.us-east-1.amazonaws.com/churn-prediction:latest
docker push <account-id>.dkr.ecr.us-east-1.amazonaws.com/churn-prediction:latest

# Deploy to ECS/EKS or EC2
Enter fullscreen mode Exit fullscreen mode

Step 11: Monitoring & Logging

# Add to api/main.py
import logging
from prometheus_client import Counter, Histogram
import time

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Prometheus metrics
PREDICTIONS = Counter('predictions_total', 'Total predictions made')
PREDICTION_TIME = Histogram('prediction_duration_seconds', 'Prediction duration')

@app.post("/predict")
def predict_churn(data: CustomerData):
    start_time = time.time()

    try:
        # Existing prediction code
        result = make_prediction(data)

        # Update metrics
        PREDICTIONS.inc()
        PREDICTION_TIME.observe(time.time() - start_time)

        # Log prediction
        logger.info(f"Prediction made: {result}")

        return result
    except Exception as e:
        logger.error(f"Prediction failed: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))
Enter fullscreen mode Exit fullscreen mode

Project Completion Checklist

  • [ ] Data preprocessing pipeline
  • [ ] Feature engineering
  • [ ] Model training with MLflow tracking
  • [ ] Model versioning with DVC
  • [ ] Unit tests
  • [ ] FastAPI deployment
  • [ ] Docker containerization
  • [ ] CI/CD pipeline
  • [ ] Cloud deployment
  • [ ] Monitoring setup
  • [ ] Documentation

Key Learning Points

  1. Version Control: Git for code, DVC for data and models
  2. Experiment Tracking: MLflow for model parameters and metrics
  3. API Development: FastAPI for serving predictions
  4. Containerization: Docker for reproducible environments
  5. Automation: GitHub Actions for CI/CD
  6. Testing: Pytest for quality assurance
  7. Monitoring: Logging and metrics for production

Next Steps & Improvements

  • Add A/B testing framework
  • Implement model retraining pipeline
  • Add data drift detection
  • Set up Grafana dashboards
  • Implement feature store
  • Add model explainability (SHAP/LIME)
  • Set up alerting system
  • Add load testing

Resources

This project demonstrates a production-ready MLOps pipeline suitable for real-world applications!

Top comments (0)