DEV Community

Cover image for Role-based access control using FastApi
Moad Ennagi
Moad Ennagi

Posted on

Role-based access control using FastApi

This is a very minimalist example of how role-based access control could be implemented in FastApi by using dependency injection. The main idea is to have a dependency that acts as authorization: the endpoint function would then either respond with the resource or with an unauthorized response.

Lets start by having route that logs the user in and responds with a token.

import datetime
from typing import Any

import bcrypt
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from fastapi.testclient import TestClient
from pydantic import BaseModel

fake_users = [
    # password foo
    {'id': 1, 'username': 'admin', 'password': '$2b$12$N.i74Kle18n5Toxhas.rVOjZreVC2WM34fCidNDyhSNgxVlbKwX7i',
     'permissions': ['items:read', 'items:write', 'users:read', 'users:write']
    },
    # password bar
    {'id': 2, 'username': 'client', 'password': '$2b$12$KUgpw1m0LF/s9NS1ZB5rRO2cA5D13MqRm56ab7ik2ixftXW/aqEyq',
     'permissions': ['items:read']}
]

class UserBase(BaseModel):
    username: str
    password: str

class LoginData(UserBase):
    pass

class PyUser(UserBase):
    id: int
    permissions: list[str] = []

class Token(BaseModel):
    access_token: str
    token_type: str

app = FastAPI()

oauth_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={'items': 'permissions to access items'}
)

def authenticate_user(username: str, password: str) -> PyUser:
    exception = HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail='Invalid credentials'
                )
    for obj in fake_users:
        if obj['username'] == username:
            if not bcrypt.checkpw(password.encode(), obj['password'].encode()):
                raise exception
            user = PyUser(**obj)
            return user
    raise exception

def get_current_user(
    token: str = Depends(oauth_scheme)
) -> PyUser:
    decoded = jwt.decode(token, 'secret', algorithms=['HS256'])
    username = decoded['sub']
    for obj in fake_users:
        if obj['username'] == username:
            user = PyUser(**obj)
            return user
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Invalid credentials'
    )

def create_token(user: PyUser) -> str:
    payload = {'sub': user.username, 'iat': datetime.datetime.utcnow(),
               'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=90)}
    token = jwt.encode(payload, key='secret')
    return token

@app.post('/token')
def login(login_data: LoginData) -> Token:
    user = authenticate_user(**login_data.dict())
    token_str = create_token(user)
    token = Token(access_token=token_str, token_type='bearer')
    return token

@app.get('/users/me')
def get_user(current_user: PyUser = Depends(get_current_user)):
    return current_user
Enter fullscreen mode Exit fullscreen mode

Lets go over this code (which is very similar what FastApi uses in the documentation examples).

oauth_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={'items': 'permissions to access items'}
)
Enter fullscreen mode Exit fullscreen mode

This code creates a dependency OAuth2PasswordBearer passing 'token' as tokenUrl, this is the url used to get the token.

def get_current_user(
    token: str = Depends(oauth_scheme)
) -> PyUser:
    decoded = jwt.decode(token, 'secret', algorithms=['HS256'])
    username = decoded['sub']
    for obj in fake_users:
        if obj['username'] == username:
            user = PyUser(**obj)
            return user
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Invalid credentials'
    )
Enter fullscreen mode Exit fullscreen mode

This dependency takes an argument of type str which defaults to Depends(oauth_scheme), the oauth_scheme dependency will return the token, the token is then decoded and the dependency returns an instance of PyUser which is a pydantic BaseModel.

Now lets add 2 routes with different access control requirements.

@app.get('/items')
def items(
    authorize: bool = Depends(PermissionChecker(required_permissions=['items:read',]))
):
    return 'items'

@app.get('/users')
def users(
    authorize: bool = Depends(PermissionChecker(required_permissions=['users:read',]))
):
    return 'users'
Enter fullscreen mode Exit fullscreen mode

We now need the PermissionChecker dependency, this dependency should check the user permissions against the route required permissions.

class PermissionChecker:

    def __init__(self, required_permissions: list[str]) -> None:
        self.required_permissions = required_permissions

    def __call__(self, user: PyUser = Depends(get_current_user)) -> bool:
        for r_perm in self.required_permissions:
            if r_perm not in user.permissions:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail='Permissions'
                )
        return True
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
chrisk824 profile image
Chris Karvouniaris

Wow, I was looking for similar articles like mine and it seems I just found one pretty much the same. Nice work mate, this is my take in case you find a trick or two medium.com/me/stats/post/a42457fc85ca