DEV Community

Jesus Oviedo Riquelme
Jesus Oviedo Riquelme

Posted on

LLPY-10: Autenticación JWT con RSA - Seguridad Stateless

🎯 El Desafío de la Autenticación en APIs

Imagina que tu API RAG está en producción:

  • ✅ Endpoints públicos: /api/health, /api/rag/ask
  • ⚠️ Endpoints sensibles: /api/vectorstore/load, /api/status (detalles completos)
  • ⚠️ Endpoints administrativos: /api/vectorstore/delete

El problema: ¿Cómo proteges los endpoints sensibles sin comprometer la performance o escalabilidad?

Requisitos de Autenticación

  1. 🔒 Seguro: Tokens no falsificables
  2. ⚡ Rápido: Validación en <5ms
  3. 📈 Escalable: Sin estado compartido entre instancias
  4. 🔄 Stateless: No requiere base de datos de sesiones
  5. 🌐 Estándar: Compatible con cualquier cliente HTTP
  6. 🔑 Rotable: Cambio de claves sin downtime
  7. ⏰ Temporal: Tokens con expiración automática

Opciones de Autenticación

Método Pros Contras Escalabilidad
Session cookies Familiar, fácil Requiere DB/Redis Limitada ⚠️
API Keys Simple No expira, difícil rotación Media ⚠️
OAuth 2.0 Estándar, robusto Complejo setup Alta ✅
JWT (HS256) Stateless, rápido Secret compartido Alta ✅
JWT (RS256) Stateless, secure Requiere key pair Muy Alta ✅✅

Nuestra elección: JWT con RS256 (RSA)

📊 La Magnitud del Problema

Desafíos Técnicos

  1. 🔐 Generación de Claves: ¿Cómo generar par RSA seguro?
  2. 📝 Claims Management: ¿Qué información incluir en el token?
  3. ✅ Validación: ¿Cómo validar firma sin secret compartido?
  4. 🔄 Distribución: ¿Cómo distribuir public key a la API?
  5. ⏰ Expiración: ¿Cómo manejar tokens expirados?
  6. 🔁 Renovación: ¿Cómo implementar refresh tokens?
  7. 🚫 Revocación: ¿Cómo invalidar tokens antes de expiración?

JWT vs Traditional Sessions

Traditional Sessions:

Client → API → Check session in Redis/DB (20-50ms)
         ↓
       Response

Scaling: N servers = N connections to Redis
Cost: Redis instance + network latency
SPOF: Redis down = all auth fails
Enter fullscreen mode Exit fullscreen mode

JWT (Stateless):

Client → API → Validate signature locally (<5ms)
         ↓
       Response

Scaling: N servers = 0 shared state
Cost: CPU for validation (negligible)
SPOF: None (cada server valida independientemente)
Enter fullscreen mode Exit fullscreen mode

💡 La Solución: JWT con RSA (RS256)

¿Qué es JWT?

JWT (JSON Web Token) es un estándar (RFC 7519) para tokens de acceso:

  • Header: Algoritmo y tipo
  • Payload: Claims (datos del usuario)
  • Signature: Firma criptográfica

¿Por Qué RS256 (RSA)?

HS256 (HMAC) vs RS256 (RSA):

Aspecto HS256 RS256
Algoritmo HMAC + SHA256 (symmetric) RSA + SHA256 (asymmetric)
Claves 1 secret compartido Private + Public key
Seguridad Secret debe estar en todos los servers Solo public key en API
Compromiso Si leak = regenerar tokens de todos Si leak public key = no importa
Uso Pequeña escala, single app Multi-service, microservices
Nuestra elección RS256

RS256 permite:

  • Private key en un solo lugar (token generation service)
  • Public key en múltiples APIs (stateless validation)
  • Zero trust: APIs solo validan, nunca generan tokens

🚀 Implementación Paso a Paso

1. Generación de Claves RSA

Script utils/generate_jwt_keys.sh:

#!/bin/bash

# Colores para output
GREEN='\033[0;32m'
BLUE='\033[0;34m'
RED='\033[0;31m'
NC='\033[0m' # No Color

# Configuración por defecto
KEY_SIZE=2048
OUTPUT_DIR="keys"
PRIVATE_KEY="private_key.pem"
PUBLIC_KEY="public_key.pem"

