DEV Community

Cover image for Auth con JWT, RBAC y token blacklist sin pegar 5 librerías: Fitz vs FastAPI + python-jose + passlib + Redis + RBAC casero
Martin Palopoli
Martin Palopoli

Posted on

Auth con JWT, RBAC y token blacklist sin pegar 5 librerías: Fitz vs FastAPI + python-jose + passlib + Redis + RBAC casero

El flow completo de auth — login, hashing Argon2id, JWT firma y verificación, RBAC custom apilable, logout con blacklist persistente — lado a lado en FastAPI y en Fitz. Cero librerías nuevas, todo built-in al lenguaje.

La auth de verdad necesita más cosas de las que uno cree

Empezás con "necesito login con JWT". Después aparece:

  • Hashing de passwords con un algoritmo bueno (Argon2id, no bcrypt para proyectos nuevos).
  • RBAC porque no alcanza con "logueado/no logueado" — hay roles, hay endpoints de admin.
  • Logout que de verdad invalide el token (no solo "olvidate de él del lado del cliente").
  • Refresh tokens para no obligar al usuario a re-loguearse cada hora.
  • Cleanup de la blacklist para no llenar Redis con tokens vencidos.

En FastAPI, cada una de esas cosas es una librería distinta más código pegamento. En Fitz, son parte del lenguaje.

El stack típico de Python

pip install python-jose[cryptography] passlib[argon2] argon2-cffi redis python-multipart
Enter fullscreen mode Exit fullscreen mode

auth.py:

from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import redis.asyncio as redis
import uuid
import time

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"

pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
redis_client = redis.from_url(os.environ["REDIS_URL"])

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    to_encode = data.copy()
    jti = str(uuid.uuid4())
    exp = datetime.now(tz=timezone.utc) + expires_delta
    to_encode.update({"jti": jti, "exp": exp})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def is_blacklisted(jti: str) -> bool:
    return await redis_client.exists(f"blacklist:{jti}") > 0

async def blacklist_token(jti: str, exp: int):
    ttl = exp - int(time.time())
    if ttl > 0:
        await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        raise HTTPException(status_code=401, detail="invalid token")
    jti = payload.get("jti")
    if jti and await is_blacklisted(jti):
        raise HTTPException(status_code=401, detail="token revoked")
    user = await db.fetch_user(payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="user not found")
    return user

def require_role(*roles):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if user.role not in roles:
            raise HTTPException(status_code=403, detail="forbidden")
        return user
    return checker
Enter fullscreen mode Exit fullscreen mode

endpoints.py:

@app.post("/login")
async def login(creds: Credentials):
    user = await db.fetch_user_by_email(creds.email)
    if not user or not verify_password(creds.password, user.password_hash):
        raise HTTPException(status_code=401, detail="invalid credentials")
    token = create_access_token({"sub": user.email, "role": user.role})
    return {"token": token}

@app.get("/me")
async def me(user: User = Depends(get_current_user)) -> User:
    return user

@app.get("/admin/users")
async def admin_list(user: User = Depends(require_role("admin"))) -> list[User]:
    return await db.fetch_all_users()

@app.post("/articles")
async def create_article(
    body: ArticleIn,
    user: User = Depends(require_role("editor", "publisher")),
) -> Article:
    return await db.create_article(body, user.id)

@app.post("/auth/logout")
async def logout(user_and_jti: tuple[User, str] = Depends(get_user_and_jti)):
    user, jti = user_and_jti
    payload = jwt.decode(user.token, SECRET_KEY, algorithms=[ALGORITHM])
    await blacklist_token(jti, payload["exp"])
    return {"ok": True}
Enter fullscreen mode Exit fullscreen mode

Ahí va. Cinco librerías, ~100 líneas de boilerplate antes de escribir lógica de negocio, una conexión Redis adicional, y la responsabilidad de mantenerlo sincronizado.

Lo mismo en Fitz

type User { id: Int, email: Str, name: Str, role: Str, password_hash: Str }
type Credentials { email: Str, password: Str }
type LoginResponse { token: Str }
type Article { id: Int, title: Str, body: Str, user_id: Int }
type ArticleIn { title: Str, body: Str }

let SECRET = secret("JWT_SECRET")

