Introduction
Dans cet article, nous allons explorer comment mettre en place une authentification JWT complète en utilisant FastAPI, incluant la gestion des tokens d'accès et de rafraîchissement, des rôles d'utilisateurs, et la sécurisation des données sensibles avec des variables d'environnement.
le code completest disponible sur le auth-jwt-fastapi
- Pré-requis
- Python 3.8+
- FastAPI
- SQLAlchemy pour la gestion de la base de données
- Pydantic pour la modélisation des données
- python-jose pour JWT
- passlib pour le hachage des mots de passe
- python-dotenv pour charger les variables d'environnement
- Installation des Dépendances
pip install fastapi[all] "python-jose[cryptography]" "passlib[bcrypt]" python-multipart email-validator python-dotenv
- Configuration avec Variables d'Environnement Pour sécuriser les données sensibles:
-Créer un fichier .env:
SECRET_KEY=your-very-secure-secret-key
MAIL_USERNAME=your-mail-username
MAIL_PASSWORD=your-mail-password
MAIL_FROM=your-from-email@example.com
- Fichier config.py pour gérer les configurations:
from pydantic_settings import BaseSettings
from dotenv import load_dotenv
load_dotenv()
class Settings(BaseSettings):
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7
MAIL_USERNAME: str
MAIL_PASSWORD: str
MAIL_FROM: str
MAIL_PORT: int = 587
MAIL_SERVER: str = "smtp.gmail.com"
MAIL_TLS: bool = True
MAIL_SSL: bool = False
USE_CREDENTIALS: bool = True
VALIDATE_CERTS: bool = True
DATABASE_URL: str = ""
class Config:
env_file = ".env"
settings = Settings()
- Modélisation des Données Les modeles
from pydantic import BaseModel
from typing import List
from passlib.context import CryptContext
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from db import Base
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class Role(BaseModel):
name: str
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
roles = relationship("Role", secondary="user_roles")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def verify_password(self, plain_password):
return pwd_context.verify(plain_password, self.hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
class UserInDB(User):
hashed_password: str
class UserCreate(BaseModel):
username: str
email: str
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class UserRoles(BaseModel):
roles: List[Role] = []
- Gestion des Tokens JWT Génération et Décodage des Tokens
from jose import JWTError, jwt
from datetime import datetime, timedelta
from config import settings
from typing import Optional
from fastapi import HTTPException
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
- Implémentation des Routes avec FastAPI
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from db import SessionLocal, engine
from models import User,UserRoles
from db import Base
import crud
from jose import JWTError
from models import UserCreate, Token
from tokens import decode_token,create_access_token,create_refresh_token
app = FastAPI()
Base.metadata.create_all(bind=engine)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
payload = decode_token(token)
user = crud.get_user_by_username(db, username=payload.get("sub"))
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = crud.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
@app.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
try:
payload = decode_token(refresh_token)
user = crud.get_user_by_username(db, username=payload.get("sub"))
if not user:
raise HTTPException(status_code=401, detail="User not found")
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = crud.create_user(db=db, user=user)
return db_user
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/roles/", response_model=UserRoles)
async def read_user_roles(current_user: User = Depends(get_current_active_user)):
return {"roles": current_user.roles}
- CRUD Operations
from sqlalchemy.orm import Session
from models import User
from models import UserCreate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_user(db: Session, user: UserCreate):
hashed_password = pwd_context.hash(user.password)
db_user = User(username=user.username, email=user.email, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username)
if not user or not user.verify_password(password):
return False
return user
- database
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
SQLALCHEMY_DATABASE_URL = settings.DATABASE_URL
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
Conclusion
Ce tutoriel vous a guidé à travers une implémentation détaillée d'une authentification JWT en FastAPI, couvrant la gestion sécurisée des configurations, la modélisation des utilisateurs et des rôles, et les opérations de base sur ces données. En utilisant ces concepts, vous pouvez construire des applications web sécurisées et évolutives.
Points Clés
Utilisation de variables d'environnement pour sécuriser les données sensibles.
Gestion des tokens d'accès et de rafraîchissement pour une expérience utilisateur améliorée.
Prise en charge des rôles pour une gestion fine des permissions.
Tags: #Python #FastAPI #JWT #Authentication #Security #Pydantic
Top comments (0)