# Función para generar las claves
generate_keys() {
    local output_dir="$1"
    local key_size="$2"
    local private_key="$3"
    local public_key="$4"

    local private_path="$output_dir/$private_key"
    local public_path="$output_dir/$public_key"

    # Crear directorio si no existe
    mkdir -p "$output_dir"

    echo -e "${BLUE}ℹ️  Generando clave privada RSA de $key_size bits...${NC}"
    openssl genrsa -out "$private_path" "$key_size"

    if [[ $? -eq 0 ]]; then
        echo -e "${GREEN}✅ Clave privada generada: $private_path${NC}"
    else
        echo -e "${RED}❌ Error al generar la clave privada${NC}"
        exit 1
    fi

    echo -e "${BLUE}ℹ️  Generando clave pública RSA...${NC}"
    openssl rsa -pubout -in "$private_path" -out "$public_path"

    if [[ $? -eq 0 ]]; then
        echo -e "${GREEN}✅ Clave pública generada: $public_path${NC}"
    else
        echo -e "${RED}❌ Error al generar la clave pública${NC}"
        exit 1
    fi

    # Establecer permisos seguros
    chmod 600 "$private_path"  # Solo lectura/escritura para el propietario
    chmod 644 "$public_path"   # Lectura para todos

    echo -e "${GREEN}✅ Permisos de archivos configurados correctamente${NC}"
}

# Ejecutar generación
generate_keys "$OUTPUT_DIR" "$KEY_SIZE" "$PRIVATE_KEY" "$PUBLIC_KEY"

echo ""
echo -e "${GREEN}========================================${NC}"
echo -e "${GREEN}    Claves RSA generadas exitosamente${NC}"
echo -e "${GREEN}========================================${NC}"
echo ""
echo -e "📁 Ubicación: $OUTPUT_DIR/"
echo -e "🔒 Clave privada: $PRIVATE_KEY (permisos: 600)"
echo -e "🔓 Clave pública: $PUBLIC_KEY (permisos: 644)"
echo ""
echo -e "${BLUE}⚠️  IMPORTANTE:${NC}"
echo -e "   - La clave privada NUNCA debe compartirse"
echo -e "   - La clave privada NO debe versionarse en Git"
echo -e "   - La clave pública puede distribuirse libremente"
Enter fullscreen mode Exit fullscreen mode

Ejecutar:

cd utils
chmod +x generate_jwt_keys.sh
./generate_jwt_keys.sh
Enter fullscreen mode Exit fullscreen mode

Output:

ℹ️  Generando clave privada RSA de 2048 bits...
✅ Clave privada generada: keys/private_key.pem
ℹ️  Generando clave pública RSA...
✅ Clave pública generada: keys/public_key.pem
✅ Permisos de archivos configurados correctamente

========================================
    Claves RSA generadas exitosamente
========================================

📁 Ubicación: keys/
🔒 Clave privada: private_key.pem (permisos: 600)
🔓 Clave pública: public_key.pem (permisos: 644)

⚠️  IMPORTANTE:
   - La clave privada NUNCA debe compartirse
   - La clave privada NO debe versionarse en Git
   - La clave pública puede distribuirse libremente
Enter fullscreen mode Exit fullscreen mode

Estructura generada:

keys/
├── private_key.pem    # 🔒 1679 bytes, permisos: 600
└── public_key.pem     # 🔓 451 bytes, permisos: 644
Enter fullscreen mode Exit fullscreen mode

Contenido de las claves:

# Private key (2048 bits)
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA2Z7X3Y+9QvH5xK...
... (24 líneas de base64)
-----END RSA PRIVATE KEY-----

# Public key
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ...
... (6 líneas de base64)
-----END PUBLIC KEY-----
Enter fullscreen mode Exit fullscreen mode

2. Generación de Tokens JWT

Script utils/generate_jwt_token.py:

#!/usr/bin/env python3
"""
JWT Token Generator using RSA keys
"""

import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

