In this article, I'll guide you through designing and implementing a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. This comprehensive guide covers database schema design, contemporary permission models, and practical implementation using current best practices.
Recent Trends in Permission Models
While traditional Role-Based Access Control (RBAC) is still widely used, modern applications often require more sophisticated permission management approaches. Here are some key trends:
1. Attribute-Based Access Control (ABAC)
ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. This approach enables highly flexible access control policies that can consider factors such as user department, resource sensitivity, and time-based restrictions.
2. Policy-Based Access Control (PBAC)
PBAC manages access through centralized policies that define specific conditions. These policies can include complex rules like "allow access to financial data only for finance department employees during business hours."
3. JSON-Based Permission Management
Storing permissions in JSON format allows for flexible, hierarchical structures that can represent complex permission sets and condition-based rules.
4. User Groups and Hierarchies
Incorporating user groups and hierarchical structures streamlines permission management by allowing administrators to assign permissions to entire groups rather than individual users.
5. Integration with External Authentication
Using standards like OAuth 2.0 and OpenID Connect enables Single Sign-On (SSO) and consistent authentication across multiple systems.
Database Design
Let's design a database schema that supports these modern approaches.
Core Tables
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE roles (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL UNIQUE,
description TEXT
);
CREATE TABLE permissions (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
resource VARCHAR(100) NOT NULL,
action VARCHAR(50) NOT NULL,
conditions JSON,
description TEXT,
UNIQUE KEY resource_action (resource, action)
);
CREATE TABLE groups (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
description TEXT
);
Relationship Tables
CREATE TABLE user_roles (
user_id INT,
role_id INT,
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE
);
CREATE TABLE role_permissions (
role_id INT,
permission_id INT,
PRIMARY KEY (role_id, permission_id),
FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE,
FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE
);
CREATE TABLE user_permissions (
user_id INT,
permission_id INT,
PRIMARY KEY (user_id, permission_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE
);
CREATE TABLE user_groups (
user_id INT,
group_id INT,
PRIMARY KEY (user_id, group_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE
);
CREATE TABLE group_roles (
group_id INT,
role_id INT,
PRIMARY KEY (group_id, role_id),
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE,
FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE
);
CREATE TABLE audit_logs (
id INT PRIMARY KEY AUTO_INCREMENT,
user_id INT,
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50) NOT NULL,
resource_id INT,
details JSON,
ip_address VARCHAR(45),
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL
);
Setting Up FastAPI and SQLAlchemy 2.0
Database Models with SQLAlchemy 2.0
Let's define our models using SQLAlchemy 2.0's latest patterns:
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import ForeignKey, String, Boolean, JSON, Integer, Text, TIMESTAMP, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# Base class for SQLAlchemy models using 2.0 style
class Base(DeclarativeBase):
pass
# Association tables for many-to-many relationships
from sqlalchemy import Table, Column
user_roles = Table(
'user_roles',
Base.metadata,
Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
)
role_permissions = Table(
'role_permissions',
Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')),
)
user_permissions = Table(
'user_permissions',
Base.metadata,
Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')),
)
user_groups = Table(
'user_groups',
Base.metadata,
Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')),
)
group_roles = Table(
'group_roles',
Base.metadata,
Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')),
Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
)
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
password_hash: Mapped[str] = mapped_column(String(255))
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
roles: Mapped[List["Role"]] = relationship(secondary=user_roles, back_populates="users")
direct_permissions: Mapped[List["Permission"]] = relationship(secondary=user_permissions, back_populates="users")
groups: Mapped[List["Group"]] = relationship(secondary=user_groups, back_populates="users")
audit_logs: Mapped[List["AuditLog"]] = relationship(back_populates="user")
class Role(Base):
__tablename__ = 'roles'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
users: Mapped[List["User"]] = relationship(secondary=user_roles, back_populates="roles")
permissions: Mapped[List["Permission"]] = relationship(secondary=role_permissions, back_populates="roles")
groups: Mapped[List["Group"]] = relationship(secondary=group_roles, back_populates="roles")
class Permission(Base):
__tablename__ = 'permissions'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, index=True)
resource: Mapped[str] = mapped_column(String(100))
action: Mapped[str] = mapped_column(String(50))
conditions: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
roles: Mapped[List["Role"]] = relationship(secondary=role_permissions, back_populates="permissions")
users: Mapped[List["User"]] = relationship(secondary=user_permissions, back_populates="direct_permissions")
__table_args__ = (
{'sqlite_autoincrement': True}, # For SQLite, if used
)
class Group(Base):
__tablename__ = 'groups'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
users: Mapped[List["User"]] = relationship(secondary=user_groups, back_populates="groups")
roles: Mapped[List["Role"]] = relationship(secondary=group_roles, back_populates="groups")
class AuditLog(Base):
__tablename__ = 'audit_logs'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[Optional[int]] = mapped_column(ForeignKey('users.id', ondelete='SET NULL'), nullable=True)
action: Mapped[str] = mapped_column(String(50))
resource_type: Mapped[str] = mapped_column(String(50))
resource_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
details: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True)
ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)
timestamp: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
user: Mapped[Optional["User"]] = relationship(back_populates="audit_logs")
Database Setup with Async Support
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# For async database operations
DATABASE_URL = "mariadb+aiomysql://user:password@localhost/dbname"
async_engine = create_async_engine(
DATABASE_URL,
echo=True,
future=True,
)
AsyncSessionLocal = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
# For sync operations (if needed)
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
SYNC_DATABASE_URL = "mariadb+pymysql://user:password@localhost/dbname"
sync_engine = create_engine(
SYNC_DATABASE_URL,
echo=True,
future=True,
)
SyncSessionLocal = sessionmaker(
sync_engine,
class_=Session,
expire_on_commit=False
)
# Creating tables
async def create_tables():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Pydantic Models with Pydantic v2
from pydantic import BaseModel, EmailStr, field_validator, Field, ConfigDict
from datetime import datetime
from typing import Dict, Any, List, Optional
# User models
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
password: str
@field_validator('password')
@classmethod
def password_strength(cls, v: str) -> str:
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserDetails(UserResponse):
roles: List["RoleResponse"] = []
groups: List["GroupResponse"] = []
model_config = ConfigDict(from_attributes=True)
# Role models
class RoleBase(BaseModel):
name: str
description: Optional[str] = None
class RoleCreate(RoleBase):
pass
class RoleResponse(RoleBase):
id: int
model_config = ConfigDict(from_attributes=True)
class RoleDetails(RoleResponse):
permissions: List["PermissionResponse"] = []
model_config = ConfigDict(from_attributes=True)
# Permission models
class PermissionBase(BaseModel):
name: str
resource: str
action: str
description: Optional[str] = None
conditions: Optional[Dict[str, Any]] = None
class PermissionCreate(PermissionBase):
pass
class PermissionResponse(PermissionBase):
id: int
model_config = ConfigDict(from_attributes=True)
# Group models
class GroupBase(BaseModel):
name: str
description: Optional[str] = None
class GroupCreate(GroupBase):
pass
class GroupResponse(GroupBase):
id: int
model_config = ConfigDict(from_attributes=True)
class GroupDetails(GroupResponse):
users: List[UserResponse] = []
roles: List[RoleResponse] = []
model_config = ConfigDict(from_attributes=True)
# Update models
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class PasswordChange(BaseModel):
current_password: str
new_password: str
@field_validator('new_password')
@classmethod
def password_strength(cls, v: str) -> str:
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
# Token models
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[str] = None
exp: Optional[int] = None
# Fix circular references
UserDetails.model_rebuild()
RoleDetails.model_rebuild()
GroupDetails.model_rebuild()
Authentication and Authorization Utilities
from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Union
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
# Security configuration
SECRET_KEY = "your-secret-key-here" # In production, use environment variables
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Database dependency
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
# Password utilities
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# JWT token functions
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# User authentication
async def authenticate_user(db: AsyncSession, username: str, password: str) -> Union[User, bool]:
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
user = result.scalars().first()
if not user or not verify_password(password, user.password_hash):
return False
if not user.is_active:
return False
return user
# Get current user from token
async def get_current_user(
db: AsyncSession = Depends(get_async_db),
token: str = Depends(oauth2_scheme)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: Optional[str] = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
user = result.scalars().first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
# Get current active user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# ABAC Permission verification
async def has_permission(
user: User,
resource: str,
action: str,
resource_id: Optional[int] = None,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Check if user has permission to perform an action on a resource.
Implements Attribute-Based Access Control (ABAC) by considering:
- Direct user permissions
- Role-based permissions
- Group-based permissions
- Contextual conditions
"""
context = context or {}
# Helper function to check conditions
def evaluate_conditions(conditions: Optional[Dict[str, Any]], context: Dict[str, Any]) -> bool:
if not conditions:
return True
# Example condition: {"time_between": ["09:00", "17:00"]}
for condition_key, condition_value in conditions.items():
if condition_key == "time_between":
current_time = context.get("current_time", datetime.now().time())
start_time = datetime.strptime(condition_value[0], "%H:%M").time()
end_time = datetime.strptime(condition_value[1], "%H:%M").time()
if not (start_time <= current_time <= end_time):
return False
elif condition_key == "ip_range":
ip = context.get("ip_address")
if not ip or ip not in condition_value:
return False
# Add more condition types as needed
return True
# Check direct user permissions
for permission in user.direct_permissions:
if (permission.resource == resource and
permission.action == action and
evaluate_conditions(permission.conditions, context)):
return True
# Check role-based permissions
for role in user.roles:
for permission in role.permissions:
if (permission.resource == resource and
permission.action == action and
evaluate_conditions(permission.conditions, context)):
return True
# Check group-based permissions (through roles)
for group in user.groups:
for role in group.roles:
for permission in role.permissions:
if (permission.resource == resource and
permission.action == action and
evaluate_conditions(permission.conditions, context)):
return True
return False
# Permission dependency for FastAPI routes
def require_permission(resource: str, action: str):
async def permission_dependency(
current_user: User = Depends(get_current_active_user),
request: Request = None
):
context = {
"current_time": datetime.now().time(),
"ip_address": request.client.host if request else None
}
if not await has_permission(current_user, resource, action, context=context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {action} on {resource}"
)
return current_user
return permission_dependency
# Audit logging utility
async def create_audit_log(
db: AsyncSession,
user_id: Optional[int],
action: str,
resource_type: str,
resource_id: Optional[int] = None,
details: Optional[Dict[str, Any]] = None,
ip_address: Optional[str] = None
) -> AuditLog:
audit_log = AuditLog(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
ip_address=ip_address
)
db.add(audit_log)
await db.commit()
await db.refresh(audit_log)
return audit_log
FastAPI Endpoints with Modern Patterns
from fastapi import FastAPI, Depends, HTTPException, status, Request, Response, BackgroundTasks
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.routing import APIRouter
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select, update, delete
from typing import List, Optional
from datetime import timedelta
app = FastAPI(
title="Permission Management API",
description="Modern API for user permission management",
version="2.0.0"
)
# Routers for better organization
auth_router = APIRouter(prefix="/auth", tags=["Authentication"])
user_router = APIRouter(prefix="/users", tags=["User Management"])
role_router = APIRouter(prefix="/roles", tags=["Role Management"])
permission_router = APIRouter(prefix="/permissions", tags=["Permission Management"])
group_router = APIRouter(prefix="/groups", tags=["Group Management"])
# Authentication endpoints
@auth_router.post("/token", response_model=Token)
async def login_for_access_token(
background_tasks: BackgroundTasks,
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_async_db),
request: Request = None
):
user = await authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# Log successful login in background
background_tasks.add_task(
create_audit_log,
db=db,
user_id=user.id,
action="login",
resource_type="auth",
details={"success": True},
ip_address=request.client.host if request else None
)
return {"access_token": access_token, "token_type": "bearer"}
# User endpoints
@user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_async_db),
request: Request = None,
current_user: User = Depends(require_permission("users", "create"))
):
try:
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
email=user.email,
password_hash=hashed_password
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
# Log user creation in background
background_tasks.add_task(
create_audit_log,
db=db,
user_id=current_user.id,
action="create",
resource_type="users",
resource_id=db_user.id,
details={"username": user.username, "email": user.email},
ip_address=request.client.host if request else None
)
return db_user
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username or email already exists"
)
@user_router.get("/", response_model=List[UserResponse])
async def get_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_async_db),
current_user: User = Depends(require_permission("users", "list"))
):
stmt = select(User).offset(skip).limit(limit)
result = await db.execute(stmt)
users = result.scalars().all()
return users
@user_router.get("/{user_id}", response_model=UserDetails)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_async_db),
current_user: User = Depends(require_permission("users", "read"))
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
db_user = result.scalars().first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@user_router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_async_db),
request: Request = None,
current_user: User = Depends(require_permission("users", "update"))
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
db_user = result.scalars().first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
update_data = user_update.model_dump(exclude_unset=True)
try:
if update_data:
stmt = (
update(User)
.where(User.id == user_id)
.values(**update_data)
)
await db.execute(stmt)
await db.commit()
# Refresh user data
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
db_user = result.scalars().first()
# Log user update in background
background_tasks.add_task(
create_audit_log,
db=db,
user_id=current_user.id,
action="update",
resource_type="users",
resource_id=user_id,
details=update_data,
ip_address=request.client.host if request else None
)
return db_user
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username or email already exists"
)
@user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_async_db),
request: Request = None,
current_user: User = Depends(require_permission("users", "delete"))
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
db_user = result.scalars().first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
# Store username for audit log
username = db_user.username
stmt = delete(User).where(User.id == user_id)
await db.execute(stmt)
await db.commit()
# Log user deletion in background
background_tasks.add_task(
create_audit_log,
db=db,
user_id=current_user.id,
action="delete",
resource_type="users",
resource_id=user_id,
details={"username": username},
ip_address=request.client.host if request else None
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
# Role endpoints
@role_router.post("/", response_model=RoleResponse, status_code=status.HTTP_201_CREATED)
async def create_role(
role: RoleCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_async_db),
request: Request = None,
current_user: User = Depends(require_permission("roles", "create"))
):
try:
db_role = Role(**role.model_dump())
db.add(db_role)
await db.commit()
await db.refresh(db_role)
# Log role creation in background
background_tasks.add_task(
create_audit_log,
db=db,
user_id=current_user.id,
action="create",
resource_type="roles",
resource_id=db_role.id,
details=role.model_dump(),
ip_address=request.client.host if request else None
)
return db_role
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Role name already exists"
)
# Permission check endpoint (useful for frontend authorization)
@auth_router.get("/check-permission")
async def check_user_permission(
resource: str,
action: str,
resource_id: Optional[int] = None,
db: AsyncSession = Depends(get_async_db),
current_user: User = Depends(get_current_active_user),
request: Request = None
):
context = {
"current_time": datetime.now().time(),
"ip_address": request.client.host if request else None
}
has_access = await has_permission(
user=current_user,
resource=resource,
action=action,
resource_id=resource_id,
context=context
)
return {"has_permission": has_access}
# Register routers
app.include_router(auth_router)
app.include_router(user_router)
app.include_router(role_router)
app.include_router(permission_router)
app.include_router(group_router)
# Lifespan for database setup
@app.on_event("startup")
async def startup():
await create_tables()
Implementation of Modern Permission Models
ABAC Implementation
Our permission system implements Attribute-Based Access Control through:
-
Contextual Permission Checking: The
has_permission
function evaluates permissions based on:- User attributes (roles, group memberships)
- Environmental factors (time of day, IP address)
- Resource-specific attributes
JSON Conditions:
{
"time_between": ["09:00", "17:00"],
"ip_range": ["192.168.1.0/24", "10.0.0.0/8"],
"department": "finance"
}
- Dynamic Context Evaluation:
# Example usage in an API endpoint
@app.get("/reports/{report_id}")
async def get_report(
report_id: int,
request: Request,
db: AsyncSession = Depends(get_async_db),
current_user: User = Depends(get_current_active_user)
):
# Create context with current time and IP
context = {
"current_time": datetime.now().time(),
"ip_address": request.client.host,
"department": current_user.department # Assuming user has department attribute
}
# Check permission with context
if not await has_permission(
user=current_user,
resource="reports",
action="read",
resource_id=report_id,
context=context
):
raise HTTPException(status_code=403, detail="Permission denied")
# Retrieve and return report...
Policy-Based Access Control
Our system implements PBAC through centralized permission policies:
-
Hierarchical Permission Inheritance:
- Users inherit permissions from roles
- Roles can be assigned to groups
- Groups can have multiple roles
-
Policy Centralization:
- Permissions are defined once and reused
- Conditions are stored in the database
- Changes to policies are immediately reflected
Performance Optimization
For large-scale applications, we've incorporated several performance enhancements:
Async Database Operations: Using SQLAlchemy's async capabilities with
asyncio
Background Tasks: Moving audit logging to background tasks
Efficient Queries: Using SQLAlchemy 2.0's more efficient query patterns
Pydantic v2: Utilizing the significantly faster Pydantic v2 for validation
Security Best Practices
Our implementation incorporates several security best practices:
-
Password Handling:
- Secure hashing with bcrypt
- Password strength validation
- Separate password change endpoint
-
API Security:
- JWT authentication with expiration
- CORS protection
- Input validation
-
Data Protection:
- Separate request/response models
- No sensitive data in responses
- Audit logging for all sensitive operations
Conclusion
This article has demonstrated how to build a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. By following current best practices and incorporating recent advancements in these technologies, you can create a secure, scalable, and maintainable permission system.
Key advantages of this approach include:
- Type Safety: Using SQLAlchemy 2.0's typed attributes and Python's type hints
- Performance: Async operations and modern optimizations
- Flexibility: Supporting complex permission models like ABAC and PBAC
- Maintainability: Clear code organization with routers and dependency injection
- Security: Comprehensive security controls and audit logging
For production environments, consider these additional enhancements:
- Implementing rate limiting
- Adding two-factor authentication
- Setting up permission caching for frequently checked permissions
- Integrating with external identity providers like Auth0 or Okta
The code provided in this article serves as a solid foundation that you can extend and customize to meet your specific requirements.
Top comments (0)