DEV Community

Cover image for JWT auth, RBAC, and token blacklist without gluing 5 libraries: Fitz vs FastAPI + python-jose + passlib + Redis + home-grown RBAC
Martin Palopoli
Martin Palopoli

Posted on

JWT auth, RBAC, and token blacklist without gluing 5 libraries: Fitz vs FastAPI + python-jose + passlib + Redis + home-grown RBAC

The full auth flow — login, Argon2id hashing, JWT sign and verify, stackable custom RBAC, logout with persistent blacklist — side by side in FastAPI and in Fitz. Zero new libraries, everything built into the language.

Real auth needs more pieces than you'd think

You start with "I need login with JWT". Then comes:

  • Password hashing with a good algorithm (Argon2id, not bcrypt for new projects).
  • RBAC because "logged in / not logged in" isn't enough — there are roles, there are admin endpoints.
  • Logout that actually invalidates the token (not just "forget about it on the client side").
  • Refresh tokens so users don't have to re-log every hour.
  • Cleanup of the blacklist so you don't fill Redis with expired tokens.

In FastAPI, each of those is a separate library plus glue code. In Fitz, they're part of the language.

The typical Python stack

pip install python-jose[cryptography] passlib[argon2] argon2-cffi redis python-multipart
Enter fullscreen mode Exit fullscreen mode

auth.py:

from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import redis.asyncio as redis
import uuid
import time

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"

pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
redis_client = redis.from_url(os.environ["REDIS_URL"])

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    to_encode = data.copy()
    jti = str(uuid.uuid4())
    exp = datetime.now(tz=timezone.utc) + expires_delta
    to_encode.update({"jti": jti, "exp": exp})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def is_blacklisted(jti: str) -> bool:
    return await redis_client.exists(f"blacklist:{jti}") > 0

async def blacklist_token(jti: str, exp: int):
    ttl = exp - int(time.time())
    if ttl > 0:
        await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        raise HTTPException(status_code=401, detail="invalid token")
    jti = payload.get("jti")
    if jti and await is_blacklisted(jti):
        raise HTTPException(status_code=401, detail="token revoked")
    user = await db.fetch_user(payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="user not found")
    return user

def require_role(*roles):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if user.role not in roles:
            raise HTTPException(status_code=403, detail="forbidden")
        return user
    return checker
Enter fullscreen mode Exit fullscreen mode

endpoints.py:

@app.post("/login")
async def login(creds: Credentials):
    user = await db.fetch_user_by_email(creds.email)
    if not user or not verify_password(creds.password, user.password_hash):
        raise HTTPException(status_code=401, detail="invalid credentials")
    token = create_access_token({"sub": user.email, "role": user.role})
    return {"token": token}

@app.get("/me")
async def me(user: User = Depends(get_current_user)) -> User:
    return user

@app.get("/admin/users")
async def admin_list(user: User = Depends(require_role("admin"))) -> list[User]:
    return await db.fetch_all_users()

@app.post("/articles")
async def create_article(
    body: ArticleIn,
    user: User = Depends(require_role("editor", "publisher")),
) -> Article:
    return await db.create_article(body, user.id)

@app.post("/auth/logout")
async def logout(user_and_jti: tuple[User, str] = Depends(get_user_and_jti)):
    user, jti = user_and_jti
    payload = jwt.decode(user.token, SECRET_KEY, algorithms=[ALGORITHM])
    await blacklist_token(jti, payload["exp"])
    return {"ok": True}
Enter fullscreen mode Exit fullscreen mode

There it is. Five libraries, ~100 lines of boilerplate before you write any business logic, an extra Redis connection, and the responsibility to keep it in sync.

The same thing in Fitz

type User { id: Int, email: Str, name: Str, role: Str, password_hash: Str }
type Credentials { email: Str, password: Str }
type LoginResponse { token: Str }
type Article { id: Int, title: Str, body: Str, user_id: Int }
type ArticleIn { title: Str, body: Str }

let SECRET = secret("JWT_SECRET")

