In this article, I will guide you through designing and implementing a user permission management system using FastAPI, SQLAlchemy, and MariaDB. I'll cover the database schema, recent trends in permission models, and how to set up API endpoints to manage users, roles, permissions, and groups effectively.
Recent Trends in Permission Models
While traditional Role-Based Access Control (RBAC) is still widely used, recent trends in permission models have introduced more flexible and granular control mechanisms. Here are some of the notable trends:
1. Attribute-Based Access Control (ABAC)
ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. It allows for highly flexible and detailed access control policies, considering factors such as user roles, departments, and working hours.
2. Policy-Based Access Control (PBAC)
PBAC manages access through policies that define specific conditions for access. These policies are centrally managed and can include complex rules, making it suitable for organizations with intricate access control requirements.
3. JSON-Based Permission Management
Storing permissions in JSON format allows for a more flexible and hierarchical structure, making it easier to manage complex permission sets and conditions.
4. User Groups and Hierarchies
Incorporating user groups and hierarchical structures (e.g., teams, departments, projects) can streamline permission management by grouping users with similar access needs.
5. Integration with External Authentication and Authorization
Using external systems like OAuth and OpenID Connect enables Single Sign-On (SSO) and consistent authentication and authorization across multiple systems.
Database Design
Let's start with designing the database schema that incorporates these trends.
Basic Tables
Users Table
Stores basic user information.
CREATE TABLE Users (
UserID INT PRIMARY KEY AUTO_INCREMENT,
Username VARCHAR(50) NOT NULL UNIQUE,
PasswordHash VARCHAR(255) NOT NULL,
Email VARCHAR(100) NOT NULL UNIQUE,
CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Roles Table
Defines roles for users.
CREATE TABLE Roles (
RoleID INT PRIMARY KEY AUTO_INCREMENT,
RoleName VARCHAR(50) NOT NULL UNIQUE
);
Permissions Table
Defines specific permissions.
CREATE TABLE Permissions (
PermissionID INT PRIMARY KEY AUTO_INCREMENT,
PermissionName VARCHAR(100) NOT NULL UNIQUE,
PermissionDetails JSON NOT NULL
);
Relationship Tables
UserRoles Table
Manages many-to-many relationships between users and roles.
CREATE TABLE UserRoles (
UserID INT,
RoleID INT,
PRIMARY KEY (UserID, RoleID),
FOREIGN KEY (UserID) REFERENCES Users(UserID),
FOREIGN KEY (RoleID) REFERENCES Roles(RoleID)
);
RolePermissions Table
Manages many-to-many relationships between roles and permissions.
CREATE TABLE RolePermissions (
RoleID INT,
PermissionID INT,
PRIMARY KEY (RoleID, PermissionID),
FOREIGN KEY (RoleID) REFERENCES Roles(RoleID),
FOREIGN KEY (PermissionID) REFERENCES Permissions(PermissionID)
);
UserPermissions Table
Allows direct assignment of permissions to users.
CREATE TABLE UserPermissions (
UserID INT,
PermissionID INT,
PRIMARY KEY (UserID, PermissionID),
FOREIGN KEY (UserID) REFERENCES Users(UserID),
FOREIGN KEY (PermissionID) REFERENCES Permissions(PermissionID)
);
Groups Table
Manages user groups.
CREATE TABLE Groups (
GroupID INT PRIMARY KEY AUTO_INCREMENT,
GroupName VARCHAR(100) NOT NULL UNIQUE
);
CREATE TABLE UserGroups (
UserID INT,
GroupID INT,
PRIMARY KEY (UserID, GroupID),
FOREIGN KEY (UserID) REFERENCES Users(UserID),
FOREIGN KEY (GroupID) REFERENCES Groups(GroupID)
);
AuditLogs Table
Tracks permission changes.
CREATE TABLE AuditLogs (
AuditID INT PRIMARY KEY AUTO_INCREMENT,
UserID INT,
ChangeType VARCHAR(50),
ChangeDetails JSON,
ChangedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (UserID) REFERENCES Users(UserID)
);
Setting Up FastAPI and SQLAlchemy
Database Models
Define the database models using SQLAlchemy.
from sqlalchemy import Column, Integer, String, ForeignKey, Table, create_engine, JSON, TIMESTAMP
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.sql import func
Base = declarative_base()
# Association tables for many-to-many relationships
user_roles = Table(
'user_roles', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('role_id', Integer, ForeignKey('roles.id'))
)
role_permissions = Table(
'role_permissions', Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id')),
Column('permission_id', Integer, ForeignKey('permissions.id'))
)
user_permissions = Table(
'user_permissions', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('permission_id', Integer, ForeignKey('permissions.id'))
)
user_groups = Table(
'user_groups', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('group_id', Integer, ForeignKey('groups.id'))
)
# Models
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
password_hash = Column(String(255), nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
roles = relationship('Role', secondary=user_roles, back_populates='users')
permissions = relationship('Permission', secondary=user_permissions, back_populates='users')
groups = relationship('Group', secondary=user_groups, back_populates='users')
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, index=True, nullable=False)
users = relationship('User', secondary=user_roles, back_populates='roles')
permissions = relationship('Permission', secondary=role_permissions, back_populates='roles')
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), unique=True, index=True, nullable=False)
details = Column(JSON, nullable=False)
roles = relationship('Role', secondary=role_permissions, back_populates='permissions')
users = relationship('User', secondary=user_permissions, back_populates='permissions')
class Group(Base):
__tablename__ = 'groups'
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), unique=True, index=True, nullable=False)
users = relationship('User', secondary=user_groups, back_populates='groups')
class AuditLog(Base):
__tablename__ = 'audit_logs'
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey('users.id'))
change_type = Column(String(50))
change_details = Column(JSON)
changed_at = Column(TIMESTAMP, server_default=func.now())
user = relationship('User')
# Database setup
DATABASE_URL = "mariadb+mariadbconnector://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
FastAPI Endpoints
Set up the API endpoints using FastAPI.
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
import bcrypt
app = FastAPI()
# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Pydantic models for request bodies
class UserCreate(BaseModel):
username: str
email: str
password: str
class RoleCreate(BaseModel):
name: str
class PermissionCreate(BaseModel):
name: str
details: dict
class GroupCreate(BaseModel):
name: str
# Utility functions
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Endpoints
@app.post("/users/", response_model=UserCreate)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
hashed_password = get_password_hash(user.password)
db_user = User(username=user.username, email=user.email, password_hash=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/roles/", response_model=RoleCreate)
def create_role(role: RoleCreate, db: Session = Depends(get_db)):
db_role = Role(name=role.name)
db.add(db_role)
db.commit()
db.refresh(db_role)
return db_role
@app.post("/permissions/", response_model=PermissionCreate)
def create_permission(permission: PermissionCreate, db: Session = Depends(get_db)):
db_permission = Permission(name=permission.name, details=permission.details)
db.add(db_permission)
db.commit()
db.refresh(db_permission)
return db_permission
@app.post("/groups/", response_model=GroupCreate)
def create_group(group: GroupCreate, db: Session = Depends(get_db)):
db_group = Group(name=group.name)
db.add(db_group)
db.commit()
db.refresh(db_group)
return db_group
@app.post("/users/{user_id}/roles/{role_id}")
def assign_role_to_user(user_id: int, role_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
role = db.query(Role).filter(Role.id == role_id).first()
if not user or not role:
raise HTTPException(status_code=404, detail="User or Role not found")
user.roles.append(role)
db.commit()
return {"message": "Role assigned to user"}
@app.post("/users/{user_id}/permissions/{permission_id}")
def assign_permission_to_user(user_id: int, permission_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
permission = db.query(Permission).filter(Permission.id == permission_id).first()
if not user or not permission:
raise HTTPException(status_code=404, detail="User or Permission not found")
user.permissions.append(permission)
db.commit()
return {"message": "Permission assigned to user"}
@app.post("/roles/{role_id}/permissions/{permission_id}")
def assign_permission_to_role(role_id: int, permission_id: int, db: Session = Depends(get_db)):
role = db.query(Role).filter(Role.id == role_id).first()
permission = db.query(Permission).filter(Permission.id == permission_id).first()
if not role or not permission:
raise HTTPException(status_code=404, detail="Role or Permission not found")
role.permissions.append(permission)
db.commit()
return {"message": "Permission assigned to role"}
@app.post("/users/{user_id}/groups/{group_id}")
def assign_user_to_group(user_id: int, group_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
group = db.query(Group).filter(Group.id == group_id).first()
if not user or not group:
raise HTTPException(status_code=404, detail="User or Group not found")
user.groups.append(group)
db.commit()
return {"message": "User assigned to group"}
@app.post("/audit/", response_model=dict)
def create_audit_log(user_id: int, change_type: str, change_details: dict, db: Session = Depends(get_db)):
db_audit = AuditLog(user_id=user_id, change_type=change_type, change_details=change_details)
db.add(db_audit)
db.commit()
db.refresh(db_audit)
return {"message": "Audit log created"}
Conclusion
This article provided a comprehensive guide to building a user permission management system using FastAPI, SQLAlchemy, and MariaDB. By incorporating recent trends in permission models, I designed a flexible and scalable system. Remember, for a complete production-ready implementation, consider adding further security measures, error handling, validation, and logging.
Top comments (0)