DEV Community

郑沛沛
郑沛沛

Posted on

Authentication Done Right: JWT, OAuth2, and Session Management

Authentication is the most security-critical part of your app. Here's how to implement it properly without reinventing the wheel.

JWT Authentication with FastAPI

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

SECRET_KEY = "your-secret-key-from-env"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = await get_user_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user
Enter fullscreen mode Exit fullscreen mode

Login Endpoint

from fastapi import APIRouter
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter()

@router.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = await authenticate_user(form.username, form.password)
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect username or password")

    access_token = create_access_token(
        data={"sub": str(user.id)},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    return {"access_token": access_token, "token_type": "bearer"}

@router.get("/me")
async def read_users_me(current_user = Depends(get_current_user)):
    return current_user
Enter fullscreen mode Exit fullscreen mode

Refresh Tokens

REFRESH_TOKEN_EXPIRE_DAYS = 7

def create_refresh_token(user_id: str) -> str:
    return create_access_token(
        data={"sub": user_id, "type": "refresh"},
        expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    )

@router.post("/refresh")
async def refresh_token(refresh_token: str):
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="Invalid token type")
        user_id = payload.get("sub")
        new_access = create_access_token(data={"sub": user_id})
        return {"access_token": new_access, "token_type": "bearer"}
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid refresh token")
Enter fullscreen mode Exit fullscreen mode

Role-Based Access Control

from enum import Enum
from functools import wraps

class Role(str, Enum):
    USER = "user"
    ADMIN = "admin"
    MODERATOR = "moderator"

def require_role(required_role: Role):
    def dependency(current_user = Depends(get_current_user)):
        if current_user.role != required_role:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return current_user
    return dependency

@router.delete("/users/{user_id}")
async def delete_user(user_id: int, admin = Depends(require_role(Role.ADMIN))):
    await user_service.delete(user_id)
    return {"message": "User deleted"}
Enter fullscreen mode Exit fullscreen mode

Rate Limiting

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, form: OAuth2PasswordRequestForm = Depends()):
    # max 5 login attempts per minute per IP
    pass
Enter fullscreen mode Exit fullscreen mode

Security Checklist

  1. Hash passwords with bcrypt (never store plaintext)
  2. Use short-lived access tokens (15-30 min)
  3. Store refresh tokens securely (httpOnly cookies)
  4. Rate limit authentication endpoints
  5. Validate JWT signature AND expiration
  6. Use HTTPS everywhere

Key Takeaways

  1. JWT for stateless auth, sessions for server-side state
  2. Always hash passwords with bcrypt or argon2
  3. Implement refresh token rotation
  4. Role-based access control with dependency injection
  5. Rate limit login endpoints to prevent brute force

6. Never store secrets in code — use environment variables

🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99

Top comments (0)