@auth_provider
async fn check_token(headers: Map<Str, Str>, db: DbConn) -> Result<User> {
    let raw = match headers.get("authorization") {
        Ok(v) => v,
        Err(_) => return Err("falta header Authorization"),
    }
    let parts = raw.split(" ")
    if (parts.len() != 2 or parts[0] != "Bearer") {
        return Err("se esperaba 'Bearer <token>'")
    }
    let claims = jwt.decode(parts[1], SECRET.expose())?
    let jti = match claims.get("jti") {
        Ok(v) => v,
        Err(_) => return Err("token sin jti"),
    }
    if (auth.is_blacklisted(db, jti).await?) {
        return Err("token revocado")
    }
    return User.where(fn(u) => u.email == claims["sub"]).first(db).await
}

@post("/login")
async fn login(db: DbConn, creds: Credentials) -> LoginResponse {
    let user: User = match User.where(fn(u) => u.email == creds.email).first(db).await {
        Ok(u) => u,
        Err(_) => return 401 { "error": "credenciales inválidas" },
    }
    if (not hash.verify(creds.password, user.password_hash)) {
        return 401 { "error": "credenciales inválidas" }
    }
    let exp = DateTime.now().timestamp() + 86400  // 24 h de validez
    // El payload del JWT es `Map<Str, Str>` en `fitz build` MVP — los
    // numéricos se serializan a string. En `fitz run` la heterogeneidad
    // pasa, pero usamos el shape estricto para mantener paridad bit-a-bit.
    let claims = {
        "sub": user.email,
        "role": user.role,
        "jti": Uuid.v4().to_str(),
        "exp": "{exp}",
    }
    return LoginResponse { token: jwt.encode(claims, SECRET.expose()) }
}

@authenticated @get("/me")
fn me(user: User) -> User => user

@admin @get("/admin/users")
async fn admin_list(db: DbConn, user: User) -> List<User> {
    return User.all(db).await
}

@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
    return Article.insert(db, Article { id: 0, title: body.title, body: body.body, user_id: user.id }).await?
}

@authenticated @post("/auth/logout")
async fn logout(db: DbConn, user: User, headers: Map<Str, Str>) -> Map<Str, Bool> {
    // Re-decodificamos el token sólo para sacar jti del claims.
    let raw = headers["authorization"]
    let token = raw.split(" ")[1]
    let claims = jwt.decode(token, SECRET.expose())?
    let jti = claims.get("jti")?
    // TTL fresca de 24h. Cuando el JWT expira, `jwt.decode` ya lo rechaza
    // por expirado; la blacklist solo previene reuse del jti antes de exp.
    // No leemos exp del payload (vendría como Str y `auth.blacklist` exige Int).
    let ttl = DateTime.now().timestamp() + 86400
    auth.blacklist(db, jti, ttl).await?
    return { "ok": true }
}

@cron("0 0 3 * * *")
async fn cleanup_blacklist(db: DbConn) {
    auth.cleanup_expired(db).await
}
Enter fullscreen mode Exit fullscreen mode

Cero pip install. Cero archivo auth.py aparte. El compilador valida cada decorador estáticamente.

La tabla cruda

Pieza Python (FastAPI + 5 libs) Fitz
Hash de passwords passlib[argon2] + CryptContext(schemes=["argon2"]) hash.password(...) built-in
Verificar password pwd_context.verify(...) hash.verify(...)
Firmar JWT jose.jwt.encode(..., SECRET, algorithm="HS256") jwt.encode(claims, SECRET)
Decodificar JWT jose.jwt.decode(...) + try/except jwt.decode(token, SECRET)?
Auth scheme OAuth2PasswordBearer + Depends(...) @auth_provider fn check_token(...)
Endpoint protegido Depends(get_current_user) en cada uno @authenticated
Admin only RBAC casero con require_role("admin") @admin
RBAC custom Helper require_role(*roles) + Depends @requires("editor") apilable
Token blacklist Redis + lógica manual + TTL calculado a mano auth.blacklist(db, jti, exp)
Cleanup vencidos Redis SET TTL (auto) o tú manual si DB auth.cleanup_expired(db) + @cron
Validación estática Ninguna — error en runtime cuando llega req @authenticated sin @auth_provider → error de compilación
Schema OpenAPI con bearerAuth Tenés que declararlo en app = FastAPI(...) Automático cuando hay @auth_provider

Por partes

Hash de passwords

Python:

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

