DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

# Comprehensive Monitoring & Observability #llmszoomcamp

This document provides detailed coverage of the monitoring, metrics, logging, and observability systems that ensure reliable operation of the medical RAG assistant.

1. Multi-Layer Observability Architecture

System Overview

[User Interactions] → [API Metrics] → [RAG Performance] → [Infrastructure Health]
       ↓                   ↓                ↓                      ↓
[PostgreSQL] ←→ [Grafana Dashboards] ←→ [S3 Audit Logs] ←→ [Health Checks]
Enter fullscreen mode Exit fullscreen mode

Observability Pillars

  • Metrics: Quantitative performance and business metrics
  • Logs: Detailed audit trails and debugging information
  • Traces: End-to-end request flow tracking
  • Health: System component availability and reliability

2. PostgreSQL Persistence Layer

Comprehensive Conversation Storage

# src/database/db.py
def save_conversation(conversation_id, question, answer_data, timestamp=None):
    """Store complete conversation with comprehensive metadata"""
    if timestamp is None:
        timestamp = datetime.now(tz)

    conn = get_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO conversations
                (id, question, answer, model_used, response_time, relevance,
                relevance_explanation, prompt_tokens, completion_tokens, total_tokens,
                eval_prompt_tokens, eval_completion_tokens, eval_total_tokens, 
                openai_cost, timestamp)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    conversation_id,
                    question,
                    answer_data["answer"],
                    answer_data["model_used"],
                    answer_data["response_time"],
                    answer_data["relevance"],
                    answer_data["relevance_explanation"],
                    answer_data["token_stats"]["rag_tokens"]["prompt_tokens"],
                    answer_data["token_stats"]["rag_tokens"]["completion_tokens"],
                    answer_data["token_stats"]["rag_tokens"]["total_tokens"],
                    answer_data["token_stats"]["evaluation_tokens"]["prompt_tokens"],
                    answer_data["token_stats"]["evaluation_tokens"]["completion_tokens"],
                    answer_data["token_stats"]["evaluation_tokens"]["total_tokens"],
                    answer_data["total_cost"],
                    timestamp,
                ),
            )
        conn.commit()
    finally:
        conn.close()
Enter fullscreen mode Exit fullscreen mode

User Feedback Analytics

