This is a very minimalist example of how role-based access control could be implemented in FastApi by using dependency injection. The main idea is to have a dependency that acts as authorization: the endpoint function would then either respond with the resource or with an unauthorized response.
Lets start by having route that logs the user in and responds with a token.
import datetime
from typing import Any
import bcrypt
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from fastapi.testclient import TestClient
from pydantic import BaseModel
fake_users = [
# password foo
{'id': 1, 'username': 'admin', 'password': '$2b$12$N.i74Kle18n5Toxhas.rVOjZreVC2WM34fCidNDyhSNgxVlbKwX7i',
'permissions': ['items:read', 'items:write', 'users:read', 'users:write']
},
# password bar
{'id': 2, 'username': 'client', 'password': '$2b$12$KUgpw1m0LF/s9NS1ZB5rRO2cA5D13MqRm56ab7ik2ixftXW/aqEyq',
'permissions': ['items:read']}
]
class UserBase(BaseModel):
username: str
password: str
class LoginData(UserBase):
pass
class PyUser(UserBase):
id: int
permissions: list[str] = []
class Token(BaseModel):
access_token: str
token_type: str
app = FastAPI()
oauth_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={'items': 'permissions to access items'}
)
def authenticate_user(username: str, password: str) -> PyUser:
exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid credentials'
)
for obj in fake_users:
if obj['username'] == username:
if not bcrypt.checkpw(password.encode(), obj['password'].encode()):
raise exception
user = PyUser(**obj)
return user
raise exception
def get_current_user(
token: str = Depends(oauth_scheme)
) -> PyUser:
decoded = jwt.decode(token, 'secret', algorithms=['HS256'])
username = decoded['sub']
for obj in fake_users:
if obj['username'] == username:
user = PyUser(**obj)
return user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid credentials'
)
def create_token(user: PyUser) -> str:
payload = {'sub': user.username, 'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=90)}
token = jwt.encode(payload, key='secret')
return token
@app.post('/token')
def login(login_data: LoginData) -> Token:
user = authenticate_user(**login_data.dict())
token_str = create_token(user)
token = Token(access_token=token_str, token_type='bearer')
return token
@app.get('/users/me')
def get_user(current_user: PyUser = Depends(get_current_user)):
return current_user
Lets go over this code (which is very similar what FastApi uses in the documentation examples).
oauth_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={'items': 'permissions to access items'}
)
This code creates a dependency OAuth2PasswordBearer
passing 'token' as tokenUrl
, this is the url used to get the token.
def get_current_user(
token: str = Depends(oauth_scheme)
) -> PyUser:
decoded = jwt.decode(token, 'secret', algorithms=['HS256'])
username = decoded['sub']
for obj in fake_users:
if obj['username'] == username:
user = PyUser(**obj)
return user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid credentials'
)
This dependency takes an argument of type str
which defaults to Depends(oauth_scheme)
, the oauth_scheme
dependency will return the token, the token is then decoded and the dependency returns an instance of PyUser
which is a pydantic BaseModel
.
Now lets add 2 routes with different access control requirements.
@app.get('/items')
def items(
authorize: bool = Depends(PermissionChecker(required_permissions=['items:read',]))
):
return 'items'
@app.get('/users')
def users(
authorize: bool = Depends(PermissionChecker(required_permissions=['users:read',]))
):
return 'users'
We now need the PermissionChecker
dependency, this dependency should check the user permissions against the route required permissions.
class PermissionChecker:
def __init__(self, required_permissions: list[str]) -> None:
self.required_permissions = required_permissions
def __call__(self, user: PyUser = Depends(get_current_user)) -> bool:
for r_perm in self.required_permissions:
if r_perm not in user.permissions:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Permissions'
)
return True
Top comments (1)
Wow, I was looking for similar articles like mine and it seems I just found one pretty much the same. Nice work mate, this is my take in case you find a trick or two medium.com/me/stats/post/a42457fc85ca