DEV Community

Teresa N. Fontanella De Santis
Teresa N. Fontanella De Santis

Posted on

23 2

Authorization on FastAPI with Casbin

Nowadays, tons of APIs (both external and internal) are created and used every day. With methods like authentication/firewall restriction, we can identify who can invoke the methods, or restrict from where is trying to access. But, can we identify and authorize given users to invoke some methods rather than others? In the following tutorial we will cover how to authorize different users to execute certain REST API methods in an easy and straightforward way.

Situation

In this case, we have an Items REST API implemented on Python 3.10 with FastAPI framework. It allows to list all items and get, create and delete an item. All of these operations must be performed by authenticated users. For sake of simplicity, the following users can be used for authentication:

User Password Role
alice secret2 Admin
johndoe secret User

The application consists of the files: main.py, utils.py and requirements.txt, with the following code.
main.py

from fastapi import Depends, FastAPI, HTTPException, status, Request, Response
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from starlette.responses import PlainTextResponse, RedirectResponse
from pydantic import BaseModel
from utils import ItemsDAO, UsersDAO, UserInDB, User
from utils import Item
app = FastAPI()
items_dao = ItemsDAO()
users_dao = UsersDAO()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = users_dao.decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(curr_user: User = Depends(get_current_user)):
if curr_user.disabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user")
return curr_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = users_dao.get_user(form_data.username)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect username or password")
hashed_password = users_dao.hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/", include_in_schema=False)
async def redirect_to_docs():
response = RedirectResponse(url='/docs')
return response
@app.get("/items")
async def read_all_items(req: Request, curr_user: User = Depends(get_current_active_user)):
return items_dao.get_all_items()
@app.get("/items/{item_id}")
async def read_item(item_id: int, req: Request, curr_user: User = Depends(get_current_active_user)):
return items_dao.get_item(item_id)
@app.post("/items/")
async def create_item(item: Item, req: Request, curr_user: User = Depends(get_current_active_user)):
answer = items_dao.create_item(item)
if not(answer):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Item with given id already exists")
else:
return answer
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int, req: Request, curr_user: User = Depends(get_current_active_user)):
items_dao.delete_item(item_id)
return Response(status_code=status.HTTP_204_NO_CONTENT)
view raw main.py hosted with ❤ by GitHub

utils.py

from typing import Optional
from itertools import filterfalse
from pydantic import BaseModel, Field
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class UsersDAO():
def __init__(self):
self.users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": False
}
}
def get_user(self, username: str):
if username in self.users_db:
user_dict = self.users_db[username]
return UserInDB(**user_dict)
def hash_password(self, password: str):
return "fakehashed" + password
def decode_token(self, token):
# This doesn't provide any security at all
# Check the next version
user = self.get_user(token)
return user
class Item(BaseModel):
id: int=Field(1)
item_name: str
class ItemsDAO():
def __init__(self):
self.items_db = [
Item(id=1, item_name="Foo"),
Item(id=2, item_name="Bar"),
Item(id=3, item_name="Baz")
]
def create_item(self, item):
if isinstance(item, Item):
item = [i for i in self.items_db if i.id == item.id]
if len(item) > 0:
return None
else:
self.items_db.append(item)
return item.dict()
def delete_item(self, item_id):
self.items_db[:] = filterfalse(lambda x: x.id == item_id, self.items_db)
def get_all_items(self):
items = [i.dict() for i in self.items_db]
return items
def get_item(self, item_id):
item = [i for i in self.items_db if i.id == item_id]
if len(item) > 0:
return item[0].dict()
else:
return {}
view raw utils.py hosted with ❤ by GitHub

requirements.txt

pydantic
fastapi
uvicorn
python-multipart

You can create a conda environment, install required packages and run the api with:



conda create -n itemsapi pip
conda activate itemsapi
pip install -r requirements.txt
python3 -m uvicorn main:app --reload 


Enter fullscreen mode Exit fullscreen mode