# src/database/db.py
def save_feedback(conversation_id, feedback, timestamp=None):
    """Store user feedback with timestamp and conversation linkage"""
    if timestamp is None:
        timestamp = datetime.now(tz)

    conn = get_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO feedback (conversation_id, feedback, timestamp) 
                VALUES (%s, %s, COALESCE(%s, CURRENT_TIMESTAMP))
                """,
                (conversation_id, feedback, timestamp),
            )
        conn.commit()
    finally:
        conn.close()

def get_feedback_stats():
    """Generate comprehensive feedback analytics"""
    conn = get_db_connection()
    try:
        with conn.cursor(cursor_factory=DictCursor) as cur:
            cur.execute(
                """
                SELECT
                    COUNT(*) as total_feedback,
                    SUM(CASE WHEN feedback > 0 THEN 1 ELSE 0 END) as thumbs_up,
                    SUM(CASE WHEN feedback < 0 THEN 1 ELSE 0 END) as thumbs_down,
                    ROUND(AVG(CASE WHEN feedback != 0 THEN feedback END), 3) as avg_rating,
                    COUNT(DISTINCT conversation_id) as unique_conversations
                FROM feedback
                WHERE timestamp >= NOW() - INTERVAL '24 hours'
                """
            )
            return cur.fetchone()
    finally:
        conn.close()
Enter fullscreen mode Exit fullscreen mode

Advanced Analytics Queries

# src/database/db.py
def get_performance_metrics(hours=24):
    """Generate comprehensive performance analytics"""
    conn = get_db_connection()
    try:
        with conn.cursor(cursor_factory=DictCursor) as cur:
            cur.execute(
                """
                SELECT 
                    COUNT(*) as query_count,
                    AVG(response_time) as avg_response_time,
                    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time) as p95_response_time,
                    AVG(openai_cost) as avg_cost,
                    SUM(openai_cost) as total_cost,
                    AVG(total_tokens) as avg_tokens,
                    COUNT(CASE WHEN relevance = 'RELEVANT' THEN 1 END) as relevant_answers,
                    COUNT(CASE WHEN relevance = 'NON_RELEVANT' THEN 1 END) as non_relevant_answers
                FROM conversations 
                WHERE timestamp >= NOW() - INTERVAL '%s hours'
                """,
                (hours,)
            )
            return cur.fetchone()
    finally:
        conn.close()
Enter fullscreen mode Exit fullscreen mode

3. Advanced Metrics API

Comprehensive Metrics Endpoint

# src/api/main_api.py
@app.get("/metrics")
async def get_metrics():
    """Comprehensive system metrics for monitoring dashboards"""
    try:
        # Performance metrics
        perf_metrics = get_performance_metrics(24)  # Last 24 hours

        # Feedback analytics
        feedback_stats = get_feedback_stats()

        # System health
        health_status = {
            "qdrant_status": check_qdrant_connection(),
            "postgres_status": check_postgres_connection(),
            "s3_status": check_s3_availability(),
        }

        return {
            "timestamp": datetime.utcnow().isoformat(),
            "performance": {
                "total_queries": perf_metrics.get("query_count", 0),
                "avg_response_time": float(perf_metrics.get("avg_response_time", 0)),
                "p95_response_time": float(perf_metrics.get("p95_response_time", 0)),
                "success_rate": calculate_success_rate(perf_metrics),
                "avg_cost_per_query": float(perf_metrics.get("avg_cost", 0)),
                "total_cost_24h": float(perf_metrics.get("total_cost", 0)),
            },
            "quality": {
                "relevance_rate": calculate_relevance_rate(perf_metrics),
                "avg_tokens_per_response": int(perf_metrics.get("avg_tokens", 0)),
            },
            "user_satisfaction": {
                "total_feedback": feedback_stats.get("total_feedback", 0),
                "positive_ratio": calculate_positive_ratio(feedback_stats),
                "avg_rating": float(feedback_stats.get("avg_rating", 0)),
            },
            "system_health": health_status
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Metrics unavailable: {str(e)}")

def calculate_success_rate(metrics):
    """Calculate system success rate based on error logs"""
    total = metrics.get("query_count", 0)
    if total == 0:
        return 1.0
    relevant = metrics.get("relevant_answers", 0)
    partly_relevant = metrics.get("partly_relevant_answers", 0)
    return (relevant + partly_relevant) / total

def calculate_relevance_rate(metrics):
    """Calculate answer relevance rate"""
    total = metrics.get("query_count", 0)
    if total == 0:
        return 1.0
    relevant = metrics.get("relevant_answers", 0)
    return relevant / total
Enter fullscreen mode Exit fullscreen mode

4. Enterprise S3 Logging Architecture

Multi-Purpose S3 Service

# src/services/s3_service.py
class S3Service:
    """Enterprise-grade S3 service for comprehensive logging and audit trails"""

    def __init__(self):
        self.bucket_name = os.getenv("S3_BUCKET_NAME", "medical-rag-logs")
        self.aws_region = os.getenv("AWS_REGION", "us-east-1")
        self._initialize_client()

    def upload_logs(self, log_data: List[Dict[str, Any]], log_type: str) -> bool:
        """Upload structured logs with compression and metadata"""
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        key = f"logs/{log_type}/{timestamp}.json"

        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "log_type": log_type,
            "count": len(log_data),
            "metadata": {
                "service_version": "1.0.0",
                "environment": os.getenv("ENVIRONMENT", "production"),
                "region": self.aws_region,
            },
            "logs": log_data,
        }

        return self.upload_json(log_entry, key, compress=True)
Enter fullscreen mode Exit fullscreen mode

Specialized Logging Functions

# src/services/s3_service.py
def upload_conversation_to_s3(conversation_data: Dict[str, Any]) -> bool:
    """Upload individual conversation with medical compliance metadata"""
    conversation_id = conversation_data.get("conversation_id", "unknown")
    timestamp = datetime.utcnow().strftime("%Y/%m/%d")
    key = f"conversations/{timestamp}/{conversation_id}.json"

    # Add compliance and audit metadata
    enhanced_data = {
        **conversation_data,
        "audit_info": {
            "logged_at": datetime.utcnow().isoformat(),
            "retention_policy": "7_years",  # Medical record retention
            "compliance": ["HIPAA", "SOC2"],
            "data_classification": "medical_phi"
        }
    }

    return s3_service.upload_json(enhanced_data, key)

def upload_feedback_to_s3(feedback_data: Dict[str, Any]) -> bool:
    """Upload user feedback with analytics metadata"""
    timestamp = datetime.utcnow().strftime("%Y/%m/%d")
    feedback_id = f"{feedback_data.get('conversation_id', 'unknown')}_{int(datetime.utcnow().timestamp())}"
    key = f"feedback/{timestamp}/{feedback_id}.json"

    return s3_service.upload_json(feedback_data, key)

def upload_system_metrics_snapshot(metrics_data: Dict[str, Any]) -> bool:
    """Upload periodic system metrics for long-term trend analysis"""
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    key = f"metrics/system_snapshots/{timestamp}.json"

    metrics_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "snapshot_type": "system_metrics",
        "metrics": metrics_data,
        "system_info": {
            "cpu_usage": get_cpu_usage(),
            "memory_usage": get_memory_usage(),
            "disk_usage": get_disk_usage(),
        }
    }

    return s3_service.upload_json(metrics_entry, key)
Enter fullscreen mode Exit fullscreen mode

Daily Data Export and Archival

# src/services/s3_service.py
def export_daily_medical_data(conversations: List[Dict], feedback: List[Dict]) -> bool:
    """Export daily aggregated medical data with compliance reporting"""
    date_str = datetime.utcnow().strftime("%Y%m%d")

    daily_export = {
        "export_date": date_str,
        "timestamp": datetime.utcnow().isoformat(),
        "summary": {
            "total_conversations": len(conversations),
            "total_feedback": len(feedback),
            "positive_feedback": len([f for f in feedback if f.get("feedback", 0) > 0]),
            "negative_feedback": len([f for f in feedback if f.get("feedback", 0) < 0]),
            "avg_response_time": calculate_avg_response_time(conversations),
            "total_cost": sum(c.get("cost", 0) for c in conversations),
            "unique_users": len(set(c.get("user_id") for c in conversations if c.get("user_id"))),
        },
        "quality_metrics": {
            "relevance_distribution": calculate_relevance_distribution(conversations),
            "model_usage": calculate_model_usage(conversations),
            "department_queries": calculate_department_distribution(conversations),
        },
        "compliance": {
            "phi_handling": "compliant",
            "retention_status": "active",
            "audit_trail": "complete"
        },
        "data": {
            "conversations": conversations,
            "feedback": feedback,
        }
    }

    key = f"daily_exports/{date_str}_medical_data.json"
    return s3_service.upload_json(daily_export, key, compress=True)
Enter fullscreen mode Exit fullscreen mode

5. Comprehensive API Observability

Request/Response Logging Pipeline

# src/api/main_api.py
async def log_api_call_to_s3(endpoint: str, conversation_id: str, request_data: str,
                           response_data: Optional[str], start_time: datetime,
                           end_time: datetime, status: str, error: Optional[str] = None):
    """Comprehensive API call logging for audit, compliance, and analytics"""
    try:
        if is_s3_available():
            log_data = {
                "endpoint": endpoint,
                "conversation_id": conversation_id,
                "request_data": request_data,
                "response_data": response_data,
                "timing": {
                    "start_time": start_time.isoformat(),
                    "end_time": end_time.isoformat(),
                    "duration_ms": (end_time - start_time).total_seconds() * 1000,
                },
                "status": status,
                "error": error,
                "metadata": {
                    "timestamp": datetime.utcnow().isoformat(),
                    "service_version": "1.0.0",
                    "user_agent": request.headers.get("user-agent"),
                    "ip_address": request.client.host if hasattr(request, 'client') else None,
                },
                "compliance": {
                    "logged_for": "audit_and_analytics",
                    "retention_period": "7_years",
                    "data_classification": "medical_operational"
                }
            }

            # Background upload to prevent API latency
            upload_logs_to_s3([log_data], f"api_{endpoint}")
    except Exception as e:
        # Log locally if S3 fails
        logger.error(f"Failed to log API call to S3: {e}")

# Enhanced endpoint with comprehensive logging
@app.post("/question")
async def handle_question_with_logging(request: QuestionRequest, 
                                     background_tasks: BackgroundTasks,
                                     http_request: Request):
    """Enhanced question endpoint with full observability"""
    start_time = datetime.utcnow()
    conversation_id = str(uuid.uuid4())

    try:
        # Core RAG processing
        answer_data = rag(request.question, model=request.model)

        # Success logging
        if os.getenv("ENABLE_S3_LOGGING", "false").lower() == "true":
            background_tasks.add_task(
                log_api_call_to_s3,
                "question", conversation_id, request.question,
                answer_data["answer"], start_time, datetime.utcnow(), "success"
            )

        return QuestionResponse(...)

    except Exception as e:
        # Error logging with full context
        if os.getenv("ENABLE_S3_LOGGING", "false").lower() == "true":
            background_tasks.add_task(
                log_api_call_to_s3,
                "question", conversation_id, request.question,
                None, start_time, datetime.utcnow(), "error", str(e)
            )

        # Enhanced error response
        raise HTTPException(
            status_code=500,
            detail={
                "message": "Medical RAG processing failed",
                "conversation_id": conversation_id,
                "error_type": type(e).__name__,
                "timestamp": datetime.utcnow().isoformat()
            }
        )
Enter fullscreen mode Exit fullscreen mode

6. Docker Compose Orchestration & Service Monitoring

Complete Service Stack

# config/docker-compose.yaml
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "5433:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
      interval: 30s
      timeout: 10s
      retries: 3

  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"    # HTTP API
      - "6334:6334"    # gRPC API
    volumes:
      - qdrant_data:/qdrant/storage
    environment:
      - QDRANT__SERVICE__HTTP_PORT=6333
      - QDRANT__SERVICE__GRPC_PORT=6334
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  app:
    build:
      context: ..
      dockerfile: Dockerfile
    environment:
      # Core application settings
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      APP_TYPE: ${APP_TYPE:-fastapi}

      # Database connections
      QDRANT_HOST: "qdrant"
      QDRANT_PORT: "6333"
      POSTGRES_HOST: "postgres"
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}

      # Monitoring and logging
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      S3_BUCKET_NAME: ${S3_BUCKET_NAME}
      ENABLE_S3_LOGGING: ${ENABLE_S3_LOGGING:-true}

    ports:
      - "8000:8000"   # FastAPI
      - "8501:8501"   # Streamlit
    depends_on:
      - postgres
      - qdrant
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 15s
      retries: 3

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana_dashboard.json:/var/lib/grafana/dashboards/medical-rag.json
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin}
      - GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
      - GF_AUTH_ANONYMOUS_ENABLED=false
      - GF_SECURITY_ALLOW_EMBEDDING=true
    depends_on:
      - postgres

volumes:
  postgres_data:
  grafana_data:
  qdrant_data:
Enter fullscreen mode Exit fullscreen mode

7. Comprehensive Monitoring Strategy

Key Performance Indicators (KPIs)

Category Metric Target Monitoring Method
Retrieval Quality Hit Rate >88% PostgreSQL analytics
Retrieval Quality Mean Reciprocal Rank >0.94 Ground truth evaluation
Performance P95 Response Time <30s API timing logs
Performance API Error Rate <1% Error tracking
Cost Efficiency Cost per Query <$0.005 Token usage monitoring
User Satisfaction Positive Feedback Ratio >75% Feedback analytics
System Health Service Uptime >99.5% Health check endpoints
Data Quality Relevance Rate >90% LLM evaluation

Advanced Monitoring Dashboards

# Grafana dashboard configuration for medical RAG metrics
GRAFANA_DASHBOARD_PANELS = {
    "system_overview": {
        "queries_per_hour": "SELECT COUNT(*) FROM conversations WHERE timestamp >= NOW() - INTERVAL '1 hour'",
        "avg_response_time": "SELECT AVG(response_time) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours'",
        "success_rate": "SELECT (COUNT(CASE WHEN relevance IN ('RELEVANT', 'PARTLY_RELEVANT') THEN 1 END) * 100.0 / COUNT(*)) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours'"
    },
    "cost_analysis": {
        "daily_cost": "SELECT DATE(timestamp), SUM(openai_cost) FROM conversations WHERE timestamp >= NOW() - INTERVAL '7 days' GROUP BY DATE(timestamp)",
        "cost_per_model": "SELECT model_used, AVG(openai_cost) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours' GROUP BY model_used"
    },
    "quality_metrics": {
        "relevance_distribution": "SELECT relevance, COUNT(*) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours' GROUP BY relevance",
        "feedback_trends": "SELECT DATE(timestamp), AVG(CASE WHEN feedback > 0 THEN 1 WHEN feedback < 0 THEN 0 ELSE 0.5 END) FROM feedback WHERE timestamp >= NOW() - INTERVAL '7 days' GROUP BY DATE(timestamp)"
    }
}
Enter fullscreen mode Exit fullscreen mode

Alert Configuration

# Automated alerting thresholds
ALERT_THRESHOLDS = {
    "high_error_rate": {
        "condition": "error_rate > 5%",
        "window": "15 minutes",
        "action": "send_email_alert"
    },
    "slow_response_time": {
        "condition": "p95_response_time > 45 seconds",
        "window": "10 minutes", 
        "action": "send_slack_alert"
    },
    "cost_anomaly": {
        "condition": "hourly_cost > 3 * avg_hourly_cost",
        "window": "1 hour",
        "action": "send_urgent_alert"
    },
    "low_relevance_rate": {
        "condition": "relevance_rate < 85%",
        "window": "2 hours",
        "action": "trigger_model_review"
    }
}
Enter fullscreen mode Exit fullscreen mode

8. Production Monitoring Best Practices

Health Check Implementation

# Comprehensive health checking
async def check_system_health():
    """Multi-component health validation"""
    health_status = {
        "overall": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "components": {}
    }

    try:
        # PostgreSQL health
        health_status["components"]["postgresql"] = await check_postgres_health()

        # Qdrant health  
        health_status["components"]["qdrant"] = await check_qdrant_health()

        # S3 connectivity
        health_status["components"]["s3"] = await check_s3_health()

        # OpenAI API
        health_status["components"]["openai"] = await check_openai_health()

        # Overall health determination
        unhealthy_components = [k for k, v in health_status["components"].items() 
                              if v.get("status") != "healthy"]

        if unhealthy_components:
            health_status["overall"] = "degraded" if len(unhealthy_components) < 3 else "unhealthy"
            health_status["issues"] = unhealthy_components

    except Exception as e:
        health_status["overall"] = "error"
        health_status["error"] = str(e)

    return health_status
Enter fullscreen mode Exit fullscreen mode

9. Future Monitoring Enhancements

Immediate Improvements

  • Real-time Dashboards: Live system status and performance monitoring
  • Predictive Alerting: Machine learning-based anomaly detection
  • Cost Optimization: Automated model selection based on query complexity
  • Performance Profiling: Detailed latency breakdown analysis

Advanced Features

  • Distributed Tracing: End-to-end request flow visualization
  • Custom Metrics: Medical domain-specific KPI tracking
  • User Journey Analytics: Multi-session user behavior analysis
  • Compliance Reporting: Automated regulatory compliance reports

Integration Opportunities

  • Prometheus Integration: Export metrics to Prometheus/Grafana ecosystem
  • ELK Stack: Centralized log management and analysis
  • APM Tools: Integration with DataDog, New Relic, or similar platforms
  • Medical Compliance: HIPAA audit trail automation and reporting

Top comments (0)