DEV Community

Geoffrey Kim
Geoffrey Kim

Posted on • Edited on

17 1 1

Building a Modern User Permission Management System with FastAPI, SQLAlchemy 2.0, and MariaDB

In this article, I'll guide you through designing and implementing a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. This comprehensive guide covers database schema design, contemporary permission models, and practical implementation using current best practices.

Recent Trends in Permission Models

While traditional Role-Based Access Control (RBAC) is still widely used, modern applications often require more sophisticated permission management approaches. Here are some key trends:

1. Attribute-Based Access Control (ABAC)

ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. This approach enables highly flexible access control policies that can consider factors such as user department, resource sensitivity, and time-based restrictions.

2. Policy-Based Access Control (PBAC)

PBAC manages access through centralized policies that define specific conditions. These policies can include complex rules like "allow access to financial data only for finance department employees during business hours."

3. JSON-Based Permission Management

Storing permissions in JSON format allows for flexible, hierarchical structures that can represent complex permission sets and condition-based rules.

4. User Groups and Hierarchies

Incorporating user groups and hierarchical structures streamlines permission management by allowing administrators to assign permissions to entire groups rather than individual users.

5. Integration with External Authentication

Using standards like OAuth 2.0 and OpenID Connect enables Single Sign-On (SSO) and consistent authentication across multiple systems.

Database Design

Let's design a database schema that supports these modern approaches.

Core Tables

CREATE TABLE users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    email VARCHAR(100) NOT NULL UNIQUE,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE roles (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(50) NOT NULL UNIQUE,
    description TEXT
);

CREATE TABLE permissions (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL UNIQUE,
    resource VARCHAR(100) NOT NULL,
    action VARCHAR(50) NOT NULL,
    conditions JSON,
    description TEXT,
    UNIQUE KEY resource_action (resource, action)
);

CREATE TABLE groups (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL UNIQUE,
    description TEXT
);
Enter fullscreen mode Exit fullscreen mode

Relationship Tables

CREATE TABLE user_roles (
    user_id INT,
    role_id INT,
    PRIMARY KEY (user_id, role_id),
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
    FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE
);

CREATE TABLE role_permissions (
    role_id INT,
    permission_id INT,
    PRIMARY KEY (role_id, permission_id),
    FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE,
    FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE
);

CREATE TABLE user_permissions (
    user_id INT,
    permission_id INT,
    PRIMARY KEY (user_id, permission_id),
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
    FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE
);

CREATE TABLE user_groups (
    user_id INT,
    group_id INT,
    PRIMARY KEY (user_id, group_id),
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
    FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE
);

CREATE TABLE group_roles (
    group_id INT,
    role_id INT,
    PRIMARY KEY (group_id, role_id),
    FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE,
    FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE
);

CREATE TABLE audit_logs (
    id INT PRIMARY KEY AUTO_INCREMENT,
    user_id INT,
    action VARCHAR(50) NOT NULL,
    resource_type VARCHAR(50) NOT NULL,
    resource_id INT,
    details JSON,
    ip_address VARCHAR(45),
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL
);
Enter fullscreen mode Exit fullscreen mode

Setting Up FastAPI and SQLAlchemy 2.0

Database Models with SQLAlchemy 2.0

Let's define our models using SQLAlchemy 2.0's latest patterns:

from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import ForeignKey, String, Boolean, JSON, Integer, Text, TIMESTAMP, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

# Base class for SQLAlchemy models using 2.0 style
class Base(DeclarativeBase):
    pass

# Association tables for many-to-many relationships
from sqlalchemy import Table, Column

user_roles = Table(
    'user_roles', 
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
    Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
)

role_permissions = Table(
    'role_permissions', 
    Base.metadata,
    Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
    Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')),
)

user_permissions = Table(
    'user_permissions', 
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
    Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')),
)

user_groups = Table(
    'user_groups', 
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')),
    Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')),
)

group_roles = Table(
    'group_roles', 
    Base.metadata,
    Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')),
    Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')),
)

class User(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    password_hash: Mapped[str] = mapped_column(String(255))
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True) 
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())

    roles: Mapped[List["Role"]] = relationship(secondary=user_roles, back_populates="users")
    direct_permissions: Mapped[List["Permission"]] = relationship(secondary=user_permissions, back_populates="users")
    groups: Mapped[List["Group"]] = relationship(secondary=user_groups, back_populates="users")
    audit_logs: Mapped[List["AuditLog"]] = relationship(back_populates="user")