class JWTTokenGenerator:
    """JWT token generator using RSA keys"""

    def __init__(self):
        # Get project root directory
        self.script_dir = Path(__file__).parent
        self.project_root = self.script_dir.parent

        # Get key paths from environment or defaults
        private_path = os.getenv("JWT_PRIVATE_KEY_PATH", "keys/private_key.pem")
        public_path = os.getenv("JWT_PUBLIC_KEY_PATH", "keys/public_key.pem")

        # Convert to absolute paths
        self.private_key_path = (
            Path(private_path) 
            if Path(private_path).is_absolute() 
            else self.project_root / private_path
        )
        self.public_key_path = (
            Path(public_path)
            if Path(public_path).is_absolute()
            else self.project_root / public_path
        )

        self.default_expiry_minutes = int(os.getenv("JWT_TOKEN_EXPIRY_MINUTES", "60"))
        self.algorithm = "RS256"

        # Keys loaded on demand
        self.private_key = None
        self.public_key = None

    def _load_private_key(self):
        """Load RSA private key from file"""
        if self.private_key is not None:
            return self.private_key

        try:
            with open(self.private_key_path, "rb") as f:
                self.private_key = serialization.load_pem_private_key(
                    f.read(),
                    password=None,
                    backend=default_backend()
                )
            print(f"✅ Clave privada cargada desde: {self.private_key_path}")
            return self.private_key
        except FileNotFoundError:
            print(f"❌ Error: Archivo de clave privada no encontrado: {self.private_key_path}")
            sys.exit(1)
        except Exception as e:
            print(f"❌ Error al cargar clave privada: {e}")
            sys.exit(1)

    def _load_public_key(self):
        """Load RSA public key from file"""
        if self.public_key is not None:
            return self.public_key

        try:
            with open(self.public_key_path, "rb") as f:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            print(f"✅ Clave pública cargada desde: {self.public_key_path}")
            return self.public_key
        except FileNotFoundError:
            print(f"❌ Error: Archivo de clave pública no encontrado: {self.public_key_path}")
            sys.exit(1)

    def generate_token(
        self,
        username: str,
        expiry_minutes: int | None = None,
        additional_claims: dict[str, Any] | None = None,
    ) -> str:
        """
        Generate a JWT token

        Args:
            username: Username/email for the token
            expiry_minutes: Token expiration in minutes (default: 60)
            additional_claims: Additional claims to include

        Returns:
            JWT token string
        """
        if expiry_minutes is None:
            expiry_minutes = self.default_expiry_minutes

        # Create timestamps
        now = datetime.now(timezone.utc)
        expiry = now + timedelta(minutes=expiry_minutes)

        # Build payload with standard claims
        payload = {
            "sub": username,              # Subject (username/email)
            "iss": "lus-laboris-api",     # Issuer
            "aud": "lus-laboris-client",  # Audience
            "exp": int(expiry.timestamp()),       # Expiration time
            "iat": int(now.timestamp()),          # Issued at
            "jti": f"{username}_{int(now.timestamp())}",  # JWT ID (unique)
        }

        # Add additional claims if provided
        if additional_claims:
            payload.update(additional_claims)

        # Generate token
        try:
            private_key = self._load_private_key()
            token = jwt.encode(payload, private_key, algorithm=self.algorithm)

            print(f"✅ Token generado exitosamente para usuario: {username}")
            print(f"ℹ️  Token expira en: {expiry.strftime('%Y-%m-%d %H:%M:%S UTC')} ({expiry_minutes} minutos)")

            return token
        except Exception as e:
            print(f"❌ Error al generar token: {e}")
            sys.exit(1)

    def validate_token(self, token: str) -> dict[str, Any]:
        """
        Validate a JWT token using the public key

        Args:
            token: JWT token string

        Returns:
            Decoded token payload
        """
        try:
            public_key = self._load_public_key()

            # Decode and validate token
            payload = jwt.decode(
                token,
                public_key,
                algorithms=[self.algorithm],
                audience="lus-laboris-client",
                issuer="lus-laboris-api",
            )

            print("✅ Validación de token exitosa")
            return payload

        except jwt.ExpiredSignatureError:
            print("❌ Token expirado")
            sys.exit(1)
        except jwt.InvalidAudienceError:
            print("❌ Audience inválido")
            sys.exit(1)
        except jwt.InvalidIssuerError:
            print("❌ Issuer inválido")
            sys.exit(1)
        except jwt.InvalidTokenError as e:
            print(f"❌ Token inválido: {e}")
            sys.exit(1)


