End-to-End MLOps Project: Customer Churn Prediction
Project Overview
Build a complete machine learning pipeline to predict customer churn with automated training, versioning, testing, and deployment.
Tech Stack
- ML Framework: Scikit-learn, XGBoost
- Experiment Tracking: MLflow
- Version Control: Git, DVC (Data Version Control)
- CI/CD: GitHub Actions
- Containerization: Docker
- Deployment: FastAPI + Docker
- Monitoring: Prometheus + Grafana (optional)
- Cloud: AWS/GCP/Azure (we'll use AWS)
Project Structure
customer-churn-mlops/
├── data/
│ ├── raw/
│ └── processed/
├── notebooks/
│ └── exploratory_analysis.ipynb
├── src/
│ ├── __init__.py
│ ├── data_preprocessing.py
│ ├── feature_engineering.py
│ ├── model_training.py
│ └── model_evaluation.py
├── api/
│ ├── __init__.py
│ ├── main.py
│ └── schemas.py
├── tests/
│ ├── test_preprocessing.py
│ └── test_model.py
├── models/
├── .github/
│ └── workflows/
│ ├── ci.yml
│ └── cd.yml
├── Dockerfile
├── requirements.txt
├── config.yaml
├── dvc.yaml
└── README.md
Step 1: Setup & Data Preparation
1.1 Initialize Project
# Create project directory
mkdir customer-churn-mlops
cd customer-churn-mlops
# Initialize git
git init
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install pandas numpy scikit-learn xgboost mlflow dvc fastapi uvicorn pytest
pip freeze > requirements.txt
1.2 Download Dataset
# src/data_preprocessing.py
import pandas as pd
from sklearn.model_selection import train_test_split
import yaml
class DataPreprocessor:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
def load_data(self, path):
"""Load customer churn dataset"""
df = pd.read_csv(path)
return df
def preprocess(self, df):
"""Clean and preprocess data"""
# Handle missing values
df = df.dropna()
# Encode categorical variables
categorical_cols = df.select_dtypes(include=['object']).columns
df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)
return df
def split_data(self, df, target_col='Churn'):
"""Split data into train and test sets"""
X = df.drop(target_col, axis=1)
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=self.config['data']['test_size'],
random_state=self.config['data']['random_state']
)
return X_train, X_test, y_train, y_test
Step 2: Feature Engineering
# src/feature_engineering.py
from sklearn.preprocessing import StandardScaler
import joblib
class FeatureEngineering:
def __init__(self):
self.scaler = StandardScaler()
def create_features(self, X_train, X_test):
"""Create and scale features"""
# Scale numerical features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Save scaler
joblib.dump(self.scaler, 'models/scaler.pkl')
return X_train_scaled, X_test_scaled
Step 3: Model Training with MLflow
# src/model_training.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import yaml
class ModelTrainer:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
mlflow.set_tracking_uri(self.config['mlflow']['tracking_uri'])
mlflow.set_experiment(self.config['mlflow']['experiment_name'])
def train_model(self, X_train, y_train, X_test, y_test):
"""Train model with MLflow tracking"""
with mlflow.start_run():
# Initialize model
model = XGBClassifier(
n_estimators=self.config['model']['n_estimators'],
max_depth=self.config['model']['max_depth'],
learning_rate=self.config['model']['learning_rate']
)
# Train model
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log parameters
mlflow.log_params(self.config['model'])
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
return model
Step 4: Configuration File
# config.yaml
data:
raw_path: 'data/raw/churn.csv'
processed_path: 'data/processed/'
test_size: 0.2
random_state: 42
model:
n_estimators: 100
max_depth: 6
learning_rate: 0.1
mlflow:
tracking_uri: 'http://localhost:5000'
experiment_name: 'customer-churn-prediction'
api:
host: '0.0.0.0'
port: 8000
Step 5: DVC Setup (Data Version Control)
# Initialize DVC
dvc init
# Add data to DVC tracking
dvc add data/raw/churn.csv
# Create DVC pipeline
# dvc.yaml
stages:
preprocess:
cmd: python src/data_preprocessing.py
deps:
- data/raw/churn.csv
- src/data_preprocessing.py
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python src/model_training.py
deps:
- data/processed/train.csv
- src/model_training.py
outs:
- models/model.pkl
metrics:
- metrics.json:
cache: false
Step 6: FastAPI Deployment
# api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import mlflow.sklearn
app = FastAPI(title="Customer Churn Prediction API")
# Load model and scaler
model = mlflow.sklearn.load_model("models/model")
scaler = joblib.load("models/scaler.pkl")
class CustomerData(BaseModel):
tenure: float
monthly_charges: float
total_charges: float
# Add other features as needed
@app.get("/")
def home():
return {"message": "Customer Churn Prediction API", "status": "running"}
@app.post("/predict")
def predict_churn(data: CustomerData):
try:
# Convert to array
features = np.array([[
data.tenure,
data.monthly_charges,
data.total_charges
]])
# Scale features
features_scaled = scaler.transform(features)
# Make prediction
prediction = model.predict(features_scaled)
probability = model.predict_proba(features_scaled)
return {
"churn_prediction": int(prediction[0]),
"churn_probability": float(probability[0][1])
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
def health_check():
return {"status": "healthy"}
Step 7: Dockerfile
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Copy requirements
COPY requirements.txt .
# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application files
COPY . .
# Expose port
EXPOSE 8000
# Run the application
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Step 8: CI/CD with GitHub Actions
# .github/workflows/ci.yml
name: CI Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest
- name: Run tests
run: |
pytest tests/
- name: Lint code
run: |
pip install flake8
flake8 src/ --max-line-length=120
# .github/workflows/cd.yml
name: CD Pipeline
on:
push:
branches: [ main ]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Build Docker image
run: |
docker build -t churn-prediction:latest .
- name: Push to ECR
run: |
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin ${{ secrets.ECR_REGISTRY }}
docker tag churn-prediction:latest ${{ secrets.ECR_REGISTRY }}/churn-prediction:latest
docker push ${{ secrets.ECR_REGISTRY }}/churn-prediction:latest
- name: Deploy to ECS
run: |
aws ecs update-service --cluster churn-cluster --service churn-service --force-new-deployment
Step 9: Testing
# tests/test_model.py
import pytest
import numpy as np
from src.model_training import ModelTrainer
def test_model_training():
"""Test model training pipeline"""
# Create dummy data
X_train = np.random.rand(100, 10)
y_train = np.random.randint(0, 2, 100)
X_test = np.random.rand(20, 10)
y_test = np.random.randint(0, 2, 20)
trainer = ModelTrainer()
model = trainer.train_model(X_train, y_train, X_test, y_test)
assert model is not None
assert hasattr(model, 'predict')
def test_prediction_shape():
"""Test prediction output shape"""
from api.main import model
sample_data = np.random.rand(1, 10)
prediction = model.predict(sample_data)
assert prediction.shape == (1,)
Step 10: Running the Complete Pipeline
Local Development
# 1. Train the model
python src/model_training.py
# 2. Start MLflow UI
mlflow ui --port 5000
# 3. Run the API
uvicorn api.main:app --reload
# 4. Test the API
curl -X POST "http://localhost:8000/predict" \
-H "Content-Type: application/json" \
-d '{"tenure": 12, "monthly_charges": 50.0, "total_charges": 600.0}'
Docker Deployment
# Build image
docker build -t churn-prediction:latest .
# Run container
docker run -p 8000:8000 churn-prediction:latest
# Test
curl http://localhost:8000/health
Production Deployment (AWS)
# Create ECR repository
aws ecr create-repository --repository-name churn-prediction
# Build and push
docker build -t churn-prediction:latest .
docker tag churn-prediction:latest <account-id>.dkr.ecr.us-east-1.amazonaws.com/churn-prediction:latest
docker push <account-id>.dkr.ecr.us-east-1.amazonaws.com/churn-prediction:latest
# Deploy to ECS/EKS or EC2
Step 11: Monitoring & Logging
# Add to api/main.py
import logging
from prometheus_client import Counter, Histogram
import time
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus metrics
PREDICTIONS = Counter('predictions_total', 'Total predictions made')
PREDICTION_TIME = Histogram('prediction_duration_seconds', 'Prediction duration')
@app.post("/predict")
def predict_churn(data: CustomerData):
start_time = time.time()
try:
# Existing prediction code
result = make_prediction(data)
# Update metrics
PREDICTIONS.inc()
PREDICTION_TIME.observe(time.time() - start_time)
# Log prediction
logger.info(f"Prediction made: {result}")
return result
except Exception as e:
logger.error(f"Prediction failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
Project Completion Checklist
- [ ] Data preprocessing pipeline
- [ ] Feature engineering
- [ ] Model training with MLflow tracking
- [ ] Model versioning with DVC
- [ ] Unit tests
- [ ] FastAPI deployment
- [ ] Docker containerization
- [ ] CI/CD pipeline
- [ ] Cloud deployment
- [ ] Monitoring setup
- [ ] Documentation
Key Learning Points
- Version Control: Git for code, DVC for data and models
- Experiment Tracking: MLflow for model parameters and metrics
- API Development: FastAPI for serving predictions
- Containerization: Docker for reproducible environments
- Automation: GitHub Actions for CI/CD
- Testing: Pytest for quality assurance
- Monitoring: Logging and metrics for production
Next Steps & Improvements
- Add A/B testing framework
- Implement model retraining pipeline
- Add data drift detection
- Set up Grafana dashboards
- Implement feature store
- Add model explainability (SHAP/LIME)
- Set up alerting system
- Add load testing
Resources
This project demonstrates a production-ready MLOps pipeline suitable for real-world applications!
Top comments (0)