class Role(Base):
    __tablename__ = 'roles'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    users: Mapped[List["User"]] = relationship(secondary=user_roles, back_populates="roles")
    permissions: Mapped[List["Permission"]] = relationship(secondary=role_permissions, back_populates="roles")
    groups: Mapped[List["Group"]] = relationship(secondary=group_roles, back_populates="roles")

class Permission(Base):
    __tablename__ = 'permissions'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    resource: Mapped[str] = mapped_column(String(100))
    action: Mapped[str] = mapped_column(String(50))
    conditions: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    roles: Mapped[List["Role"]] = relationship(secondary=role_permissions, back_populates="permissions")
    users: Mapped[List["User"]] = relationship(secondary=user_permissions, back_populates="direct_permissions")

    __table_args__ = (
        {'sqlite_autoincrement': True},  # For SQLite, if used
    )

class Group(Base):
    __tablename__ = 'groups'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    users: Mapped[List["User"]] = relationship(secondary=user_groups, back_populates="groups")
    roles: Mapped[List["Role"]] = relationship(secondary=group_roles, back_populates="groups")

class AuditLog(Base):
    __tablename__ = 'audit_logs'

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[Optional[int]] = mapped_column(ForeignKey('users.id', ondelete='SET NULL'), nullable=True)
    action: Mapped[str] = mapped_column(String(50))
    resource_type: Mapped[str] = mapped_column(String(50))
    resource_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
    details: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True)
    ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)
    timestamp: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())

    user: Mapped[Optional["User"]] = relationship(back_populates="audit_logs")
Enter fullscreen mode Exit fullscreen mode

Database Setup with Async Support

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# For async database operations
DATABASE_URL = "mariadb+aiomysql://user:password@localhost/dbname"

async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,
    future=True,
)

AsyncSessionLocal = sessionmaker(
    async_engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

# For sync operations (if needed)
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

SYNC_DATABASE_URL = "mariadb+pymysql://user:password@localhost/dbname"

sync_engine = create_engine(
    SYNC_DATABASE_URL,
    echo=True,
    future=True,
)

SyncSessionLocal = sessionmaker(
    sync_engine, 
    class_=Session, 
    expire_on_commit=False
)

# Creating tables
async def create_tables():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
Enter fullscreen mode Exit fullscreen mode

Pydantic Models with Pydantic v2

from pydantic import BaseModel, EmailStr, field_validator, Field, ConfigDict
from datetime import datetime
from typing import Dict, Any, List, Optional

# User models
class UserBase(BaseModel):
    username: str
    email: EmailStr

class UserCreate(UserBase):
    password: str

    @field_validator('password')
    @classmethod
    def password_strength(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters')
        if not any(char.isdigit() for char in v):
            raise ValueError('Password must contain at least one digit')
        if not any(char.isupper() for char in v):
            raise ValueError('Password must contain at least one uppercase letter')
        return v

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    model_config = ConfigDict(from_attributes=True)

class UserDetails(UserResponse):
    roles: List["RoleResponse"] = []
    groups: List["GroupResponse"] = []

    model_config = ConfigDict(from_attributes=True)

# Role models
class RoleBase(BaseModel):
    name: str
    description: Optional[str] = None

class RoleCreate(RoleBase):
    pass

class RoleResponse(RoleBase):
    id: int

    model_config = ConfigDict(from_attributes=True)

class RoleDetails(RoleResponse):
    permissions: List["PermissionResponse"] = []

    model_config = ConfigDict(from_attributes=True)

# Permission models
class PermissionBase(BaseModel):
    name: str
    resource: str
    action: str
    description: Optional[str] = None
    conditions: Optional[Dict[str, Any]] = None

class PermissionCreate(PermissionBase):
    pass

class PermissionResponse(PermissionBase):
    id: int

    model_config = ConfigDict(from_attributes=True)

# Group models
class GroupBase(BaseModel):
    name: str
    description: Optional[str] = None

class GroupCreate(GroupBase):
    pass

class GroupResponse(GroupBase):
    id: int

    model_config = ConfigDict(from_attributes=True)

class GroupDetails(GroupResponse):
    users: List[UserResponse] = []
    roles: List[RoleResponse] = []

    model_config = ConfigDict(from_attributes=True)

# Update models
class UserUpdate(BaseModel):
    username: Optional[str] = None
    email: Optional[EmailStr] = None
    is_active: Optional[bool] = None

class PasswordChange(BaseModel):
    current_password: str
    new_password: str

    @field_validator('new_password')
    @classmethod
    def password_strength(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters')
        if not any(char.isdigit() for char in v):
            raise ValueError('Password must contain at least one digit')
        if not any(char.isupper() for char in v):
            raise ValueError('Password must contain at least one uppercase letter')
        return v

# Token models
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenPayload(BaseModel):
    sub: Optional[str] = None
    exp: Optional[int] = None

# Fix circular references
UserDetails.model_rebuild()
RoleDetails.model_rebuild()
GroupDetails.model_rebuild()
Enter fullscreen mode Exit fullscreen mode

Authentication and Authorization Utilities

from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Union
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

# Security configuration
SECRET_KEY = "your-secret-key-here"  # In production, use environment variables
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Database dependency
async def get_async_db():
    async with AsyncSessionLocal() as session:
        yield session

# Password utilities
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

# JWT token functions
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# User authentication
async def authenticate_user(db: AsyncSession, username: str, password: str) -> Union[User, bool]:
    stmt = select(User).where(User.username == username)
    result = await db.execute(stmt)
    user = result.scalars().first()

    if not user or not verify_password(password, user.password_hash):
        return False
    if not user.is_active:
        return False
    return user

# Get current user from token
async def get_current_user(
    db: AsyncSession = Depends(get_async_db), 
    token: str = Depends(oauth2_scheme)
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: Optional[str] = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    stmt = select(User).where(User.username == username)
    result = await db.execute(stmt)
    user = result.scalars().first()

    if user is None:
        raise credentials_exception
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return user

# Get current active user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# ABAC Permission verification
async def has_permission(
    user: User,
    resource: str,
    action: str,
    resource_id: Optional[int] = None,
    context: Optional[Dict[str, Any]] = None
) -> bool:
    """
    Check if user has permission to perform an action on a resource.
    Implements Attribute-Based Access Control (ABAC) by considering:
    - Direct user permissions
    - Role-based permissions
    - Group-based permissions
    - Contextual conditions
    """
    context = context or {}

    # Helper function to check conditions
    def evaluate_conditions(conditions: Optional[Dict[str, Any]], context: Dict[str, Any]) -> bool:
        if not conditions:
            return True

        # Example condition: {"time_between": ["09:00", "17:00"]}
        for condition_key, condition_value in conditions.items():
            if condition_key == "time_between":
                current_time = context.get("current_time", datetime.now().time())
                start_time = datetime.strptime(condition_value[0], "%H:%M").time()
                end_time = datetime.strptime(condition_value[1], "%H:%M").time()
                if not (start_time <= current_time <= end_time):
                    return False
            elif condition_key == "ip_range":
                ip = context.get("ip_address")
                if not ip or ip not in condition_value:
                    return False
            # Add more condition types as needed

        return True

    # Check direct user permissions
    for permission in user.direct_permissions:
        if (permission.resource == resource and 
            permission.action == action and 
            evaluate_conditions(permission.conditions, context)):
            return True

    # Check role-based permissions
    for role in user.roles:
        for permission in role.permissions:
            if (permission.resource == resource and 
                permission.action == action and 
                evaluate_conditions(permission.conditions, context)):
                return True

    # Check group-based permissions (through roles)
    for group in user.groups:
        for role in group.roles:
            for permission in role.permissions:
                if (permission.resource == resource and 
                    permission.action == action and 
                    evaluate_conditions(permission.conditions, context)):
                    return True

    return False

# Permission dependency for FastAPI routes
def require_permission(resource: str, action: str):
    async def permission_dependency(
        current_user: User = Depends(get_current_active_user),
        request: Request = None
    ):
        context = {
            "current_time": datetime.now().time(),
            "ip_address": request.client.host if request else None
        }

        if not await has_permission(current_user, resource, action, context=context):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission denied: {action} on {resource}"
            )
        return current_user
    return permission_dependency

# Audit logging utility
async def create_audit_log(
    db: AsyncSession,
    user_id: Optional[int],
    action: str,
    resource_type: str,
    resource_id: Optional[int] = None,
    details: Optional[Dict[str, Any]] = None,
    ip_address: Optional[str] = None
) -> AuditLog:
    audit_log = AuditLog(
        user_id=user_id,
        action=action,
        resource_type=resource_type,
        resource_id=resource_id,
        details=details,
        ip_address=ip_address
    )
    db.add(audit_log)
    await db.commit()
    await db.refresh(audit_log)
    return audit_log
Enter fullscreen mode Exit fullscreen mode

FastAPI Endpoints with Modern Patterns

from fastapi import FastAPI, Depends, HTTPException, status, Request, Response, BackgroundTasks
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.routing import APIRouter
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select, update, delete
from typing import List, Optional
from datetime import timedelta

app = FastAPI(
    title="Permission Management API",
    description="Modern API for user permission management",
    version="2.0.0"
)

# Routers for better organization
auth_router = APIRouter(prefix="/auth", tags=["Authentication"])
user_router = APIRouter(prefix="/users", tags=["User Management"])
role_router = APIRouter(prefix="/roles", tags=["Role Management"])
permission_router = APIRouter(prefix="/permissions", tags=["Permission Management"])
group_router = APIRouter(prefix="/groups", tags=["Group Management"])

# Authentication endpoints
@auth_router.post("/token", response_model=Token)
async def login_for_access_token(
    background_tasks: BackgroundTasks,
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_async_db),
    request: Request = None
):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    # Log successful login in background
    background_tasks.add_task(
        create_audit_log,
        db=db,
        user_id=user.id,
        action="login",
        resource_type="auth",
        details={"success": True},
        ip_address=request.client.host if request else None
    )

    return {"access_token": access_token, "token_type": "bearer"}

# User endpoints
@user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_async_db),
    request: Request = None,
    current_user: User = Depends(require_permission("users", "create"))
):
    try:
        hashed_password = get_password_hash(user.password)
        db_user = User(
            username=user.username,
            email=user.email,
            password_hash=hashed_password
        )
        db.add(db_user)
        await db.commit()
        await db.refresh(db_user)

        # Log user creation in background
        background_tasks.add_task(
            create_audit_log,
            db=db,
            user_id=current_user.id,
            action="create",
            resource_type="users",
            resource_id=db_user.id,
            details={"username": user.username, "email": user.email},
            ip_address=request.client.host if request else None
        )

        return db_user
    except IntegrityError:
        await db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Username or email already exists"
        )

@user_router.get("/", response_model=List[UserResponse])
async def get_users(
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_async_db),
    current_user: User = Depends(require_permission("users", "list"))
):
    stmt = select(User).offset(skip).limit(limit)
    result = await db.execute(stmt)
    users = result.scalars().all()
    return users

@user_router.get("/{user_id}", response_model=UserDetails)
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_async_db),
    current_user: User = Depends(require_permission("users", "read"))
):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    db_user = result.scalars().first()

    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

@user_router.put("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    user_update: UserUpdate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_async_db),
    request: Request = None,
    current_user: User = Depends(require_permission("users", "update"))
):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    db_user = result.scalars().first()

    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")

    update_data = user_update.model_dump(exclude_unset=True)

    try:
        if update_data:
            stmt = (
                update(User)
                .where(User.id == user_id)
                .values(**update_data)
            )
            await db.execute(stmt)
            await db.commit()

        # Refresh user data
        stmt = select(User).where(User.id == user_id)
        result = await db.execute(stmt)
        db_user = result.scalars().first()

        # Log user update in background
        background_tasks.add_task(
            create_audit_log,
            db=db,
            user_id=current_user.id,
            action="update",
            resource_type="users",
            resource_id=user_id,
            details=update_data,
            ip_address=request.client.host if request else None
        )

        return db_user
    except IntegrityError:
        await db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Username or email already exists"
        )

@user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
    user_id: int,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_async_db),
    request: Request = None,
    current_user: User = Depends(require_permission("users", "delete"))
):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    db_user = result.scalars().first()

    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")

    # Store username for audit log
    username = db_user.username

    stmt = delete(User).where(User.id == user_id)
    await db.execute(stmt)
    await db.commit()

    # Log user deletion in background
    background_tasks.add_task(
        create_audit_log,
        db=db,
        user_id=current_user.id,
        action="delete",
        resource_type="users",
        resource_id=user_id,
        details={"username": username},
        ip_address=request.client.host if request else None
    )

    return Response(status_code=status.HTTP_204_NO_CONTENT)

# Role endpoints
@role_router.post("/", response_model=RoleResponse, status_code=status.HTTP_201_CREATED)
async def create_role(
    role: RoleCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_async_db),
    request: Request = None,
    current_user: User = Depends(require_permission("roles", "create"))
):
    try:
        db_role = Role(**role.model_dump())
        db.add(db_role)
        await db.commit()
        await db.refresh(db_role)

        # Log role creation in background
        background_tasks.add_task(
            create_audit_log,
            db=db,
            user_id=current_user.id,
            action="create",
            resource_type="roles",
            resource_id=db_role.id,
            details=role.model_dump(),
            ip_address=request.client.host if request else None
        )

        return db_role
    except IntegrityError:
        await db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Role name already exists"
        )

