What is so special about Fast API?
It is the level of freedom!
- free to choose the project structure, modular, layers, or even use-cases.
- free to use SQLModel, SQLAlchemy, SQLObject, Tortoise-ORM, or even Django ORM!
- free to use Pydantic, DataClasses, Marshmallow, or even Cerberus.
- free to use Alembic, Liquibase, or maybe Jetbase.
- free to use sync or async operations.
What sucks about Fast API?
It is the level of freedom!
You might work on more than 10 FastAPI projects, and each is unique in its own way 😁
The Frankenstein I'm building
I believe you saw why I call itFrankenstein, you build your project from a choice of libraries and structures, ending up with either a well-shaped body or an ill-shaped one.
The power also comes from there; you can add as many hands, legs, and heads as you want, it is yours to build as you see fit.
I'll try to build my own Frankenstein with the best choices based on the latest FastAPI documentation and my experience 🤓
Attention Before following those choices, make sure to check the article date. If it is older than 6 months, just look for a newer guide
The choices in this article:
- Modular project structure
- SQLModel
- Alembic
- Async operations
I'll use UV Python virtual environment and write some tests.
This article will cover only project initialization; later articles will cover aspects such as authentication and module addition.
Create UV virtual environment
I believe everyone is familiar with traditional
python3 -m venv .venv
source .venv/bin/activate
So feel free to use it if you wish.
For UV, we should start by installing UV based on your operating system.
Then (named the project folder Franky)
cd franky
uv init
Now you should have main.py, readme.md, and pyproject.toml. alongside the git initialization, which you can remove if you want.
installing the libraries
uv add 'fastapi[standard]'
uv add pydantic-settings
uv add python-dotenv
uv add sqlmodel
uv add uvicorn
uv add alembic
uv add httpx
uv add pytest
uv add pytest_asyncio
uv add greenlet
Most of the library's names explain their part, to say a few words
- fastapi[standard]: installs all fast api packages, don't ignore the quotations, ''
- uvicorn: for deployment
- alembic: for database migration
- httpx: for async client testing
- greenlet: required for sqlmodel
Now you should have a project.toml file similar to mine
franky/project.toml
[project]
name = "franky"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aiosqlite>=0.22.1",
"alembic>=1.18.4",
"fastapi[standard]>=0.136.0",
"greenlet>=3.4.0",
"httpx>=0.28.1",
"pydantic-settings>=2.13.1",
"pytest>=9.0.3",
"pytest-asyncio>=1.3.0",
"python-dotenv>=1.2.2",
"sqlmodel>=0.0.38",
"uvicorn>=0.44.0",
]
To test our environment, let's steal the core FastAPI script from the docs and update our main.py
franky/main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
and test it by running
uv run fastapi dev
Core Project config, dependencies, and logging
We will create a config file that loads config from the .env file and provide it to the whole application as pydantic_settings
franky/src/core/config.py
import os
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
load_dotenv()
class Config(BaseSettings):
app_name: str = "Franky"
debug: bool = True
db_name: str = os.getenv("DB_NAME")
@property
def db_url(self):
return f"sqlite+aiosqlite:///./{self.db_name}"
config = Config()
and create an .env in the main project folder with the DB_NAME property
franky/.env
DB_NAME=db.sqlite3
Notice that we are using sqlite+aiosqlite driver to use async operations with SQLight; this depends on your database choice.
Now for the database dependency
franky/src/core/dependencies.py
from typing import Annotated
from fastapi import Depends
from src.core.config import config
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(config.db_url, connect_args={"check_same_thread": False})
SessionLocal = async_sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession
)
async def get_session() -> AsyncSession:
async with SessionLocal() as session:
try:
yield session
finally:
await session.close()
SessionDep = Annotated[AsyncSession, Depends(get_session)]
We are creating an async session to communicate with the database.
to set up logging level and format
franky/src/core/logging.py
import logging
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s"
)
Updating the main app to use our new configuration
setup_logging()
app = FastAPI(title=config.app_name)
# ...
logging chang
old logging
INFO Started reloader process [37345] using WatchFiles
INFO Started server process [37348]
INFO Waiting for application startup.
INFO Application startup complete.
new logging format
INFO Started reloader process [37384] using WatchFiles
2026-04-17 22:05:47,964 INFO [uvicorn.error] Started reloader process [37384] using
WatchFiles
INFO Started server process [37388]
2026-04-17 22:05:52,371 INFO [uvicorn.error] Started server process [37388]
INFO Waiting for application startup.
2026-04-17 22:05:52,373 INFO [uvicorn.error] Waiting for application startup.
INFO Application startup complete.
2026-04-17 22:05:52,374 INFO [uvicorn.error] Application startup complete.
The database dependency will be used later when integrating with the DB.
Exception handlers and unified response schema
The idea is to unify the response schema to something like
{
"success": true,
"message": "Operation successful",
"data": {
...
}
}
and errors to appear like
{
"success": false,
"message": "serious error occurred",
"data": null
}
We will create a middleware to unify responses. Let's begin by creating response schemas
franky/src/core/models.py
from typing import Generic, TypeVar
from pydantic import BaseModel
T = TypeVar("T")
class IResponse(BaseModel, Generic[T]):
success: bool = True
message: str = "Operation successful"
data: T | None = None
IResponse is the generic schema for all responses from the system.
the middleware
franky/src/core/middlewares.py
import json
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.concurrency import iterate_in_threadpool
from src.core.models import IResponse
class UnifiedResponseMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 1. Skip wrapping for docs
if request.url.path in ["/openapi.json", "/docs", "/redoc"]:
return await call_next(request)
response = await call_next(request)
# 2. Only wrap JSON
content_type = response.headers.get("content-type", "")
if response.status_code < 400 and "application/json" in content_type:
body = b"".join([section async for section in response.body_iterator])
if not body:
return response
try:
data = json.loads(body.decode("utf-8"))
# Check if already wrapped
if isinstance(data, dict) and "success" in data:
response.body_iterator = iterate_in_threadpool(iter([body]))
return response
# 3. Wrap
wrapped_data = IResponse(data=data).model_dump_json()
# 4. Clean Headers
headers = dict(response.headers)
headers.pop("Content-Length", None)
headers.pop("content-length", None)
return Response(
content=wrapped_data,
status_code=response.status_code,
headers=headers,
media_type="application/json"
)
except (json.JSONDecodeError, UnicodeDecodeError):
response.body_iterator = iterate_in_threadpool(iter([body]))
return response
return response
Installing a global middleware interrupts all requests and responses in the system, including those from Swagger. That is why we are checking where the request comes from
if request.url.path in ["/openapi.json", "/docs", "/redoc"]:
One of the common issues when playing with the responses is the content-length header, which is why we are setting it to None.
We need an app-level error handler to wrap all errors in this common response.
franky/src/core/exceptions.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
async def common_exception_handler(request: Request, exc: Exception):
status_code = 400
message = str(exc)
# Check if it's a Starlette/FastAPI HTTPException
if isinstance(exc, StarletteHTTPException):
status_code = exc.status_code
message = exc.detail
return JSONResponse(
status_code=status_code,
content={
"success": False,
"message": message,
"data": None
}
)
def setup_exception_handlers(app: FastAPI):
app.add_exception_handler(StarletteHTTPException, common_exception_handler)
app.add_exception_handler(RequestValidationError, common_exception_handler)
app.add_exception_handler(Exception, common_exception_handler)
setup_exception_handlers wraps StarletteHTTPException, RequestValidationError, and generic Exception with our generic response.
Updating main.py
Adding the error handler and middleware on the app level
franky/main.py
from fastapi import FastAPI
from src.core.logging import setup_logging
from src.core.config import config
from src.core.exceptions import setup_exception_handlers
from src.core.middlewares import UnifiedResponseMiddleware
setup_logging()
app = FastAPI(title=config.app_name)
setup_exception_handlers(app)
app.add_middleware(UnifiedResponseMiddleware)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/error")
async def root():
raise Exception("serious error occurred")
Test / and /error to see the responses
Simple module structure
I suggest this simple and clear modular app structure
--src
----module1
------__init__.py
------dependencies.py
------models.py
------router.py
------service.py
...
I'll create a simple appointments module as an example.
starting from models
franky/src/appointments/models.py
from datetime import datetime, UTC
from enum import Enum
from typing import Optional
from sqlmodel import SQLModel, Field
class AppointmentStatus(str, Enum):
scheduled = "scheduled"
completed = "completed"
cancelled = "cancelled"
class AppointmentBase(SQLModel):
title: str = Field(min_length=1, max_length=255)
description: Optional[str] = Field(default=None, max_length=1000)
start_time: datetime
end_time: datetime
location: Optional[str] = Field(default=None, max_length=500)
status: AppointmentStatus = AppointmentStatus.scheduled
class Appointment(AppointmentBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
class AppointmentCreate(AppointmentBase):
pass
class AppointmentUpdate(SQLModel):
title: Optional[str] = Field(default=None, min_length=1, max_length=255)
description: Optional[str] = Field(default=None, max_length=1000)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
location: Optional[str] = Field(default=None, max_length=500)
status: Optional[AppointmentStatus] = None
class AppointmentRead(AppointmentBase):
id: int
created_at: datetime
updated_at: datetime
After the models, it is time for the DB service, which just provides basic CRUD
franky/src/appointments/service.py
from datetime import datetime, UTC
from typing import Optional, Sequence
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from src.appointments.models import Appointment, AppointmentCreate, AppointmentUpdate
class AppointmentService:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def create(self, data: AppointmentCreate) -> Appointment:
appointment = Appointment.model_validate(data)
self.session.add(appointment)
await self.session.commit()
await self.session.refresh(appointment)
return appointment
async def get(self, appointment_id: int) -> Optional[Appointment]:
return await self.session.get(Appointment, appointment_id)
async def list(self, offset: int = 0, limit: int = 20) -> Sequence[Appointment]:
result = await self.session.execute(
select(Appointment).offset(offset).limit(limit)
)
return result.scalars().all()
async def update(
self, appointment_id: int, data: AppointmentUpdate
) -> Optional[Appointment]:
appointment = await self.session.get(Appointment, appointment_id)
if not appointment:
return None
updates = data.model_dump(exclude_unset=True)
for key, value in updates.items():
setattr(appointment, key, value)
appointment.updated_at = datetime.now(UTC)
self.session.add(appointment)
await self.session.commit()
await self.session.refresh(appointment)
return appointment
async def delete(self, appointment_id: int) -> bool:
appointment = await self.session.get(Appointment, appointment_id)
if not appointment:
return False
await self.session.delete(appointment)
await self.session.commit()
return True
And the dependency after the service
franky/src/appointments/dependencies.py
from typing import Annotated
from fastapi import Depends
from src.core.dependencies import SessionDep
from src.appointments.service import AppointmentService
def get_appointment_service(session: SessionDep) -> AppointmentService:
return AppointmentService(session)
AppointmentServiceDep = Annotated[AppointmentService, Depends(get_appointment_service)]
and finally the router.
franky/src/appointments/router.py
from fastapi import APIRouter, HTTPException, Query
from src.appointments.dependencies import AppointmentServiceDep
from src.appointments.models import AppointmentCreate, AppointmentRead, AppointmentUpdate
router = APIRouter(prefix="/appointments", tags=["appointments"])
@router.post("/", response_model=AppointmentRead, status_code=201)
async def create_appointment(data: AppointmentCreate, service: AppointmentServiceDep):
return await service.create(data)
@router.get("/", response_model=list[AppointmentRead])
async def list_appointments(
service: AppointmentServiceDep,
offset: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
):
return await service.list(offset=offset, limit=limit)
@router.get("/{appointment_id}", response_model=AppointmentRead)
async def get_appointment(appointment_id: int, service: AppointmentServiceDep):
appointment = await service.get(appointment_id)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
return appointment
@router.patch("/{appointment_id}", response_model=AppointmentRead)
async def update_appointment(
appointment_id: int,
data: AppointmentUpdate,
service: AppointmentServiceDep,
):
appointment = await service.update(appointment_id, data)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
return appointment
@router.delete("/{appointment_id}", status_code=204)
async def delete_appointment(appointment_id: int, service: AppointmentServiceDep):
deleted = await service.delete(appointment_id)
if not deleted:
raise HTTPException(status_code=404, detail="Appointment not found")
Great, now we have our module, let's install it in the main app.
franky/main.py
...
from src.appointments.router import router as appointments_router
setup_logging()
@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield
app = FastAPI(title=config.app_name, lifespan=lifespan)
setup_exception_handlers(app)
app.add_middleware(UnifiedResponseMiddleware)
app.include_router(appointments_router)
...
We have it set. You should be able to find the appointments endpoints with Swagger.
Notice that we added an async function to create the db if it doesn't already exist, lifespan. This is temporary until we set up our database migration on the next step 😁
Fast API database migration with Alembic
I'll cover Alembic with async db; there might be differences when dealing with sync one.
uv run alembic init -t async migrations
With Alembic initialized, we need to update the schema of generated files to include sqlmodel
fraky/migrations/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel # NEW
${imports if imports else ""}
...
and point out the new models for Alembic to track
franky/migrations/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlmodel import SQLModel #new
from alembic import context
from src.appointments.models import Appointment #new
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = SQLModel.metadata #updated
...
The final config part includes updating the ini file to point to the db
...
sqlalchemy.url = sqlite+aiosqlite:///./db.sqlite3 #updated
...
With all configurations ready, we can create a migration using
uv run alembic revision --autogenerate -m "init"
Now we have our first migration 🤩, congrats. You should be able to find it under migrations/versions/randomthing_init.py
You can edit the migration manually if required, but it is good in general.
to apply a migration
uv run alembic upgrade head
Great, now we can remove the SQLModel.metadata.create_all from the main file and use our migration to create it instead
franky/main.py
from src.appointments.router import router as appointments_router
setup_logging()
app = FastAPI(title=config.app_name)
setup_exception_handlers(app)
app.add_middleware(UnifiedResponseMiddleware)
app.include_router(appointments_router)
And make sure to issue uv run alembic upgrade head before using the app.
Fast API Tests
I'll be fast on this. We start by setting up the test config
franky/tests/conftest.py
from __future__ import annotations
import os
import uuid
# Relative path: `sqlite+aiosqlite:///./{name}` in config breaks for absolute paths.
_test_db_file = f".pytest_franky_{uuid.uuid4().hex}.sqlite"
os.environ["DB_NAME"] = _test_db_file
import pytest
from httpx import ASGITransport, AsyncClient
from sqlmodel import SQLModel
from main import app
from src.core.dependencies import engine
def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
try:
os.unlink(_test_db_file)
except OSError:
pass
@pytest.fixture(autouse=True)
async def reset_database() -> None:
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
await conn.run_sync(SQLModel.metadata.create_all)
yield
@pytest.fixture
async def client() -> AsyncClient:
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
Then, a simple test file for each module
franky/tests/test_appointments.py
from __future__ import annotations
from httpx import AsyncClient
def _sample_create_payload() -> dict:
return {
"title": "Annual checkup",
"description": "Routine visit",
"start_time": "2026-06-01T10:00:00+00:00",
"end_time": "2026-06-01T11:00:00+00:00",
"location": "Main clinic",
"status": "scheduled",
}
def _unwrap_success_json(data: dict) -> dict | list:
assert data.get("success") is True
assert "data" in data
return data["data"]
async def test_list_appointments_empty(client: AsyncClient) -> None:
response = await client.get("/appointments/")
assert response.status_code == 200
body = response.json()
assert _unwrap_success_json(body) == []
async def test_create_and_get_appointment(client: AsyncClient) -> None:
payload = _sample_create_payload()
create_res = await client.post("/appointments/", json=payload)
assert create_res.status_code == 201
created = _unwrap_success_json(create_res.json())
assert created["title"] == payload["title"]
assert created["id"] is not None
appointment_id = created["id"]
get_res = await client.get(f"/appointments/{appointment_id}")
assert get_res.status_code == 200
fetched = _unwrap_success_json(get_res.json())
assert fetched["id"] == appointment_id
assert fetched["title"] == payload["title"]
async def test_get_appointment_not_found(client: AsyncClient) -> None:
response = await client.get("/appointments/99999")
assert response.status_code == 404
body = response.json()
assert body["success"] is False
assert body["data"] is None
async def test_list_pagination(client: AsyncClient) -> None:
for i in range(3):
await client.post(
"/appointments/",
json={
**_sample_create_payload(),
"title": f"Appt {i}",
},
)
all_res = await client.get("/appointments/", params={"limit": 100})
assert len(_unwrap_success_json(all_res.json())) == 3
page = await client.get("/appointments/", params={"offset": 1, "limit": 2})
assert len(_unwrap_success_json(page.json())) == 2
async def test_update_appointment(client: AsyncClient) -> None:
create_res = await client.post("/appointments/", json=_sample_create_payload())
appointment_id = _unwrap_success_json(create_res.json())["id"]
patch_res = await client.patch(
f"/appointments/{appointment_id}",
json={"title": "Updated title", "status": "completed"},
)
assert patch_res.status_code == 200
updated = _unwrap_success_json(patch_res.json())
assert updated["title"] == "Updated title"
assert updated["status"] == "completed"
async def test_update_not_found(client: AsyncClient) -> None:
response = await client.patch(
"/appointments/99999",
json={"title": "Nope"},
)
assert response.status_code == 404
assert response.json()["success"] is False
async def test_delete_appointment(client: AsyncClient) -> None:
create_res = await client.post("/appointments/", json=_sample_create_payload())
appointment_id = _unwrap_success_json(create_res.json())["id"]
delete_res = await client.delete(f"/appointments/{appointment_id}")
assert delete_res.status_code == 204
assert delete_res.content == b""
get_res = await client.get(f"/appointments/{appointment_id}")
assert get_res.status_code == 404
async def test_delete_not_found(client: AsyncClient) -> None:
response = await client.delete("/appointments/99999")
assert response.status_code == 404
assert response.json()["success"] is False
async def test_create_validation_error(client: AsyncClient) -> None:
response = await client.post(
"/appointments/",
json={"title": ""},
)
assert response.status_code == 400
body = response.json()
assert body["success"] is False
assert body["data"] is None
and run them by issuing
uv run pytest .
It was a long one, but we have our project ready. Build a wonderful app on this 😁
I'll build a user management module for the next article. So stay tuned
The final project structure should look like
franky/
├── main.py # FastAPI app, lifespan, route registration
├── README.md
├── alembic.ini # Alembic configuration (DB URL for CLI migrations)
├── pyproject.toml # Dependencies, pytest options, and project metadata
├── uv.lock # Locked dependency versions
├── migrations/ # Alembic migration environment
│ ├── env.py # Async engine; SQLModel metadata for autogenerate
│ ├── script.py.mako # Template for new revision files
│ ├── README # Alembic-generated readme
│ └── versions/ # Revision scripts (e.g. initial schema)
├── tests/ # Pytest suite (`pythonpath` includes repo root)
│ ├── conftest.py # Test DB env, schema reset, async HTTP client fixture
│ └── test_appointments.py
└── src/
├── core/ # Shared infrastructure
│ ├── config.py # Settings (env / `.env`)
│ ├── dependencies.py # Async SQLAlchemy engine and session
│ ├── exceptions.py # Exception handlers
│ ├── logging.py
│ ├── middlewares.py # Unified JSON response wrapper
│ └── models.py # `IResponse`, pagination types
└── appointments/ # Appointments domain
├── dependencies.py # Service DI
├── models.py # SQLModel entities and request/response schemas
├── router.py # HTTP routes under `/appointments`
└── service.py # Business logic
Top comments (2)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.