hashed = pwd_context.hash("password123")
ok = pwd_context.verify("password123", hashed)
Enter fullscreen mode Exit fullscreen mode

Fitz:

let hashed = hash.password("password123")
let ok = hash.verify("password123", hashed)
Enter fullscreen mode Exit fullscreen mode

Argon2id por defecto (recomendación OWASP). Sin elegir esquemas, sin configurar deprecated="auto". Lo que querés con buenos defaults.

JWT

Python:

from jose import jwt, JWTError
from datetime import datetime, timezone, timedelta

exp = datetime.now(tz=timezone.utc) + timedelta(hours=24)
token = jwt.encode({"sub": "ada", "exp": exp}, SECRET, algorithm="HS256")

try:
    claims = jwt.decode(token, SECRET, algorithms=["HS256"])
except JWTError as e:
    raise HTTPException(401, f"invalid token: {e}")
Enter fullscreen mode Exit fullscreen mode

Fitz:

let exp = DateTime.now().timestamp() + 86400
let token = jwt.encode({ "sub": "ada", "exp": "{exp}" }, SECRET)

match jwt.decode(token, SECRET) {
    Ok(claims) => process(claims),
    Err(e) => return 401 { "error": "{e}" },
}
Enter fullscreen mode Exit fullscreen mode

HS256 por default. HS384/HS512 disponibles con kwarg. Excepción → Result::Err automático, manejado con ? o match.

RBAC custom apilable

Acá es donde Fitz claramente diferencia. En FastAPI, RBAC más allá de "admin sí / no" significa escribir un helper:

def require_role(*roles):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if user.role not in roles:
            raise HTTPException(403, "forbidden")
        return user
    return checker

@app.post("/articles")
async def create_article(
    body: ArticleIn,
    user: User = Depends(require_role("editor", "publisher")),
):
    ...
Enter fullscreen mode Exit fullscreen mode

En Fitz:

@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
    ...
}
Enter fullscreen mode Exit fullscreen mode

Multi-decorator es OR (el user pertenece a uno u otro role). El checker valida estáticamente:

  • Que User tenga campo role: Str no nullable.
  • Que no haya duplicados (@requires("editor") @requires("editor") → error compile-time).
  • Que el handler tenga un @auth_provider activo en el archivo.
  • Que el último param del handler que matchee con el type del provider sea el inyectado.

Errores que en Python aparecen en runtime cuando un user le pega al endpoint, en Fitz aparecen en fitz check antes de commitear.

Token blacklist persistente

Esta es la pieza que muchos proyectos en FastAPI nunca terminan de hacer bien — porque significa sumar Redis aunque tu app no la necesite para nada más, escribir el client async, calcular TTLs a mano, decidir si la falla de Redis tira el request o lo deja pasar...

async def blacklist_token(jti: str, exp: int):
    ttl = exp - int(time.time())
    if ttl > 0:
        await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)

async def is_blacklisted(jti: str) -> bool:
    return await redis_client.exists(f"blacklist:{jti}") > 0
Enter fullscreen mode Exit fullscreen mode

Plus mantener Redis vivo, montar el client, manejar reconexiones.

En Fitz, usás el Postgres que ya tenés:

@authenticated
@post("/auth/logout")
async fn logout(db: DbConn, user: User) -> Map<Str, Bool> {
    auth.blacklist(db, user.jti, user.exp).await?
    return { "ok": true }
}

@cron("0 0 3 * * *")  // 3 AM diario
async fn cleanup_blacklist(db: DbConn) {
    auth.cleanup_expired(db).await
}
Enter fullscreen mode Exit fullscreen mode

Tabla fitz_token_blacklist(jti TEXT PRIMARY KEY, expires_at BIGINT NOT NULL) se auto-crea con CREATE TABLE IF NOT EXISTS al primer call. Auto-filtro expires_at > now() cuando checkea (tokens vencidos no necesitan seguir bloqueando — jwt.decode los rechaza primero por exp).

El patrón /auth/logout y /auth/refresh no se auto-monta (intencional — el flow exacto varía por proyecto), pero los tres builtins (auth.blacklist/auth.is_blacklisted/auth.cleanup_expired) te lo dan hecho en ~10 líneas de tu código.

OpenAPI con bearerAuth automático

En FastAPI tenés que declararlo:

