DEV Community

Cover image for LLPY-06: FastAPI - Construyendo una API REST Robusta para RAG
Jesus Oviedo Riquelme
Jesus Oviedo Riquelme

Posted on

LLPY-06: FastAPI - Construyendo una API REST Robusta para RAG

🎯 El Desafío de Construir una API de Producción

Imagina que tienes un sistema RAG completo funcionando:

  • 413 artículos legales vectorizados en Qdrant
  • Embeddings con modelos de última generación
  • LLMs configurados (OpenAI y Gemini)
  • Reranking para mejorar precisión

Ahora el verdadero desafío: ¿cómo expones esta funcionalidad al mundo de forma segura, escalable y mantenible?

Necesitas una API que:

  • 🔒 Proteja los endpoints con autenticación JWT
  • Responda en milisegundos bajo alta carga
  • 📊 Documente automáticamente sus endpoints
  • 🔍 Monitoree cada request con observabilidad completa
  • 🛡️ Valide inputs/outputs con type safety
  • 🔄 Maneje errores gracefully con retry logic
  • 📈 Escale horizontalmente sin fricción

📊 La Magnitud del Problema

Requisitos de una API de Producción para RAG

  1. 🏗️ Arquitectura Modular: Separación clara de responsabilidades
  2. 🔐 Seguridad: JWT con RSA, CORS, rate limiting
  3. 📝 Documentación Automática: OpenAPI/Swagger sin esfuerzo manual
  4. ⚙️ Configuración Flexible: Variables de entorno para todos los servicios
  5. 🎯 Validación Estricta: Pydantic models para requests/responses
  6. 🔄 Orquestación Compleja: Coordinar 6+ servicios (Qdrant, embeddings, LLM, reranking, Phoenix, GCP)
  7. 📊 Observabilidad: Tracing completo con OpenTelemetry
  8. 🚀 Performance: Async/await para operaciones I/O bound
  9. 🛡️ Resiliencia: Retry logic, timeouts, circuit breakers
  10. 📦 Deployment-ready: Docker, health checks, logs estructurados

Desafíos Técnicos Específicos

  1. 🔍 Dependency Injection: Gestionar servicios singleton compartidos
  2. ⏱️ Rate Limiting: Prevenir abuso sin afectar usuarios legítimos
  3. 🔒 Autenticación Flexible: Endpoints públicos + privados
  4. 📊 Middleware Stack: CORS, TrustedHost, exception handlers
  5. 🎯 Validación Multinivel: Request body, query params, headers
  6. 🔄 Lifecycle Management: Inicialización y cierre graceful de servicios

💡 La Solución: FastAPI Framework

FastAPI es un framework moderno de Python que ofrece:

  • Performance extrema: Basado en Starlette (async) y Pydantic (validación)
  • 📝 Documentación automática: OpenAPI + Swagger UI out-of-the-box
  • 🎯 Type hints nativos: Validación y autocompletado con Python 3.13+
  • 🔄 Async-first: Soporte nativo para async/await
  • 🛡️ Dependency injection: Sistema robusto y flexible
  • 📊 Standards-based: OpenAPI, JSON Schema, OAuth2

¿Por Qué FastAPI vs Otras Opciones?

Característica FastAPI Flask Django REST Express.js
Performance ⚡⚡⚡ ⚡⚡ ⚡⚡⚡
Type Safety ⚠️
Async Native ⚠️ ⚠️
Auto Docs ⚠️
Data Validation ✅ Pydantic ✅ Serializers
Learning Curve Baja Muy Baja Alta Baja
Ecosystem Creciendo Maduro Muy Maduro Muy Maduro
Ideal para APIs modernas Prototipos Apps full-stack Node.js devs

Nuestra elección: FastAPI por su balance perfecto entre performance, developer experience y features de producción.

🏗️ Arquitectura de la API

Estructura de Carpetas

src/lus_laboris_api/
├── api/
│   ├── main.py                 # Application factory, middleware
│   ├── config.py               # Settings con Pydantic
│   │
│   ├── auth/                   # Autenticación
│   │   ├── jwt_handler.py      # Validación JWT
│   │   └── dependencies.py     # Auth dependencies
│   │
│   ├── endpoints/              # Routers por dominio
│   │   ├── health.py           # Health checks
│   │   ├── status.py           # Status y root
│   │   ├── rag.py              # RAG queries
│   │   └── vectorstore.py      # Gestión de colecciones
│   │
│   ├── models/                 # Pydantic models
│   │   ├── requests.py         # Request schemas
│   │   └── responses.py        # Response schemas
│   │
│   └── services/               # Business logic
│       ├── rag_service.py      # Orquestación RAG
│       ├── qdrant_service.py   # Qdrant client
│       ├── embedding_service.py # Embeddings
│       ├── reranking_service.py # Reranking
│       ├── evaluation_service.py # LLM evaluations
│       ├── phoenix_service.py  # Observabilidad
│       └── gcp_service.py      # GCP integration
│
├── Dockerfile                  # Container para API
├── docker-compose.yml          # Stack completo (API + Qdrant + Phoenix)
├── start_api_dev.sh            # Script de desarrollo
└── pyproject.toml              # Dependencias con UV
Enter fullscreen mode Exit fullscreen mode

Principios de diseño:

  • Separación de concerns: Routers → Services → External APIs
  • Single Responsibility: Cada servicio tiene una responsabilidad clara
  • Dependency Injection: Servicios singleton compartidos
  • Configuration as Code: Todo configurable vía environment variables

🚀 Configuración y Setup

1. Settings con Pydantic

El corazón de la configuración es config.py usando pydantic-settings:

# src/lus_laboris_api/api/config.py

from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """Application settings with environment variable support"""

    # API Configuration
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    api_reload: bool = False
    api_log_level: str = "info"

    # Security
    api_allowed_origins: list[str] = Field(default=["*"])
    api_allowed_hosts: list[str] = Field(default=["*"])
    api_jwt_public_key_path: str = None
    api_jwt_aud: str = "lus-laboris-client"
    api_jwt_iss: str = "lus-laboris-api"

    # Qdrant Configuration
    api_qdrant_url: str = None
    api_qdrant_api_key: str | None = None
    api_qdrant_collection_name: str = None
    api_qdrant_grpc_port: int = 6334
    api_qdrant_prefer_grpc: bool = True  # 2-3x más rápido

    # Embedding Configuration
    api_embedding_model: str = None
    api_embedding_batch_size: int = 100

    # Reranking Configuration
    api_reranking_model: str = None
    api_use_reranking: bool = False

    # RAG Configuration
    api_rag_top_k: int = None
    api_llm_provider: str = None  # 'openai' o 'gemini'
    api_llm_model: str = None

    # LLM API Keys
    openai_api_key: str | None = None
    gemini_api_key: str | None = None

    # Rate Limiting
    api_rate_limit_requests: int = 10
    api_rate_limit_window: str = "1 minute"

    # Phoenix Monitoring
    api_phoenix_enabled: bool = True
    api_phoenix_endpoint: str | None = None
    api_phoenix_api_key: str | None = None
    api_phoenix_project_name: str = "lus-laboris-api"

    # Environment
    api_environment: str = "development"

    class Config:
        # Resolución automática de .env
        project_root = Path(__file__).parent.parent.parent.parent
        env_file = project_root / ".env"
        env_file_encoding = "utf-8"
        case_sensitive = False
        extra = "ignore"

