DEV Community

Cover image for Implémentation d'une Authentification JWT Complète en FastAPI
PEMPEME MOHAMED CHAMSOUDINE
PEMPEME MOHAMED CHAMSOUDINE

Posted on

Implémentation d'une Authentification JWT Complète en FastAPI

Introduction

Dans cet article, nous allons explorer comment mettre en place une authentification JWT complète en utilisant FastAPI, incluant la gestion des tokens d'accès et de rafraîchissement, des rôles d'utilisateurs, et la sécurisation des données sensibles avec des variables d'environnement.
le code completest disponible sur le auth-jwt-fastapi

  • Pré-requis
  • Python 3.8+
  • FastAPI
  • SQLAlchemy pour la gestion de la base de données
  • Pydantic pour la modélisation des données
  • python-jose pour JWT
  • passlib pour le hachage des mots de passe
  • python-dotenv pour charger les variables d'environnement
  1. Installation des Dépendances

pip install fastapi[all] "python-jose[cryptography]" "passlib[bcrypt]" python-multipart email-validator python-dotenv

  1. Configuration avec Variables d'Environnement Pour sécuriser les données sensibles:

-Créer un fichier .env:

SECRET_KEY=your-very-secure-secret-key
MAIL_USERNAME=your-mail-username
MAIL_PASSWORD=your-mail-password
MAIL_FROM=your-from-email@example.com
Enter fullscreen mode Exit fullscreen mode
  • Fichier config.py pour gérer les configurations:
from pydantic_settings import BaseSettings
from dotenv import load_dotenv

load_dotenv()

class Settings(BaseSettings):
    SECRET_KEY: str  
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
    REFRESH_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7
    MAIL_USERNAME: str 
    MAIL_PASSWORD: str
    MAIL_FROM: str 
    MAIL_PORT: int = 587
    MAIL_SERVER: str = "smtp.gmail.com"
    MAIL_TLS: bool = True
    MAIL_SSL: bool = False
    USE_CREDENTIALS: bool = True
    VALIDATE_CERTS: bool = True
    DATABASE_URL: str = ""

    class Config:
        env_file = ".env"

settings = Settings()
Enter fullscreen mode Exit fullscreen mode
  1. Modélisation des Données Les modeles
from pydantic import BaseModel
from typing import List
from passlib.context import CryptContext
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from db import Base

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class Role(BaseModel):
    name: str

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True, nullable=False)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    roles = relationship("Role", secondary="user_roles")
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

    def verify_password(self, plain_password):
        return pwd_context.verify(plain_password, self.hashed_password)

    def get_password_hash(password):
        return pwd_context.hash(password)

class UserInDB(User):
    hashed_password: str

class UserCreate(BaseModel):
    username: str
    email: str
    password: str

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

class UserRoles(BaseModel):
    roles: List[Role] = []
Enter fullscreen mode Exit fullscreen mode
  1. Gestion des Tokens JWT Génération et Décodage des Tokens
from jose import JWTError, jwt
from datetime import datetime, timedelta
from config import settings
from typing import Optional
from fastapi import HTTPException

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt



def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt



def decode_token(token: str):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

Enter fullscreen mode Exit fullscreen mode
  1. Implémentation des Routes avec FastAPI
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from db import SessionLocal, engine
from models import User,UserRoles
from db import Base
import crud
from jose import JWTError
from models import UserCreate, Token
from tokens import decode_token,create_access_token,create_refresh_token

app = FastAPI()
Base.metadata.create_all(bind=engine)

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    payload = decode_token(token)
    user = crud.get_user_by_username(db, username=payload.get("sub"))
    if not user:
        raise HTTPException(status_code=401, detail="User not found")
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = crud.authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(data={"sub": user.username})
    refresh_token = create_refresh_token(data={"sub": user.username})
    return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}

@app.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    try:
        payload = decode_token(refresh_token)
        user = crud.get_user_by_username(db, username=payload.get("sub"))
        if not user:
            raise HTTPException(status_code=401, detail="User not found")
        access_token = create_access_token(data={"sub": user.username})
        return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = crud.create_user(db=db, user=user)
    return db_user

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

@app.get("/users/me/roles/", response_model=UserRoles)
async def read_user_roles(current_user: User = Depends(get_current_active_user)):
    return {"roles": current_user.roles}
Enter fullscreen mode Exit fullscreen mode
  1. CRUD Operations
from sqlalchemy.orm import Session
from models import User
from models import UserCreate
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_user(db: Session, user: UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = User(username=user.username, email=user.email, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

def get_user_by_username(db: Session, username: str):
    return db.query(User).filter(User.username == username).first()

def authenticate_user(db: Session, username: str, password: str):
    user = get_user_by_username(db, username)
    if not user or not user.verify_password(password):
        return False
    return user
Enter fullscreen mode Exit fullscreen mode
  1. database
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.config import settings

SQLALCHEMY_DATABASE_URL = settings.DATABASE_URL

engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
Enter fullscreen mode Exit fullscreen mode

Conclusion
Ce tutoriel vous a guidé à travers une implémentation détaillée d'une authentification JWT en FastAPI, couvrant la gestion sécurisée des configurations, la modélisation des utilisateurs et des rôles, et les opérations de base sur ces données. En utilisant ces concepts, vous pouvez construire des applications web sécurisées et évolutives.

Points Clés
Utilisation de variables d'environnement pour sécuriser les données sensibles.
Gestion des tokens d'accès et de rafraîchissement pour une expérience utilisateur améliorée.
Prise en charge des rôles pour une gestion fine des permissions.

Tags: #Python #FastAPI #JWT #Authentication #Security #Pydantic

Top comments (0)