In the previous-post, I already went over how to set up and create a new user account API service.
In this post, I will show how to do an authenticate API using FastAPI.
I. Hash the real password before save to database:
Previously, we’re doing like this :
def create_user(db: Session, user: schemas.UserCreate): | |
fake_hashed_password = user.password + "notreallyhashed" | |
db_user = models.UserInfo(username=user.username, password=fake_hashed_password, fullname=user.fullname) | |
db.add(db_user) | |
db.commit() | |
db.refresh(db_user) | |
return db_user |
Now, we will hash the password using the bycrypt library.
bcrypt is a password hashing function designed by Niels Provos and David Mazières, based on the Blowfish cipher, and presented at USENIX in 1999.[1] Besides incorporating a salt to protect against rainbow table attacks, bcrypt is an adaptive function: over time, the iteration count can be increased to make it slower, so it remains resistant to brute-force search attacks even with increasing computation power.
To install bcrypt library in python, simply
pipenv install bcrypt
And now the code for create new user will look like:
def create_user(db: Session, user: schemas.UserCreate): | |
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt()) | |
db_user = models.UserInfo(username=user.username, password=hashed_password, fullname=user.fullname) | |
db.add(db_user) | |
db.commit() | |
db.refresh(db_user) | |
return db_user |
II.Check the input password for authenticate API matching
def check_username_password(db: Session, user: schemas.UserAuthenticate): | |
db_user_info: models.UserInfo = get_user_by_username(db, username=user.username) | |
return bcrypt.checkpw(user.password.encode('utf-8'), db_user_info.password.encode('utf-8')) |
To be able to do this, we use bcrypt.checkpw from the bcrypt library.
III.Create a token object response if the username and password is correct
1.Define Token Schemas in schemas.py
class Token(BaseModel): | |
access_token: str | |
token_type: str |
2.Create an access token by using jwt library
Utilize the jwt library in python by installing it:
pipenv install pyjwt
Then define a method that create an access token from the hashed_password in the database
def create_access_token(*, data: dict, expires_delta: timedelta = None): | |
secret_key = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" | |
algorithm = "HS256" | |
to_encode = data.copy() | |
if expires_delta: | |
expire = datetime.utcnow() + expires_delta | |
else: | |
expire = datetime.utcnow() + timedelta(minutes=15) | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) | |
return encoded_jwt |
IV.Wrap altogether and define the authenticate api from main.py
Wrap it altogether to create authenticate API
The below method will validate the username and password, then return the access_token in the response if the username and password is correct.
@app.post("/authenticate", response_model=schemas.Token) | |
def authenticate_user(user: schemas.UserAuthenticate, db: Session = Depends(get_db)): | |
db_user = crud.get_user_by_username(db, username=user.username) | |
if db_user is None: | |
raise HTTPException(status_code=400, detail="Username not existed") | |
else: | |
is_password_correct = crud.check_username_password(db, user) | |
if is_password_correct is False: | |
raise HTTPException(status_code=400, detail="Password is not correct") | |
else: | |
from datetime import timedelta | |
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
from sql_app.app_utils import create_access_token | |
access_token = create_access_token( | |
data={"sub": user.username}, expires_delta=access_token_expires) | |
return {"access_token": access_token, "token_type": "Bearer"} |
To run the server, simply click on the run button from IDE, or by command line:
uvicorn main:app --reload
Please check the full sourcecode from github .
Happy coding~~~
Top comments (1)
Where is UserAuthenticate?