# Singleton global
settings = Settings()
Enter fullscreen mode Exit fullscreen mode

Características clave:

  • Type safety: Validación automática de tipos
  • Defaults sensibles: Valores por defecto para desarrollo
  • Path resolution: Rutas relativas al proyecto root
  • Flexible .env: Soporta múltiples archivos .env
  • Secrets management: API keys cargadas de forma segura

2. Application Factory con Lifespan

El main.py define el ciclo de vida completo de la aplicación:

# src/lus_laboris_api/api/main.py

import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

from .config import settings
from .endpoints import health, rag, status, vectorstore
from .services.embedding_service import embedding_service
from .services.qdrant_service import qdrant_service
from .services.rag_service import rag_service
from .services.evaluation_service import evaluation_service

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager - startup and shutdown logic"""

    # === STARTUP ===
    logger.info("Starting Lus Laboris API...")

    try:
        # 1. Initialize Qdrant connection
        qdrant_status = qdrant_service.health_check()
        logger.info(f"Qdrant status: {qdrant_status.get('status')}")

        if qdrant_status.get('status') != 'healthy':
            logger.warning("Qdrant is not healthy, but API will continue")

        # 2. Initialize embedding service (load models)
        embedding_status = embedding_service.health_check()
        logger.info(f"Embedding service status: {embedding_status.get('status')}")

        # 3. Initialize RAG service
        rag_status = rag_service.health_check()
        logger.info(f"RAG service status: {rag_status.get('status')}")

        # 4. Initialize evaluation service (async)
        eval_status = evaluation_service.health_check()
        logger.info(f"Evaluation service status: {eval_status.get('status')}")

        logger.info("All services initialized successfully")

    except Exception as e:
        logger.exception("Failed to initialize services")
        # Continue anyway - health endpoints will report failures

    yield  # API runs here

    # === SHUTDOWN ===
    logger.info("Shutting down Lus Laboris API...")

    try:
        # Graceful shutdown of evaluation service
        evaluation_service.shutdown()
        logger.info("Evaluation service shut down successfully")
    except Exception as e:
        logger.exception("Error shutting down evaluation service")

# Create FastAPI application
app = FastAPI(
    title="Lus Laboris API",
    description="API for semantic search and retrieval of Paraguayan labor law information",
    version="1.0.0",
    docs_url="/docs",          # Swagger UI
    redoc_url="/redoc",        # ReDoc
    openapi_url="/openapi.json",
    lifespan=lifespan,
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.api_allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Add trusted host middleware
app.add_middleware(
    TrustedHostMiddleware, 
    allowed_hosts=settings.api_allowed_hosts
)

# Include routers
app.include_router(status.router)
app.include_router(health.router)
app.include_router(vectorstore.router)
app.include_router(rag.router)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        "main:app",
        host=settings.api_host,
        port=settings.api_port,
        reload=settings.api_reload,
        log_level=settings.api_log_level,
    )
Enter fullscreen mode Exit fullscreen mode

Ventajas del lifespan:

  • Startup previsible: Inicializar servicios en orden correcto
  • Health check temprano: Detectar problemas antes de recibir requests
  • Graceful shutdown: Cerrar conexiones y threads correctamente
  • Resource management: Carga de modelos ML solo una vez

🔐 Autenticación JWT con RSA

1. Validador JWT

La autenticación usa JWT firmados con RSA (algoritmo RS256):

# src/lus_laboris_api/api/auth/jwt_handler.py

import logging
import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from ..config import settings

logger = logging.getLogger(__name__)

class JWTValidator:
    """JWT validator using RSA public key"""

    def __init__(self):
        self.public_key = None
        self.algorithm = "RS256"
        self._load_public_key()

    def _load_public_key(self):
        """Load RSA public key for token validation"""
        public_key_path = settings.api_jwt_public_key_path

        # Resolve relative paths
        if not os.path.isabs(public_key_path):
            project_root = Path(__file__).parent.parent.parent.parent
            public_key_path = project_root / public_key_path

        try:
            with open(public_key_path, 'rb') as f:
                self.public_key = serialization.load_pem_public_key(
                    f.read(), 
                    backend=default_backend()
                )
            logger.info(f"JWT public key loaded from {public_key_path}")
        except FileNotFoundError:
            logger.error(f"JWT public key not found at {public_key_path}")
            raise ValueError("JWT public key not found")

    def validate_token(self, token: str) -> dict[str, Any]:
        """Validate JWT token and return payload"""
        if not self.public_key:
            raise ValueError("Public key not available")

        try:
            payload = jwt.decode(
                token,
                self.public_key,
                algorithms=[self.algorithm],
                audience=settings.api_jwt_aud,  # 'lus-laboris-client'
                issuer=settings.api_jwt_iss,    # 'lus-laboris-api'
                options={
                    'verify_exp': True,   # Verificar expiración
                    'verify_iat': True,   # Verificar issued_at
                    'verify_aud': True,   # Verificar audience
                    'verify_iss': True,   # Verificar issuer
                },
            )

            logger.info(f"JWT validated for: {payload.get('sub', 'unknown')}")
            return payload

        except jwt.ExpiredSignatureError:
            logger.warning("JWT token expired")
            raise ValueError("Token expired")
        except jwt.InvalidAudienceError:
            logger.warning("Invalid JWT audience")
            raise ValueError(f"Invalid audience. Expected: {settings.api_jwt_aud}")
        except jwt.InvalidIssuerError:
            logger.warning("Invalid JWT issuer")
            raise ValueError(f"Invalid issuer. Expected: {settings.api_jwt_iss}")
        except jwt.InvalidTokenError as e:
            logger.warning(f"Invalid JWT token: {e}")
            raise ValueError(f"Invalid token: {e}")

# Singleton global
jwt_validator = JWTValidator()
Enter fullscreen mode Exit fullscreen mode

2. Dependencies para Autenticación

# src/lus_laboris_api/api/auth/dependencies.py

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Any
from .jwt_handler import jwt_validator

# Security scheme para Swagger UI
security = HTTPBearer()

def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict[str, Any]:
    """Dependency para endpoints que requieren autenticación"""
    token = credentials.credentials

    try:
        payload = jwt_validator.validate_token(token)
        return payload
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        )

def optional_auth(
    credentials: HTTPAuthorizationCredentials | None = Depends(
        HTTPBearer(auto_error=False)
    )
) -> dict[str, Any] | None:
    """Dependency para endpoints con autenticación opcional"""
    if not credentials:
        return None

    try:
        payload = jwt_validator.validate_token(credentials.credentials)
        return payload
    except ValueError:
        return None  # Silencioso - no bloquea acceso
Enter fullscreen mode Exit fullscreen mode

Uso en endpoints:

from fastapi import Depends
from .auth.dependencies import get_current_user, optional_auth

# Endpoint protegido (requiere JWT)
@router.get("/admin/metrics")
async def admin_metrics(
    token_payload: dict = Depends(get_current_user)
):
    user = token_payload.get('sub')
    return {"metrics": "...", "requested_by": user}

# Endpoint con auth opcional (más info si está autenticado)
@router.get("/health/detailed")
async def detailed_health(
    token_payload: dict | None = Depends(optional_auth)
):
    is_authenticated = token_payload is not None

    if is_authenticated:
        return {"status": "healthy", "details": "..."}
    else:
        return {"status": "healthy"}  # Info limitada
Enter fullscreen mode Exit fullscreen mode

📝 Modelos Pydantic

Request Models

# src/lus_laboris_api/api/models/requests.py

from pydantic import BaseModel, Field

class QuestionRequest(BaseModel):
    """Request model for RAG queries"""

    question: str = Field(
        ...,
        description="Question about Paraguayan labor law",
        min_length=5,
        max_length=1000,
    )

    class Config:
        json_schema_extra = {
            "example": {
                "question": "¿Cuáles son los derechos del trabajador en caso de despido?"
            }
        }

class LoadToVectorstoreLocalRequest(BaseModel):
    """Request for loading data from local files"""

    filename: str = Field(
        ..., 
        description="JSON file name (without path)",
        min_length=1
    )

    local_data_path: str | None = Field(
        None,
        description="Path to data directory (relative to project root). Default: 'data/processed'"
    )

    replace_collection: bool = Field(
        False,
        description="Replace collection if exists"
    )

    class Config:
        json_schema_extra = {
            "example": {
                "filename": "codigo_trabajo_articulos.json",
                "local_data_path": "data/processed",
                "replace_collection": False
            }
        }
Enter fullscreen mode Exit fullscreen mode

Response Models

# src/lus_laboris_api/api/models/responses.py

from pydantic import BaseModel, Field
from typing import Any

class BaseResponse(BaseModel):
    """Base response model"""
    success: bool
    message: str

class QuestionResponse(BaseResponse):
    """Response model for RAG queries"""

    question: str
    answer: str | None = None
    error: str | None = None
    processing_time_seconds: float
    documents_retrieved: int | None = None
    top_k: int | None = None
    reranking_applied: bool | None = None
    documents: list[dict[str, Any]] | None = None
    session_id: str | None = None

    class Config:
        json_schema_extra = {
            "example": {
                "success": True,
                "message": "Question answered successfully",
                "question": "¿Cuántos días de vacaciones corresponden?",
                "answer": "Según el Artículo 218...",
                "processing_time_seconds": 1.234,
                "documents_retrieved": 5,
                "top_k": 5,
                "reranking_applied": True,
                "documents": [...],
                "session_id": "session_20241016_123456"
            }
        }
Enter fullscreen mode Exit fullscreen mode

Beneficios de Pydantic:

  • Validación automática: Type checking en runtime
  • Documentación automática: OpenAPI schema generado
  • Serialization: JSON encoding/decoding automático
  • IDE support: Autocompletado y type hints
  • Error messages: Mensajes de error detallados y legibles

🎯 Endpoints Principales

1. Health Check Endpoint

# src/lus_laboris_api/api/endpoints/health.py

from fastapi import APIRouter, Depends
from ..auth.dependencies import optional_auth
from ..services.qdrant_service import qdrant_service
from ..services.embedding_service import embedding_service
from ..services.rag_service import rag_service

router = APIRouter(prefix="/api/health", tags=["Health"])

def _sanitize_health_response(
    status: dict[str, Any], 
    is_authenticated: bool
) -> dict[str, Any]:
    """Sanitize health response based on authentication"""
    if is_authenticated:
        return status  # Full details
    else:
        # Only return status for unauthenticated users
        return {"status": status.get("status")}

@router.get("/")
async def health_check():
    """Basic health check - always public"""
    return {
        "status": "healthy",
        "api": "Lus Laboris API",
        "version": "1.0.0"
    }

@router.get("/qdrant")
async def qdrant_health_check(
    token_payload: dict[str, Any] | None = Depends(optional_auth)
):
    """Qdrant health check with smart info filtering"""
    is_authenticated = token_payload is not None

    status = qdrant_service.health_check()

    return _sanitize_health_response(status, is_authenticated)

@router.get("/embeddings")
async def embeddings_health_check(
    token_payload: dict[str, Any] | None = Depends(optional_auth)
):
    """Embeddings service health check"""
    is_authenticated = token_payload is not None

    status = embedding_service.health_check()

    return _sanitize_health_response(status, is_authenticated)

@router.get("/rag")
async def rag_health_check(
    token_payload: dict[str, Any] | None = Depends(optional_auth)
):
    """RAG service health check"""
    is_authenticated = token_payload is not None

    status = rag_service.health_check()

    return _sanitize_health_response(status, is_authenticated)
Enter fullscreen mode Exit fullscreen mode

Ejemplo de respuesta sin autenticación:

{
  "status": "healthy"
}
Enter fullscreen mode Exit fullscreen mode

Ejemplo de respuesta con JWT:

{
  "status": "healthy",
  "url": "http://35.123.45.67:6333",
  "connection_type": "gRPC",
  "collections_count": 3,
  "collection": "labor_law_articles",
  "documents_count": 413
}
Enter fullscreen mode Exit fullscreen mode

2. RAG Query Endpoint

# src/lus_laboris_api/api/endpoints/rag.py

import logging
from fastapi import APIRouter, HTTPException, Request, status
from slowapi import Limiter
from slowapi.util import get_remote_address

from ..models.requests import QuestionRequest
from ..models.responses import QuestionResponse
from ..services.rag_service import rag_service
from ..services.phoenix_service import phoenix_service

logger = logging.getLogger(__name__)

# Rate limiter
limiter = Limiter(key_func=get_remote_address)

router = APIRouter(prefix="/api/rag", tags=["RAG"])

@router.post(
    "/ask",
    response_model=QuestionResponse,
    summary="Ask a question about Paraguayan labor law",
    description="Uses RAG (Retrieval-Augmented Generation) to answer legal questions"
)
@limiter.limit("10/minute")  # 10 requests per minute per IP
async def ask_question(
    request: Request,  # Required for rate limiting
    question_data: QuestionRequest,
) -> QuestionResponse:
    """
    Ask a question using RAG pipeline:
    1. Generate embedding for question
    2. Search relevant documents in Qdrant
    3. Optional: Rerank results
    4. Generate answer with LLM
    5. Track everything with Phoenix
    """
    session_id = None

    try:
        logger.info(f"Received question: {question_data.question[:100]}...")

        # Create monitoring session
        session_id = phoenix_service.create_session()

        # Answer using RAG service (async)
        result = await rag_service.answer_question(
            question_data.question, 
            session_id
        )

        # Build response
        response = QuestionResponse(
            success=result["success"],
            message="Question answered successfully" if result["success"] 
                    else "Failed to answer question",
            question=result["question"],
            answer=result.get("answer"),
            error=result.get("error"),
            processing_time_seconds=result["processing_time_seconds"],
            documents_retrieved=result.get("documents_retrieved"),
            top_k=result.get("top_k"),
            reranking_applied=result.get("reranking_applied"),
            documents=result.get("documents"),
            session_id=result.get("session_id"),
        )

        if result["success"]:
            logger.info(
                f"Question answered in {result['processing_time_seconds']:.3f}s "
                f"for session {session_id}"
            )
        else:
            logger.error(f"Failed to answer: {result.get('error')}")

        return response

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Internal server error: {e}"
        )
    finally:
        # End monitoring session
        if session_id:
            phoenix_service.end_session(session_id)
Enter fullscreen mode Exit fullscreen mode

Ejemplo de request:

curl -X POST "http://localhost:8000/api/rag/ask" \
  -H "Content-Type: application/json" \
  -d '{
    "question": "¿Cuántos días de vacaciones corresponden a un trabajador?"
  }'
Enter fullscreen mode Exit fullscreen mode

Ejemplo de response:

{
  "success": true,
  "message": "Question answered successfully",
  "question": "¿Cuántos días de vacaciones corresponden a un trabajador?",
  "answer": "Según el Artículo 218 del Código del Trabajo de Paraguay, todo trabajador que cumpla un año de trabajo continuo al servicio del mismo empleador tiene derecho a un período de vacaciones anuales remuneradas. La duración específica depende de varios factores establecidos en el código.",
  "processing_time_seconds": 2.145,
  "documents_retrieved": 5,
  "top_k": 5,
  "reranking_applied": true,
  "documents": [
    {
      "articulo_numero": 218,
      "articulo": "todo trabajador que cumpla un año...",
      "score": 0.912,
      "rerank_score": 0.987,
      "capitulo": "capitulo ii - de las vacaciones"
    }
  ],
  "session_id": "session_20241016_143022"
}
Enter fullscreen mode Exit fullscreen mode

3. Vectorstore Management Endpoint

# src/lus_laboris_api/api/endpoints/vectorstore.py

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from ..auth.dependencies import get_current_user
from ..models.requests import LoadToVectorstoreLocalRequest
from ..models.responses import BaseResponse

router = APIRouter(prefix="/api/vectorstore", tags=["Vectorstore"])

@router.post(
    "/load/local",
    response_model=BaseResponse,
    summary="Load data to vectorstore from local files",
    dependencies=[Depends(get_current_user)]  # Requires JWT
)
async def load_to_vectorstore_local(
    request: LoadToVectorstoreLocalRequest,
    background_tasks: BackgroundTasks,
    token_payload: dict = Depends(get_current_user),
):
    """
    Load legal documents to Qdrant from local JSON files.
    Requires authentication.

    Process:
    1. Load JSON from local filesystem
    2. Generate embeddings
    3. Create/update Qdrant collection
    4. Insert documents with metadata

    This is a long-running operation executed in background.
    """
    try:
        current_user = token_payload.get('sub', 'unknown')
        logger.info(f"Load request from user: {current_user}")

        # Generate unique job ID
        job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        # Run in background
        background_tasks.add_task(
            _load_to_vectorstore_background,
            job_id=job_id,
            request=request,
            current_user=current_user,
            token_payload=token_payload,
        )

        return BaseResponse(
            success=True,
            message=f"Loading job started with ID: {job_id}"
        )

    except Exception as e:
        logger.error(f"Failed to start loading job: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=str(e)
        )

async def _load_to_vectorstore_background(
    job_id: str,
    request: LoadToVectorstoreLocalRequest,
    current_user: str,
    token_payload: dict,
):
    """Background task for loading data to vectorstore"""
    logger.info(f"[{job_id}] Starting background load task for user: {current_user}")

    try:
        # 1. Load JSON data
        # 2. Generate embeddings
        # 3. Insert to Qdrant
        # ... (implementation details)

        logger.info(f"[{job_id}] Load completed successfully")

    except Exception as e:
        logger.error(f"[{job_id}] Load failed: {e}")
Enter fullscreen mode Exit fullscreen mode

⚙️ Servicios

RAGService - Orquestación del Pipeline

# src/lus_laboris_api/api/services/rag_service.py

import logging
import time
from typing import Any
from openai import AsyncOpenAI
from google import genai

from ..config import settings
from .embedding_service import embedding_service
from .qdrant_service import qdrant_service
from .reranking_service import reranking_service
from .phoenix_service import phoenix_service

logger = logging.getLogger(__name__)

class RAGService:
    """Service for RAG-based question answering"""

    def __init__(self):
        self.llm_provider = settings.api_llm_provider.lower()
        self.llm_model = settings.api_llm_model
        self.collection_name = settings.api_qdrant_collection_name
        self.top_k = settings.api_rag_top_k

        # Initialize LLM clients
        self._initialize_llm_clients()

    def _initialize_llm_clients(self):
        """Initialize async LLM clients"""
        if self.llm_provider == "openai":
            self.openai_client = AsyncOpenAI(
                api_key=settings.openai_api_key
            )
            logger.info("OpenAI async client initialized")
        elif self.llm_provider == "gemini":
            genai.configure(api_key=settings.gemini_api_key)
            logger.info("Gemini configured")

    async def answer_question(
        self, 
        query: str, 
        session_id: str
    ) -> dict[str, Any]:
        """
        Answer question using RAG pipeline:
        1. Retrieve relevant documents
        2. Build context from documents
        3. Generate answer with LLM
        4. Track everything with Phoenix
        """
        start_time = time.time()

        try:
            # 1. Retrieve documents (with optional reranking)
            documents, retrieval_metadata = self._retrieve_documents(
                query, session_id
            )

            if not documents:
                return {
                    "success": False,
                    "question": query,
                    "error": "No relevant documents found",
                    "processing_time_seconds": time.time() - start_time,
                }

            # 2. Build context from documents
            context = self._build_context(documents)

            # 3. Generate answer with LLM (async)
            answer = await self._generate_answer(query, context, session_id)

            # 4. Track complete RAG span
            phoenix_service.track_rag_complete(
                session_id=session_id,
                query=query,
                answer=answer,
                documents=documents,
                processing_time=time.time() - start_time,
            )

            return {
                "success": True,
                "question": query,
                "answer": answer,
                "documents_retrieved": len(documents),
                "top_k": self.top_k,
                "reranking_applied": settings.api_use_reranking,
                "documents": documents,
                "session_id": session_id,
                "processing_time_seconds": time.time() - start_time,
            }

        except Exception as e:
            logger.exception(f"Failed to answer question: {e}")
            return {
                "success": False,
                "question": query,
                "error": str(e),
                "processing_time_seconds": time.time() - start_time,
            }

    def _retrieve_documents(
        self, query: str, session_id: str
    ) -> tuple[list[dict], dict]:
        """Retrieve documents with embeddings and optional reranking"""

        # Generate embedding
        query_embedding = embedding_service.generate_single_embedding(
            query, model_name=settings.api_embedding_model
        )

        # Search in Qdrant
        search_limit = self.top_k * 2 if settings.api_use_reranking else self.top_k
        search_results = qdrant_service.search_documents(
            collection_name=self.collection_name,
            query_vector=query_embedding,
            limit=search_limit,
        )

        # Optional reranking
        if settings.api_use_reranking and search_results:
            reranked_docs, rerank_metadata = reranking_service.rerank_documents(
                query=query, 
                documents=search_results, 
                top_k=self.top_k
            )
            return reranked_docs, rerank_metadata

        return search_results, {}

    def _build_context(self, documents: list[dict]) -> str:
        """Build context string from documents"""
        context_parts = []
        for i, doc in enumerate(documents, 1):
            payload = doc.get('payload', {})
            articulo = payload.get('articulo', '')
            articulo_num = payload.get('articulo_numero', '?')

            context_parts.append(
                f"Artículo {articulo_num}: {articulo}"
            )

        return "\n\n".join(context_parts)

    async def _generate_answer(
        self, query: str, context: str, session_id: str
    ) -> str:
        """Generate answer using LLM (async)"""

        system_prompt = """Eres un asistente legal experto en derecho laboral paraguayo.
