This document explains the complete authentication design, with a strong focus on security controls and implementation details.
What This Auth System Covers
- User registration with hashed password storage
- Email OTP verification with anti-abuse controls
- Login gated by email verification
- Password reset using OTP + short-lived purpose-scoped token
- Logout with JWT revocation (token blacklist)
Security Goals
The flow is designed to prevent:
- Unauthorized login before email ownership is verified
- OTP brute-force and OTP resend spam
- Reset-token misuse outside password reset flow
- Continued use of access tokens after logout
High-Level Flow
- Register: create user (
verified=False) - Send OTP:
POST /auth/otp/send - Verify OTP:
POST /auth/otp/verify-> marks user verified - Login:
POST /auth/login-> JWT only for verified users - Forgot password:
POST /auth/otp/send-
POST /auth/verify-reset-> short-lived token (purpose=password_reset) -
POST /auth/reset-passwordwith reset token
- Logout:
POST /auth/logout-> token added to blacklist
Architecture Diagram
Registration Flow
Password Reset Flow
Core Endpoints
# backend/app/routers/auth_router.py
@router.post("/register")
def register(request: UserRegister, service: AuthService):
return service.register(request)
@router.post("/login")
def login(request: UserLogin, service: AuthService):
return service.login(request)
@router.post("/otp/send")
async def send_otp(request: OTPRequest, service: AuthService, background_tasks: BackgroundTasks):
return service.send_otp(email=request.email, background_tasks=background_tasks)
@router.post("/otp/verify")
def verify_otp(request: OTPVerifyRequest, service: AuthService):
return service.verify_otp(email=request.email, input_code=request.code)
@router.post("/verify-reset")
def forgot_password(request: OTPVerifyRequest, service: AuthService):
return service.verify_reset_otp(email=request.email, input_code=request.code)
@router.post("/reset-password")
def reset_password(request: PasswordResetRequest, user: PasswordResetUser, service: AuthService):
return service.reset_password(email=user, password=request.new_password)
@router.post("/logout")
def logout(block_token: BlockToken):
return {"message": "Logout successful"}
Request Validation
# backend/app/schemas/user_schema.py
class UserRegister(BaseModel):
email: EmailStr
password: str
confirm_password: str = Field(alias="confirmPassword")
# backend/app/schemas/otp_schema.py
class OTPRequest(BaseModel):
email: EmailStr
class OTPVerifyRequest(BaseModel):
email: EmailStr
code: str
class PasswordResetRequest(BaseModel):
new_password: str = Field(alias="newPassword")
OTP Security Model
# backend/app/otp/otp_manager.py
class OTPManager:
OTP_LENGTH = 6
OTP_EXPIRY_MINUTES = 10
MAX_ATTEMPTS = 5
MAX_RESENDS = 3
LOCKOUT_MINUTES = 15
RATE_LIMIT_SECONDS = 60
Controls implemented:
- Cryptographically secure OTP generation (
secrets) - OTP hash storage (
SHA-256) instead of plaintext - 10-minute OTP TTL
- Resend rate limit (60s)
- Max resend cap (3)
- Max failed attempts (5) + temporary lockout (15m)
Registration Logic
def register(self, user: UserRegister) -> UserResponse:
if len(user.password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
is_valid, error_msg = validate_email_address(user.email)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
if self.user_repo.is_user_exists(user.email):
raise HTTPException(status_code=400, detail="Account with this email already exists")
if user.password != user.confirm_password:
raise HTTPException(status_code=400, detail="Password and confirm password do not match")
hashed_password = hash_password(user.password)
return self.user_repo.create_user(user.email, hashed_password)
Security notes:
- Password is never stored plaintext
- User remains unverified until OTP verification succeeds
OTP Send + Verify Logic
def send_otp(self, email: EmailStr, background_tasks: BackgroundTasks) -> dict:
if self.otp_repo.is_rate_limited(email):
raise HTTPException(status_code=429, detail="Please wait before requesting another code")
if self.otp_repo.is_locked(email):
raise HTTPException(status_code=403, detail="Account locked temporarily")
existing = self.otp_repo.get_otp(email)
if existing and existing.resend_count >= OTPManager.MAX_RESENDS:
raise HTTPException(status_code=429, detail="Maximum OTP requests exceeded")
otp_code = OTPManager.generate_otp()
code_hash = OTPManager.hash_otp(otp_code)
self.otp_repo.create_or_update_otp(email, code_hash)
background_tasks.add_task(self.email_service.send_email, email_data)
return {"message": "OTP sent successfully"}
def _verify_otp_core(self, email: EmailStr, input_code: str):
if self.otp_repo.is_locked(email):
raise HTTPException(status_code=403, detail="Account is temporarily locked")
otp_record = self.otp_repo.get_otp(email)
if not otp_record:
raise HTTPException(status_code=404, detail="No OTP found for this email")
if datetime.now(timezone.utc) > otp_record.expires_at:
raise HTTPException(status_code=400, detail="OTP has expired")
if not OTPManager.verify_otp(input_code, otp_record.code_hash):
self.otp_repo.increment_attempts(email)
raise HTTPException(status_code=400, detail="Invalid OTP")
self.otp_repo.mark_verified(email)
self.otp_repo.delete_otp(email)
verify_otp uses this core and marks user email verified.
Login Logic
def login(self, user_data: UserLogin) -> TokenResponse:
user = self.user_repo.get_user_by_email(user_data.email)
if user and not user.verified:
raise HTTPException(status_code=401, detail="Email address not verified")
if not user or not verify_password(user_data.password, user.password):
raise HTTPException(status_code=401, detail="Incorrect email or password")
access_token = create_access_token(data={
"sub": user.email,
"user_id": str(user.id),
"subscription": {...}
})
return {"access_token": access_token, "token_type": "bearer", "user": user}
Security behavior:
- Unverified users are denied
- Password verification uses Argon2 helper
- JWT includes expiry claim (
exp)
Password Reset (Scoped Token)
def verify_reset_otp(self, email: EmailStr, input_code: str):
self._verify_otp_core(email, input_code)
access_token = create_access_token(
data={"email": email, "purpose": "password_reset"},
expires_delta=timedelta(minutes=10)
)
return {"message": "OTP verified", "access_token": access_token}
def get_password_reset_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
payload = decode_access_token(credentials.credentials)
if payload.get("purpose") != "password_reset":
raise HTTPException(status_code=401, detail="Invalid token purpose")
return payload.get("email")
This prevents normal access tokens from being used to reset passwords.
Logout + Token Revocation
def block_token_logout(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)) -> bool:
token = credentials.credentials
payload = decode_access_token(token)
expires_at = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
TokenBlacklistRepository(db).add_token(token, payload["user_id"], expires_at)
return True
def get_current_user(...):
payload = decode_access_token(token)
if TokenBlacklistRepository(db).is_blacklisted(token):
raise HTTPException(status_code=401, detail="Token has been revoked")
This gives immediate logout invalidation, even for unexpired JWTs.
Data Model Notes
OTP Table
-
email(PK) code_hashexpires_atlast_sent_atattemptsresend_countverifiedlocked_until- optional audit fields (
ip_address,user_agent)
Users Table
email-
password(hashed) verified
Final Takeaway
This auth flow is a strong security baseline: verification-first onboarding, abuse-resistant OTP lifecycle, scoped password reset, and revocable JWT sessions. With secret management and a few operational hardening steps, it is production-ready for security-sensitive workloads.


Top comments (0)