El flow completo de auth — login, hashing Argon2id, JWT firma y verificación, RBAC custom apilable, logout con blacklist persistente — lado a lado en FastAPI y en Fitz. Cero librerías nuevas, todo built-in al lenguaje.
La auth de verdad necesita más cosas de las que uno cree
Empezás con "necesito login con JWT". Después aparece:
- Hashing de passwords con un algoritmo bueno (Argon2id, no bcrypt para proyectos nuevos).
- RBAC porque no alcanza con "logueado/no logueado" — hay roles, hay endpoints de admin.
- Logout que de verdad invalide el token (no solo "olvidate de él del lado del cliente").
- Refresh tokens para no obligar al usuario a re-loguearse cada hora.
- Cleanup de la blacklist para no llenar Redis con tokens vencidos.
En FastAPI, cada una de esas cosas es una librería distinta más código pegamento. En Fitz, son parte del lenguaje.
El stack típico de Python
pip install python-jose[cryptography] passlib[argon2] argon2-cffi redis python-multipart
auth.py:
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import redis.asyncio as redis
import uuid
import time
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
redis_client = redis.from_url(os.environ["REDIS_URL"])
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
to_encode = data.copy()
jti = str(uuid.uuid4())
exp = datetime.now(tz=timezone.utc) + expires_delta
to_encode.update({"jti": jti, "exp": exp})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def is_blacklisted(jti: str) -> bool:
return await redis_client.exists(f"blacklist:{jti}") > 0
async def blacklist_token(jti: str, exp: int):
ttl = exp - int(time.time())
if ttl > 0:
await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="invalid token")
jti = payload.get("jti")
if jti and await is_blacklisted(jti):
raise HTTPException(status_code=401, detail="token revoked")
user = await db.fetch_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="user not found")
return user
def require_role(*roles):
async def checker(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(status_code=403, detail="forbidden")
return user
return checker
endpoints.py:
@app.post("/login")
async def login(creds: Credentials):
user = await db.fetch_user_by_email(creds.email)
if not user or not verify_password(creds.password, user.password_hash):
raise HTTPException(status_code=401, detail="invalid credentials")
token = create_access_token({"sub": user.email, "role": user.role})
return {"token": token}
@app.get("/me")
async def me(user: User = Depends(get_current_user)) -> User:
return user
@app.get("/admin/users")
async def admin_list(user: User = Depends(require_role("admin"))) -> list[User]:
return await db.fetch_all_users()
@app.post("/articles")
async def create_article(
body: ArticleIn,
user: User = Depends(require_role("editor", "publisher")),
) -> Article:
return await db.create_article(body, user.id)
@app.post("/auth/logout")
async def logout(user_and_jti: tuple[User, str] = Depends(get_user_and_jti)):
user, jti = user_and_jti
payload = jwt.decode(user.token, SECRET_KEY, algorithms=[ALGORITHM])
await blacklist_token(jti, payload["exp"])
return {"ok": True}
Ahí va. Cinco librerías, ~100 líneas de boilerplate antes de escribir lógica de negocio, una conexión Redis adicional, y la responsabilidad de mantenerlo sincronizado.
Lo mismo en Fitz
type User { id: Int, email: Str, name: Str, role: Str, password_hash: Str }
type Credentials { email: Str, password: Str }
type LoginResponse { token: Str }
type Article { id: Int, title: Str, body: Str, user_id: Int }
type ArticleIn { title: Str, body: Str }
let SECRET = secret("JWT_SECRET")
@auth_provider
async fn check_token(headers: Map<Str, Str>, db: DbConn) -> Result<User> {
let raw = match headers.get("authorization") {
Ok(v) => v,
Err(_) => return Err("falta header Authorization"),
}
let parts = raw.split(" ")
if (parts.len() != 2 or parts[0] != "Bearer") {
return Err("se esperaba 'Bearer <token>'")
}
let claims = jwt.decode(parts[1], SECRET.expose())?
let jti = match claims.get("jti") {
Ok(v) => v,
Err(_) => return Err("token sin jti"),
}
if (auth.is_blacklisted(db, jti).await?) {
return Err("token revocado")
}
return User.where(fn(u) => u.email == claims["sub"]).first(db).await
}
@post("/login")
async fn login(db: DbConn, creds: Credentials) -> LoginResponse {
let user: User = match User.where(fn(u) => u.email == creds.email).first(db).await {
Ok(u) => u,
Err(_) => return 401 { "error": "credenciales inválidas" },
}
if (not hash.verify(creds.password, user.password_hash)) {
return 401 { "error": "credenciales inválidas" }
}
let exp = DateTime.now().timestamp() + 86400 // 24 h de validez
// El payload del JWT es `Map<Str, Str>` en `fitz build` MVP — los
// numéricos se serializan a string. En `fitz run` la heterogeneidad
// pasa, pero usamos el shape estricto para mantener paridad bit-a-bit.
let claims = {
"sub": user.email,
"role": user.role,
"jti": Uuid.v4().to_str(),
"exp": "{exp}",
}
return LoginResponse { token: jwt.encode(claims, SECRET.expose()) }
}
@authenticated @get("/me")
fn me(user: User) -> User => user
@admin @get("/admin/users")
async fn admin_list(db: DbConn, user: User) -> List<User> {
return User.all(db).await
}
@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
return Article.insert(db, Article { id: 0, title: body.title, body: body.body, user_id: user.id }).await?
}
@authenticated @post("/auth/logout")
async fn logout(db: DbConn, user: User, headers: Map<Str, Str>) -> Map<Str, Bool> {
// Re-decodificamos el token sólo para sacar jti del claims.
let raw = headers["authorization"]
let token = raw.split(" ")[1]
let claims = jwt.decode(token, SECRET.expose())?
let jti = claims.get("jti")?
// TTL fresca de 24h. Cuando el JWT expira, `jwt.decode` ya lo rechaza
// por expirado; la blacklist solo previene reuse del jti antes de exp.
// No leemos exp del payload (vendría como Str y `auth.blacklist` exige Int).
let ttl = DateTime.now().timestamp() + 86400
auth.blacklist(db, jti, ttl).await?
return { "ok": true }
}
@cron("0 0 3 * * *")
async fn cleanup_blacklist(db: DbConn) {
auth.cleanup_expired(db).await
}
Cero pip install. Cero archivo auth.py aparte. El compilador valida cada decorador estáticamente.
La tabla cruda
| Pieza | Python (FastAPI + 5 libs) | Fitz |
|---|---|---|
| Hash de passwords |
passlib[argon2] + CryptContext(schemes=["argon2"])
|
hash.password(...) built-in |
| Verificar password | pwd_context.verify(...) |
hash.verify(...) |
| Firmar JWT | jose.jwt.encode(..., SECRET, algorithm="HS256") |
jwt.encode(claims, SECRET) |
| Decodificar JWT |
jose.jwt.decode(...) + try/except |
jwt.decode(token, SECRET)? |
| Auth scheme |
OAuth2PasswordBearer + Depends(...)
|
@auth_provider fn check_token(...) |
| Endpoint protegido |
Depends(get_current_user) en cada uno |
@authenticated |
| Admin only | RBAC casero con require_role("admin")
|
@admin |
| RBAC custom | Helper require_role(*roles) + Depends |
@requires("editor") apilable |
| Token blacklist | Redis + lógica manual + TTL calculado a mano | auth.blacklist(db, jti, exp) |
| Cleanup vencidos | Redis SET TTL (auto) o tú manual si DB |
auth.cleanup_expired(db) + @cron
|
| Validación estática | Ninguna — error en runtime cuando llega req |
@authenticated sin @auth_provider → error de compilación |
Schema OpenAPI con bearerAuth
|
Tenés que declararlo en app = FastAPI(...)
|
Automático cuando hay @auth_provider
|
Por partes
Hash de passwords
Python:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
hashed = pwd_context.hash("password123")
ok = pwd_context.verify("password123", hashed)
Fitz:
let hashed = hash.password("password123")
let ok = hash.verify("password123", hashed)
Argon2id por defecto (recomendación OWASP). Sin elegir esquemas, sin configurar deprecated="auto". Lo que querés con buenos defaults.
JWT
Python:
from jose import jwt, JWTError
from datetime import datetime, timezone, timedelta
exp = datetime.now(tz=timezone.utc) + timedelta(hours=24)
token = jwt.encode({"sub": "ada", "exp": exp}, SECRET, algorithm="HS256")
try:
claims = jwt.decode(token, SECRET, algorithms=["HS256"])
except JWTError as e:
raise HTTPException(401, f"invalid token: {e}")
Fitz:
let exp = DateTime.now().timestamp() + 86400
let token = jwt.encode({ "sub": "ada", "exp": "{exp}" }, SECRET)
match jwt.decode(token, SECRET) {
Ok(claims) => process(claims),
Err(e) => return 401 { "error": "{e}" },
}
HS256 por default. HS384/HS512 disponibles con kwarg. Excepción → Result::Err automático, manejado con ? o match.
RBAC custom apilable
Acá es donde Fitz claramente diferencia. En FastAPI, RBAC más allá de "admin sí / no" significa escribir un helper:
def require_role(*roles):
async def checker(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(403, "forbidden")
return user
return checker
@app.post("/articles")
async def create_article(
body: ArticleIn,
user: User = Depends(require_role("editor", "publisher")),
):
...
En Fitz:
@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
...
}
Multi-decorator es OR (el user pertenece a uno u otro role). El checker valida estáticamente:
- Que
Usertenga camporole: Strno nullable. - Que no haya duplicados (
@requires("editor") @requires("editor")→ error compile-time). - Que el handler tenga un
@auth_provideractivo en el archivo. - Que el último param del handler que matchee con el type del provider sea el inyectado.
Errores que en Python aparecen en runtime cuando un user le pega al endpoint, en Fitz aparecen en fitz check antes de commitear.
Token blacklist persistente
Esta es la pieza que muchos proyectos en FastAPI nunca terminan de hacer bien — porque significa sumar Redis aunque tu app no la necesite para nada más, escribir el client async, calcular TTLs a mano, decidir si la falla de Redis tira el request o lo deja pasar...
async def blacklist_token(jti: str, exp: int):
ttl = exp - int(time.time())
if ttl > 0:
await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)
async def is_blacklisted(jti: str) -> bool:
return await redis_client.exists(f"blacklist:{jti}") > 0
Plus mantener Redis vivo, montar el client, manejar reconexiones.
En Fitz, usás el Postgres que ya tenés:
@authenticated
@post("/auth/logout")
async fn logout(db: DbConn, user: User) -> Map<Str, Bool> {
auth.blacklist(db, user.jti, user.exp).await?
return { "ok": true }
}
@cron("0 0 3 * * *") // 3 AM diario
async fn cleanup_blacklist(db: DbConn) {
auth.cleanup_expired(db).await
}
Tabla fitz_token_blacklist(jti TEXT PRIMARY KEY, expires_at BIGINT NOT NULL) se auto-crea con CREATE TABLE IF NOT EXISTS al primer call. Auto-filtro expires_at > now() cuando checkea (tokens vencidos no necesitan seguir bloqueando — jwt.decode los rechaza primero por exp).
El patrón /auth/logout y /auth/refresh no se auto-monta (intencional — el flow exacto varía por proyecto), pero los tres builtins (auth.blacklist/auth.is_blacklisted/auth.cleanup_expired) te lo dan hecho en ~10 líneas de tu código.
OpenAPI con bearerAuth automático
En FastAPI tenés que declararlo:
app = FastAPI(
swagger_ui_init_oauth={...},
)
# y la security scheme en cada endpoint manualmente o con dependencies
En Fitz, cuando hay @auth_provider en el programa, el schema OpenAPI se emite con:
{
"components": {
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
},
"paths": {
"/me": {
"get": {
"security": [{"bearerAuth": []}],
"responses": {
"200": {...},
"401": {"description": "auth"},
}
}
}
}
}
La UI de Scalar en /docs te muestra el botón "Authorize" funcional. Auto-mounted. Sin configurar.
El checker hace gran parte del trabajo
Antes de levantar el server, fitz check valida:
-
@authenticatedsin@auth_provider→ "se usa @authenticated pero no hay @auth_provider declarado". -
@adminsobre handler cuyo User no tienerole: Strno nullable → "para usar @admin el type User debe tener camporole: Str". -
@requires("X")con typo → no chequea valores (el role es un Str arbitrario), pero detecta apilados duplicados. -
JWT decode return type →
Result<Map<Str, Str>>, exige manejo de Err. -
El param inyectado por
@auth_provider→ su tipo debe matchear el del User retornado por el provider.
Errores que en FastAPI/Flask típicamente se descubren cuando el request llega a producción, en Fitz aparecen en CI.
Decisiones de diseño que vale la pena entender
auth.blacklist exige DbConn explícita
En lugar de un singleton global, los 3 builtins (auth.blacklist/is_blacklisted/cleanup_expired) toman DbConn como primer arg. Razón: hace explícito el costo en cada call site, y testeable contra una DB de test.
Mensaje de 403 enriquecido para @requires
Cuando el role no matchea:
{
"error": "forbidden",
"current_role": "viewer",
"required_roles": ["editor", "publisher"]
}
Habilita debugging serio y observability (los logs estructurados con log.info("auth.denied", { user_id, role, required }) te dicen exactamente qué falló).
@requires implica auth
No necesitás apilar @authenticated @requires("editor") — el @requires ya corre el provider. Apilarlo es no-op (y el checker lo flagea).
Lo que Fitz NO te da (todavía)
- OAuth2 social login (Google/GitHub/etc.) — el flow OAuth contra el provider lo hacés a mano contra los endpoints conocidos.
-
Multi-role (
user.roles: List<Str>) — hoy es single role. Roles compuestos: deuda residual de 9.w.1.iter2. - Asymmetric JWT (RS256/ES256 con PEM y rotación) — solo HS256/384/512.
-
Role hierarchy (admin > editor > viewer) — no se modela. Workaround: si admin debe poder hacer lo que editor,
@requires("admin") @requires("editor")(apilado). - Session cookie-based alternativo a JWT.
-
/auth/refreshy/auth/logoutauto-mounted — el patrón canónico (~10 LoC con los builtins) está en el cap 28 de la guía pero queda manual. Auto-mount con@server(auto_auth_endpoints=true)es deuda visible.
Cierre
Lo que más me cansa de hacer auth en Python no es escribir el primer endpoint — es mantener consistencia entre tres lugares: el Depends(...) de FastAPI, la lib de JWT, y el RBAC casero. Cuando agregás @requires("editor") y te olvidaste de actualizar el helper del RBAC, no te enterás hasta que un user te reporta un bug.
En Fitz el checker estático cierra ese loop. El @auth_provider es la fuente de verdad del tipo User. @authenticated/@admin/@requires consumen ese tipo. La OpenAPI lo refleja. La blacklist usa la misma DB. Cero deps externas para auth.
Si tu proyecto está en uno de los casos del MVP — JWT, roles simples, blacklist en Postgres — Fitz te ahorra el día de pegar librerías. Si necesitás OAuth social y multi-role, todavía es Python (o tomalo como invitación a abrir un issue).
Próximo post de la serie: "Tracing distribuido, métricas Prometheus y logs estructurados con dos decoradores: @trace/@metric en Fitz vs el setup de OpenTelemetry en FastAPI" — observability lado a lado.
Repo: github.com/Thegreekman76/fitz
Capítulo 28 de la guía (Auth nativa): docs/guide.md
Top comments (0)