def main():
    """Main function for CLI usage"""
    import argparse

    parser = argparse.ArgumentParser(description="Generate or validate JWT tokens")
    parser.add_argument("action", choices=["generate", "validate"], help="Action to perform")
    parser.add_argument("--username", "-u", help="Username for token generation")
    parser.add_argument("--expiry", "-e", type=int, help="Expiration time in minutes")
    parser.add_argument("--token", "-t", help="Token to validate")

    args = parser.parse_args()

    generator = JWTTokenGenerator()

    if args.action == "generate":
        if not args.username:
            print("❌ Error: --username es requerido para generar token")
            sys.exit(1)

        token = generator.generate_token(
            username=args.username,
            expiry_minutes=args.expiry
        )

        print("\n" + "="*80)
        print("TOKEN GENERADO:")
        print("="*80)
        print(token)
        print("="*80)
        print("\nUso:")
        print(f'  curl -H "Authorization: Bearer {token}" http://localhost:8000/api/status')
        print()

    elif args.action == "validate":
        if not args.token:
            print("❌ Error: --token es requerido para validar")
            sys.exit(1)

        payload = generator.validate_token(args.token)

        print("\n" + "="*80)
        print("PAYLOAD DECODIFICADO:")
        print("="*80)
        import json
        print(json.dumps(payload, indent=2))
        print("="*80)


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Uso:

# Generar token (expira en 60 minutos por defecto)
python utils/generate_jwt_token.py generate --username admin@example.com

# Generar token con expiración custom
python utils/generate_jwt_token.py generate --username admin@example.com --expiry 1440

# Validar token
python utils/generate_jwt_token.py validate --token "eyJhbGc..."
Enter fullscreen mode Exit fullscreen mode

Output (generación):

✅ Clave privada cargada desde: /path/to/keys/private_key.pem
✅ Token generado exitosamente para usuario: admin@example.com
ℹ️  Token expira en: 2024-10-17 15:30:00 UTC (60 minutos)

================================================================================
TOKEN GENERADO:
================================================================================
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzcyI6Imx1cy1sYWJvcmlzLWFwaSIsImF1ZCI6Imx1cy1sYWJvcmlzLWNsaWVudCIsImV4cCI6MTcyOTI3MTgwMCwiaWF0IjoxNzI5MjY4MjAwLCJqdGkiOiJhZG1pbkBleGFtcGxlLmNvbV8xNzI5MjY4MjAwIn0.kR4X_3hN8vY2gK5pQ...
================================================================================

Uso:
  curl -H "Authorization: Bearer eyJhbGc..." http://localhost:8000/api/status
Enter fullscreen mode Exit fullscreen mode

Estructura del Token:

Header (base64):
{
  "alg": "RS256",
  "typ": "JWT"
}

Payload (base64):
{
  "sub": "admin@example.com",         // Subject (username)
  "iss": "lus-laboris-api",           // Issuer
  "aud": "lus-laboris-client",        // Audience
  "exp": 1729271800,                  // Expiration (Unix timestamp)
  "iat": 1729268200,                  // Issued at
  "jti": "admin@example.com_1729268200"  // JWT ID (unique)
}

Signature (RSA):
RS256(base64UrlEncode(header) + "." + base64UrlEncode(payload), private_key)
Enter fullscreen mode Exit fullscreen mode

3. Validación en FastAPI

Archivo src/lus_laboris_api/api/auth/jwt_handler.py:

"""
JWT token validation using public key authentication
"""

import logging
import os
from typing import Any

import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

from ..config import settings

logger = logging.getLogger(__name__)