After the API is up and running, let's follow these steps to test it:

  1. Open http://127.0.0.0:8000/docs url in browser.
  2. Click on "Authorize" and login with username and password (as per Users table shown before).
  3. To get all items list, select on /items GET API method. Then, click on "Try out" button and on "Execute" button.
  4. To delete the item with id = 1, select on /item DELETE API method, click on "Try out" and execute the method with item_id = 1. The response is 204 and item is deleted successfully.

Image description

Goal

Although only registered users (johndoe and alice in this case) can perform items actions, all of them are able to delete items. As per their roles, alice should be able to delete items, but johndoe shouldn't. To achieve this we will implement authorization at REST API method level, in an easy and extensible way with Casbin.

Implementation

Overview

Casbin is an open source authorization library with support for many models (like Access Control Lists or ACLs, Role Based Access Control or RBAC, Restful, etc) and with implementations on several programming languages (ie: Python, Go, Java, Rust, Ruby, etc).
It consists of two configuration files:

  • A model file: a CONF file (with .conf extension) which specifies the authorization model to be applied (in this case we will use Restful one)
  • A policy file: a CSV file that list API methods permissions for each user.

Steps

1) Install casbin python package with pip
pip install casbin
2) Define Conf policy
Create new file called model.conf with the following content:

[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && keyMatch(r.obj, p.obj) && regexMatch(r.act, p.act)
view raw model.conf hosted with ❤ by GitHub

You can find more details about Casbin model syntax on the documentation

3) Define Policy file

Create a new CSV file called policy.csv and paste the following:

p alice /items/* (GET)|(DELETE)|(POST)
p alice /items (GET)
p johndoe /items/* (GET)|(POST)
p johndoe /items (GET)
view raw policy.csv hosted with ❤ by GitHub

Each row is an allowed permissions with the following values: the second column is the user, the third is the API resource or url, and the last one is a set of allowed methods. In this case, alice will have access to list create and delete items, while johndoe may list and create items but not delete them.

4) Update Python API code to enforce authorization.
In the main.py file, with following lines:



import casbin
...
async def get_current_user_authorization(req: Request, curr_user: User = Depends(get_current_active_user)):
    e = casbin.Enforcer("model.conf", "policy.csv")
    sub = curr_user.username
    obj = req.url.path
    act = req.method
    if not(e.enforce(sub, obj, act)):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Method not authorized for this user")
    return curr_user


Enter fullscreen mode Exit fullscreen mode

It imports the casbin module and create a new authorization function that read the configuration files with casbin.Enforcer and enforce the user has the required permission.

Then, on the defined API methods, change the old method get_current_active_user with the new get_current_user_authorization



@app.get("/items/{item_id}")
async def read_item(item_id: int, req: Request, curr_user: User = Depends(get_current_user_authorization)):
return items_dao.get_item(item_id)

@app.post("/items/")
async def create_item(item: Item, req: Request, curr_user: User = Depends(get_current_user_authorization)):
answer = items_dao.create_item(item)
if not(answer):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Item with given id already exists")
else:
return answer

@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int, req: Request, curr_user: User = Depends(get_current_user_authorization)):
items_dao.delete_item(item_id)
return Response(status_code=status.HTTP_204_NO_CONTENT)

Enter fullscreen mode Exit fullscreen mode




Test

  1. Start the updated API
  2. Open http://127.0.0.0:8000/docs url in browser.
  3. Click on "Authorize" and login with "johndoe".
  4. Try to delete item with id=1. It will be rejected with a 401 Unauthorized error.
  5. Logout from that user. Then login with "alice".
  6. Try to delete item with id=1. The request works fine, returns 204 and item is deleted.

Image description

Conclusion

On this post we saw how to use Casbin to implement authorization on REST APIs. Keep in consideration that this example can be extended combining with other authorization models like RBAC, and only changing the model and policy configuration files.

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (1)

Collapse
 
lizelleniit profile image
Lizelle Niit

Thanks very much for this tut. It works right out of the box for me - I was expecting to have to fiddle :)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay