The full auth flow — login, Argon2id hashing, JWT sign and verify, stackable custom RBAC, logout with persistent blacklist — side by side in FastAPI and in Fitz. Zero new libraries, everything built into the language.
Real auth needs more pieces than you'd think
You start with "I need login with JWT". Then comes:
- Password hashing with a good algorithm (Argon2id, not bcrypt for new projects).
- RBAC because "logged in / not logged in" isn't enough — there are roles, there are admin endpoints.
- Logout that actually invalidates the token (not just "forget about it on the client side").
- Refresh tokens so users don't have to re-log every hour.
- Cleanup of the blacklist so you don't fill Redis with expired tokens.
In FastAPI, each of those is a separate library plus glue code. In Fitz, they're part of the language.
The typical Python stack
pip install python-jose[cryptography] passlib[argon2] argon2-cffi redis python-multipart
auth.py:
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import redis.asyncio as redis
import uuid
import time
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
redis_client = redis.from_url(os.environ["REDIS_URL"])
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
to_encode = data.copy()
jti = str(uuid.uuid4())
exp = datetime.now(tz=timezone.utc) + expires_delta
to_encode.update({"jti": jti, "exp": exp})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def is_blacklisted(jti: str) -> bool:
return await redis_client.exists(f"blacklist:{jti}") > 0
async def blacklist_token(jti: str, exp: int):
ttl = exp - int(time.time())
if ttl > 0:
await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="invalid token")
jti = payload.get("jti")
if jti and await is_blacklisted(jti):
raise HTTPException(status_code=401, detail="token revoked")
user = await db.fetch_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="user not found")
return user
def require_role(*roles):
async def checker(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(status_code=403, detail="forbidden")
return user
return checker
endpoints.py:
@app.post("/login")
async def login(creds: Credentials):
user = await db.fetch_user_by_email(creds.email)
if not user or not verify_password(creds.password, user.password_hash):
raise HTTPException(status_code=401, detail="invalid credentials")
token = create_access_token({"sub": user.email, "role": user.role})
return {"token": token}
@app.get("/me")
async def me(user: User = Depends(get_current_user)) -> User:
return user
@app.get("/admin/users")
async def admin_list(user: User = Depends(require_role("admin"))) -> list[User]:
return await db.fetch_all_users()
@app.post("/articles")
async def create_article(
body: ArticleIn,
user: User = Depends(require_role("editor", "publisher")),
) -> Article:
return await db.create_article(body, user.id)
@app.post("/auth/logout")
async def logout(user_and_jti: tuple[User, str] = Depends(get_user_and_jti)):
user, jti = user_and_jti
payload = jwt.decode(user.token, SECRET_KEY, algorithms=[ALGORITHM])
await blacklist_token(jti, payload["exp"])
return {"ok": True}
There it is. Five libraries, ~100 lines of boilerplate before you write any business logic, an extra Redis connection, and the responsibility to keep it in sync.
The same thing in Fitz
type User { id: Int, email: Str, name: Str, role: Str, password_hash: Str }
type Credentials { email: Str, password: Str }
type LoginResponse { token: Str }
type Article { id: Int, title: Str, body: Str, user_id: Int }
type ArticleIn { title: Str, body: Str }
let SECRET = secret("JWT_SECRET")
@auth_provider
async fn check_token(headers: Map<Str, Str>, db: DbConn) -> Result<User> {
let raw = match headers.get("authorization") {
Ok(v) => v,
Err(_) => return Err("missing Authorization header"),
}
let parts = raw.split(" ")
if (parts.len() != 2 or parts[0] != "Bearer") {
return Err("expected 'Bearer <token>'")
}
let claims = jwt.decode(parts[1], SECRET.expose())?
let jti = match claims.get("jti") {
Ok(v) => v,
Err(_) => return Err("token missing jti"),
}
if (auth.is_blacklisted(db, jti).await?) {
return Err("token revoked")
}
return User.where(fn(u) => u.email == claims["sub"]).first(db).await
}
@post("/login")
async fn login(db: DbConn, creds: Credentials) -> LoginResponse {
let user: User = match User.where(fn(u) => u.email == creds.email).first(db).await {
Ok(u) => u,
Err(_) => return 401 { "error": "invalid credentials" },
}
if (not hash.verify(creds.password, user.password_hash)) {
return 401 { "error": "invalid credentials" }
}
let exp = DateTime.now().timestamp() + 86400 // 24h validity
// The JWT payload is `Map<Str, Str>` in the `fitz build` MVP — numerics
// serialize to string. In `fitz run` heterogeneity passes, but we use
// the strict shape to preserve bit-for-bit parity.
let claims = {
"sub": user.email,
"role": user.role,
"jti": Uuid.v4().to_str(),
"exp": "{exp}",
}
return LoginResponse { token: jwt.encode(claims, SECRET.expose()) }
}
@authenticated @get("/me")
fn me(user: User) -> User => user
@admin @get("/admin/users")
async fn admin_list(db: DbConn, user: User) -> List<User> {
return User.all(db).await
}
@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
return Article.insert(db, Article { id: 0, title: body.title, body: body.body, user_id: user.id }).await?
}
@authenticated @post("/auth/logout")
async fn logout(db: DbConn, user: User, headers: Map<Str, Str>) -> Map<Str, Bool> {
// We re-decode the token just to pull jti from the claims.
let raw = headers["authorization"]
let token = raw.split(" ")[1]
let claims = jwt.decode(token, SECRET.expose())?
let jti = claims.get("jti")?
// Fresh 24h TTL. When the JWT expires, `jwt.decode` rejects it as
// expired; the blacklist only prevents reuse of the jti before exp.
// We don't read exp from the payload (it would come back as Str and
// `auth.blacklist` requires Int).
let ttl = DateTime.now().timestamp() + 86400
auth.blacklist(db, jti, ttl).await?
return { "ok": true }
}
@cron("0 0 3 * * *")
async fn cleanup_blacklist(db: DbConn) {
auth.cleanup_expired(db).await
}
Zero pip install. Zero separate auth.py file. The compiler validates every decorator statically.
The raw table
| Piece | Python (FastAPI + 5 libs) | Fitz |
|---|---|---|
| Password hashing |
passlib[argon2] + CryptContext(schemes=["argon2"])
|
hash.password(...) built-in |
| Verify password | pwd_context.verify(...) |
hash.verify(...) |
| Sign JWT | jose.jwt.encode(..., SECRET, algorithm="HS256") |
jwt.encode(claims, SECRET) |
| Decode JWT |
jose.jwt.decode(...) + try/except |
jwt.decode(token, SECRET)? |
| Auth scheme |
OAuth2PasswordBearer + Depends(...)
|
@auth_provider fn check_token(...) |
| Protected endpoint |
Depends(get_current_user) on each one |
@authenticated |
| Admin only | Home-grown RBAC with require_role("admin")
|
@admin |
| Custom RBAC |
require_role(*roles) helper + Depends |
@requires("editor") stackable |
| Token blacklist | Redis + manual logic + manually computed TTL | auth.blacklist(db, jti, exp) |
| Expired cleanup | Redis SET TTL (auto) or manual if DB |
auth.cleanup_expired(db) + @cron
|
| Static validation | None — runtime error when a request hits |
@authenticated without @auth_provider → compile error |
OpenAPI bearerAuth schema |
Declare in app = FastAPI(...)
|
Automatic when @auth_provider is present |
Piece by piece
Password hashing
Python:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
hashed = pwd_context.hash("password123")
ok = pwd_context.verify("password123", hashed)
Fitz:
let hashed = hash.password("password123")
let ok = hash.verify("password123", hashed)
Argon2id by default (OWASP recommendation). No picking schemes, no configuring deprecated="auto". What you want with good defaults.
JWT
Python:
from jose import jwt, JWTError
from datetime import datetime, timezone, timedelta
exp = datetime.now(tz=timezone.utc) + timedelta(hours=24)
token = jwt.encode({"sub": "ada", "exp": exp}, SECRET, algorithm="HS256")
try:
claims = jwt.decode(token, SECRET, algorithms=["HS256"])
except JWTError as e:
raise HTTPException(401, f"invalid token: {e}")
Fitz:
let exp = DateTime.now().timestamp() + 86400
let token = jwt.encode({ "sub": "ada", "exp": "{exp}" }, SECRET)
match jwt.decode(token, SECRET) {
Ok(claims) => process(claims),
Err(e) => return 401 { "error": "{e}" },
}
HS256 by default. HS384/HS512 available via kwarg. Exception → Result::Err automatic, handled with ? or match.
Stackable custom RBAC
This is where Fitz clearly stands apart. In FastAPI, RBAC beyond "admin yes / no" means writing a helper:
def require_role(*roles):
async def checker(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(403, "forbidden")
return user
return checker
@app.post("/articles")
async def create_article(
body: ArticleIn,
user: User = Depends(require_role("editor", "publisher")),
):
...
In Fitz:
@requires("editor") @requires("publisher")
@post("/articles")
async fn create_article(db: DbConn, body: ArticleIn, user: User) -> Article {
...
}
Multi-decorator is OR (the user belongs to one or the other role). The checker validates statically:
- That
Userhas a non-nullablerole: Strfield. - That there are no duplicates (
@requires("editor") @requires("editor")→ compile-time error). - That the handler has an active
@auth_providerin the file. - That the handler's last param matching the provider's return type is the injected one.
Errors that in Python appear at runtime when a user hits the endpoint, in Fitz appear in fitz check before you commit.
Persistent token blacklist
This is the piece many FastAPI projects never finish properly — because it means adding Redis even though your app doesn't need it for anything else, writing the async client, computing TTLs by hand, deciding whether a Redis failure should fail the request or let it through...
async def blacklist_token(jti: str, exp: int):
ttl = exp - int(time.time())
if ttl > 0:
await redis_client.set(f"blacklist:{jti}", "1", ex=ttl)
async def is_blacklisted(jti: str) -> bool:
return await redis_client.exists(f"blacklist:{jti}") > 0
Plus keeping Redis alive, mounting the client, handling reconnects.
In Fitz, you use the Postgres you already have:
@authenticated
@post("/auth/logout")
async fn logout(db: DbConn, user: User) -> Map<Str, Bool> {
auth.blacklist(db, user.jti, user.exp).await?
return { "ok": true }
}
@cron("0 0 3 * * *") // 3 AM daily
async fn cleanup_blacklist(db: DbConn) {
auth.cleanup_expired(db).await
}
The fitz_token_blacklist(jti TEXT PRIMARY KEY, expires_at BIGINT NOT NULL) table auto-creates with CREATE TABLE IF NOT EXISTS on the first call. Auto-filters expires_at > now() on check (expired tokens don't need to keep blocking — jwt.decode rejects them first via exp).
The /auth/logout and /auth/refresh patterns are not auto-mounted (intentional — the exact flow varies by project), but the three builtins (auth.blacklist/auth.is_blacklisted/auth.cleanup_expired) give you the work in ~10 lines of your code.
Automatic bearerAuth in OpenAPI
In FastAPI you have to declare it:
app = FastAPI(
swagger_ui_init_oauth={...},
)
# and the security scheme on each endpoint manually or via dependencies
In Fitz, when the program has @auth_provider, the OpenAPI schema emits:
{
"components": {
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
},
"paths": {
"/me": {
"get": {
"security": [{"bearerAuth": []}],
"responses": {
"200": {...},
"401": {"description": "auth"}
}
}
}
}
}
The Scalar UI at /docs shows you a working "Authorize" button. Auto-mounted. No config.
The checker does much of the work
Before bringing up the server, fitz check validates:
-
@authenticatedwithout@auth_provider→ "uses @authenticated but no @auth_provider declared". -
@adminover a handler whose User has norole: Strnon-nullable → "to use @admin the User type must have arole: Strfield". -
@requires("X")with a typo → doesn't check values (role is an arbitrary Str), but detects duplicate stacks. -
JWT decode return type →
Result<Map<Str, Str>>, requires handling Err. -
The param injected by
@auth_provider→ its type must match the User returned by the provider.
Errors that in FastAPI/Flask typically get discovered when the request lands in production, in Fitz show up in CI.
Design decisions worth understanding
auth.blacklist requires an explicit DbConn
Instead of a global singleton, the 3 builtins (auth.blacklist/is_blacklisted/cleanup_expired) take DbConn as the first arg. Reason: it makes the cost explicit at each call site, and testable against a test DB.
Enriched 403 message for @requires
When the role doesn't match:
{
"error": "forbidden",
"current_role": "viewer",
"required_roles": ["editor", "publisher"]
}
Enables serious debugging and observability (structured logs with log.info("auth.denied", { user_id, role, required }) tell you exactly what failed).
@requires implies auth
You don't need to stack @authenticated @requires("editor") — @requires already runs the provider. Stacking it is a no-op (and the checker flags it).
What Fitz does NOT give you (yet)
- OAuth2 social login (Google/GitHub/etc.) — the OAuth flow against the provider you do by hand against the known endpoints.
-
Multi-role (
user.roles: List<Str>) — today it's single role. Composite roles: residual debt of 9.w.1.iter2. - Asymmetric JWT (RS256/ES256 with PEM and rotation) — only HS256/384/512.
-
Role hierarchy (admin > editor > viewer) — not modeled. Workaround: if admin should also be able to do what editor does,
@requires("admin") @requires("editor")(stacked). - Cookie-based sessions as an alternative to JWT.
-
/auth/refreshand/auth/logoutauto-mounted — the canonical pattern (~10 LoC with the builtins) is in chapter 28 of the guide but stays manual. Auto-mount with@server(auto_auth_endpoints=true)is visible debt.
Closing
What tires me most about doing auth in Python isn't writing the first endpoint — it's keeping consistency across three places: FastAPI's Depends(...), the JWT library, and the home-grown RBAC. When you add @requires("editor") and you forgot to update the RBAC helper, you don't find out until a user reports a bug.
In Fitz the static checker closes that loop. The @auth_provider is the source of truth for the User type. @authenticated/@admin/@requires consume that type. OpenAPI reflects it. The blacklist uses the same DB. Zero external deps for auth.
If your project is in one of the MVP cases — JWT, simple roles, blacklist in Postgres — Fitz saves you the day of gluing libraries together. If you need OAuth social and multi-role, it's still Python (or take it as an invitation to open an issue).
Next post in the series: "Distributed tracing, Prometheus metrics, and structured logs with two decorators: @trace/@metric in Fitz vs the OpenTelemetry setup in FastAPI" — observability side by side.
Repo: github.com/Thegreekman76/fitz
Chapter 28 of the guide (Native auth): docs/guide.md
Top comments (0)