class JWTValidator:
    """JWT validator for token validation using RSA public key"""

    def __init__(self):
        self.public_key = None
        self.algorithm = "RS256"

        # Load public key for validation
        self._load_public_key()

    def _load_public_key(self):
        """Load public key from file for token validation"""
        public_key_path = settings.api_jwt_public_key_path

        # Resolve path relative to project root if needed
        if not os.path.isabs(public_key_path):
            # Get project root (assuming this file is in src/lus_laboris_api/api/auth/)
            project_root = os.path.abspath(
                os.path.join(os.path.dirname(__file__), "../../../..")
            )
            public_key_path = os.path.join(project_root, public_key_path)

        try:
            with open(public_key_path, "rb") as f:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            logger.info(f"JWT public key loaded successfully from {public_key_path}")
        except FileNotFoundError:
            logger.error(f"JWT public key not found at {public_key_path}")
            raise ValueError(
                f"JWT public key not found. Please ensure the key file exists at {public_key_path}"
            )
        except Exception as e:
            logger.error(f"Failed to load JWT public key: {e}")
            raise ValueError(f"Failed to load JWT public key: {e}")

    def validate_token(self, token: str) -> dict[str, Any]:
        """Validate a JWT token and return its payload"""
        if not self.public_key:
            raise ValueError("Public key not available for token validation")

        try:
            # Get expected issuer and audience from settings
            expected_issuer = settings.api_jwt_iss
            expected_audience = settings.api_jwt_aud

            payload = jwt.decode(
                token,
                self.public_key,
                algorithms=[self.algorithm],
                audience=expected_audience,  # Validate audience from config
                issuer=expected_issuer,      # Validate issuer from config
                options={
                    "verify_exp": True,   # Verify expiration
                    "verify_iat": True,   # Verify issued at
                    "verify_aud": True,   # Verify audience
                    "verify_iss": True,   # Verify issuer
                },
            )

            logger.info(f"JWT token validated for subject: {payload.get('sub', 'unknown')}")
            return payload

        except jwt.ExpiredSignatureError:
            logger.warning("JWT token has expired")
            raise ValueError("Token has expired")
        except jwt.InvalidAudienceError:
            logger.warning(f"Invalid JWT audience. Expected: {expected_audience}")
            raise ValueError(f"Invalid audience. Expected: {expected_audience}")
        except jwt.InvalidIssuerError:
            logger.warning(f"Invalid JWT issuer. Expected: {expected_issuer}")
            raise ValueError(f"Invalid issuer. Expected: {expected_issuer}")
        except jwt.InvalidTokenError as e:
            logger.warning(f"Invalid JWT token: {e}")
            raise ValueError(f"Invalid token: {e}")

    def is_token_valid(self, token: str) -> bool:
        """Check if a token is valid without raising exceptions"""
        try:
            self.validate_token(token)
            return True
        except ValueError:
            return False


# Global instance
jwt_validator = JWTValidator()
Enter fullscreen mode Exit fullscreen mode

4. Dependencies para FastAPI

Archivo src/lus_laboris_api/api/auth/dependencies.py:

"""
Authentication dependencies for FastAPI endpoints
"""

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Any

from .jwt_handler import jwt_validator

# Security scheme for Swagger UI
security = HTTPBearer()


def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict[str, Any]:
    """
    Dependency for endpoints that require authentication

    Usage:
        @router.get("/protected")
        async def protected_endpoint(user: dict = Depends(get_current_user)):
            return {"message": f"Hello {user['sub']}"}
    """
    token = credentials.credentials

    try:
        payload = jwt_validator.validate_token(token)
        return payload
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        )


def optional_auth(
    credentials: HTTPAuthorizationCredentials | None = Depends(
        HTTPBearer(auto_error=False)
    )
) -> dict[str, Any] | None:
    """
    Dependency for endpoints with optional authentication

    Returns user payload if token is valid, None otherwise.
    Does not raise exceptions for missing/invalid tokens.

    Usage:
        @router.get("/health/detailed")
        async def detailed_health(user: dict | None = Depends(optional_auth)):
            is_authenticated = user is not None
            if is_authenticated:
                return {"status": "healthy", "details": "..."}
            else:
                return {"status": "healthy"}
    """
    if not credentials:
        return None

    try:
        payload = jwt_validator.validate_token(credentials.credentials)
        return payload
    except ValueError:
        return None  # Silently fail - don't block access
Enter fullscreen mode Exit fullscreen mode

5. Uso en Endpoints

from fastapi import APIRouter, Depends
from ..auth.dependencies import get_current_user, optional_auth

router = APIRouter(prefix="/api", tags=["API"])


@router.get("/public")
async def public_endpoint():
    """Public endpoint - no authentication required"""
    return {"message": "This is public"}


@router.get("/protected")
async def protected_endpoint(user: dict = Depends(get_current_user)):
    """Protected endpoint - JWT required"""
    return {
        "message": f"Hello {user['sub']}",
        "user_id": user["sub"],
        "issued_at": user["iat"],
        "expires_at": user["exp"],
    }