@auth_provider
async fn check_token(headers: Map<Str, Str>, db: DbConn) -> Result<User> {
    let raw = match headers.get("authorization") {
        Ok(v) => v,
        Err(_) => return Err("missing Authorization header"),
    }
    let parts = raw.split(" ")
    if (parts.len() != 2 or parts[0] != "Bearer") {
        return Err("expected 'Bearer <token>'")
    }
    let claims = jwt.decode(parts[1], SECRET.expose())?
    let jti = match claims.get("jti") {
        Ok(v) => v,
        Err(_) => return Err("token missing jti"),
    }
    if (auth.is_blacklisted(db, jti).await?) {
        return Err("token revoked")
    }
    return User.where(fn(u) => u.email == claims["sub"]).first(db).await
}

@post("/login")
async fn login(db: DbConn, creds: Credentials) -> LoginResponse {
    let user: User = match User.where(fn(u) => u.email == creds.email).first(db).await {
        Ok(u) => u,
        Err(_) => return 401 { "error": "invalid credentials" },
    }
    if (not hash.verify(creds.password, user.password_hash)) {
        return 401 { "error": "invalid credentials" }
    }
    let exp = DateTime.now().timestamp() + 86400  // 24h validity
    // The JWT payload is `Map<Str, Str>` in the `fitz build` MVP — numerics
    // serialize to string. In `fitz run` heterogeneity passes, but we use
    // the strict shape to preserve bit-for-bit parity.
    let claims = {
        "sub": user.email,
        "role": user.role,
        "jti": Uuid.v4().to_str(),
        "exp": "{exp}",
    }
    return LoginResponse { token: jwt.encode(claims, SECRET.expose()) }
}

@authenticated @get("/me")
fn me(user: User) -> User => user

@admin @get("/admin/users")
async fn admin_list(db: DbConn, user: User) -> List<User> {
    return User.all(db).await
}

@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
    return Article.insert(db, Article { id: 0, title: body.title, body: body.body, user_id: user.id }).await?
}

@authenticated @post("/auth/logout")
async fn logout(db: DbConn, user: User, headers: Map<Str, Str>) -> Map<Str, Bool> {
    // We re-decode the token just to pull jti from the claims.
    let raw = headers["authorization"]
    let token = raw.split(" ")[1]
    let claims = jwt.decode(token, SECRET.expose())?
    let jti = claims.get("jti")?
    // Fresh 24h TTL. When the JWT expires, `jwt.decode` rejects it as
    // expired; the blacklist only prevents reuse of the jti before exp.
    // We don't read exp from the payload (it would come back as Str and
    // `auth.blacklist` requires Int).
    let ttl = DateTime.now().timestamp() + 86400
    auth.blacklist(db, jti, ttl).await?
    return { "ok": true }
}

@cron("0 0 3 * * *")
async fn cleanup_blacklist(db: DbConn) {
    auth.cleanup_expired(db).await
}
Enter fullscreen mode Exit fullscreen mode

Zero pip install. Zero separate auth.py file. The compiler validates every decorator statically.

The raw table

Piece Python (FastAPI + 5 libs) Fitz
Password hashing passlib[argon2] + CryptContext(schemes=["argon2"]) hash.password(...) built-in
Verify password pwd_context.verify(...) hash.verify(...)
Sign JWT jose.jwt.encode(..., SECRET, algorithm="HS256") jwt.encode(claims, SECRET)
Decode JWT jose.jwt.decode(...) + try/except jwt.decode(token, SECRET)?
Auth scheme OAuth2PasswordBearer + Depends(...) @auth_provider fn check_token(...)
Protected endpoint Depends(get_current_user) on each one @authenticated
Admin only Home-grown RBAC with require_role("admin") @admin
Custom RBAC require_role(*roles) helper + Depends @requires("editor") stackable
Token blacklist Redis + manual logic + manually computed TTL auth.blacklist(db, jti, exp)
Expired cleanup Redis SET TTL (auto) or manual if DB auth.cleanup_expired(db) + @cron
Static validation None — runtime error when a request hits @authenticated without @auth_provider → compile error
OpenAPI bearerAuth schema Declare in app = FastAPI(...) Automatic when @auth_provider is present

