๐ฏ Web App Operations & Deployment Tutorial
๐ What You'll Learn
In this tutorial, you'll discover how to:
- ๐ Deploy your ML web application to production
- ๐ง Manage different deployment environments
- ๐ Monitor application performance and health
- ๐ ๏ธ Handle scaling and maintenance operations
- ๐ Implement security best practices
๐ค Why Do We Need Operations Management?
Imagine you've built an amazing restaurant (your ML app) but you need to:
- Open multiple locations (different environments)
- Ensure consistent food quality (reliable deployments)
- Handle busy nights (scaling under load)
- Keep the kitchen clean (maintenance and updates)
- Ensure customer safety (security)
Operations management is what keeps your ML application running smoothly in the real world!
๐๏ธ Understanding Deployment Environments
Think of deployment environments like different stages of a restaurant business:
๐งช Development Environment (Your Home Kitchen)
๐ Development
โโโ ๐ง Rapid experimentation
โโโ ๐ Bug testing and fixes
โโโ โก Fast iteration cycles
โโโ ๐ฏ Feature development
What happens here:
- Developers test new features
- Quick fixes and experiments
- No real customer data
- Frequent restarts and changes
๐ญ Staging Environment (Test Restaurant)
๐ญ Staging
โโโ ๐ฏ Production-like testing
โโโ ๐ Performance validation
โโโ ๐ Integration testing
โโโ โ
Final quality checks
What happens here:
- Exact copy of production environment
- Real-world testing without real customers
- Performance and load testing
- Final approval before going live
๐ Production Environment (Real Restaurant)
๐ Production
โโโ ๐ฅ Real customers served
โโโ ๐ฐ Revenue generation
โโโ ๐ High security standards
โโโ ๐ Performance monitoring
What happens here:
- Serving real customers
- Maximum uptime required
- Careful change management
- Comprehensive monitoring
๐ณ Container-Based Deployment
What are containers? Think of containers like food trucks - they have everything needed to serve customers and can be deployed anywhere!
Step 1: Building Your Food Truck (Docker Container)
# Dockerfile - The blueprint for our food truck
FROM python:3.9-slim
# ๐ท๏ธ Add labels (like painting the truck name)
LABEL maintainer="Your MLOps Team"
LABEL version="2.0.0"
LABEL description="Churn Prediction Web Application"
# ๐ Set up the kitchen workspace
WORKDIR /app
# ๐ Copy the recipe book first (requirements.txt)
COPY requirements.txt .
# ๐ Install all cooking tools (pip install)
RUN pip install --no-cache-dir -r requirements.txt
# ๐ฆ Copy all the application files (your recipes and tools)
COPY . .
# ๐ค Create a non-root user (safety first!)
RUN useradd --create-home --shell /bin/bash app_user
RUN chown -R app_user:app_user /app
USER app_user
# ๐ช Open the service door (expose port)
EXPOSE 8000
# ๐ฏ Set up health check (is the kitchen working?)
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# ๐ Start the service (open for business!)
CMD ["uvicorn", "fastapi_app:app", "--host", "0.0.0.0", "--port", "8000"]
Why each part matters:
- FROM python:3.9-slim: Use a lightweight, secure base image
- WORKDIR /app: Organize files in a clean directory structure
- COPY requirements.txt first: Docker layer optimization (faster builds)
- RUN pip install: Install dependencies in a separate layer
- USER app_user: Security - never run as root in production
- HEALTHCHECK: Automatic monitoring of container health
- CMD: Define how to start the application
Step 2: Creating a Fleet of Food Trucks (Docker Compose)
# docker-compose.yml - Managing multiple trucks at once
version: '3.8'
services:
# ๐ Main API Service (FastAPI truck)
api:
build:
context: .
dockerfile: Dockerfile
container_name: churn_api
ports:
- "8000:8000" # Map truck door to street address
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- REDIS_URL=redis://redis:6379
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs # Share log files with host
- ./models:/app/models # Share models folder
- ./artifacts:/app/artifacts # Share preprocessing artifacts
depends_on:
- redis # Wait for Redis to start first
- mlflow # Wait for MLflow to start first
restart: unless-stopped # Auto-restart if crashed
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# ๐ Dashboard Service (Streamlit truck)
dashboard:
build:
context: .
dockerfile: Dockerfile.streamlit
container_name: churn_dashboard
ports:
- "8501:8501" # Dashboard on different street
environment:
- API_URL=http://api:8000
depends_on:
- api # Dashboard needs API to work
restart: unless-stopped
# ๐๏ธ Cache Service (Redis - like a fast storage unit)
redis:
image: redis:7-alpine
container_name: churn_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data # Persistent storage
restart: unless-stopped
command: redis-server --appendonly yes # Enable data persistence
# ๐ MLflow Tracking (Recipe book management)
mlflow:
image: python:3.9-slim
container_name: churn_mlflow
ports:
- "5000:5000"
volumes:
- ./mlruns:/mlflow/mlruns
- ./mlflow.db:/mlflow/mlflow.db
command: >
bash -c "pip install mlflow &&
mlflow server
--backend-store-uri sqlite:////mlflow/mlflow.db
--default-artifact-root /mlflow/mlruns
--host 0.0.0.0
--port 5000"
restart: unless-stopped
# ๐๏ธ Persistent storage (like warehouse space)
volumes:
redis_data:
driver: local
# ๐ Network setup (how trucks communicate)
networks:
default:
name: churn_network
driver: bridge
What this composition does:
- api: Your main FastAPI service
- dashboard: Streamlit interface for business users
- redis: Fast caching for better performance
- mlflow: Model tracking and management
- volumes: Persistent data storage
- networks: Secure communication between services
Step 3: Starting Your Food Truck Fleet
# ๐๏ธ Build all containers (prepare all trucks)
docker-compose build
# ๐ Start all services (open for business!)
docker-compose up -d
# ๐ Check if everything is running (inspect the fleet)
docker-compose ps
# ๐ View logs from specific service (listen to kitchen chatter)
docker-compose logs -f api
# ๐ Monitor all services at once
docker-compose logs -f
# ๐ Stop everything gracefully (close for the day)
docker-compose down
# ๐งน Clean up everything including data (deep cleaning)
docker-compose down -v --remove-orphans
โ๏ธ Load Balancing and Scaling
What is load balancing? Imagine having multiple food trucks serving the same menu - when one gets busy, customers automatically go to less busy trucks.
Understanding the Need for Scaling
๐ Single Truck (No Scaling)
Customer 1 โโโโโโโ
Customer 2 โโโโโโโผโโโโบ ๐ Single API Server
Customer 3 โโโโโโโ
...
Customer 100 โโโโโโโโโบ โณ Long wait times!
๐๐๐ Multiple Trucks (With Scaling)
Customer 1 โโโโโบ ๐ API Server 1
Customer 2 โโโโโบ ๐ API Server 2
Customer 3 โโโโโบ ๐ API Server 3
... โ
Customer 100 โโโโ Load Balancer
Setting Up Load Balancing with Nginx
# nginx.conf - Traffic director configuration
upstream api_servers {
# ๐ฏ List of API servers (our truck fleet)
server api1:8000 weight=3; # Stronger truck, handle more customers
server api2:8000 weight=2; # Medium capacity
server api3:8000 weight=1; # Backup truck
# ๐ Load balancing method
least_conn; # Send to least busy truck
}
server {
listen 80; # Listen on port 80 (main street)
server_name your-domain.com; # Your website address
# ๐ Access logs (customer visit records)
access_log /var/log/nginx/api_access.log;
error_log /var/log/nginx/api_error.log;
# ๐ฏ Route all API requests to our truck fleet
location /api/ {
proxy_pass http://api_servers/;
# ๐ง Headers for proper communication
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# โฑ๏ธ Timeouts (don't wait forever)
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
# ๐ช Retry logic (if one truck is down, try another)
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
}
# ๐ Dashboard routing (separate from API)
location /dashboard/ {
proxy_pass http://dashboard:8501/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# ๐ฅ Health check endpoint (is everything working?)
location /health {
access_log off; # Don't log health checks
proxy_pass http://api_servers/health;
proxy_connect_timeout 5s;
proxy_read_timeout 5s;
}
}
Auto-Scaling with Docker Swarm
# ๐๏ธ Initialize Docker Swarm (create truck fleet management)
docker swarm init
# ๐ Deploy as a stack (organized fleet deployment)
docker stack deploy -c docker-compose.yml churn_app
# ๐ Scale specific service up (add more trucks)
docker service scale churn_app_api=5
# ๐ Check service status (fleet inspection)
docker service ls
docker service ps churn_app_api
# ๐ Scale down when load decreases (reduce fleet size)
docker service scale churn_app_api=2
# ๐ Update service with zero downtime (upgrade trucks one by one)
docker service update --image myapp:v2.0 churn_app_api
๐ Health Monitoring and Observability
Why monitor? Like having security cameras and temperature sensors in your restaurant to ensure everything runs smoothly.
Step 1: Application Health Checks
# fastapi_app.py - Adding health monitoring to your app
from fastapi import FastAPI, status
from pydantic import BaseModel
import psutil
import time
from datetime import datetime
import logging
app = FastAPI()
class HealthStatus(BaseModel):
"""Health check response model"""
status: str
timestamp: str
version: str
uptime_seconds: float
memory_usage_mb: float
cpu_usage_percent: float
model_loaded: bool
database_connected: bool
# ๐ Track when the application started
start_time = time.time()
@app.get("/health", response_model=HealthStatus)
async def health_check():
"""
๐ฅ Comprehensive health check endpoint
This is like a doctor's checkup for your application.
It reports on all vital signs.
"""
try:
# โฑ๏ธ Calculate uptime
uptime = time.time() - start_time
# ๐พ Memory usage
memory_info = psutil.virtual_memory()
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
# ๐ฅ CPU usage
cpu_usage = psutil.cpu_percent(interval=1)
# ๐ค Check if model is loaded
model_loaded = model_manager.model is not None
# ๐๏ธ Check database connection (if applicable)
database_connected = await check_database_connection()
# ๐ฏ Determine overall status
status = "healthy"
if memory_usage > 1000: # > 1GB memory usage
status = "warning"
if not model_loaded or not database_connected:
status = "unhealthy"
return HealthStatus(
status=status,
timestamp=datetime.now().isoformat(),
version="2.0.0",
uptime_seconds=round(uptime, 2),
memory_usage_mb=round(memory_usage, 2),
cpu_usage_percent=round(cpu_usage, 2),
model_loaded=model_loaded,
database_connected=database_connected
)
except Exception as e:
logging.error(f"Health check failed: {str(e)}")
return HealthStatus(
status="unhealthy",
timestamp=datetime.now().isoformat(),
version="2.0.0",
uptime_seconds=0,
memory_usage_mb=0,
cpu_usage_percent=0,
model_loaded=False,
database_connected=False
)
async def check_database_connection():
"""Check if database is accessible"""
try:
# Add your database ping logic here
# For example, with SQLAlchemy:
# result = await database.fetch_one("SELECT 1")
return True
except:
return False
@app.get("/metrics")
async def get_metrics():
"""
๐ Detailed metrics endpoint for monitoring systems
This provides detailed performance data for monitoring tools
like Prometheus or Grafana.
"""
return {
"predictions_total": prediction_counter.get_count(),
"predictions_success": success_counter.get_count(),
"predictions_error": error_counter.get_count(),
"average_response_time": response_time_tracker.get_average(),
"memory_usage_bytes": psutil.Process().memory_info().rss,
"cpu_usage_percent": psutil.cpu_percent(),
"disk_usage_percent": psutil.disk_usage('/').percent,
"model_version": model_manager.model_version,
"uptime_seconds": time.time() - start_time
}
Step 2: Centralized Logging
# config/logging.py - Setting up comprehensive logging
import logging
import logging.handlers
import json
from datetime import datetime
import os
class JSONFormatter(logging.Formatter):
"""Custom formatter for structured JSON logs"""
def format(self, record):
"""Convert log record to JSON format"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add exception info if present
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# Add extra fields
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno',
'pathname', 'filename', 'module', 'lineno',
'funcName', 'created', 'msecs', 'relativeCreated',
'thread', 'threadName', 'processName', 'process']:
log_entry[key] = value
return json.dumps(log_entry)
def setup_logging():
"""Configure application logging"""
# ๐ Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
# ๐ฏ Create root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# ๐ File handler for all logs
file_handler = logging.handlers.RotatingFileHandler(
"logs/app.log",
maxBytes=10*1024*1024, # 10MB per file
backupCount=5 # Keep 5 old files
)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
# ๐ฅ๏ธ Console handler for development
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# ๐จ Error-only file for critical issues
error_handler = logging.handlers.RotatingFileHandler(
"logs/errors.log",
maxBytes=5*1024*1024, # 5MB per file
backupCount=3 # Keep 3 old files
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(JSONFormatter())
logger.addHandler(error_handler)
return logger
# Usage in your application
logger = setup_logging()
# ๐ Example of structured logging
@app.post("/predict")
async def predict_churn(customer_data: CustomerData):
request_id = str(uuid.uuid4())
logger.info("Prediction request started", extra={
"request_id": request_id,
"customer_tenure": customer_data.tenure,
"monthly_charges": customer_data.MonthlyCharges
})
try:
# Your prediction logic here
result = make_prediction(customer_data)
logger.info("Prediction completed successfully", extra={
"request_id": request_id,
"churn_probability": result.churn_probability,
"processing_time": processing_time
})
return result
except Exception as e:
logger.error("Prediction failed", extra={
"request_id": request_id,
"error": str(e),
"customer_data": customer_data.dict()
})
raise
๐๏ธ Streamlit Dashboard Operations
What is Streamlit? Think of it as creating a beautiful, interactive restaurant menu that customers can use to place orders easily.
Building the User Interface
# streamlit_app.py - Your customer-facing dashboard
import streamlit as st
import requests
import pandas as pd
import plotly.express as px
# ๐จ Page setup (restaurant ambiance)
st.set_page_config(
page_title="๐ฏ Churn Prediction Dashboard",
page_icon="๐",
layout="wide",
initial_sidebar_state="expanded"
)
# ๐ Main welcome area
st.title("๐ฏ Customer Churn Prediction Dashboard")
st.markdown("### Predict customer churn risk and get actionable insights")
# ๐ Customer input form (order form)
with st.sidebar:
st.header("๐ Customer Information")
# ๐ฅ Demographics section
st.subheader("๐ฅ Demographics")
gender = st.selectbox("Gender", ["Male", "Female"])
senior_citizen = st.selectbox("Senior Citizen", ["No", "Yes"])
partner = st.selectbox("Has Partner", ["No", "Yes"])
dependents = st.selectbox("Has Dependents", ["No", "Yes"])
# ๐ Service information
st.subheader("๐ Services")
tenure = st.slider("Tenure (months)", 0, 72, 12)
phone_service = st.selectbox("Phone Service", ["No", "Yes"])
internet_service = st.selectbox("Internet Service",
["No", "DSL", "Fiber optic"])
# ๐ฐ Billing information
st.subheader("๐ฐ Billing")
monthly_charges = st.number_input("Monthly Charges ($)",
min_value=0.0, max_value=200.0, value=70.0)
total_charges = st.number_input("Total Charges ($)",
min_value=0.0, value=monthly_charges * tenure)
contract = st.selectbox("Contract Type",
["Month-to-month", "One year", "Two year"])
payment_method = st.selectbox("Payment Method",
["Electronic check", "Mailed check",
"Bank transfer (automatic)", "Credit card (automatic)"])
# ๐ฏ Prediction button (place order)
if st.button("๐ฎ Predict Churn Risk", type="primary"):
with st.spinner("๐ค Analyzing customer data..."):
# ๐ฆ Prepare data for API
customer_data = {
"gender": gender,
"SeniorCitizen": 1 if senior_citizen == "Yes" else 0,
"Partner": partner,
"Dependents": dependents,
"tenure": tenure,
"PhoneService": phone_service,
"InternetService": internet_service,
"Contract": contract,
"PaymentMethod": payment_method,
"MonthlyCharges": monthly_charges,
"TotalCharges": str(total_charges)
}
try:
# ๐ก Call API (send order to kitchen)
response = requests.post(
"http://localhost:8000/predict",
json=customer_data,
timeout=30
)
if response.status_code == 200:
result = response.json()
# ๐ Display results (serve the meal)
col1, col2, col3 = st.columns(3)
with col1:
churn_prob = result["churn_probability"]
st.metric(
"๐ฏ Churn Probability",
f"{churn_prob:.1%}",
delta=None
)
with col2:
risk_level = result["risk_level"]
color = "๐ด" if "High" in risk_level else "๐ก" if "Medium" in risk_level else "๐ข"
st.metric("โ ๏ธ Risk Level", f"{color} {risk_level}")
with col3:
confidence = result["confidence"]
st.metric("๐ฏ Confidence", f"{confidence:.1%}")
# ๐ก Recommendations (suggested actions)
st.subheader("๐ก Recommended Actions")
for i, recommendation in enumerate(result["recommendations"], 1):
st.write(f"{i}. {recommendation}")
# ๐ Visual analysis (charts and graphs)
st.subheader("๐ Risk Analysis")
# Create a risk gauge chart
import plotly.graph_objects as go
fig = go.Figure(go.Indicator(
mode = "gauge+number",
value = churn_prob * 100,
title = {'text': "Churn Risk %"},
domain = {'x': [0, 1], 'y': [0, 1]},
gauge = {
'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 40], 'color': "lightgreen"},
{'range': [40, 70], 'color': "yellow"},
{'range': [70, 100], 'color': "red"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 70
}
}
))
st.plotly_chart(fig, use_container_width=True)
else:
st.error(f"โ Prediction failed: {response.text}")
except requests.exceptions.RequestException as e:
st.error(f"โ Connection error: {str(e)}")
st.info("๐ก Make sure the API server is running on http://localhost:8000")
# ๐ Additional dashboard features
st.subheader("๐ Customer Profile Analysis")
# Create customer profile visualization
profile_data = {
"Attribute": ["Tenure", "Monthly Charges", "Total Charges", "Services"],
"Value": [tenure, monthly_charges, total_charges,
len([x for x in [phone_service, internet_service] if x != "No"])],
"Max_Value": [72, 200, 10000, 2]
}
df_profile = pd.DataFrame(profile_data)
df_profile["Percentage"] = (df_profile["Value"] / df_profile["Max_Value"]) * 100
fig_profile = px.bar(
df_profile,
x="Attribute",
y="Percentage",
title="Customer Profile Overview",
color="Percentage",
color_continuous_scale="viridis"
)
st.plotly_chart(fig_profile, use_container_width=True)
Key Components Explained:
- Page Configuration: Sets up the dashboard appearance and layout
- Input Forms: Collects customer data in an organized way
- API Integration: Connects to your FastAPI backend for predictions
- Results Display: Shows predictions in a user-friendly format
- Visualizations: Creates charts and graphs for better understanding
- Error Handling: Gracefully handles connection issues and errors
๐ง Deployment Best Practices
Environment-Specific Configurations
# .env.development - Development settings
DEBUG=True
LOG_LEVEL=DEBUG
API_HOST=localhost
API_PORT=8000
MLFLOW_TRACKING_URI=http://localhost:5000
# .env.staging - Staging settings
DEBUG=False
LOG_LEVEL=INFO
API_HOST=0.0.0.0
API_PORT=8000
MLFLOW_TRACKING_URI=http://mlflow-staging:5000
# .env.production - Production settings
DEBUG=False
LOG_LEVEL=WARNING
API_HOST=0.0.0.0
API_PORT=8000
MLFLOW_TRACKING_URI=http://mlflow-prod:5000
CORS_ORIGINS=["https://your-domain.com"]
Zero-Downtime Deployment Strategy
# deploy.sh - Smart deployment script
#!/bin/bash
echo "๐ Starting zero-downtime deployment..."
# 1. ๐๏ธ Build new version
echo "๐๏ธ Building new application version..."
docker build -t churn-app:new .
# 2. ๐งช Run health check on new version
echo "๐งช Testing new version..."
docker run -d --name test-container -p 8001:8000 churn-app:new
sleep 30
# Check if new version is healthy
if curl -f http://localhost:8001/health; then
echo "โ
New version is healthy"
docker stop test-container
docker rm test-container
else
echo "โ New version failed health check"
docker stop test-container
docker rm test-container
exit 1
fi
# 3. ๐ Rolling update
echo "๐ Performing rolling update..."
docker tag churn-app:current churn-app:backup
docker tag churn-app:new churn-app:current
# 4. ๐ Deploy new version
docker-compose up -d --force-recreate api
# 5. โ
Verify deployment
echo "โ
Verifying deployment..."
sleep 30
if curl -f http://localhost:8000/health; then
echo "๐ Deployment successful!"
docker rmi churn-app:backup
else
echo "โ Deployment failed, rolling back..."
docker tag churn-app:backup churn-app:current
docker-compose up -d --force-recreate api
exit 1
fi
This operations tutorial covers the essential aspects of deploying and managing your ML web application in production. The combination of containerization, monitoring, and proper deployment practices ensures your application runs reliably and scales effectively., unsafe_allow_html=True)
def main():
"""Main dashboard application"""
# ๐ท๏ธ Header Section
st.markdown('<h1 class="main-header">๐ฏ Customer Churn Prediction Dashboard</h1>',
unsafe_allow_html=True)
# ๐ Sidebar Configuration
with st.sidebar:
st.header("๐ง Configuration")
# API endpoint configuration
api_base_url = st.text_input(
"๐ API Base URL",
value="http://localhost:8000",
help="FastAPI service endpoint"
)
# Display model information
if st.button("๐ Check Model Status"):
check_model_status(api_base_url)
# ๐ Main content tabs
tab1, tab2, tab3, tab4 = st.tabs([
"๐ฏ Single Prediction",
"๐ Batch Analysis",
"๐ Analytics Dashboard",
"๐ง Model Information"
])
with tab1:
single_prediction_interface(api_base_url)
with tab2:
batch_prediction_interface(api_base_url)
with tab3:
analytics_dashboard()
with tab4:
model_information_panel(api_base_url)
def single_prediction_interface(api_base_url: str):
"""Interface for single customer churn prediction"""
st.header("๐ฏ Individual Customer Analysis")
# ๐ Input form in columns
col1, col2, col3 = st.columns(3)
with col1:
st.subheader("๐ค Demographics")
gender = st.selectbox("Gender", ["Male", "Female"])
senior_citizen = st.selectbox("Senior Citizen", [0, 1],
format_func=lambda x: "Yes" if x else "No")
partner = st.selectbox("Has Partner", ["Yes", "No"])
dependents = st.selectbox("Has Dependents", ["Yes", "No"])
with col2:
st.subheader("๐ Account Information")
tenure = st.slider("Tenure (months)", 0, 100, 12)
contract = st.selectbox("Contract Type",
["Month-to-month", "One year", "Two year"])
paperless_billing = st.selectbox("Paperless Billing", ["Yes", "No"])
payment_method = st.selectbox("Payment Method",
["Electronic check", "Mailed check",
"Bank transfer (automatic)",
"Credit card (automatic)"])
with col3:
st.subheader("๐ฐ Charges")
monthly_charges = st.number_input("Monthly Charges ($)",
min_value=0.0, value=70.0, step=0.01)
total_charges = st.number_input("Total Charges ($)",
min_value=0.0, value=840.0, step=0.01)
# ๐ Services Section
st.subheader("๐ Services")
service_col1, service_col2 = st.columns(2)
with service_col1:
phone_service = st.selectbox("Phone Service", ["Yes", "No"])
multiple_lines = st.selectbox("Multiple Lines",
["No", "Yes", "No phone service"])
internet_service = st.selectbox("Internet Service",
["DSL", "Fiber optic", "No"])
online_security = st.selectbox("Online Security",
["No", "Yes", "No internet service"])
with service_col2:
online_backup = st.selectbox("Online Backup",
["No", "Yes", "No internet service"])
device_protection = st.selectbox("Device Protection",
["No", "Yes", "No internet service"])
tech_support = st.selectbox("Tech Support",
["No", "Yes", "No internet service"])
streaming_tv = st.selectbox("Streaming TV",
["No", "Yes", "No internet service"])
streaming_movies = st.selectbox("Streaming Movies",
["No", "Yes", "No internet service"])
# ๐ฏ Prediction Button
if st.button("๐ฎ Predict Churn", type="primary"):
# ๐ฆ Prepare data payload
customer_data = {
"gender": gender,
"SeniorCitizen": senior_citizen,
"Partner": partner,
"Dependents": dependents,
"tenure": tenure,
"Contract": contract,
"PaperlessBilling": paperless_billing,
"PaymentMethod": payment_method,
"PhoneService": phone_service,
"MultipleLines": multiple_lines,
"InternetService": internet_service,
"OnlineSecurity": online_security,
"OnlineBackup": online_backup,
"DeviceProtection": device_protection,
"TechSupport": tech_support,
"StreamingTV": streaming_tv,
"StreamingMovies": streaming_movies,
"MonthlyCharges": monthly_charges,
"TotalCharges": str(total_charges)
}
# ๐ Make API call
with st.spinner("๐ Analyzing customer data..."):
try:
response = requests.post(
f"{api_base_url}/predict",
json=customer_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
display_prediction_results(result)
else:
st.error(f"โ API Error: {response.status_code} - {response.text}")
except requests.exceptions.RequestException as e:
st.error(f"๐ Connection Error: {str(e)}")
def display_prediction_results(result: dict):
"""Display prediction results with visualizations"""
# ๐ Main metrics
col1, col2, col3 = st.columns(3)
with col1:
churn_prob = result['churn_probability']
st.metric(
label="๐ฏ Churn Probability",
value=f"{churn_prob:.1%}",
delta=f"Confidence: {result.get('confidence', 0.8):.1%}"
)
with col2:
risk_level = result['risk_level']
risk_color = {"High Risk ๐ด": "๐ด", "Medium Risk ๐ก": "๐ก", "Low Risk ๐ข": "๐ข"}
st.metric(
label="โ ๏ธ Risk Level",
value=risk_level
)
with col3:
model_version = result.get('model_version', 'Unknown')
st.metric(
label="๐ค Model Version",
value=model_version
)
# ๐ Probability Gauge Chart
fig = go.Figure(go.Indicator(
mode = "gauge+number+delta",
value = churn_prob * 100,
domain = {'x': [0, 1], 'y': [0, 1]},
title = {'text': "Churn Probability (%)"},
delta = {'reference': 50},
gauge = {
'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 40], 'color': "lightgreen"},
{'range': [40, 70], 'color': "yellow"},
{'range': [70, 100], 'color': "red"}],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 70
}
}
))
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# ๐ก Recommendations
st.subheader("๐ก Retention Recommendations")
recommendations = result.get('recommendations', [])
for i, rec in enumerate(recommendations, 1):
st.write(f"{i}. {rec}")
# ๐ Risk Breakdown
st.subheader("๐ Risk Analysis")
risk_factors = analyze_risk_factors(result, churn_prob)
# Create risk factor chart
factors_df = pd.DataFrame(risk_factors)
fig_factors = px.bar(
factors_df,
x='impact',
y='factor',
orientation='h',
title="Key Risk Factors",
color='impact',
color_continuous_scale='RdYlGn_r'
)
fig_factors.update_layout(height=300)
st.plotly_chart(fig_factors, use_container_width=True)
def analyze_risk_factors(result: dict, churn_prob: float) -> list:
"""Analyze key risk factors contributing to churn probability"""
factors = []
# High churn probability factors
if churn_prob > 0.7:
factors.extend([
{"factor": "High Churn Probability", "impact": 0.9},
{"factor": "Immediate Attention Required", "impact": 0.85},
{"factor": "Revenue at Risk", "impact": 0.8}
])
elif churn_prob > 0.4:
factors.extend([
{"factor": "Moderate Risk", "impact": 0.6},
{"factor": "Monitoring Recommended", "impact": 0.5},
{"factor": "Engagement Opportunity", "impact": 0.4}
])
else:
factors.extend([
{"factor": "Low Risk Customer", "impact": 0.2},
{"factor": "Stable Relationship", "impact": 0.1},
{"factor": "Upsell Potential", "impact": 0.3}
])
return factors
## ๐ณ Docker Configuration
The Web App service is containerized for consistent deployment across environments:
dockerfile
services/web-app/Dockerfile
FROM python:3.9-slim
๐ท๏ธ Metadata
LABEL maintainer="mlops-team@company.com"
LABEL version="2.0.0"
LABEL description="Customer Churn Prediction Web Application"
๐ง System dependencies
RUN apt-get update && apt-get install -y \
curl \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
๐ Working directory
WORKDIR /app
๐ Copy requirements first for better caching
COPY requirements.txt .
๐ฆ Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
๐ Copy application code
COPY . .
๐ค Create non-root user for security
RUN useradd --create-home --shell /bin/bash app && \
chown -R app:app /app
USER app
๐ Expose ports
EXPOSE 8000 8501
๐ฅ Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
๐ Startup script
COPY --chown=app:app startup.sh /app/
RUN chmod +x /app/startup.sh
CMD ["/app/startup.sh"]
bash
!/bin/bash
services/web-app/startup.sh
๐ฏ Multi-service startup script
echo "๐ Starting Customer Churn Prediction Web Services..."
๐ Check if models exist
if [ ! -f "/app/models/best_model.pkl" ]; then
echo "โ ๏ธ Models not found, downloading from MLflow..."
python /app/utils/download_models.py
fi
๐ Start FastAPI in background
echo "๐ Starting FastAPI server..."
uvicorn fastapi_app:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--log-level info &
โฑ๏ธ Wait for FastAPI to be ready
echo "โฑ๏ธ Waiting for FastAPI to be ready..."
while ! curl -s http://localhost:8000/health > /dev/null; do
sleep 2
done
echo "โ FastAPI is ready!"
๐ Start Streamlit
echo "๐ Starting Streamlit dashboard..."
streamlit run streamlit_app.py \
--server.address 0.0.0.0 \
--server.port 8501 \
--server.headless true \
--server.fileWatcherType none \
--browser.gatherUsageStats false
## ๐ง Environment Configuration
yaml
docker-compose.yml (Web App section)
version: '3.8'
services:
web-app:
build:
context: ./services/web-app
dockerfile: Dockerfile
container_name: churn-web-app
ports:
- "8000:8000" # FastAPI
- "8501:8501" # Streamlit
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- DATABASE_URL=postgresql://user:pass@postgres:5432/churn_db
- REDIS_URL=redis://redis:6379
- LOG_LEVEL=INFO
- API_WORKERS=4
volumes:
- ./models:/app/models:ro
- ./artifacts:/app/artifacts:ro
- ./logs:/app/logs
depends_on:
- mlflow
- postgres
- redis
networks:
- churn-network
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# ๐ง Supporting services
postgres:
image: postgres:13
environment:
POSTGRES_DB: churn_db
POSTGRES_USER: churn_user
POSTGRES_PASSWORD: secure_password
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- churn-network
redis:
image: redis:alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
networks:
- churn-network
volumes:
postgres_data:
redis_data:
networks:
churn-network:
driver: bridge
## ๐ Monitoring & Observability
### Application Metrics
python
utils/monitoring.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import functools
import logging
๐ Metrics definitions
prediction_counter = Counter(
'churn_predictions_total',
'Total number of churn predictions made',
['risk_level', 'model_version']
)
prediction_duration = Histogram(
'churn_prediction_duration_seconds',
'Time spent on churn predictions',
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
model_accuracy_gauge = Gauge(
'churn_model_accuracy',
'Current model accuracy score'
)
active_users_gauge = Gauge(
'active_dashboard_users',
'Number of active dashboard users'
)
def track_prediction_metrics(func):
"""Decorator to track prediction metrics"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Record successful prediction
duration = time.time() - start_time
prediction_duration.observe(duration)
# Extract risk level and model version
risk_level = result.get('risk_level', 'unknown')
model_version = result.get('model_version', 'unknown')
prediction_counter.labels(
risk_level=risk_level,
model_version=model_version
).inc()
logging.info(f"Prediction completed: {duration:.3f}s | Risk: {risk_level}")
return result
except Exception as e:
# Record failed prediction
prediction_counter.labels(
risk_level='error',
model_version='unknown'
).inc()
logging.error(f"Prediction failed: {str(e)}")
raise
return wrapper
๐ Start metrics server
def start_metrics_server(port: int = 8080):
"""Start Prometheus metrics server"""
start_http_server(port)
logging.info(f"๐ Metrics server started on port {port}")
### Health Checks & Status Endpoints
python
@app.get("/health")
async def health_check():
"""
๐ฅ Comprehensive health check endpoint
Checks the status of all critical components:
- Model availability
- Database connectivity
- External service dependencies
"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": settings.APP_VERSION,
"checks": {}
}
# โ
Check model availability
try:
if model_manager.model is not None:
health_status["checks"]["model"] = "โ
Available"
else:
health_status["checks"]["model"] = "โ Not loaded"
health_status["status"] = "degraded"
except Exception as e:
health_status["checks"]["model"] = f"โ Error: {str(e)}"
health_status["status"] = "unhealthy"
# โ
Check MLflow connectivity
try:
mlflow.get_tracking_uri()
health_status["checks"]["mlflow"] = "โ
Connected"
except Exception as e:
health_status["checks"]["mlflow"] = f"โ ๏ธ Warning: {str(e)}"
if health_status["status"] == "healthy":
health_status["status"] = "degraded"
# โ
Check database connectivity (if configured)
if settings.DATABASE_URL:
try:
# Database connection check logic here
health_status["checks"]["database"] = "โ
Connected"
except Exception as e:
health_status["checks"]["database"] = f"โ Error: {str(e)}"
health_status["status"] = "unhealthy"
return health_status
@app.get("/metrics")
async def get_metrics():
"""๐ Application metrics endpoint"""
return {
"predictions_total": prediction_counter._value.sum(),
"average_response_time": prediction_duration._sum.sum() / max(prediction_counter._value.sum(), 1),
"model_version": model_manager.model_version,
"uptime_seconds": time.time() - app_start_time
}
@app.get("/status")
async def get_status():
"""๐ Detailed application status"""
return {
"service": "Churn Prediction Web App",
"version": settings.APP_VERSION,
"model_info": {
"version": model_manager.model_version,
"loaded_at": model_manager.load_timestamp,
"model_type": type(model_manager.model).__name__
},
"configuration": {
"debug_mode": settings.DEBUG,
"workers": settings.API_WORKERS,
"mlflow_uri": settings.MLFLOW_TRACKING_URI
},
"system_info": {
"python_version": sys.version,
"platform": platform.platform(),
"cpu_count": os.cpu_count(),
"memory_usage": psutil.virtual_memory().percent
}
}
## ๐ Deployment & Scaling
### Kubernetes Deployment
yaml
k8s/web-app-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: churn-web-app
labels:
app: churn-web-app
tier: frontend
spec:
replicas: 3
selector:
matchLabels:
app: churn-web-app
template:
metadata:
labels:
app: churn-web-app
spec:
containers:
- name: web-app
image: churn-prediction/web-app:latest
ports:
- containerPort: 8000
name: fastapi
- containerPort: 8501
name: streamlit
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service:5000"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
apiVersion: v1
kind: Service
metadata:
name: churn-web-app-service
spec:
selector:
app: churn-web-app
ports:
- name: fastapi port: 8000 targetPort: 8000
- name: streamlit port: 8501 targetPort: 8501 type: LoadBalancer
#mlopszoomcamp
Top comments (0)