Tu rol es responder preguntas sobre el Código del Trabajo de Paraguay basándote
ÚNICAMENTE en el contexto proporcionado.

Reglas:
1. Responde en español de forma clara y profesional
2. Cita los artículos relevantes
3. Si no tienes información suficiente, indícalo claramente
4. No inventes información que no esté en el contexto
"""

        user_prompt = f"""Contexto legal:
{context}

Pregunta: {query}

Respuesta:"""

        # Generate with OpenAI or Gemini
        if self.llm_provider == "openai":
            response = await self.openai_client.chat.completions.create(
                model=self.llm_model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt},
                ],
                temperature=0.3,
                max_tokens=500,
            )
            answer = response.choices[0].message.content

        elif self.llm_provider == "gemini":
            # Gemini async call
            client = genai.Client(api_key=settings.gemini_api_key)
            full_prompt = f"{system_prompt}\n\n{user_prompt}"

            response = await client.aio.models.generate_content(
                model=self.llm_model,
                contents=full_prompt,
            )
            answer = response.text

        return answer

# Singleton global
rag_service = RAGService()
Enter fullscreen mode Exit fullscreen mode

Características del RAGService:

  • Async-first: Uso de AsyncOpenAI para performance
  • Multi-provider: Soporte para OpenAI y Gemini
  • Observability: Tracking completo con Phoenix
  • Reranking opcional: Mejora de precisión configurable
  • Error handling: Manejo robusto de errores
  • Structured prompts: System + user prompts optimizados

🚀 Deployment y Ejecución

1. Desarrollo Local

# Instalar dependencias con UV
cd src/lus_laboris_api
uv sync

# Configurar variables de entorno
cp .env.example .env
# Editar .env con tus configuraciones

# Iniciar API en desarrollo (con reload)
./start_api_dev.sh

# O manualmente:
uv run uvicorn api.main:app --reload --host 0.0.0.0 --port 8000
Enter fullscreen mode Exit fullscreen mode

2. Docker

# src/lus_laboris_api/Dockerfile

FROM python:3.13-slim

# Install UV
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app

# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies (no dev dependencies in production)
RUN uv sync --frozen --no-dev

# Copy application code
COPY api/ ./api/

# Create non-root user
RUN useradd -m -u 1000 apiuser && chown -R apiuser:apiuser /app
USER apiuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/api/health || exit 1

# Expose port
EXPOSE 8000

# Run API
CMD ["uv", "run", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Enter fullscreen mode Exit fullscreen mode

Build y run:

# Build image
docker build -t lus-laboris-api:latest .

# Run container
docker run -p 8000:8000 \
  --env-file .env \
  lus-laboris-api:latest
Enter fullscreen mode Exit fullscreen mode

3. Docker Compose (Stack Completo)

# src/lus_laboris_api/docker-compose.yml

services:
  # Qdrant vector database
  qdrant:
    image: qdrant/qdrant:latest
    container_name: qdrant
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_storage:/qdrant/storage
    environment:
      - QDRANT__SERVICE__HTTP_PORT=6333
      - QDRANT__SERVICE__GRPC_PORT=6334
    networks:
      - api-network

  # Phoenix observability
  phoenix:
    image: arizephoenix/phoenix:latest
    container_name: phoenix
    ports:
      - "6006:6006"
      - "4317:4317"
    environment:
      - PHOENIX_PORT=6006
      - PHOENIX_GRPC_PORT=4317
    networks:
      - api-network

  # Lus Laboris API
  api:
    build: .
    container_name: lus-laboris-api
    ports:
      - "8000:8000"
    env_file:
      - .env
    environment:
      - API_QDRANT_URL=http://qdrant:6333
      - API_PHOENIX_ENDPOINT=http://phoenix:6006
      - API_PHOENIX_GRPC_ENDPOINT=phoenix:4317
    depends_on:
      - qdrant
      - phoenix
    networks:
      - api-network

volumes:
  qdrant_storage:

networks:
  api-network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

Iniciar stack completo:

docker-compose up -d

# Ver logs
docker-compose logs -f api

# Verificar health
curl http://localhost:8000/api/health
Enter fullscreen mode Exit fullscreen mode

4. Build y Push a Docker Hub (Script Bash)

El proyecto incluye un script automatizado para construir y publicar la imagen en Docker Hub:

# src/lus_laboris_api/docker_build_push.sh

#!/bin/bash
set -e

# Load variables from .env (two levels up)
if [[ -f "../../.env" ]]; then
    set -o allexport
    source ../../.env
    set +o allexport
else
    echo "⚠️  WARNING: No se encontró .env, usando variables de entorno del sistema"
fi

# Validate required variables
if [[ -z "$DOCKER_HUB_USERNAME" || -z "$DOCKER_HUB_PASSWORD" || -z "$DOCKER_IMAGE_NAME_RAG_API" ]]; then
  echo "❌ ERROR: Asegurate de definir DOCKER_HUB_USERNAME, DOCKER_HUB_PASSWORD e DOCKER_IMAGE_NAME_RAG_API en .env"
  exit 1
fi

# Login to Docker Hub
echo "$DOCKER_HUB_PASSWORD" | docker login --username "$DOCKER_HUB_USERNAME" --password-stdin

# Define tags
DATE_TAG=$(date +%Y%m%d)
LATEST_TAG="latest"

# Build image
docker build -t "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG" .
docker tag "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG" "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"

# Push both images
docker push "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG"
docker push "$DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"

echo "✅ Imagenes subidas a Docker Hub:"
echo "   $DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$DATE_TAG"
echo "   $DOCKER_HUB_USERNAME/$DOCKER_IMAGE_NAME_RAG_API:$LATEST_TAG"
Enter fullscreen mode Exit fullscreen mode

Variables necesarias en .env:

# Docker Hub Configuration
DOCKER_HUB_USERNAME=tu_usuario
DOCKER_HUB_PASSWORD=tu_password_o_token
DOCKER_IMAGE_NAME_RAG_API=lus-laboris-api
Enter fullscreen mode Exit fullscreen mode

Ejecutar el script:

cd src/lus_laboris_api
chmod +x docker_build_push.sh
./docker_build_push.sh
Enter fullscreen mode Exit fullscreen mode

Output:

Logging in to Docker Hub...
Building image...
[+] Building 45.2s (12/12) FINISHED
Tagging images...
Pushing jesusoviedo/lus-laboris-api:20241016...
Pushing jesusoviedo/lus-laboris-api:latest...
✅ Imagenes subidas a Docker Hub:
   jesusoviedo/lus-laboris-api:20241016
   jesusoviedo/lus-laboris-api:latest
Enter fullscreen mode Exit fullscreen mode

Características del script:

  • Auto-tagging: Genera tag con fecha (20241016) y latest
  • Validación: Verifica que las variables necesarias existan
  • Idempotente: Puede ejecutarse múltiples veces sin problemas
  • Secure: Lee credenciales de .env (no hardcodeadas)

5. GitHub Actions: Build Automático de Imagen

El workflow .github/workflows/docker-api-build-publish.yml automatiza el build y push a Docker Hub:

name: Build & Publish Docker Image (API)

on:
  workflow_dispatch:  # Trigger manual
  push:
    paths:  # Trigger automático cuando cambian estos archivos
    - src/lus_laboris_api/Dockerfile
    - src/lus_laboris_api/pyproject.toml
    - src/lus_laboris_api/uv.lock
    - src/lus_laboris_api/api/**

jobs:
  build-and-push-api:
    runs-on: ubuntu-latest
    env:
      DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
      DOCKER_HUB_PASSWORD: ${{ secrets.DOCKER_HUB_PASSWORD }}
      DOCKER_IMAGE_NAME_RAG_API: ${{ vars.DOCKER_IMAGE_NAME_RAG_API }}

    steps:
    - name: Checkout code
      uses: actions/checkout@v5

    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v3

    - name: Log in to Docker Hub
      uses: docker/login-action@v3
      with:
        username: ${{ env.DOCKER_HUB_USERNAME }}
        password: ${{ env.DOCKER_HUB_PASSWORD }}

    - name: Get date tag
      id: date_tag
      run: echo "tag=$(date +'%Y%m%d')" >> $GITHUB_OUTPUT

    - name: Build and push Docker image
      uses: docker/build-push-action@v6
      with:
        context: ./src/lus_laboris_api
        push: true
        tags: |
          ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:latest
          ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:${{ steps.date_tag.outputs.tag }}

    - name: Output image details
      run: |
        echo "✅ Docker image built and pushed successfully!"
        echo "Image: ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:latest"
        echo "Image: ${{ env.DOCKER_HUB_USERNAME }}/${{ env.DOCKER_IMAGE_NAME_RAG_API }}:${{ steps.date_tag.outputs.tag }}"
Enter fullscreen mode Exit fullscreen mode

Secrets necesarios en GitHub:

  • DOCKER_HUB_USERNAME: Tu usuario de Docker Hub
  • DOCKER_HUB_PASSWORD: Token de acceso de Docker Hub

Variables necesarias en GitHub:

  • DOCKER_IMAGE_NAME_RAG_API: Nombre de la imagen (ej: lus-laboris-api)

Triggers:

  • Manual: Via GitHub Actions UI (workflow_dispatch)
  • Automático: Cuando cambias Dockerfile, pyproject.toml, uv.lock o código en api/

6. GitHub Actions: Deploy a Cloud Run con Secrets

El workflow .github/workflows/update-api-secrets-deploy.yml actualiza secrets y despliega a Cloud Run:

name: Update API Secrets & Deploy

on:
  workflow_dispatch:

jobs:
  update-and-deploy:
    name: Update Secrets & Deploy API
    runs-on: ubuntu-latest
    env:
      # Secrets from GitHub
      GSA_KEY: ${{ secrets.GSA_KEY }}
      GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
      GCP_REGION: ${{ secrets.GCP_REGION }}
      API_ENV_FILE: ${{ secrets.API_ENV_FILE }}
      JWT_PUBLIC_KEY: ${{ secrets.JWT_PUBLIC_KEY }}
      DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}

      # Variables from GitHub
      GCP_CLOUD_RUN_API_SERVICE_NAME: ${{ vars.GCP_CLOUD_RUN_API_SERVICE_NAME }}
      DOCKER_IMAGE_NAME_RAG_API: ${{ vars.DOCKER_IMAGE_NAME_RAG_API }}
      GCP_CLOUD_RUN_API_IMAGE_TAG: ${{ vars.GCP_CLOUD_RUN_API_IMAGE_TAG }}
      GCP_CLOUD_SECRETS_UPDATE: ${{ vars.GCP_CLOUD_SECRETS_UPDATE }}
      GCP_CLOUD_RUN_API_CPU: ${{ vars.GCP_CLOUD_RUN_API_CPU }}
      GCP_CLOUD_RUN_API_MEMORY: ${{ vars.GCP_CLOUD_RUN_API_MEMORY }}
      GCP_CLOUD_RUN_API_MIN_INSTANCES: ${{ vars.GCP_CLOUD_RUN_API_MIN_INSTANCES }}
      GCP_CLOUD_RUN_API_MAX_INSTANCES: ${{ vars.GCP_CLOUD_RUN_API_MAX_INSTANCES }}

    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    - name: Authenticate to GCP
      uses: google-github-actions/auth@v2
      with:
        credentials_json: ${{ env.GSA_KEY }}

    - name: Set up Cloud SDK
      uses: google-github-actions/setup-gcloud@v2

    - name: Update Secrets in Secret Manager
      if: ${{ env.GCP_CLOUD_SECRETS_UPDATE == 'true' }}
      run: |
        echo "📝 Updating secrets in Secret Manager..."

        # Update .env file secret
        echo "${API_ENV_FILE}" | \
          gcloud secrets versions add ${GCP_CLOUD_SECRETS_API_ENV_ID} \
          --data-file=- \
          --project=${GCP_PROJECT_ID}

        echo "✅ .env secret updated"

        # Update JWT public key secret
        echo "${JWT_PUBLIC_KEY}" | \
          gcloud secrets versions add ${GCP_CLOUD_SECRETS_JWT_KEY_ID} \
          --data-file=- \
          --project=${GCP_PROJECT_ID}

        echo "✅ JWT public key secret updated"

    - name: Deploy API to Cloud Run
      run: |
        IMAGE="${DOCKER_HUB_USERNAME}/${DOCKER_IMAGE_NAME_RAG_API}:${GCP_CLOUD_RUN_API_IMAGE_TAG}"

        echo "🚀 Deploying ${IMAGE} to ${GCP_CLOUD_RUN_API_SERVICE_NAME}..."

        gcloud run deploy ${GCP_CLOUD_RUN_API_SERVICE_NAME} \
          --image=${IMAGE} \
          --region=${GCP_REGION} \
          --project=${GCP_PROJECT_ID} \
          --port=8000 \
          --set-env-vars="API_HOST=0.0.0.0,API_PORT=8000,API_RELOAD=false,API_JWT_PUBLIC_KEY_PATH=/app/secrets/jwt/public_key.pem,API_ENV_FILE_PATH=/app/secrets/env/.env" \
          --update-secrets="/app/secrets/env/.env=${GCP_CLOUD_SECRETS_API_ENV_ID}:latest,/app/secrets/jwt/public_key.pem=${GCP_CLOUD_SECRETS_JWT_KEY_ID}:latest" \
          --cpu=${GCP_CLOUD_RUN_API_CPU} \
          --memory=${GCP_CLOUD_RUN_API_MEMORY} \
          --min-instances=${GCP_CLOUD_RUN_API_MIN_INSTANCES} \
          --max-instances=${GCP_CLOUD_RUN_API_MAX_INSTANCES} \
          --timeout=300 \
          --no-cpu-throttling \
          --allow-unauthenticated

        echo "✅ API deployed successfully!"

    - name: Get Service URL
      run: |
        URL=$(gcloud run services describe ${GCP_CLOUD_RUN_API_SERVICE_NAME} \
          --region=${GCP_REGION} \
          --format='value(status.url)' \
          --project=${GCP_PROJECT_ID})

        echo "🌐 Service URL: ${URL}"
        echo "📋 Health Check: ${URL}/api/health"
        echo "📊 Swagger UI: ${URL}/docs"
Enter fullscreen mode Exit fullscreen mode

Secrets necesarios en GitHub:

Secret Descripción
GSA_KEY Service Account JSON key para GCP
GCP_PROJECT_ID ID del proyecto GCP
GCP_REGION Región de Cloud Run (ej: us-central1)
API_ENV_FILE Contenido completo del archivo .env
JWT_PUBLIC_KEY Clave pública RSA para JWT
DOCKER_HUB_USERNAME Usuario de Docker Hub

Variables necesarias en GitHub:

Variable Descripción Ejemplo
GCP_CLOUD_RUN_API_SERVICE_NAME Nombre del servicio Cloud Run lus-laboris-api
DOCKER_IMAGE_NAME_RAG_API Nombre de la imagen Docker lus-laboris-api
GCP_CLOUD_RUN_API_IMAGE_TAG Tag de la imagen a deployar latest o 20241016
GCP_CLOUD_SECRETS_UPDATE Actualizar secrets antes de deploy true o false
GCP_CLOUD_RUN_API_CPU CPUs asignadas 2
GCP_CLOUD_RUN_API_MEMORY Memoria asignada 2Gi
GCP_CLOUD_RUN_API_MIN_INSTANCES Instancias mínimas (warm start) 1
GCP_CLOUD_RUN_API_MAX_INSTANCES Instancias máximas (auto-scaling) 10

Flujo completo de deployment:

  1. Actualizar código → Push a GitHub
  2. Build automático → Workflow docker-api-build-publish.yml se ejecuta
  3. Nueva imagen → Subida a Docker Hub con tag latest y 20241016
  4. Actualizar variables → Cambiar GCP_CLOUD_RUN_API_IMAGE_TAG a 20241016 en GitHub
  5. Deploy manual → Ejecutar workflow update-api-secrets-deploy.yml desde GitHub Actions UI
  6. Secrets updated.env y JWT public key actualizados en Secret Manager
  7. Deploy to Cloud Run → Nueva versión desplegada
  8. Verification → Verificar health check en la URL del servicio

Ejemplo de ejecución:

🚀 Running workflow: Update API Secrets & Deploy

📝 Updating secrets in Secret Manager...
✅ .env secret updated
✅ JWT public key secret updated

🚀 Deploying jesusoviedo/lus-laboris-api:20241016 to lus-laboris-api...
Deploying container to Cloud Run service [lus-laboris-api]...
✓ Deploying new service... Done.
  ✓ Creating Revision...
  ✓ Routing traffic...
  ✓ Setting IAM Policy...
✅ API deployed successfully!

🌐 Service URL: https://lus-laboris-api-abc123-uc.a.run.app
📋 Health Check: https://lus-laboris-api-abc123-uc.a.run.app/api/health
📊 Swagger UI: https://lus-laboris-api-abc123-uc.a.run.app/docs
Enter fullscreen mode Exit fullscreen mode

Verificación post-deployment:

# Health check
curl https://lus-laboris-api-abc123-uc.a.run.app/api/health

# Response:
# {
#   "success": true,
#   "service": "lus-laboris-api",
#   "status": "healthy",
#   "dependencies": {
#     "qdrant": "connected",
#     "embedding_service": "healthy",
#     "rag_service": "healthy"
#   },
#   "uptime_seconds": 45.2
# }
Enter fullscreen mode Exit fullscreen mode

Ventajas del workflow:

  • Automated secrets management: Secrets viven en GitHub Secrets, no en código
  • Zero-downtime deployment: Cloud Run hace rolling update
  • Rollback fácil: Cambiar IMAGE_TAG a versión anterior y re-ejecutar
  • Configurable scaling: Min/max instances según carga esperada
  • Cost optimization: min-instances=1 para evitar cold starts, max-instances=10 para limitar costo
  • No CPU throttling: --no-cpu-throttling para performance consistente

📊 Documentación Automática

FastAPI genera documentación interactiva automáticamente:

Swagger UI

Acceder a: http://localhost:8000/docs

Características:

  • Try it out: Ejecutar requests directamente desde el browser
  • Schema visualization: Ver modelos de request/response
  • Authentication: Ingresar JWT token para endpoints protegidos
  • Examples: Ver ejemplos de payloads
  • Response codes: Documentación de todos los códigos HTTP

ReDoc

Acceder a: http://localhost:8000/redoc

Características:

  • Clean UI: Interfaz más limpia para lectura
  • Full schema: Toda la documentación en una página
  • Search: Buscar endpoints y schemas
  • Export: Descargar OpenAPI spec como JSON

OpenAPI JSON

Acceder a: http://localhost:8000/openapi.json

Spec completo en formato JSON para:

  • Generación de clientes (Python, TypeScript, etc.)
  • Integración con herramientas de testing
  • Importación a Postman/Insomnia

🎯 Casos de Uso Reales

Para Desarrolladores Frontend:

"Necesito una API clara y bien documentada para mi aplicación web"

Solución: Swagger UI + modelos Pydantic

// Client TypeScript auto-generado desde OpenAPI spec
const response = await fetch('http://api.luslaboris.com/api/rag/ask', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    question: '¿Cuántos días de vacaciones?'
  })
});

const data = await response.json();
console.log(data.answer);
Enter fullscreen mode Exit fullscreen mode

Para Mobile Developers:

"Necesito rate limiting para prevenir abuso"

Solución: SlowAPI con límites por IP

# 10 requests por minuto por IP
@limiter.limit("10/minute")
async def ask_question(request: Request, ...):
    ...

# Response cuando se excede el límite:
# HTTP 429 Too Many Requests
# Retry-After: 42
Enter fullscreen mode Exit fullscreen mode

Para Científicos de Datos:

"Necesito experimentar con diferentes modelos LLM sin cambiar código"

Solución: Configuración por variables de entorno

# Probar con OpenAI GPT-4
export API_LLM_PROVIDER=openai
export API_LLM_MODEL=gpt-4

# Probar con Gemini
export API_LLM_PROVIDER=gemini
export API_LLM_MODEL=gemini-1.5-flash

# Activar reranking
export API_USE_RERANKING=true
Enter fullscreen mode Exit fullscreen mode

Para DevOps:

"Necesito health checks para monitoring y auto-scaling"

Solución: Múltiples endpoints de health

# Health check básico (para load balancer)
curl http://api/health
# {"status": "healthy"}

# Health detallado (para alerting)
curl -H "Authorization: Bearer $JWT" http://api/status
# {
#   "status": "healthy",
#   "services": {
#     "qdrant": {"status": "connected", "collections": 3},
#     "embeddings": {"status": "healthy", "models_loaded": ["e5-small"]},
#     ...
#   }
# }
Enter fullscreen mode Exit fullscreen mode

🚀 El Impacto Transformador

Antes de FastAPI:

  • ⏱️ Desarrollo lento: Escribir rutas, validación, docs manualmente
  • 🐛 Errores frecuentes: Sin validación automática de tipos
  • 📝 Docs desactualizados: Documentación manual sin sincronización
  • 🔒 Seguridad débil: Auth implementado ad-hoc
  • 📊 Sin observabilidad: Debugging a ciegas

Después de FastAPI:

  • Desarrollo rápido: Type hints = validación + docs automáticas
  • Menos bugs: Validación en runtime + IDE autocomplete
  • 📝 Docs siempre actualizados: OpenAPI generado del código
  • 🔒 Seguridad robusta: JWT + dependencies + middleware
  • 📊 Observabilidad completa: Phoenix + OpenTelemetry integrados

🔧 Características Técnicas Destacadas

1. Async/Await Performance

# ❌ Síncrono - bloquea el thread
def answer_question(query: str):
    embedding = generate_embedding(query)  # Bloquea
    documents = search_qdrant(embedding)   # Bloquea
    answer = call_openai(documents)        # Bloquea
    return answer

# ✅ Asíncrono - non-blocking
async def answer_question(query: str):
    embedding = await generate_embedding(query)   # No bloquea
    documents = await search_qdrant(embedding)    # No bloquea
    answer = await call_openai(documents)         # No bloquea
    return answer

# Mejora: 3-10x más throughput bajo carga
Enter fullscreen mode Exit fullscreen mode

2. Dependency Injection

# Compartir conexiones entre requests
@lru_cache()
def get_qdrant_service():
    return QdrantService()  # Singleton

# Usar en endpoint
@router.get("/search")
async def search(
    query: str,
    qdrant: QdrantService = Depends(get_qdrant_service)
):
    return qdrant.search(query)

# Beneficio: Una conexión Qdrant para toda la app
Enter fullscreen mode Exit fullscreen mode

3. Middleware Stack

# Orden de ejecución (top-down en request, bottom-up en response):
# 1. TrustedHostMiddleware - valida host header
# 2. CORSMiddleware - maneja CORS
# 3. Custom exception handlers
# 4. Router logic
Enter fullscreen mode Exit fullscreen mode

4. Background Tasks

@router.post("/process")
async def process_data(
    background_tasks: BackgroundTasks
):
    # Response inmediata
    background_tasks.add_task(long_running_task)

    return {"status": "processing"}

# Cliente recibe respuesta rápida
# Task se ejecuta después de enviar response
Enter fullscreen mode Exit fullscreen mode

📊 Métricas de Rendimiento

Latency:

  • Health check: 5-10ms
  • RAG query (sin LLM): 80-150ms
    • Embedding: 30ms
    • Qdrant search (gRPC): 30ms
    • Reranking: 20ms
  • RAG query (con LLM): 1-3 segundos
    • Retrieval: 150ms
    • LLM generation: 800-2500ms

Throughput:

  • Health endpoints: >1000 req/s
  • RAG endpoints: 50-100 req/s (limitado por LLM API)
  • Vectorstore load: 100 docs/segundo

Escalabilidad:

  • Horizontal scaling: Stateless - escala linealmente
  • Vertical scaling: CPU-bound en embeddings, I/O-bound en LLM calls
  • Cloud Run: Auto-scaling de 0 a N instancias

💡 Lecciones Aprendidas

1. Async es Crítico para I/O-Bound

Las llamadas a LLM APIs pueden tomar 1-3 segundos. Con async, el thread puede atender otros requests mientras espera.

2. Pydantic v2 es Mucho Más Rápido

Pydantic v2 (core en Rust) es 5-50x más rápido que v1. Crucial para alta carga.

3. Rate Limiting es No Negociable

Sin rate limiting, un atacante puede agotar tus créditos de OpenAI en minutos.

4. Health Checks Deben Ser Granulares

Un único /health no es suficiente. Necesitas health checks por servicio para debugging efectivo.

5. Dependency Injection Simplifica Testing

# Mock fácil de servicios en tests
def get_mock_qdrant():
    return MockQdrantService()

app.dependency_overrides[get_qdrant_service] = get_mock_qdrant
Enter fullscreen mode Exit fullscreen mode

6. Background Tasks para Long-Running Operations

Nunca bloquees un request HTTP por minutos. Usa background tasks y polling/webhooks para status.

🎯 El Propósito Más Grande

FastAPI no es solo un framework web - es el gateway que hace accesible nuestro sistema RAG al mundo. Al proporcionar:

  • 🌐 Accesibilidad: Cualquier cliente HTTP puede consumir la API
  • 🔒 Seguridad: JWT + CORS + rate limiting = producción-ready
  • 📊 Observabilidad: Cada request trackeado con Phoenix
  • 📝 Documentación: Swagger UI = onboarding sin fricción
  • ⚡ Performance: Async + gRPC = respuestas en tiempo real
  • 🔄 Escalabilidad: Stateless = escala horizontalmente sin límites

Estamos democratizando el acceso a conocimiento legal con una API REST de clase mundial, comparable a las APIs de las mejores empresas tecnológicas del mundo.


🔗 Recursos y Enlaces

Repositorio del Proyecto

Documentación Técnica

Recursos Externos


Próximo Post: LLPY-07 - Integrando LLMs: OpenAI y Google Gemini

En el siguiente post profundizaremos en la integración con múltiples providers de LLM, el sistema de fallback automático, prompt engineering para consultas legales, y comparación de modelos.

Top comments (0)