Piece by piece

Password hashing

Python:

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

hashed = pwd_context.hash("password123")
ok = pwd_context.verify("password123", hashed)
Enter fullscreen mode Exit fullscreen mode

Fitz:

let hashed = hash.password("password123")
let ok = hash.verify("password123", hashed)
Enter fullscreen mode Exit fullscreen mode

Argon2id by default (OWASP recommendation). No picking schemes, no configuring deprecated="auto". What you want with good defaults.

JWT

Python:

from jose import jwt, JWTError
from datetime import datetime, timezone, timedelta

exp = datetime.now(tz=timezone.utc) + timedelta(hours=24)
token = jwt.encode({"sub": "ada", "exp": exp}, SECRET, algorithm="HS256")

try:
    claims = jwt.decode(token, SECRET, algorithms=["HS256"])
except JWTError as e:
    raise HTTPException(401, f"invalid token: {e}")
Enter fullscreen mode Exit fullscreen mode

Fitz:

let exp = DateTime.now().timestamp() + 86400
let token = jwt.encode({ "sub": "ada", "exp": "{exp}" }, SECRET)

match jwt.decode(token, SECRET) {
    Ok(claims) => process(claims),
    Err(e) => return 401 { "error": "{e}" },
}
Enter fullscreen mode Exit fullscreen mode

HS256 by default. HS384/HS512 available via kwarg. Exception → Result::Err automatic, handled with ? or match.

Stackable custom RBAC

This is where Fitz clearly stands apart. In FastAPI, RBAC beyond "admin yes / no" means writing a helper:

def require_role(*roles):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if user.role not in roles:
            raise HTTPException(403, "forbidden")
        return user
    return checker

@app.post("/articles")
async def create_article(
    body: ArticleIn,
    user: User = Depends(require_role("editor", "publisher")),
):
    ...
Enter fullscreen mode Exit fullscreen mode

In Fitz:

@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
    ...
}
Enter fullscreen mode Exit fullscreen mode

Multi-decorator is OR (the user belongs to one or the other role). The checker validates statically:

  • That User has a non-nullable role: Str field.
  • That there are no duplicates (@requires("editor") @requires("editor") → compile-time error).
  • That the handler has an active @auth_provider in the file.
  • That the handler's last param matching the provider's return type is the injected one.

Errors that in Python appear at runtime when a user hits the endpoint, in Fitz appear in fitz check before you commit.

Persistent token blacklist

This is the piece many FastAPI projects never finish properly — because it means adding Redis even though your app doesn't need it for anything else, writing the async client, computing TTLs by hand, deciding whether a Redis failure should fail the request or let it through...

async def blacklist_token(jti: str, exp: int):
    ttl = exp - int(time.time())
    if ttl > 0:
        await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)

async def is_blacklisted(jti: str) -> bool:
    return await redis_client.exists(f"blacklist:{jti}") > 0
Enter fullscreen mode Exit fullscreen mode

Plus keeping Redis alive, mounting the client, handling reconnects.

In Fitz, you use the Postgres you already have:

@authenticated
@post("/auth/logout")
async fn logout(db: DbConn, user: User) -> Map<Str, Bool> {
    auth.blacklist(db, user.jti, user.exp).await?
    return { "ok": true }
}

@cron("0 0 3 * * *")  // 3 AM daily
async fn cleanup_blacklist(db: DbConn) {
    auth.cleanup_expired(db).await
}
Enter fullscreen mode Exit fullscreen mode

The fitz_token_blacklist(jti TEXT PRIMARY KEY, expires_at BIGINT NOT NULL) table auto-creates with CREATE TABLE IF NOT EXISTS on the first call. Auto-filters expires_at > now() on check (expired tokens don't need to keep blocking — jwt.decode rejects them first via exp).

The /auth/logout and /auth/refresh patterns are not auto-mounted (intentional — the exact flow varies by project), but the three builtins (auth.blacklist/auth.is_blacklisted/auth.cleanup_expired) give you the work in ~10 lines of your code.

Automatic bearerAuth in OpenAPI

In FastAPI you have to declare it:

app = FastAPI(
    swagger_ui_init_oauth={...},
)
# and the security scheme on each endpoint manually or via dependencies
Enter fullscreen mode Exit fullscreen mode

In Fitz, when the program has @auth_provider, the OpenAPI schema emits:

{
  "components": {
    "securitySchemes": {
      "bearerAuth": {
        "type": "http",
        "scheme": "bearer",
        "bearerFormat": "JWT"
      }
    }
  },
  "paths": {
    "/me": {
      "get": {
        "security": [{"bearerAuth": []}],
        "responses": {
          "200": {...},
          "401": {"description": "auth"}
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The Scalar UI at /docs shows you a working "Authorize" button. Auto-mounted. No config.

The checker does much of the work

Before bringing up the server, fitz check validates:

  • @authenticated without @auth_provider → "uses @authenticated but no @auth_provider declared".
  • @admin over a handler whose User has no role: Str non-nullable → "to use @admin the User type must have a role: Str field".
  • @requires("X") with a typo → doesn't check values (role is an arbitrary Str), but detects duplicate stacks.
  • JWT decode return typeResult<Map<Str, Str>>, requires handling Err.
  • The param injected by @auth_provider → its type must match the User returned by the provider.

Errors that in FastAPI/Flask typically get discovered when the request lands in production, in Fitz show up in CI.

Design decisions worth understanding

auth.blacklist requires an explicit DbConn

Instead of a global singleton, the 3 builtins (auth.blacklist/is_blacklisted/cleanup_expired) take DbConn as the first arg. Reason: it makes the cost explicit at each call site, and testable against a test DB.

Enriched 403 message for @requires

When the role doesn't match:

{
  "error": "forbidden",
  "current_role": "viewer",
  "required_roles": ["editor", "publisher"]
}
Enter fullscreen mode Exit fullscreen mode

Enables serious debugging and observability (structured logs with log.info("auth.denied", { user_id, role, required }) tell you exactly what failed).

@requires implies auth

You don't need to stack @authenticated @requires("editor")@requires already runs the provider. Stacking it is a no-op (and the checker flags it).

What Fitz does NOT give you (yet)

  • OAuth2 social login (Google/GitHub/etc.) — the OAuth flow against the provider you do by hand against the known endpoints.
  • Multi-role (user.roles: List<Str>) — today it's single role. Composite roles: residual debt of 9.w.1.iter2.
  • Asymmetric JWT (RS256/ES256 with PEM and rotation) — only HS256/384/512.
  • Role hierarchy (admin > editor > viewer) — not modeled. Workaround: if admin should also be able to do what editor does, @requires("admin") @requires("editor") (stacked).
  • Cookie-based sessions as an alternative to JWT.
  • /auth/refresh and /auth/logout auto-mounted — the canonical pattern (~10 LoC with the builtins) is in chapter 28 of the guide but stays manual. Auto-mount with @server(auto_auth_endpoints=true) is visible debt.

Closing

What tires me most about doing auth in Python isn't writing the first endpoint — it's keeping consistency across three places: FastAPI's Depends(...), the JWT library, and the home-grown RBAC. When you add @requires("editor") and you forgot to update the RBAC helper, you don't find out until a user reports a bug.

In Fitz the static checker closes that loop. The @auth_provider is the source of truth for the User type. @authenticated/@admin/@requires consume that type. OpenAPI reflects it. The blacklist uses the same DB. Zero external deps for auth.

If your project is in one of the MVP cases — JWT, simple roles, blacklist in Postgres — Fitz saves you the day of gluing libraries together. If you need OAuth social and multi-role, it's still Python (or take it as an invitation to open an issue).


Next post in the series: "Distributed tracing, Prometheus metrics, and structured logs with two decorators: @trace/@metric in Fitz vs the OpenTelemetry setup in FastAPI" — observability side by side.

Repo: github.com/Thegreekman76/fitz
Chapter 28 of the guide (Native auth): docs/guide.md

Top comments (0)