🎯 El Desafío de Construir una API de Producción
Imagina que tienes un sistema RAG completo funcionando:
- ✅ 413 artículos legales vectorizados en Qdrant
- ✅ Embeddings con modelos de última generación
- ✅ LLMs configurados (OpenAI y Gemini)
- ✅ Reranking para mejorar precisión
Ahora el verdadero desafío: ¿cómo expones esta funcionalidad al mundo de forma segura, escalable y mantenible?
Necesitas una API que:
- 🔒 Proteja los endpoints con autenticación JWT
- ⚡ Responda en milisegundos bajo alta carga
- 📊 Documente automáticamente sus endpoints
- 🔍 Monitoree cada request con observabilidad completa
- 🛡️ Valide inputs/outputs con type safety
- 🔄 Maneje errores gracefully con retry logic
- 📈 Escale horizontalmente sin fricción
📊 La Magnitud del Problema
Requisitos de una API de Producción para RAG
- 🏗️ Arquitectura Modular: Separación clara de responsabilidades
- 🔐 Seguridad: JWT con RSA, CORS, rate limiting
- 📝 Documentación Automática: OpenAPI/Swagger sin esfuerzo manual
- ⚙️ Configuración Flexible: Variables de entorno para todos los servicios
- 🎯 Validación Estricta: Pydantic models para requests/responses
- 🔄 Orquestación Compleja: Coordinar 6+ servicios (Qdrant, embeddings, LLM, reranking, Phoenix, GCP)
- 📊 Observabilidad: Tracing completo con OpenTelemetry
- 🚀 Performance: Async/await para operaciones I/O bound
- 🛡️ Resiliencia: Retry logic, timeouts, circuit breakers
- 📦 Deployment-ready: Docker, health checks, logs estructurados
Desafíos Técnicos Específicos
- 🔍 Dependency Injection: Gestionar servicios singleton compartidos
- ⏱️ Rate Limiting: Prevenir abuso sin afectar usuarios legítimos
- 🔒 Autenticación Flexible: Endpoints públicos + privados
- 📊 Middleware Stack: CORS, TrustedHost, exception handlers
- 🎯 Validación Multinivel: Request body, query params, headers
- 🔄 Lifecycle Management: Inicialización y cierre graceful de servicios
💡 La Solución: FastAPI Framework
FastAPI es un framework moderno de Python que ofrece:
- ⚡ Performance extrema: Basado en Starlette (async) y Pydantic (validación)
- 📝 Documentación automática: OpenAPI + Swagger UI out-of-the-box
- 🎯 Type hints nativos: Validación y autocompletado con Python 3.13+
- 🔄 Async-first: Soporte nativo para async/await
- 🛡️ Dependency injection: Sistema robusto y flexible
- 📊 Standards-based: OpenAPI, JSON Schema, OAuth2
¿Por Qué FastAPI vs Otras Opciones?
Característica | FastAPI | Flask | Django REST | Express.js |
---|---|---|---|---|
Performance | ⚡⚡⚡ | ⚡ | ⚡⚡ | ⚡⚡⚡ |
Type Safety | ✅ | ❌ | ⚠️ | ❌ |
Async Native | ✅ | ⚠️ | ⚠️ | ✅ |
Auto Docs | ✅ | ❌ | ⚠️ | ❌ |
Data Validation | ✅ Pydantic | ❌ | ✅ Serializers | ❌ |
Learning Curve | Baja | Muy Baja | Alta | Baja |
Ecosystem | Creciendo | Maduro | Muy Maduro | Muy Maduro |
Ideal para | APIs modernas | Prototipos | Apps full-stack | Node.js devs |
Nuestra elección: FastAPI por su balance perfecto entre performance, developer experience y features de producción.
🏗️ Arquitectura de la API
Estructura de Carpetas
src/lus_laboris_api/
├── api/
│ ├── main.py # Application factory, middleware
│ ├── config.py # Settings con Pydantic
│ │
│ ├── auth/ # Autenticación
│ │ ├── jwt_handler.py # Validación JWT
│ │ └── dependencies.py # Auth dependencies
│ │
│ ├── endpoints/ # Routers por dominio
│ │ ├── health.py # Health checks
│ │ ├── status.py # Status y root
│ │ ├── rag.py # RAG queries
│ │ └── vectorstore.py # Gestión de colecciones
│ │
│ ├── models/ # Pydantic models
│ │ ├── requests.py # Request schemas
│ │ └── responses.py # Response schemas
│ │
│ └── services/ # Business logic
│ ├── rag_service.py # Orquestación RAG
│ ├── qdrant_service.py # Qdrant client
│ ├── embedding_service.py # Embeddings
│ ├── reranking_service.py # Reranking
│ ├── evaluation_service.py # LLM evaluations
│ ├── phoenix_service.py # Observabilidad
│ └── gcp_service.py # GCP integration
│
├── Dockerfile # Container para API
├── docker-compose.yml # Stack completo (API + Qdrant + Phoenix)
├── start_api_dev.sh # Script de desarrollo
└── pyproject.toml # Dependencias con UV
Principios de diseño:
- ✅ Separación de concerns: Routers → Services → External APIs
- ✅ Single Responsibility: Cada servicio tiene una responsabilidad clara
- ✅ Dependency Injection: Servicios singleton compartidos
- ✅ Configuration as Code: Todo configurable vía environment variables
🚀 Configuración y Setup
1. Settings con Pydantic
El corazón de la configuración es config.py
usando pydantic-settings
:
# src/lus_laboris_api/api/config.py
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings with environment variable support"""
# API Configuration
api_host: str = "0.0.0.0"
api_port: int = 8000
api_reload: bool = False
api_log_level: str = "info"
# Security
api_allowed_origins: list[str] = Field(default=["*"])
api_allowed_hosts: list[str] = Field(default=["*"])
api_jwt_public_key_path: str = None
api_jwt_aud: str = "lus-laboris-client"
api_jwt_iss: str = "lus-laboris-api"
# Qdrant Configuration
api_qdrant_url: str = None
api_qdrant_api_key: str | None = None
api_qdrant_collection_name: str = None
api_qdrant_grpc_port: int = 6334
api_qdrant_prefer_grpc: bool = True # 2-3x más rápido
# Embedding Configuration
api_embedding_model: str = None
api_embedding_batch_size: int = 100
# Reranking Configuration
api_reranking_model: str = None
api_use_reranking: bool = False
# RAG Configuration
api_rag_top_k: int = None
api_llm_provider: str = None # 'openai' o 'gemini'
api_llm_model: str = None
# LLM API Keys
openai_api_key: str | None = None
gemini_api_key: str | None = None
# Rate Limiting
api_rate_limit_requests: int = 10
api_rate_limit_window: str = "1 minute"
# Phoenix Monitoring
api_phoenix_enabled: bool = True
api_phoenix_endpoint: str | None = None
api_phoenix_api_key: str | None = None
api_phoenix_project_name: str = "lus-laboris-api"
# Environment
api_environment: str = "development"
class Config:
# Resolución automática de .env
project_root = Path(__file__).parent.parent.parent.parent
env_file = project_root / ".env"
env_file_encoding = "utf-8"
case_sensitive = False
extra = "ignore"
# Singleton global
settings = Settings()
Características clave:
- ✅ Type safety: Validación automática de tipos
- ✅ Defaults sensibles: Valores por defecto para desarrollo
- ✅ Path resolution: Rutas relativas al proyecto root
- ✅ Flexible .env: Soporta múltiples archivos .env
- ✅ Secrets management: API keys cargadas de forma segura
2. Application Factory con Lifespan
El main.py
define el ciclo de vida completo de la aplicación:
# src/lus_laboris_api/api/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from .config import settings
from .endpoints import health, rag, status, vectorstore
from .services.embedding_service import embedding_service
from .services.qdrant_service import qdrant_service
from .services.rag_service import rag_service
from .services.evaluation_service import evaluation_service
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager - startup and shutdown logic"""
# === STARTUP ===
logger.info("Starting Lus Laboris API...")
try:
# 1. Initialize Qdrant connection
qdrant_status = qdrant_service.health_check()
logger.info(f"Qdrant status: {qdrant_status.get('status')}")
if qdrant_status.get('status') != 'healthy':
logger.warning("Qdrant is not healthy, but API will continue")
# 2. Initialize embedding service (load models)
embedding_status = embedding_service.health_check()
logger.info(f"Embedding service status: {embedding_status.get('status')}")
# 3. Initialize RAG service
rag_status = rag_service.health_check()
logger.info(f"RAG service status: {rag_status.get('status')}")
# 4. Initialize evaluation service (async)
eval_status = evaluation_service.health_check()
logger.info(f"Evaluation service status: {eval_status.get('status')}")
logger.info("All services initialized successfully")
except Exception as e:
logger.exception("Failed to initialize services")
# Continue anyway - health endpoints will report failures
yield # API runs here
# === SHUTDOWN ===
logger.info("Shutting down Lus Laboris API...")
try:
# Graceful shutdown of evaluation service
evaluation_service.shutdown()
logger.info("Evaluation service shut down successfully")
except Exception as e:
logger.exception("Error shutting down evaluation service")
# Create FastAPI application
app = FastAPI(
title="Lus Laboris API",
description="API for semantic search and retrieval of Paraguayan labor law information",
version="1.0.0",
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
openapi_url="/openapi.json",
lifespan=lifespan,
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.api_allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add trusted host middleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=settings.api_allowed_hosts
)
# Include routers
app.include_router(status.router)
app.include_router(health.router)
app.include_router(vectorstore.router)
app.include_router(rag.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.api_reload,
log_level=settings.api_log_level,
)
Ventajas del lifespan:
- ✅ Startup previsible: Inicializar servicios en orden correcto
- ✅ Health check temprano: Detectar problemas antes de recibir requests
- ✅ Graceful shutdown: Cerrar conexiones y threads correctamente
- ✅ Resource management: Carga de modelos ML solo una vez
🔐 Autenticación JWT con RSA
1. Validador JWT
La autenticación usa JWT firmados con RSA (algoritmo RS256):
# src/lus_laboris_api/api/auth/jwt_handler.py
import logging
import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from ..config import settings
logger = logging.getLogger(__name__)
class JWTValidator:
"""JWT validator using RSA public key"""
def __init__(self):
self.public_key = None
self.algorithm = "RS256"
self._load_public_key()
def _load_public_key(self):
"""Load RSA public key for token validation"""
public_key_path = settings.api_jwt_public_key_path
# Resolve relative paths
if not os.path.isabs(public_key_path):
project_root = Path(__file__).parent.parent.parent.parent
public_key_path = project_root / public_key_path
try:
with open(public_key_path, 'rb') as f:
self.public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
logger.info(f"JWT public key loaded from {public_key_path}")
except FileNotFoundError:
logger.error(f"JWT public key not found at {public_key_path}")
raise ValueError("JWT public key not found")
def validate_token(self, token: str) -> dict[str, Any]:
"""Validate JWT token and return payload"""
if not self.public_key:
raise ValueError("Public key not available")
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=[self.algorithm],
audience=settings.api_jwt_aud, # 'lus-laboris-client'
issuer=settings.api_jwt_iss, # 'lus-laboris-api'
options={
'verify_exp': True, # Verificar expiración
'verify_iat': True, # Verificar issued_at
'verify_aud': True, # Verificar audience
'verify_iss': True, # Verificar issuer
},
)
logger.info(f"JWT validated for: {payload.get('sub', 'unknown')}")
return payload
except jwt.ExpiredSignatureError:
logger.warning("JWT token expired")
raise ValueError("Token expired")
except jwt.InvalidAudienceError:
logger.warning("Invalid JWT audience")
raise ValueError(f"Invalid audience. Expected: {settings.api_jwt_aud}")
except jwt.InvalidIssuerError:
logger.warning("Invalid JWT issuer")
raise ValueError(f"Invalid issuer. Expected: {settings.api_jwt_iss}")
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid JWT token: {e}")
raise ValueError(f"Invalid token: {e}")
# Singleton global
jwt_validator = JWTValidator()
2. Dependencies para Autenticación
# src/lus_laboris_api/api/auth/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Any
from .jwt_handler import jwt_validator
# Security scheme para Swagger UI
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict[str, Any]:
"""Dependency para endpoints que requieren autenticación"""
token = credentials.credentials
try:
payload = jwt_validator.validate_token(token)
return payload
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
headers={"WWW-Authenticate": "Bearer"},
)
def optional_auth(
credentials: HTTPAuthorizationCredentials | None = Depends(
HTTPBearer(auto_error=False)
)
) -> dict[str, Any] | None:
"""Dependency para endpoints con autenticación opcional"""
if not credentials:
return None
try:
payload = jwt_validator.validate_token(credentials.credentials)
return payload
except ValueError:
return None # Silencioso - no bloquea acceso
Uso en endpoints:
from fastapi import Depends
from .auth.dependencies import get_current_user, optional_auth
# Endpoint protegido (requiere JWT)
@router.get("/admin/metrics")
async def admin_metrics(
token_payload: dict = Depends(get_current_user)
):
user = token_payload.get('sub')
return {"metrics": "...", "requested_by": user}
# Endpoint con auth opcional (más info si está autenticado)
@router.get("/health/detailed")
async def detailed_health(
token_payload: dict | None = Depends(optional_auth)
):
is_authenticated = token_payload is not None
if is_authenticated:
return {"status": "healthy", "details": "..."}
else:
return {"status": "healthy"} # Info limitada
📝 Modelos Pydantic
Request Models
# src/lus_laboris_api/api/models/requests.py
from pydantic import BaseModel, Field
class QuestionRequest(BaseModel):
"""Request model for RAG queries"""
question: str = Field(
...,
description="Question about Paraguayan labor law",
min_length=5,
max_length=1000,
)
class Config:
json_schema_extra = {
"example": {
"question": "¿Cuáles son los derechos del trabajador en caso de despido?"
}
}
class LoadToVectorstoreLocalRequest(BaseModel):
"""Request for loading data from local files"""
filename: str = Field(
...,
description="JSON file name (without path)",
min_length=1
)
local_data_path: str | None = Field(
None,
description="Path to data directory (relative to project root). Default: 'data/processed'"
)
replace_collection: bool = Field(
False,
description="Replace collection if exists"
)
class Config:
json_schema_extra = {
"example": {
"filename": "codigo_trabajo_articulos.json",
"local_data_path": "data/processed",
"replace_collection": False
}
}
Response Models
# src/lus_laboris_api/api/models/responses.py
from pydantic import BaseModel, Field
from typing import Any
class BaseResponse(BaseModel):
"""Base response model"""
success: bool
message: str
class QuestionResponse(BaseResponse):
"""Response model for RAG queries"""
question: str
answer: str | None = None
error: str | None = None
processing_time_seconds: float
documents_retrieved: int | None = None
top_k: int | None = None
reranking_applied: bool | None = None
documents: list[dict[str, Any]] | None = None
session_id: str | None = None
class Config:
json_schema_extra = {
"example": {
"success": True,
"message": "Question answered successfully",
"question": "¿Cuántos días de vacaciones corresponden?",
"answer": "Según el Artículo 218...",
"processing_time_seconds": 1.234,
"documents_retrieved": 5,
"top_k": 5,
"reranking_applied": True,
"documents": [...],
"session_id": "session_20241016_123456"
}
}
Beneficios de Pydantic:
- ✅ Validación automática: Type checking en runtime
- ✅ Documentación automática: OpenAPI schema generado
- ✅ Serialization: JSON encoding/decoding automático
- ✅ IDE support: Autocompletado y type hints
- ✅ Error messages: Mensajes de error detallados y legibles
🎯 Endpoints Principales
1. Health Check Endpoint
# src/lus_laboris_api/api/endpoints/health.py
from fastapi import APIRouter, Depends
from ..auth.dependencies import optional_auth
from ..services.qdrant_service import qdrant_service
from ..services.embedding_service import embedding_service
from ..services.rag_service import rag_service
router = APIRouter(prefix="/api/health", tags=["Health"])
def _sanitize_health_response(
status: dict[str, Any],
is_authenticated: bool
) -> dict[str, Any]:
"""Sanitize health response based on authentication"""
if is_authenticated:
return status # Full details
else:
# Only return status for unauthenticated users
return {"status": status.get("status")}
@router.get("/")
async def health_check():
"""Basic health check - always public"""
return {
"status": "healthy",
"api": "Lus Laboris API",
"version": "1.0.0"
}
@router.get("/qdrant")
async def qdrant_health_check(
token_payload: dict[str, Any] | None = Depends(optional_auth)
):
"""Qdrant health check with smart info filtering"""
is_authenticated = token_payload is not None
status = qdrant_service.health_check()
return _sanitize_health_response(status, is_authenticated)
@router.get("/embeddings")
async def embeddings_health_check(
token_payload: dict[str, Any] | None = Depends(optional_auth)
):
"""Embeddings service health check"""
is_authenticated = token_payload is not None
status = embedding_service.health_check()
return _sanitize_health_response(status, is_authenticated)
@router.get("/rag")
async def rag_health_check(
token_payload: dict[str, Any] | None = Depends(optional_auth)
):
"""RAG service health check"""
is_authenticated = token_payload is not None
status = rag_service.health_check()
return _sanitize_health_response(status, is_authenticated)
Ejemplo de respuesta sin autenticación:
{
"status": "healthy"
}
Ejemplo de respuesta con JWT:
{
"status": "healthy",
"url": "http://35.123.45.67:6333",
"connection_type": "gRPC",
"collections_count": 3,
"collection": "labor_law_articles",
"documents_count": 413
}
2. RAG Query Endpoint
# src/lus_laboris_api/api/endpoints/rag.py
import logging
from fastapi import APIRouter, HTTPException, Request, status
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..models.requests import QuestionRequest
from ..models.responses import QuestionResponse
from ..services.rag_service import rag_service
from ..services.phoenix_service import phoenix_service
logger = logging.getLogger(__name__)
# Rate limiter
limiter = Limiter(key_func=get_remote_address)
router = APIRouter(prefix="/api/rag", tags=["RAG"])
@router.post(
"/ask",
response_model=QuestionResponse,
summary="Ask a question about Paraguayan labor law",
description="Uses RAG (Retrieval-Augmented Generation) to answer legal questions"
)
@limiter.limit("10/minute") # 10 requests per minute per IP
async def ask_question(
request: Request, # Required for rate limiting
question_data: QuestionRequest,
) -> QuestionResponse:
"""
Ask a question using RAG pipeline:
1. Generate embedding for question
2. Search relevant documents in Qdrant
3. Optional: Rerank results
4. Generate answer with LLM
5. Track everything with Phoenix
"""
session_id = None
try:
logger.info(f"Received question: {question_data.question[:100]}...")
# Create monitoring session
session_id = phoenix_service.create_session()
# Answer using RAG service (async)
result = await rag_service.answer_question(
question_data.question,
session_id
)
# Build response
response = QuestionResponse(
success=result["success"],
message="Question answered successfully" if result["success"]
else "Failed to answer question",
question=result["question"],
answer=result.get("answer"),
error=result.get("error"),
processing_time_seconds=result["processing_time_seconds"],
documents_retrieved=result.get("documents_retrieved"),
top_k=result.get("top_k"),
reranking_applied=result.get("reranking_applied"),
documents=result.get("documents"),
session_id=result.get("session_id"),
)
if result["success"]:
logger.info(
f"Question answered in {result['processing_time_seconds']:.3f}s "
f"for session {session_id}"
)
else:
logger.error(f"Failed to answer: {result.get('error')}")
return response
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {e}"
)
finally:
# End monitoring session
if session_id:
phoenix_service.end_session(session_id)
Ejemplo de request:
curl -X POST "http://localhost:8000/api/rag/ask" \
-H "Content-Type: application/json" \
-d '{
"question": "¿Cuántos días de vacaciones corresponden a un trabajador?"
}'
Ejemplo de response:
{
"success": true,
"message": "Question answered successfully",
"question": "¿Cuántos días de vacaciones corresponden a un trabajador?",
"answer": "Según el Artículo 218 del Código del Trabajo de Paraguay, todo trabajador que cumpla un año de trabajo continuo al servicio del mismo empleador tiene derecho a un período de vacaciones anuales remuneradas. La duración específica depende de varios factores establecidos en el código.",
"processing_time_seconds": 2.145,
"documents_retrieved": 5,
"top_k": 5,
"reranking_applied": true,
"documents": [
{
"articulo_numero": 218,
"articulo": "todo trabajador que cumpla un año...",
"score": 0.912,
"rerank_score": 0.987,
"capitulo": "capitulo ii - de las vacaciones"
}
],
"session_id": "session_20241016_143022"
}
3. Vectorstore Management Endpoint
# src/lus_laboris_api/api/endpoints/vectorstore.py
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from ..auth.dependencies import get_current_user
from ..models.requests import LoadToVectorstoreLocalRequest
from ..models.responses import BaseResponse
router = APIRouter(prefix="/api/vectorstore", tags=["Vectorstore"])
@router.post(
"/load/local",
response_model=BaseResponse,
summary="Load data to vectorstore from local files",
dependencies=[Depends(get_current_user)] # Requires JWT
)
async def load_to_vectorstore_local(
request: LoadToVectorstoreLocalRequest,
background_tasks: BackgroundTasks,
token_payload: dict = Depends(get_current_user),
):
"""
Load legal documents to Qdrant from local JSON files.
Requires authentication.
Process:
1. Load JSON from local filesystem
2. Generate embeddings
3. Create/update Qdrant collection
4. Insert documents with metadata
This is a long-running operation executed in background.
"""
try:
current_user = token_payload.get('sub', 'unknown')
logger.info(f"Load request from user: {current_user}")
# Generate unique job ID
job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Run in background
background_tasks.add_task(
_load_to_vectorstore_background,
job_id=job_id,
request=request,
current_user=current_user,
token_payload=token_payload,
)
return BaseResponse(
success=True,
message=f"Loading job started with ID: {job_id}"
)
except Exception as e:
logger.error(f"Failed to start loading job: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
async def _load_to_vectorstore_background(
job_id: str,
request: LoadToVectorstoreLocalRequest,
current_user: str,
token_payload: dict,
):
"""Background task for loading data to vectorstore"""
logger.info(f"[{job_id}] Starting background load task for user: {current_user}")
try:
# 1. Load JSON data
# 2. Generate embeddings
# 3. Insert to Qdrant
# ... (implementation details)
logger.info(f"[{job_id}] Load completed successfully")
except Exception as e:
logger.error(f"[{job_id}] Load failed: {e}")
⚙️ Servicios
RAGService - Orquestación del Pipeline
# src/lus_laboris_api/api/services/rag_service.py
import logging
import time
from typing import Any
from openai import AsyncOpenAI
from google import genai
from ..config import settings
from .embedding_service import embedding_service
from .qdrant_service import qdrant_service
from .reranking_service import reranking_service
from .phoenix_service import phoenix_service
logger = logging.getLogger(__name__)
class RAGService:
"""Service for RAG-based question answering"""
def __init__(self):
self.llm_provider = settings.api_llm_provider.lower()
self.llm_model = settings.api_llm_model
self.collection_name = settings.api_qdrant_collection_name
self.top_k = settings.api_rag_top_k
# Initialize LLM clients
self._initialize_llm_clients()
def _initialize_llm_clients(self):
"""Initialize async LLM clients"""
if self.llm_provider == "openai":
self.openai_client = AsyncOpenAI(
api_key=settings.openai_api_key
)
logger.info("OpenAI async client initialized")
elif self.llm_provider == "gemini":
genai.configure(api_key=settings.gemini_api_key)
logger.info("Gemini configured")
async def answer_question(
self,
query: str,
session_id: str
) -> dict[str, Any]:
"""
Answer question using RAG pipeline:
1. Retrieve relevant documents
2. Build context from documents
3. Generate answer with LLM
4. Track everything with Phoenix
"""
start_time = time.time()
try:
# 1. Retrieve documents (with optional reranking)
documents, retrieval_metadata = self._retrieve_documents(
query, session_id
)
if not documents:
return {
"success": False,
"question": query,
"error": "No relevant documents found",
"processing_time_seconds": time.time() - start_time,
}
# 2. Build context from documents
context = self._build_context(documents)
# 3. Generate answer with LLM (async)
answer = await self._generate_answer(query, context, session_id)
# 4. Track complete RAG span
phoenix_service.track_rag_complete(
session_id=session_id,
query=query,
answer=answer,
documents=documents,
processing_time=time.time() - start_time,
)
return {
"success": True,
"question": query,
"answer": answer,
"documents_retrieved": len(documents),
"top_k": self.top_k,
"reranking_applied": settings.api_use_reranking,
"documents": documents,
"session_id": session_id,
"processing_time_seconds": time.time() - start_time,
}
except Exception as e:
logger.exception(f"Failed to answer question: {e}")
return {
"success": False,
"question": query,
"error": str(e),
"processing_time_seconds": time.time() - start_time,
}
def _retrieve_documents(
self, query: str, session_id: str
) -> tuple[list[dict], dict]:
"""Retrieve documents with embeddings and optional reranking"""
# Generate embedding
query_embedding = embedding_service.generate_single_embedding(
query, model_name=settings.api_embedding_model
)
# Search in Qdrant
search_limit = self.top_k * 2 if settings.api_use_reranking else self.top_k
search_results = qdrant_service.search_documents(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=search_limit,
)
# Optional reranking
if settings.api_use_reranking and search_results:
reranked_docs, rerank_metadata = reranking_service.rerank_documents(
query=query,
documents=search_results,
top_k=self.top_k
)
return reranked_docs, rerank_metadata
return search_results, {}
def _build_context(self, documents: list[dict]) -> str:
"""Build context string from documents"""
context_parts = []
for i, doc in enumerate(documents, 1):
payload = doc.get('payload', {})
articulo = payload.get('articulo', '')
articulo_num = payload.get('articulo_numero', '?')
context_parts.append(
f"Artículo {articulo_num}: {articulo}"
)
return "\n\n".join(context_parts)
async def _generate_answer(
self, query: str, context: str, session_id: str
) -> str:
"""Generate answer using LLM (async)"""
system_prompt = """Eres un asistente legal experto en derecho laboral paraguayo.
Tu rol es responder preguntas sobre el Código del Trabajo de Paraguay basándote
ÚNICAMENTE en el contexto proporcionado.
Reglas:
1. Responde en español de forma clara y profesional
2. Cita los artículos relevantes
3. Si no tienes información suficiente, indícalo claramente
4. No inventes información que no esté en el contexto
"""
user_prompt = f"""Contexto legal:
{context}
Pregunta: {query}
Respuesta:"""
# Generate with OpenAI or Gemini
if self.llm_provider == "openai":
response = await self.openai_client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0.3,
max_tokens=500,
)
answer = response.choices[0].message.content
elif self.llm_provider == "gemini":
# Gemini async call
client = genai.Client(api_key=settings.gemini_api_key)
full_prompt = f"{system_prompt}\n\n{user_prompt}"
response = await client.aio.models.generate_content(
model=self.llm_model,
contents=full_prompt,
)
answer = response.text
return answer
# Singleton global
rag_service = RAGService()
Características del RAGService:
- ✅ Async-first: Uso de
AsyncOpenAI
para performance - ✅ Multi-provider: Soporte para OpenAI y Gemini
- ✅ Observability: Tracking completo con Phoenix
- ✅ Reranking opcional: Mejora de precisión configurable
- ✅ Error handling: Manejo robusto de errores
- ✅ Structured prompts: System + user prompts optimizados
🚀 Deployment y Ejecución
1. Desarrollo Local
# Instalar dependencias con UV
cd src/lus_laboris_api
uv sync
# Configurar variables de entorno
cp .env.example .env
# Editar .env con tus configuraciones
# Iniciar API en desarrollo (con reload)
./start_api_dev.sh
# O manualmente:
uv run uvicorn api.main:app --reload --host 0.0.0.0 --port 8000
2. Docker
# src/lus_laboris_api/Dockerfile
FROM python:3.13-slim
# Install UV
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
WORKDIR /app
# Copy dependency files
COPY pyproject.toml uv.lock ./
# Install dependencies (no dev dependencies in production)
RUN uv sync --frozen --no-dev
# Copy application code
COPY api/ ./api/
# Create non-root user
RUN useradd -m -u 1000 apiuser && chown -R apiuser:apiuser /app
USER apiuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/api/health || exit 1
# Expose port
EXPOSE 8000
# Run API
CMD ["uv", "run", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Build y run:
# Build image
docker build -t lus-laboris-api:latest .
# Run container
docker run -p 8000:8000 \
--env-file .env \
lus-laboris-api:latest
3. Docker Compose (Stack Completo)
# src/lus_laboris_api/docker-compose.yml
services:
# Qdrant vector database
qdrant:
image: qdrant/qdrant:latest
container_name: qdrant
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_storage:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
- QDRANT__SERVICE__GRPC_PORT=6334
networks:
- api-network
# Phoenix observability
phoenix:
image: arizephoenix/phoenix:latest
container_name: phoenix
ports:
- "6006:6006"
- "4317:4317"
environment:
- PHOENIX_PORT=6006
- PHOENIX_GRPC_PORT=4317
networks:
- api-network
# Lus Laboris API
api:
build: .
container_name: lus-laboris-api
ports:
- "8000:8000"
env_file:
- .env
environment:
- API_QDRANT_URL=http://qdrant:6333
- API_PHOENIX_ENDPOINT=http://phoenix:6006
- API_PHOENIX_GRPC_ENDPOINT=phoenix:4317
depends_on:
- qdrant
- phoenix
networks:
- api-network
volumes:
qdrant_storage:
networks:
api-network:
driver: bridge
Iniciar stack completo:
docker-compose up -d
# Ver logs
docker-compose logs -f api
# Verificar health
curl http://localhost:8000/api/health
4. Build y Push a Docker Hub (Script Bash)
El proyecto incluye un script automatizado para construir y publicar la imagen en Docker Hub:
# src/lus_laboris_api/docker_build_push.sh
#!/bin/bash
set -e
# Load variables from .env (two levels up)
if [[ -f "../../.env" ]]; then
set -o allexport
source ../../.env
set +o allexport
else
echo "⚠️ WARNING: No se encontró .env, usando variables de entorno del sistema"
fi
# Validate required variables
if [[ -z "$DOCKER_HUB_USERNAME" || -z "$DOCKER_HUB_PASSWORD" || -z "$DOCKER_IMAGE_NAME_RAG_API" ]]; then
echo "❌ ERROR: Asegurate de definir DOCKER_HUB_USERNAME, DOCKER_HUB_PASSWORD e DOCKER_IMAGE_NAME_RAG_API en .env"
exit 1
fi
# Login to Docker Hub
echo "$DOCKER_HUB_PASSWORD" | docker login --username "$DOCKER_HUB_USERNAME" --password-stdin
# Define tags
DATE_TAG=$(date +%Y%m%d)
LATEST_TAG="latest"
# Build image
docker build -t "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG" .
docker tag "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG" "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"
# Push both images
docker push "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG"
docker push "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"
echo "✅ Imagenes subidas a Docker Hub:"
echo " $DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG"
echo " $DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"
Variables necesarias en .env
:
# Docker Hub Configuration
DOCKER_HUB_USERNAME=tu_usuario
DOCKER_HUB_PASSWORD=tu_password_o_token
DOCKER_IMAGE_NAME_RAG_API=lus-laboris-api
Ejecutar el script:
cd src/lus_laboris_api
chmod +x docker_build_push.sh
./docker_build_push.sh
Output:
Logging in to Docker Hub...
Building image...
[+] Building 45.2s (12/12) FINISHED
Tagging images...
Pushing jesusoviedo/lus-laboris-api:20241016...
Pushing jesusoviedo/lus-laboris-api:latest...
✅ Imagenes subidas a Docker Hub:
jesusoviedo/lus-laboris-api:20241016
jesusoviedo/lus-laboris-api:latest
Características del script:
- ✅ Auto-tagging: Genera tag con fecha (
20241016
) ylatest
- ✅ Validación: Verifica que las variables necesarias existan
- ✅ Idempotente: Puede ejecutarse múltiples veces sin problemas
- ✅ Secure: Lee credenciales de
.env
(no hardcodeadas)
5. GitHub Actions: Build Automático de Imagen
El workflow .github/workflows/docker-api-build-publish.yml
automatiza el build y push a Docker Hub:
name: Build & Publish Docker Image (API)
on:
workflow_dispatch: # Trigger manual
push:
paths: # Trigger automático cuando cambian estos archivos
- src/lus_laboris_api/Dockerfile
- src/lus_laboris_api/pyproject.toml
- src/lus_laboris_api/uv.lock
- src/lus_laboris_api/api/**
jobs:
build-and-push-api:
runs-on: ubuntu-latest
env:
DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
DOCKER_HUB_PASSWORD: ${{ secrets.DOCKER_HUB_PASSWORD }}
DOCKER_IMAGE_NAME_RAG_API: ${{ vars.DOCKER_IMAGE_NAME_RAG_API }}
steps:
- name: Checkout code
uses: actions/checkout@v5
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ env.DOCKER_HUB_USERNAME }}
password: ${{ env.DOCKER_HUB_PASSWORD }}
- name: Get date tag
id: date_tag
run: echo "tag=$(date +'%Y%m%d')" >> $GITHUB_OUTPUT
- name: Build and push Docker image
uses: docker/build-push-action@v6
with:
context: ./src/lus_laboris_api
push: true
tags: |
${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:latest
${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:${{ steps.date_tag.outputs.tag }}
- name: Output image details
run: |
echo "✅ Docker image built and pushed successfully!"
echo "Image: ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:latest"
echo "Image: ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:${{ steps.date_tag.outputs.tag }}"
Secrets necesarios en GitHub:
-
DOCKER_HUB_USERNAME
: Tu usuario de Docker Hub -
DOCKER_HUB_PASSWORD
: Token de acceso de Docker Hub
Variables necesarias en GitHub:
-
DOCKER_IMAGE_NAME_RAG_API
: Nombre de la imagen (ej:lus-laboris-api
)
Triggers:
- ✅ Manual: Via GitHub Actions UI (workflow_dispatch)
- ✅ Automático: Cuando cambias
Dockerfile
,pyproject.toml
,uv.lock
o código enapi/
6. GitHub Actions: Deploy a Cloud Run con Secrets
El workflow .github/workflows/update-api-secrets-deploy.yml
actualiza secrets y despliega a Cloud Run:
name: Update API Secrets & Deploy
on:
workflow_dispatch:
jobs:
update-and-deploy:
name: Update Secrets & Deploy API
runs-on: ubuntu-latest
env:
# Secrets from GitHub
GSA_KEY: ${{ secrets.GSA_KEY }}
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_REGION: ${{ secrets.GCP_REGION }}
API_ENV_FILE: ${{ secrets.API_ENV_FILE }}
JWT_PUBLIC_KEY: ${{ secrets.JWT_PUBLIC_KEY }}
DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
# Variables from GitHub
GCP_CLOUD_RUN_API_SERVICE_NAME: ${{ vars.GCP_CLOUD_RUN_API_SERVICE_NAME }}
DOCKER_IMAGE_NAME_RAG_API: ${{ vars.DOCKER_IMAGE_NAME_RAG_API }}
GCP_CLOUD_RUN_API_IMAGE_TAG: ${{ vars.GCP_CLOUD_RUN_API_IMAGE_TAG }}
GCP_CLOUD_SECRETS_UPDATE: ${{ vars.GCP_CLOUD_SECRETS_UPDATE }}
GCP_CLOUD_RUN_API_CPU: ${{ vars.GCP_CLOUD_RUN_API_CPU }}
GCP_CLOUD_RUN_API_MEMORY: ${{ vars.GCP_CLOUD_RUN_API_MEMORY }}
GCP_CLOUD_RUN_API_MIN_INSTANCES: ${{ vars.GCP_CLOUD_RUN_API_MIN_INSTANCES }}
GCP_CLOUD_RUN_API_MAX_INSTANCES: ${{ vars.GCP_CLOUD_RUN_API_MAX_INSTANCES }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Authenticate to GCP
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ env.GSA_KEY }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v2
- name: Update Secrets in Secret Manager
if: ${{ env.GCP_CLOUD_SECRETS_UPDATE == 'true' }}
run: |
echo "📝 Updating secrets in Secret Manager..."
# Update .env file secret
echo "${API_ENV_FILE}" | \
gcloud secrets versions add ${GCP_CLOUD_SECRETS_API_ENV_ID} \
--data-file=- \
--project=${GCP_PROJECT_ID}
echo "✅ .env secret updated"
# Update JWT public key secret
echo "${JWT_PUBLIC_KEY}" | \
gcloud secrets versions add ${GCP_CLOUD_SECRETS_JWT_KEY_ID} \
--data-file=- \
--project=${GCP_PROJECT_ID}
echo "✅ JWT public key secret updated"
- name: Deploy API to Cloud Run
run: |
IMAGE="${DOCKER_HUB_USERNAME}/${DOCKER_IMAGE_NAME_RAG_API}:${GCP_CLOUD_RUN_API_IMAGE_TAG}"
echo "🚀 Deploying ${IMAGE} to ${GCP_CLOUD_RUN_API_SERVICE_NAME}..."
gcloud run deploy ${GCP_CLOUD_RUN_API_SERVICE_NAME} \
--image=${IMAGE} \
--region=${GCP_REGION} \
--project=${GCP_PROJECT_ID} \
--port=8000 \
--set-env-vars="API_HOST=0.0.0.0,API_PORT=8000,API_RELOAD=false,API_JWT_PUBLIC_KEY_PATH=/app/secrets/jwt/public_key.pem,API_ENV_FILE_PATH=/app/secrets/env/.env" \
--update-secrets="/app/secrets/env/.env=${GCP_CLOUD_SECRETS_API_ENV_ID}:latest,/app/secrets/jwt/public_key.pem=${GCP_CLOUD_SECRETS_JWT_KEY_ID}:latest" \
--cpu=${GCP_CLOUD_RUN_API_CPU} \
--memory=${GCP_CLOUD_RUN_API_MEMORY} \
--min-instances=${GCP_CLOUD_RUN_API_MIN_INSTANCES} \
--max-instances=${GCP_CLOUD_RUN_API_MAX_INSTANCES} \
--timeout=300 \
--no-cpu-throttling \
--allow-unauthenticated
echo "✅ API deployed successfully!"
- name: Get Service URL
run: |
URL=$(gcloud run services describe ${GCP_CLOUD_RUN_API_SERVICE_NAME} \
--region=${GCP_REGION} \
--format='value(status.url)' \
--project=${GCP_PROJECT_ID})
echo "🌐 Service URL: ${URL}"
echo "📋 Health Check: ${URL}/api/health"
echo "📊 Swagger UI: ${URL}/docs"
Secrets necesarios en GitHub:
Secret | Descripción |
---|---|
GSA_KEY |
Service Account JSON key para GCP |
GCP_PROJECT_ID |
ID del proyecto GCP |
GCP_REGION |
Región de Cloud Run (ej: us-central1 ) |
API_ENV_FILE |
Contenido completo del archivo .env
|
JWT_PUBLIC_KEY |
Clave pública RSA para JWT |
DOCKER_HUB_USERNAME |
Usuario de Docker Hub |
Variables necesarias en GitHub:
Variable | Descripción | Ejemplo |
---|---|---|
GCP_CLOUD_RUN_API_SERVICE_NAME |
Nombre del servicio Cloud Run | lus-laboris-api |
DOCKER_IMAGE_NAME_RAG_API |
Nombre de la imagen Docker | lus-laboris-api |
GCP_CLOUD_RUN_API_IMAGE_TAG |
Tag de la imagen a deployar |
latest o 20241016
|
GCP_CLOUD_SECRETS_UPDATE |
Actualizar secrets antes de deploy |
true o false
|
GCP_CLOUD_RUN_API_CPU |
CPUs asignadas | 2 |
GCP_CLOUD_RUN_API_MEMORY |
Memoria asignada | 2Gi |
GCP_CLOUD_RUN_API_MIN_INSTANCES |
Instancias mínimas (warm start) | 1 |
GCP_CLOUD_RUN_API_MAX_INSTANCES |
Instancias máximas (auto-scaling) | 10 |
Flujo completo de deployment:
- Actualizar código → Push a GitHub
-
Build automático → Workflow
docker-api-build-publish.yml
se ejecuta -
Nueva imagen → Subida a Docker Hub con tag
latest
y20241016
-
Actualizar variables → Cambiar
GCP_CLOUD_RUN_API_IMAGE_TAG
a20241016
en GitHub -
Deploy manual → Ejecutar workflow
update-api-secrets-deploy.yml
desde GitHub Actions UI -
Secrets updated →
.env
yJWT public key
actualizados en Secret Manager - Deploy to Cloud Run → Nueva versión desplegada
- Verification → Verificar health check en la URL del servicio
Ejemplo de ejecución:
🚀 Running workflow: Update API Secrets & Deploy
📝 Updating secrets in Secret Manager...
✅ .env secret updated
✅ JWT public key secret updated
🚀 Deploying jesusoviedo/lus-laboris-api:20241016 to lus-laboris-api...
Deploying container to Cloud Run service [lus-laboris-api]...
✓ Deploying new service... Done.
✓ Creating Revision...
✓ Routing traffic...
✓ Setting IAM Policy...
✅ API deployed successfully!
🌐 Service URL: https://lus-laboris-api-abc123-uc.a.run.app
📋 Health Check: https://lus-laboris-api-abc123-uc.a.run.app/api/health
📊 Swagger UI: https://lus-laboris-api-abc123-uc.a.run.app/docs
Verificación post-deployment:
# Health check
curl https://lus-laboris-api-abc123-uc.a.run.app/api/health
# Response:
# {
# "success": true,
# "service": "lus-laboris-api",
# "status": "healthy",
# "dependencies": {
# "qdrant": "connected",
# "embedding_service": "healthy",
# "rag_service": "healthy"
# },
# "uptime_seconds": 45.2
# }
Ventajas del workflow:
- ✅ Automated secrets management: Secrets viven en GitHub Secrets, no en código
- ✅ Zero-downtime deployment: Cloud Run hace rolling update
- ✅ Rollback fácil: Cambiar
IMAGE_TAG
a versión anterior y re-ejecutar - ✅ Configurable scaling: Min/max instances según carga esperada
- ✅ Cost optimization:
min-instances=1
para evitar cold starts,max-instances=10
para limitar costo - ✅ No CPU throttling:
--no-cpu-throttling
para performance consistente
📊 Documentación Automática
FastAPI genera documentación interactiva automáticamente:
Swagger UI
Acceder a: http://localhost:8000/docs
Características:
- ✅ Try it out: Ejecutar requests directamente desde el browser
- ✅ Schema visualization: Ver modelos de request/response
- ✅ Authentication: Ingresar JWT token para endpoints protegidos
- ✅ Examples: Ver ejemplos de payloads
- ✅ Response codes: Documentación de todos los códigos HTTP
ReDoc
Acceder a: http://localhost:8000/redoc
Características:
- ✅ Clean UI: Interfaz más limpia para lectura
- ✅ Full schema: Toda la documentación en una página
- ✅ Search: Buscar endpoints y schemas
- ✅ Export: Descargar OpenAPI spec como JSON
OpenAPI JSON
Acceder a: http://localhost:8000/openapi.json
Spec completo en formato JSON para:
- Generación de clientes (Python, TypeScript, etc.)
- Integración con herramientas de testing
- Importación a Postman/Insomnia
🎯 Casos de Uso Reales
Para Desarrolladores Frontend:
"Necesito una API clara y bien documentada para mi aplicación web"
Solución: Swagger UI + modelos Pydantic
// Client TypeScript auto-generado desde OpenAPI spec
const response = await fetch('http://api.luslaboris.com/api/rag/ask', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
question: '¿Cuántos días de vacaciones?'
})
});
const data = await response.json();
console.log(data.answer);
Para Mobile Developers:
"Necesito rate limiting para prevenir abuso"
Solución: SlowAPI con límites por IP
# 10 requests por minuto por IP
@limiter.limit("10/minute")
async def ask_question(request: Request, ...):
...
# Response cuando se excede el límite:
# HTTP 429 Too Many Requests
# Retry-After: 42
Para Científicos de Datos:
"Necesito experimentar con diferentes modelos LLM sin cambiar código"
Solución: Configuración por variables de entorno
# Probar con OpenAI GPT-4
export API_LLM_PROVIDER=openai
export API_LLM_MODEL=gpt-4
# Probar con Gemini
export API_LLM_PROVIDER=gemini
export API_LLM_MODEL=gemini-1.5-flash
# Activar reranking
export API_USE_RERANKING=true
Para DevOps:
"Necesito health checks para monitoring y auto-scaling"
Solución: Múltiples endpoints de health
# Health check básico (para load balancer)
curl http://api/health
# {"status": "healthy"}
# Health detallado (para alerting)
curl -H "Authorization: Bearer $JWT" http://api/status
# {
# "status": "healthy",
# "services": {
# "qdrant": {"status": "connected", "collections": 3},
# "embeddings": {"status": "healthy", "models_loaded": ["e5-small"]},
# ...
# }
# }
🚀 El Impacto Transformador
Antes de FastAPI:
- ⏱️ Desarrollo lento: Escribir rutas, validación, docs manualmente
- 🐛 Errores frecuentes: Sin validación automática de tipos
- 📝 Docs desactualizados: Documentación manual sin sincronización
- 🔒 Seguridad débil: Auth implementado ad-hoc
- 📊 Sin observabilidad: Debugging a ciegas
Después de FastAPI:
- ⚡ Desarrollo rápido: Type hints = validación + docs automáticas
- ✅ Menos bugs: Validación en runtime + IDE autocomplete
- 📝 Docs siempre actualizados: OpenAPI generado del código
- 🔒 Seguridad robusta: JWT + dependencies + middleware
- 📊 Observabilidad completa: Phoenix + OpenTelemetry integrados
🔧 Características Técnicas Destacadas
1. Async/Await Performance
# ❌ Síncrono - bloquea el thread
def answer_question(query: str):
embedding = generate_embedding(query) # Bloquea
documents = search_qdrant(embedding) # Bloquea
answer = call_openai(documents) # Bloquea
return answer
# ✅ Asíncrono - non-blocking
async def answer_question(query: str):
embedding = await generate_embedding(query) # No bloquea
documents = await search_qdrant(embedding) # No bloquea
answer = await call_openai(documents) # No bloquea
return answer
# Mejora: 3-10x más throughput bajo carga
2. Dependency Injection
# Compartir conexiones entre requests
@lru_cache()
def get_qdrant_service():
return QdrantService() # Singleton
# Usar en endpoint
@router.get("/search")
async def search(
query: str,
qdrant: QdrantService = Depends(get_qdrant_service)
):
return qdrant.search(query)
# Beneficio: Una conexión Qdrant para toda la app
3. Middleware Stack
# Orden de ejecución (top-down en request, bottom-up en response):
# 1. TrustedHostMiddleware - valida host header
# 2. CORSMiddleware - maneja CORS
# 3. Custom exception handlers
# 4. Router logic
4. Background Tasks
@router.post("/process")
async def process_data(
background_tasks: BackgroundTasks
):
# Response inmediata
background_tasks.add_task(long_running_task)
return {"status": "processing"}
# Cliente recibe respuesta rápida
# Task se ejecuta después de enviar response
📊 Métricas de Rendimiento
Latency:
- Health check: 5-10ms
-
RAG query (sin LLM): 80-150ms
- Embedding: 30ms
- Qdrant search (gRPC): 30ms
- Reranking: 20ms
-
RAG query (con LLM): 1-3 segundos
- Retrieval: 150ms
- LLM generation: 800-2500ms
Throughput:
- Health endpoints: >1000 req/s
- RAG endpoints: 50-100 req/s (limitado por LLM API)
- Vectorstore load: 100 docs/segundo
Escalabilidad:
- Horizontal scaling: Stateless - escala linealmente
- Vertical scaling: CPU-bound en embeddings, I/O-bound en LLM calls
- Cloud Run: Auto-scaling de 0 a N instancias
💡 Lecciones Aprendidas
1. Async es Crítico para I/O-Bound
Las llamadas a LLM APIs pueden tomar 1-3 segundos. Con async, el thread puede atender otros requests mientras espera.
2. Pydantic v2 es Mucho Más Rápido
Pydantic v2 (core en Rust) es 5-50x más rápido que v1. Crucial para alta carga.
3. Rate Limiting es No Negociable
Sin rate limiting, un atacante puede agotar tus créditos de OpenAI en minutos.
4. Health Checks Deben Ser Granulares
Un único /health
no es suficiente. Necesitas health checks por servicio para debugging efectivo.
5. Dependency Injection Simplifica Testing
# Mock fácil de servicios en tests
def get_mock_qdrant():
return MockQdrantService()
app.dependency_overrides[get_qdrant_service] = get_mock_qdrant
6. Background Tasks para Long-Running Operations
Nunca bloquees un request HTTP por minutos. Usa background tasks y polling/webhooks para status.
🎯 El Propósito Más Grande
FastAPI no es solo un framework web - es el gateway que hace accesible nuestro sistema RAG al mundo. Al proporcionar:
- 🌐 Accesibilidad: Cualquier cliente HTTP puede consumir la API
- 🔒 Seguridad: JWT + CORS + rate limiting = producción-ready
- 📊 Observabilidad: Cada request trackeado con Phoenix
- 📝 Documentación: Swagger UI = onboarding sin fricción
- ⚡ Performance: Async + gRPC = respuestas en tiempo real
- 🔄 Escalabilidad: Stateless = escala horizontalmente sin límites
Estamos democratizando el acceso a conocimiento legal con una API REST de clase mundial, comparable a las APIs de las mejores empresas tecnológicas del mundo.
🔗 Recursos y Enlaces
Repositorio del Proyecto
- GitHub: lus-laboris-py
Documentación Técnica
-
Main App:
src/lus_laboris_api/api/main.py
-
Config:
src/lus_laboris_api/api/config.py
-
RAG Endpoint:
src/lus_laboris_api/api/endpoints/rag.py
-
RAG Service:
src/lus_laboris_api/api/services/rag_service.py
-
JWT Handler:
src/lus_laboris_api/api/auth/jwt_handler.py
-
README API:
src/lus_laboris_api/README.md
Recursos Externos
- FastAPI Docs: fastapi.tiangolo.com
- Pydantic Docs: docs.pydantic.dev
- Uvicorn Docs: uvicorn.org
- Starlette Docs: starlette.io
Próximo Post: LLPY-07 - Integrando LLMs: OpenAI y Google Gemini
En el siguiente post profundizaremos en la integración con múltiples providers de LLM, el sistema de fallback automático, prompt engineering para consultas legales, y comparación de modelos.
Top comments (0)