@router.get("/optional-auth")
async def optional_auth_endpoint(user: dict | None = Depends(optional_auth)):
    """Endpoint with optional authentication"""
    if user:
        return {
            "message": f"Authenticated as {user['sub']}",
            "authenticated": True
        }
    else:
        return {
            "message": "Not authenticated",
            "authenticated": False
        }


@router.delete("/vectorstore/collections/{collection_name}")
async def delete_collection(
    collection_name: str,
    user: dict = Depends(get_current_user)
):
    """Admin endpoint - requires authentication"""
    # Only authenticated users can delete collections
    logger.info(f"User {user['sub']} deleting collection {collection_name}")

    # Delete logic here...

    return {
        "message": f"Collection {collection_name} deleted",
        "deleted_by": user["sub"]
    }
Enter fullscreen mode Exit fullscreen mode

Swagger UI Integration:

La autenticación aparece automáticamente en Swagger UI:

  1. Click en el botón "Authorize" 🔓
  2. Ingresa el token en el campo "Value"
  3. Click en "Authorize"
  4. Ahora puedes probar endpoints protegidos

🔐 Integración con GCP Secret Manager

1. Almacenar Public Key en Secret Manager

# Crear secret con la public key
gcloud secrets create jwt-public-key \
    --data-file=keys/public_key.pem \
    --project=tu-proyecto-gcp \
    --replication-policy="automatic"

# Verificar
gcloud secrets versions list jwt-public-key --project=tu-proyecto-gcp
Enter fullscreen mode Exit fullscreen mode

2. Montar Secret en Cloud Run

En el deployment de Cloud Run:

gcloud run deploy lus-laboris-api \
  --image=tu-imagen \
  --update-secrets="/app/secrets/jwt/public_key.pem=jwt-public-key:latest" \
  --set-env-vars="API_JWT_PUBLIC_KEY_PATH=/app/secrets/jwt/public_key.pem"
Enter fullscreen mode Exit fullscreen mode

3. GitHub Actions Workflow

- name: Update JWT Public Key Secret
  run: |
    echo "${JWT_PUBLIC_KEY}" | \
      gcloud secrets versions add jwt-public-key \
      --data-file=- \
      --project=${GCP_PROJECT_ID}

    echo "✅ JWT public key secret updated"
Enter fullscreen mode Exit fullscreen mode

🎯 Casos de Uso Reales

Para APIs Públicas con Endpoints Protegidos:

"Quiero que /health sea público pero /status sea protegido"

Solución:

@router.get("/health")
async def health():
    """Public - no auth"""
    return {"status": "healthy"}

@router.get("/status")
async def status(user: dict = Depends(get_current_user)):
    """Protected - JWT required"""
    return {
        "status": "healthy",
        "details": {...},  # Info sensible
        "requested_by": user["sub"]
    }
Enter fullscreen mode Exit fullscreen mode

Para Integración con Aplicaciones Externas:

"Otra aplicación necesita cargar datos al vectorstore"

Solución:

# Generar token para la aplicación externa
python generate_jwt_token.py generate \
  --username external-app@company.com \
  --expiry 10080  # 7 días

# Compartir token (de forma segura)
# La app externa puede hacer requests:
curl -X POST http://api/vectorstore/load \
  -H "Authorization: Bearer eyJhbGc..." \
  -H "Content-Type: application/json" \
  -d '{"filename": "data.json"}'
Enter fullscreen mode Exit fullscreen mode

Para Diferentes Niveles de Acceso:

"Admin puede delete, user puede read"

Solución:

def require_admin(user: dict = Depends(get_current_user)) -> dict:
    """Dependency que requiere rol admin"""
    if user.get("role") != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    return user

@router.delete("/collections/{name}")
async def delete_collection(
    name: str,
    admin: dict = Depends(require_admin)
):
    """Solo admins pueden eliminar"""
    ...
Enter fullscreen mode Exit fullscreen mode

Para Debugging y Testing:

"Quiero probar endpoints protegidos en desarrollo"

Solución:

# Generar token de desarrollo (expira en 24 horas)
python generate_jwt_token.py generate \
  --username dev@localhost \
  --expiry 1440 > dev_token.txt

# Usar en requests
export TOKEN=$(cat dev_token.txt)
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/api/status
Enter fullscreen mode Exit fullscreen mode

🚀 El Impacto Transformador