# Permission check endpoint (useful for frontend authorization)
@auth_router.get("/check-permission")
async def check_user_permission(
    resource: str,
    action: str,
    resource_id: Optional[int] = None,
    db: AsyncSession = Depends(get_async_db),
    current_user: User = Depends(get_current_active_user),
    request: Request = None
):
    context = {
        "current_time": datetime.now().time(),
        "ip_address": request.client.host if request else None
    }

    has_access = await has_permission(
        user=current_user,
        resource=resource,
        action=action,
        resource_id=resource_id,
        context=context
    )

    return {"has_permission": has_access}

# Register routers
app.include_router(auth_router)
app.include_router(user_router)
app.include_router(role_router)
app.include_router(permission_router)
app.include_router(group_router)

# Lifespan for database setup
@app.on_event("startup")
async def startup():
    await create_tables()
Enter fullscreen mode Exit fullscreen mode

Implementation of Modern Permission Models

ABAC Implementation

Our permission system implements Attribute-Based Access Control through:

  1. Contextual Permission Checking: The has_permission function evaluates permissions based on:

    • User attributes (roles, group memberships)
    • Environmental factors (time of day, IP address)
    • Resource-specific attributes
  2. JSON Conditions:

{
  "time_between": ["09:00", "17:00"],
  "ip_range": ["192.168.1.0/24", "10.0.0.0/8"],
  "department": "finance"
}
Enter fullscreen mode Exit fullscreen mode
  1. Dynamic Context Evaluation:
# Example usage in an API endpoint
@app.get("/reports/{report_id}")
async def get_report(
    report_id: int,
    request: Request,
    db: AsyncSession = Depends(get_async_db),
    current_user: User = Depends(get_current_active_user)
):
    # Create context with current time and IP
    context = {
        "current_time": datetime.now().time(),
        "ip_address": request.client.host,
        "department": current_user.department  # Assuming user has department attribute
    }

    # Check permission with context
    if not await has_permission(
        user=current_user,
        resource="reports",
        action="read",
        resource_id=report_id,
        context=context
    ):
        raise HTTPException(status_code=403, detail="Permission denied")

    # Retrieve and return report...
Enter fullscreen mode Exit fullscreen mode

Policy-Based Access Control

Our system implements PBAC through centralized permission policies:

  1. Hierarchical Permission Inheritance:

    • Users inherit permissions from roles
    • Roles can be assigned to groups
    • Groups can have multiple roles
  2. Policy Centralization:

    • Permissions are defined once and reused
    • Conditions are stored in the database
    • Changes to policies are immediately reflected

Performance Optimization

For large-scale applications, we've incorporated several performance enhancements:

  1. Async Database Operations: Using SQLAlchemy's async capabilities with asyncio

  2. Background Tasks: Moving audit logging to background tasks

  3. Efficient Queries: Using SQLAlchemy 2.0's more efficient query patterns

  4. Pydantic v2: Utilizing the significantly faster Pydantic v2 for validation

Security Best Practices

Our implementation incorporates several security best practices:

  1. Password Handling:

    • Secure hashing with bcrypt
    • Password strength validation
    • Separate password change endpoint
  2. API Security:

    • JWT authentication with expiration
    • CORS protection
    • Input validation
  3. Data Protection:

    • Separate request/response models
    • No sensitive data in responses
    • Audit logging for all sensitive operations

Conclusion

This article has demonstrated how to build a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. By following current best practices and incorporating recent advancements in these technologies, you can create a secure, scalable, and maintainable permission system.

Key advantages of this approach include:

  1. Type Safety: Using SQLAlchemy 2.0's typed attributes and Python's type hints
  2. Performance: Async operations and modern optimizations
  3. Flexibility: Supporting complex permission models like ABAC and PBAC
  4. Maintainability: Clear code organization with routers and dependency injection
  5. Security: Comprehensive security controls and audit logging

For production environments, consider these additional enhancements:

  • Implementing rate limiting
  • Adding two-factor authentication
  • Setting up permission caching for frequently checked permissions
  • Integrating with external identity providers like Auth0 or Okta

The code provided in this article serves as a solid foundation that you can extend and customize to meet your specific requirements.

AWS Security LIVE!

Tune in for AWS Security LIVE!

Join AWS Security LIVE! for expert insights and actionable tips to protect your organization and keep security teams prepared.

Learn More

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs