This document provides detailed coverage of the monitoring, metrics, logging, and observability systems that ensure reliable operation of the medical RAG assistant.
1. Multi-Layer Observability Architecture
System Overview
[User Interactions] → [API Metrics] → [RAG Performance] → [Infrastructure Health]
↓ ↓ ↓ ↓
[PostgreSQL] ←→ [Grafana Dashboards] ←→ [S3 Audit Logs] ←→ [Health Checks]
Observability Pillars
- Metrics: Quantitative performance and business metrics
- Logs: Detailed audit trails and debugging information
- Traces: End-to-end request flow tracking
- Health: System component availability and reliability
2. PostgreSQL Persistence Layer
Comprehensive Conversation Storage
# src/database/db.py
def save_conversation(conversation_id, question, answer_data, timestamp=None):
"""Store complete conversation with comprehensive metadata"""
if timestamp is None:
timestamp = datetime.now(tz)
conn = get_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO conversations
(id, question, answer, model_used, response_time, relevance,
relevance_explanation, prompt_tokens, completion_tokens, total_tokens,
eval_prompt_tokens, eval_completion_tokens, eval_total_tokens,
openai_cost, timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
conversation_id,
question,
answer_data["answer"],
answer_data["model_used"],
answer_data["response_time"],
answer_data["relevance"],
answer_data["relevance_explanation"],
answer_data["token_stats"]["rag_tokens"]["prompt_tokens"],
answer_data["token_stats"]["rag_tokens"]["completion_tokens"],
answer_data["token_stats"]["rag_tokens"]["total_tokens"],
answer_data["token_stats"]["evaluation_tokens"]["prompt_tokens"],
answer_data["token_stats"]["evaluation_tokens"]["completion_tokens"],
answer_data["token_stats"]["evaluation_tokens"]["total_tokens"],
answer_data["total_cost"],
timestamp,
),
)
conn.commit()
finally:
conn.close()
User Feedback Analytics
# src/database/db.py
def save_feedback(conversation_id, feedback, timestamp=None):
"""Store user feedback with timestamp and conversation linkage"""
if timestamp is None:
timestamp = datetime.now(tz)
conn = get_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO feedback (conversation_id, feedback, timestamp)
VALUES (%s, %s, COALESCE(%s, CURRENT_TIMESTAMP))
""",
(conversation_id, feedback, timestamp),
)
conn.commit()
finally:
conn.close()
def get_feedback_stats():
"""Generate comprehensive feedback analytics"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(
"""
SELECT
COUNT(*) as total_feedback,
SUM(CASE WHEN feedback > 0 THEN 1 ELSE 0 END) as thumbs_up,
SUM(CASE WHEN feedback < 0 THEN 1 ELSE 0 END) as thumbs_down,
ROUND(AVG(CASE WHEN feedback != 0 THEN feedback END), 3) as avg_rating,
COUNT(DISTINCT conversation_id) as unique_conversations
FROM feedback
WHERE timestamp >= NOW() - INTERVAL '24 hours'
"""
)
return cur.fetchone()
finally:
conn.close()
Advanced Analytics Queries
# src/database/db.py
def get_performance_metrics(hours=24):
"""Generate comprehensive performance analytics"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(
"""
SELECT
COUNT(*) as query_count,
AVG(response_time) as avg_response_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time) as p95_response_time,
AVG(openai_cost) as avg_cost,
SUM(openai_cost) as total_cost,
AVG(total_tokens) as avg_tokens,
COUNT(CASE WHEN relevance = 'RELEVANT' THEN 1 END) as relevant_answers,
COUNT(CASE WHEN relevance = 'NON_RELEVANT' THEN 1 END) as non_relevant_answers
FROM conversations
WHERE timestamp >= NOW() - INTERVAL '%s hours'
""",
(hours,)
)
return cur.fetchone()
finally:
conn.close()
3. Advanced Metrics API
Comprehensive Metrics Endpoint
# src/api/main_api.py
@app.get("/metrics")
async def get_metrics():
"""Comprehensive system metrics for monitoring dashboards"""
try:
# Performance metrics
perf_metrics = get_performance_metrics(24) # Last 24 hours
# Feedback analytics
feedback_stats = get_feedback_stats()
# System health
health_status = {
"qdrant_status": check_qdrant_connection(),
"postgres_status": check_postgres_connection(),
"s3_status": check_s3_availability(),
}
return {
"timestamp": datetime.utcnow().isoformat(),
"performance": {
"total_queries": perf_metrics.get("query_count", 0),
"avg_response_time": float(perf_metrics.get("avg_response_time", 0)),
"p95_response_time": float(perf_metrics.get("p95_response_time", 0)),
"success_rate": calculate_success_rate(perf_metrics),
"avg_cost_per_query": float(perf_metrics.get("avg_cost", 0)),
"total_cost_24h": float(perf_metrics.get("total_cost", 0)),
},
"quality": {
"relevance_rate": calculate_relevance_rate(perf_metrics),
"avg_tokens_per_response": int(perf_metrics.get("avg_tokens", 0)),
},
"user_satisfaction": {
"total_feedback": feedback_stats.get("total_feedback", 0),
"positive_ratio": calculate_positive_ratio(feedback_stats),
"avg_rating": float(feedback_stats.get("avg_rating", 0)),
},
"system_health": health_status
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Metrics unavailable: {str(e)}")
def calculate_success_rate(metrics):
"""Calculate system success rate based on error logs"""
total = metrics.get("query_count", 0)
if total == 0:
return 1.0
relevant = metrics.get("relevant_answers", 0)
partly_relevant = metrics.get("partly_relevant_answers", 0)
return (relevant + partly_relevant) / total
def calculate_relevance_rate(metrics):
"""Calculate answer relevance rate"""
total = metrics.get("query_count", 0)
if total == 0:
return 1.0
relevant = metrics.get("relevant_answers", 0)
return relevant / total
4. Enterprise S3 Logging Architecture
Multi-Purpose S3 Service
# src/services/s3_service.py
class S3Service:
"""Enterprise-grade S3 service for comprehensive logging and audit trails"""
def __init__(self):
self.bucket_name = os.getenv("S3_BUCKET_NAME", "medical-rag-logs")
self.aws_region = os.getenv("AWS_REGION", "us-east-1")
self._initialize_client()
def upload_logs(self, log_data: List[Dict[str, Any]], log_type: str) -> bool:
"""Upload structured logs with compression and metadata"""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
key = f"logs/{log_type}/{timestamp}.json"
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"log_type": log_type,
"count": len(log_data),
"metadata": {
"service_version": "1.0.0",
"environment": os.getenv("ENVIRONMENT", "production"),
"region": self.aws_region,
},
"logs": log_data,
}
return self.upload_json(log_entry, key, compress=True)
Specialized Logging Functions
# src/services/s3_service.py
def upload_conversation_to_s3(conversation_data: Dict[str, Any]) -> bool:
"""Upload individual conversation with medical compliance metadata"""
conversation_id = conversation_data.get("conversation_id", "unknown")
timestamp = datetime.utcnow().strftime("%Y/%m/%d")
key = f"conversations/{timestamp}/{conversation_id}.json"
# Add compliance and audit metadata
enhanced_data = {
**conversation_data,
"audit_info": {
"logged_at": datetime.utcnow().isoformat(),
"retention_policy": "7_years", # Medical record retention
"compliance": ["HIPAA", "SOC2"],
"data_classification": "medical_phi"
}
}
return s3_service.upload_json(enhanced_data, key)
def upload_feedback_to_s3(feedback_data: Dict[str, Any]) -> bool:
"""Upload user feedback with analytics metadata"""
timestamp = datetime.utcnow().strftime("%Y/%m/%d")
feedback_id = f"{feedback_data.get('conversation_id', 'unknown')}_{int(datetime.utcnow().timestamp())}"
key = f"feedback/{timestamp}/{feedback_id}.json"
return s3_service.upload_json(feedback_data, key)
def upload_system_metrics_snapshot(metrics_data: Dict[str, Any]) -> bool:
"""Upload periodic system metrics for long-term trend analysis"""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
key = f"metrics/system_snapshots/{timestamp}.json"
metrics_entry = {
"timestamp": datetime.utcnow().isoformat(),
"snapshot_type": "system_metrics",
"metrics": metrics_data,
"system_info": {
"cpu_usage": get_cpu_usage(),
"memory_usage": get_memory_usage(),
"disk_usage": get_disk_usage(),
}
}
return s3_service.upload_json(metrics_entry, key)
Daily Data Export and Archival
# src/services/s3_service.py
def export_daily_medical_data(conversations: List[Dict], feedback: List[Dict]) -> bool:
"""Export daily aggregated medical data with compliance reporting"""
date_str = datetime.utcnow().strftime("%Y%m%d")
daily_export = {
"export_date": date_str,
"timestamp": datetime.utcnow().isoformat(),
"summary": {
"total_conversations": len(conversations),
"total_feedback": len(feedback),
"positive_feedback": len([f for f in feedback if f.get("feedback", 0) > 0]),
"negative_feedback": len([f for f in feedback if f.get("feedback", 0) < 0]),
"avg_response_time": calculate_avg_response_time(conversations),
"total_cost": sum(c.get("cost", 0) for c in conversations),
"unique_users": len(set(c.get("user_id") for c in conversations if c.get("user_id"))),
},
"quality_metrics": {
"relevance_distribution": calculate_relevance_distribution(conversations),
"model_usage": calculate_model_usage(conversations),
"department_queries": calculate_department_distribution(conversations),
},
"compliance": {
"phi_handling": "compliant",
"retention_status": "active",
"audit_trail": "complete"
},
"data": {
"conversations": conversations,
"feedback": feedback,
}
}
key = f"daily_exports/{date_str}_medical_data.json"
return s3_service.upload_json(daily_export, key, compress=True)
5. Comprehensive API Observability
Request/Response Logging Pipeline
# src/api/main_api.py
async def log_api_call_to_s3(endpoint: str, conversation_id: str, request_data: str,
response_data: Optional[str], start_time: datetime,
end_time: datetime, status: str, error: Optional[str] = None):
"""Comprehensive API call logging for audit, compliance, and analytics"""
try:
if is_s3_available():
log_data = {
"endpoint": endpoint,
"conversation_id": conversation_id,
"request_data": request_data,
"response_data": response_data,
"timing": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_ms": (end_time - start_time).total_seconds() * 1000,
},
"status": status,
"error": error,
"metadata": {
"timestamp": datetime.utcnow().isoformat(),
"service_version": "1.0.0",
"user_agent": request.headers.get("user-agent"),
"ip_address": request.client.host if hasattr(request, 'client') else None,
},
"compliance": {
"logged_for": "audit_and_analytics",
"retention_period": "7_years",
"data_classification": "medical_operational"
}
}
# Background upload to prevent API latency
upload_logs_to_s3([log_data], f"api_{endpoint}")
except Exception as e:
# Log locally if S3 fails
logger.error(f"Failed to log API call to S3: {e}")
# Enhanced endpoint with comprehensive logging
@app.post("/question")
async def handle_question_with_logging(request: QuestionRequest,
background_tasks: BackgroundTasks,
http_request: Request):
"""Enhanced question endpoint with full observability"""
start_time = datetime.utcnow()
conversation_id = str(uuid.uuid4())
try:
# Core RAG processing
answer_data = rag(request.question, model=request.model)
# Success logging
if os.getenv("ENABLE_S3_LOGGING", "false").lower() == "true":
background_tasks.add_task(
log_api_call_to_s3,
"question", conversation_id, request.question,
answer_data["answer"], start_time, datetime.utcnow(), "success"
)
return QuestionResponse(...)
except Exception as e:
# Error logging with full context
if os.getenv("ENABLE_S3_LOGGING", "false").lower() == "true":
background_tasks.add_task(
log_api_call_to_s3,
"question", conversation_id, request.question,
None, start_time, datetime.utcnow(), "error", str(e)
)
# Enhanced error response
raise HTTPException(
status_code=500,
detail={
"message": "Medical RAG processing failed",
"conversation_id": conversation_id,
"error_type": type(e).__name__,
"timestamp": datetime.utcnow().isoformat()
}
)
6. Docker Compose Orchestration & Service Monitoring
Complete Service Stack
# config/docker-compose.yaml
services:
postgres:
image: postgres:13
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "5433:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
interval: 30s
timeout: 10s
retries: 3
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333" # HTTP API
- "6334:6334" # gRPC API
volumes:
- qdrant_data:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
- QDRANT__SERVICE__GRPC_PORT=6334
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 30s
timeout: 10s
retries: 3
app:
build:
context: ..
dockerfile: Dockerfile
environment:
# Core application settings
OPENAI_API_KEY: ${OPENAI_API_KEY}
APP_TYPE: ${APP_TYPE:-fastapi}
# Database connections
QDRANT_HOST: "qdrant"
QDRANT_PORT: "6333"
POSTGRES_HOST: "postgres"
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
# Monitoring and logging
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
S3_BUCKET_NAME: ${S3_BUCKET_NAME}
ENABLE_S3_LOGGING: ${ENABLE_S3_LOGGING:-true}
ports:
- "8000:8000" # FastAPI
- "8501:8501" # Streamlit
depends_on:
- postgres
- qdrant
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 15s
retries: 3
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana_dashboard.json:/var/lib/grafana/dashboards/medical-rag.json
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin}
- GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
- GF_AUTH_ANONYMOUS_ENABLED=false
- GF_SECURITY_ALLOW_EMBEDDING=true
depends_on:
- postgres
volumes:
postgres_data:
grafana_data:
qdrant_data:
7. Comprehensive Monitoring Strategy
Key Performance Indicators (KPIs)
Category | Metric | Target | Monitoring Method |
---|---|---|---|
Retrieval Quality | Hit Rate | >88% | PostgreSQL analytics |
Retrieval Quality | Mean Reciprocal Rank | >0.94 | Ground truth evaluation |
Performance | P95 Response Time | <30s | API timing logs |
Performance | API Error Rate | <1% | Error tracking |
Cost Efficiency | Cost per Query | <$0.005 | Token usage monitoring |
User Satisfaction | Positive Feedback Ratio | >75% | Feedback analytics |
System Health | Service Uptime | >99.5% | Health check endpoints |
Data Quality | Relevance Rate | >90% | LLM evaluation |
Advanced Monitoring Dashboards
# Grafana dashboard configuration for medical RAG metrics
GRAFANA_DASHBOARD_PANELS = {
"system_overview": {
"queries_per_hour": "SELECT COUNT(*) FROM conversations WHERE timestamp >= NOW() - INTERVAL '1 hour'",
"avg_response_time": "SELECT AVG(response_time) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours'",
"success_rate": "SELECT (COUNT(CASE WHEN relevance IN ('RELEVANT', 'PARTLY_RELEVANT') THEN 1 END) * 100.0 / COUNT(*)) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours'"
},
"cost_analysis": {
"daily_cost": "SELECT DATE(timestamp), SUM(openai_cost) FROM conversations WHERE timestamp >= NOW() - INTERVAL '7 days' GROUP BY DATE(timestamp)",
"cost_per_model": "SELECT model_used, AVG(openai_cost) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours' GROUP BY model_used"
},
"quality_metrics": {
"relevance_distribution": "SELECT relevance, COUNT(*) FROM conversations WHERE timestamp >= NOW() - INTERVAL '24 hours' GROUP BY relevance",
"feedback_trends": "SELECT DATE(timestamp), AVG(CASE WHEN feedback > 0 THEN 1 WHEN feedback < 0 THEN 0 ELSE 0.5 END) FROM feedback WHERE timestamp >= NOW() - INTERVAL '7 days' GROUP BY DATE(timestamp)"
}
}
Alert Configuration
# Automated alerting thresholds
ALERT_THRESHOLDS = {
"high_error_rate": {
"condition": "error_rate > 5%",
"window": "15 minutes",
"action": "send_email_alert"
},
"slow_response_time": {
"condition": "p95_response_time > 45 seconds",
"window": "10 minutes",
"action": "send_slack_alert"
},
"cost_anomaly": {
"condition": "hourly_cost > 3 * avg_hourly_cost",
"window": "1 hour",
"action": "send_urgent_alert"
},
"low_relevance_rate": {
"condition": "relevance_rate < 85%",
"window": "2 hours",
"action": "trigger_model_review"
}
}
8. Production Monitoring Best Practices
Health Check Implementation
# Comprehensive health checking
async def check_system_health():
"""Multi-component health validation"""
health_status = {
"overall": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"components": {}
}
try:
# PostgreSQL health
health_status["components"]["postgresql"] = await check_postgres_health()
# Qdrant health
health_status["components"]["qdrant"] = await check_qdrant_health()
# S3 connectivity
health_status["components"]["s3"] = await check_s3_health()
# OpenAI API
health_status["components"]["openai"] = await check_openai_health()
# Overall health determination
unhealthy_components = [k for k, v in health_status["components"].items()
if v.get("status") != "healthy"]
if unhealthy_components:
health_status["overall"] = "degraded" if len(unhealthy_components) < 3 else "unhealthy"
health_status["issues"] = unhealthy_components
except Exception as e:
health_status["overall"] = "error"
health_status["error"] = str(e)
return health_status
9. Future Monitoring Enhancements
Immediate Improvements
- Real-time Dashboards: Live system status and performance monitoring
- Predictive Alerting: Machine learning-based anomaly detection
- Cost Optimization: Automated model selection based on query complexity
- Performance Profiling: Detailed latency breakdown analysis
Advanced Features
- Distributed Tracing: End-to-end request flow visualization
- Custom Metrics: Medical domain-specific KPI tracking
- User Journey Analytics: Multi-session user behavior analysis
- Compliance Reporting: Automated regulatory compliance reports
Integration Opportunities
- Prometheus Integration: Export metrics to Prometheus/Grafana ecosystem
- ELK Stack: Centralized log management and analysis
- APM Tools: Integration with DataDog, New Relic, or similar platforms
- Medical Compliance: HIPAA audit trail automation and reporting
Top comments (0)