Antes de JWT:

  • 🔑 Session management: Redis/DB requerido para sesiones
  • 📊 Scaling issues: N servers = N connections to session store
  • ⏱️ Latency: 20-50ms para check session
  • 💰 Cost: Redis instance ($50-100/mes)
  • 🚨 SPOF: Redis down = all auth fails

Después de JWT (RS256):

  • 🔑 Stateless: Cero dependencias externas
  • 📊 Infinite scaling: N servers = 0 shared state
  • ⏱️ Ultra-fast: <5ms validation (CPU-bound)
  • 💰 Zero cost: CPU cost negligible
  • 🚨 No SPOF: Cada server valida independientemente

Métricas de Mejora:

Aspecto Sin JWT Con JWT (RS256) Mejora
Latency de validación 20-50ms <5ms -90%
External dependencies Redis/DB None -100%
Cost (auth infra) $50-100/mes $0 -100%
Scaling complexity High Zero N/A
SPOF risk Yes No N/A

💡 Lecciones Aprendidas

1. RS256 > HS256 para Multi-Service

Si tu API se comunica con otros servicios, RS256 es mejor porque solo necesitas compartir la public key (no sensitive).

2. Expiry Time es Crítico

  • Corto (15-60 min): Más seguro, pero requiere refresh frecuente
  • Largo (24 horas+): Más convenient, pero mayor ventana de compromiso
  • Nuestro sweet spot: 60 minutos para API calls, 7 días para batch jobs

3. Claims Mínimos son Mejores

Solo incluir en el payload lo que realmente necesitas validar. No uses JWT como session storage.

4. Public Key en Secret Manager

Aunque la public key no es sensible, almacenarla en Secret Manager facilita rotación sin re-deploy.

5. optional_auth es Poderoso

Permite endpoints que dan más info si estás autenticado, pero no bloquean acceso público.

6. Swagger UI con JWT es UX++

El botón "Authorize" en Swagger UI hace testing de endpoints protegidos trivial.

🔐 Seguridad Best Practices

1. Never Version Private Key

# .gitignore
keys/private_key.pem
*.pem
!public_key.pem  # Public key is safe
Enter fullscreen mode Exit fullscreen mode

2. Secure Key Permissions

chmod 600 keys/private_key.pem  # Owner read/write only
chmod 644 keys/public_key.pem   # Read for everyone (safe)
Enter fullscreen mode Exit fullscreen mode

3. Rotate Keys Periodically

# Generate new key pair
./generate_jwt_keys.sh --force

# Deploy new public key to API
gcloud secrets versions add jwt-public-key --data-file=keys/public_key.pem

# Gradually phase out old tokens (based on expiry)
Enter fullscreen mode Exit fullscreen mode

4. Use Short-Lived Tokens

# For interactive users: 15-60 minutes
token = generator.generate_token(username, expiry_minutes=60)

# For service accounts: 7 days max
token = generator.generate_token(username, expiry_minutes=10080)
Enter fullscreen mode Exit fullscreen mode

5. Validate All Standard Claims

# Always verify: exp, iat, iss, aud
payload = jwt.decode(
    token,
    public_key,
    options={
        "verify_exp": True,
        "verify_iat": True,
        "verify_aud": True,
        "verify_iss": True,
    }
)
Enter fullscreen mode Exit fullscreen mode

🎯 El Propósito Más Grande

JWT con RS256 no es solo autenticación - es libertad arquitectural. Al eliminar la necesidad de estado compartido:

  • 🚀 Scaling: Cada instancia de la API es independiente
  • 🌐 Multi-region: Tokens válidos en cualquier región
  • 🔄 Microservices: Diferentes servicios validan con la misma public key
  • 💰 Cost: Zero infrastructure para auth
  • ⚡ Performance: Validación local ultra-rápida
  • 🔒 Security: Public key leak = no problem

Estamos construyendo una API que escala infinitamente sin necesidad de coordinación entre instancias, sin single points of failure, y sin costo adicional de infraestructura.


🔗 Recursos y Enlaces

Repositorio del Proyecto

Documentación Técnica

Recursos Externos


Próximo Post: LLPY-11 - Terraform: Infraestructura como Código

En el siguiente post exploraremos cómo gestionar toda la infraestructura de GCP con Terraform, desde VMs para Qdrant hasta Cloud Run para la API, con módulos reutilizables y CI/CD automatizado.

Top comments (0)