app = FastAPI(
    swagger_ui_init_oauth={...},
)
# y la security scheme en cada endpoint manualmente o con dependencies
Enter fullscreen mode Exit fullscreen mode

En Fitz, cuando hay @auth_provider en el programa, el schema OpenAPI se emite con:

{
  "components": {
    "securitySchemes": {
      "bearerAuth": {
        "type": "http",
        "scheme": "bearer",
        "bearerFormat": "JWT"
      }
    }
  },
  "paths": {
    "/me": {
      "get": {
        "security": [{"bearerAuth": []}],
        "responses": {
          "200": {...},
          "401": {"description": "auth"},
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

La UI de Scalar en /docs te muestra el botón "Authorize" funcional. Auto-mounted. Sin configurar.

El checker hace gran parte del trabajo

Antes de levantar el server, fitz check valida:

  • @authenticated sin @auth_provider → "se usa @authenticated pero no hay @auth_provider declarado".
  • @admin sobre handler cuyo User no tiene role: Str no nullable → "para usar @admin el type User debe tener campo role: Str".
  • @requires("X") con typo → no chequea valores (el role es un Str arbitrario), pero detecta apilados duplicados.
  • JWT decode return typeResult<Map<Str, Str>>, exige manejo de Err.
  • El param inyectado por @auth_provider → su tipo debe matchear el del User retornado por el provider.

Errores que en FastAPI/Flask típicamente se descubren cuando el request llega a producción, en Fitz aparecen en CI.

Decisiones de diseño que vale la pena entender

auth.blacklist exige DbConn explícita

En lugar de un singleton global, los 3 builtins (auth.blacklist/is_blacklisted/cleanup_expired) toman DbConn como primer arg. Razón: hace explícito el costo en cada call site, y testeable contra una DB de test.

Mensaje de 403 enriquecido para @requires

Cuando el role no matchea:

{
  "error": "forbidden",
  "current_role": "viewer",
  "required_roles": ["editor", "publisher"]
}
Enter fullscreen mode Exit fullscreen mode

Habilita debugging serio y observability (los logs estructurados con log.info("auth.denied", { user_id, role, required }) te dicen exactamente qué falló).

@requires implica auth

No necesitás apilar @authenticated @requires("editor") — el @requires ya corre el provider. Apilarlo es no-op (y el checker lo flagea).

Lo que Fitz NO te da (todavía)

  • OAuth2 social login (Google/GitHub/etc.) — el flow OAuth contra el provider lo hacés a mano contra los endpoints conocidos.
  • Multi-role (user.roles: List<Str>) — hoy es single role. Roles compuestos: deuda residual de 9.w.1.iter2.
  • Asymmetric JWT (RS256/ES256 con PEM y rotación) — solo HS256/384/512.
  • Role hierarchy (admin > editor > viewer) — no se modela. Workaround: si admin debe poder hacer lo que editor, @requires("admin") @requires("editor") (apilado).
  • Session cookie-based alternativo a JWT.
  • /auth/refresh y /auth/logout auto-mounted — el patrón canónico (~10 LoC con los builtins) está en el cap 28 de la guía pero queda manual. Auto-mount con @server(auto_auth_endpoints=true) es deuda visible.

Cierre

Lo que más me cansa de hacer auth en Python no es escribir el primer endpoint — es mantener consistencia entre tres lugares: el Depends(...) de FastAPI, la lib de JWT, y el RBAC casero. Cuando agregás @requires("editor") y te olvidaste de actualizar el helper del RBAC, no te enterás hasta que un user te reporta un bug.

En Fitz el checker estático cierra ese loop. El @auth_provider es la fuente de verdad del tipo User. @authenticated/@admin/@requires consumen ese tipo. La OpenAPI lo refleja. La blacklist usa la misma DB. Cero deps externas para auth.

Si tu proyecto está en uno de los casos del MVP — JWT, roles simples, blacklist en Postgres — Fitz te ahorra el día de pegar librerías. Si necesitás OAuth social y multi-role, todavía es Python (o tomalo como invitación a abrir un issue).


Próximo post de la serie: "Tracing distribuido, métricas Prometheus y logs estructurados con dos decoradores: @trace/@metric en Fitz vs el setup de OpenTelemetry en FastAPI" — observability lado a lado.

Repo: github.com/Thegreekman76/fitz
Capítulo 28 de la guía (Auth nativa): docs/guide.md

Top comments (0)