Introduction
Ever wanted to build your own microblogging service like Twitter? In this blog post, I'll walk you through the creation of MicroTwitter, a backend project powered by . Whether you're a developer looking to learn more about FastAPI or simply curious about how social media platforms work under the hood, this post has something for you. Let's dive in!
Prerequisites
Virtual environment
Create or navigate to the root of our project.
EG
mkdir twitter && cd twitter
python -m venv venv
source venv/bin/activate #Linux
venv\Scripts\activate #Windows
Keep the virtual environment activated throughout the blog post as you will need to install multiple packages
Project Architechture
To get started, we will need to define a solid project architechture to separate everything, from our SQLALCHEMY 2.0 to our actual FASTAPI routes
.
├── docker-compose.yaml
├── Dockerfile
├── init.sql
├── README.md
├── requirements.dev.txt
├── requirements.txt
└── src
├── alembic
│ ├── env.py
│ ├── README
│ ├── script.py.mako
│ └── versions
│ ├── 65272ae975a2_add_test_user.py
│ └── fa5378814a19_initial_migration.py
├── alembic.ini
├── database
│ ├── database.py
│ ├── __init__.py
│ └── utils.py
├── __init__.py
├── main.py
├── models
│ ├── base.py
│ ├── __init__.py
│ ├── likes.py
│ ├── media.py
│ ├── tweets.py
│ └── users.py
├── routes
│ ├── __init__.py
│ ├── media_route.py
│ ├── tweet_route.py
│ └── user_route.py
├── schemas
│ ├── base_schema.py
│ ├── exception_schema.py
│ ├── __init__.py
│ ├── media_schema.py
│ ├── tweet_schema.py
│ └── user_schema.py
├── setup.cfg
├── tests
│ ├── conftest.py
│ ├── __init__.py
│ ├── test_media_route.py
│ ├── test_tweet_route.py
│ └── test_user_route.py
└── utils
├── auth.py
├── exceptions.py
├── file_utils.py
├── __init__.py
├── loggerconf.py
└── settings.py
10 directories, 45 files
Although, the tree might seem large. Trust me, its better to separate each little part into another little part. It's way less confusing and makes our imports absolute which is good practice. By separating each module into different parts. we allow benefits into our life such as:
- Scalibity - Easier to modify and add new features to the codebase
- Modularity - breaking things down into different modules, where each part has a clear function
- Crippling Depression and not leaving the house for days - You already know the benefits!
Anyways. lets move on to our models.
In particular, we are going to create a folder called models
with the path of src/models/
.
├── base.py
├── __init__.py
├── likes.py
├── media.py
├── tweets.py
└── users.py
1 directory, 6 files
And then create these files.
Lets start by breaking down the base.py first.
Models
Install the required packages
pip install sqlalchemy[asyncio] asyncpg
Asyncpg is just a database interface library that is specifically designed to connect to PostgreSQL
asyncronously.
Base Model
from typing import Any
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.exc import DetachedInstanceError
class Base(AsyncAttrs, DeclarativeBase):
def __repr__(self) -> str:
return self._repr(id=self.id)
def _repr(self, **fields: Any) -> str:
"""
Helper for __repr__
"""
field_strings = list()
at_least_one_attached_attribute = False
for key, field in fields.items():
try:
field_strings.append(f"{key}={field!r}")
except DetachedInstanceError:
field_strings.append(f"{key}=DetachedInstanceError")
else:
at_least_one_attached_attribute = True
if at_least_one_attached_attribute:
return f"<{self.__class__.__name__}({','.join(field_strings)})>"
return f"<{self.__class__.__name__} {id(self)}>"
Here, we just define a way of using the repr method with ease. You will see why i put in this method later on.
Media Model
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from src.models.base import Base
class Media(Base):
__tablename__ = "media"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
media_path: Mapped[str]
tweet_id: Mapped[int] = mapped_column(ForeignKey("tweets.id"), nullable=True)
def __repr__(self):
return self._repr(
id=self.id,
media_path=self.media_path,
tweet_id=self.tweet_id,
)
This a simple Media Model that could be customized further with, the @validates
tag in the media_path to filter out any unwanted file types.
tweet_id
is just a foreign key that can be set to Null.
And as you can see here we have an easy way to use the repr method.
Like Model
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from src.models.base import Base
class Like(Base):
__tablename__ = "likes"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
tweet_id: Mapped[int] = mapped_column(ForeignKey("tweets.id"), nullable=False)
def __repr__(self):
return self._repr(
id=self.id,
user_id=self.user_id,
tweets_id=self.tweet_id,
)
Very Simple Model, that has two foreign keys that cannot be set to Null
Tweet Model
from datetime import datetime
from typing import List
from sqlalchemy import ForeignKey, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.models.base import Base
class Tweet(Base):
__tablename__ = "tweets"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
create_date: Mapped[datetime] = mapped_column(server_default=func.now())
tweet_data: Mapped[str] = mapped_column(String(2500))
media: Mapped[List["Media"]] = relationship(backref="tweets", cascade="all, delete")
likes: Mapped[List["Like"]] = relationship(backref="tweets", cascade="all, delete")
def __repr__(self):
return self._repr(
id=self.id,
user_id=self.user_id,
create_date=self.create_date,
tweet_data=self.tweet_data,
)
This model is more interesting as we have new variables like create_date
and new relationships that we did not see before.
Lets break things down:
-
create_date
is a field that will automatically be created during the creation of a Tweet. An example is:2023-09-23 20:37:26.444296
-
tweet_data
is simple text limited to 2500 characters. -
media
andlikes
establish a reverse relationship to the Tweet model, so we can access theTweet.media
andTweet.likes
attributes. -
Cascade, all
- means that all Likes and Media that is associated with the Tweet will be deleted if a Tweet is deleted.
-
User Model
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, Table
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.models.base import Base
from src.models.likes import Like
# Needed import for creating the media model, sqlalchemy doesn't recognize other models otherwise
from src.models.media import Media
from src.models.tweets import Tweet
user_to_user = Table(
"user_to_user",
Base.metadata,
Column("follower_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("following_id", Integer, ForeignKey("users.id"), primary_key=True),
)
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
api_key: Mapped[str] = mapped_column(String(255))
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
tweets: Mapped[List["Tweet"]] = relationship(
backref="user", cascade="all, delete-orphan"
)
likes: Mapped[List["Like"]] = relationship(
backref="user", cascade="all, delete-orphan"
)
following: Mapped[List["None"]] = relationship(
"User",
secondary=user_to_user,
primaryjoin=lambda: User.id == user_to_user.c.follower_id,
secondaryjoin=lambda: User.id == user_to_user.c.following_id,
backref="followers",
lazy="selectin",
)
def __repr__(self):
return self._repr(
id=self.id,
api_key=self.api_key,
username=self.username,
)
The last model and probably the hardest to create out of all.
What we are doing here is establishinng a self-referential many-to-many relationship
Read more Here
Lets break things down:
-
api_key
is a string limited to 255 characters -
username
is a string that is unique and and index limited to 50 characters. -
tweets
andlikes
the only difference is the wordorphan
which is indicates that the child object is to follow the Parent object at all times. If a user gets deleted, then it makes sense to delete all his Tweets, Likes and Media, as well -
following
is an explicit definition of a relationship, user_to_user, we need to specifyprimaryjoin
andsecondaryjoin
because sqlalchemy is unable to which columns to connect with, egfollower_id
orfollowing_id
, therefore we explicitly have to state the joins. Thebackref
attribute specifies that we also want to create a reverse relationship to be able to access,User.followers
andlazy=selectin
allows us to have asecond
SELECT
statement, so that all members of our related models are loaded at once!
Database package and setting up .env variables
Packages
pydantic-settings==2.0.3
loguru==0.7.2
asyncpg==0.28.0
Setting up .env variables
To keep our secrets safe and avoid hard coding any of them into our application, we will be using pydantic_settings
Create a python file with the following path of src/settings
with the following code:
from functools import lru_cache
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
BASE_DIR = Path(__file__).resolve().parent.parent.parent
ENV_PATH = BASE_DIR / ".env"
MEDIA_PATH = BASE_DIR / "media"
class PostgresSettings(BaseSettings):
model_config = SettingsConfigDict(
env_file=ENV_PATH,
env_prefix="POSTGRES_",
env_file_encoding="utf-8",
case_sensitive=True,
)
HOST: str
PORT: str
DB_NAME: str
USER: str
PASSWORD: str
class ServerSettings(BaseSettings):
"""
Server-Side configuration for uvicorn
"""
model_config = SettingsConfigDict(
env_file=ENV_PATH,
env_prefix="SERVER_",
env_file_encoding="utf-8",
case_sensitive=True,
)
LOG_LEVEL: str
HOST: str
PORT: str
DEBUG: bool
PRODUCTION: bool
class TestSettings(BaseSettings):
"""
Testing configuration
"""
model_config = SettingsConfigDict(
env_file=ENV_PATH,
env_prefix="TEST_",
env_file_encoding="utf-8",
case_sensitive=True,
)
DB_NAME: str
API_KEY: str
USERNAME: str
class LoggerSettings(BaseSettings):
model_config = SettingsConfigDict(
env_file=ENV_PATH,
env_prefix="LOGGER_",
env_file_encoding="utf-8",
case_sensitive=True,
extra="allow",
)
LEVEL: str
ROTATION: str
COMPRESSION: str
SERIALIZE: bool
BACKTRACE: bool
@lru_cache()
def get_pg_settings():
return PostgresSettings()
@lru_cache()
def get_server_settings():
return ServerSettings()
@lru_cache()
def get_test_settings():
return TestSettings()
@lru_cache()
def get_logger_settings():
return LoggerSettings()
Here, BASE_DIR is pointing at the root of our project, so that we are able to collect the .env file, as well as setup the MEDIA_PATH which we'll need for storing user media later on.
Here's what the .env should look like, its case sensitive:
SERVER_HOST=localhost
SERVER_PORT=8000
SERVER_LOG_LEVEL=debug
SERVER_DEBUG=True
SERVER_PRODUCTION=False
LOGGER_LEVEL=DEBUG
LOGGER_ROTATION='1 week'
LOGGER_COMPRESSION=zip
LOGGER_BACKTRACE=True
LOGGER_SERIALIZE=True
POSTGRES_HOST=database
POSTGRES_PORT=5432
POSTGRES_DB_NAME=twitter
POSTGRES_USER=admin
POSTGRES_PASSWORD=admin
TEST_DB_NAME=test
TEST_API_KEY=APITEST
TEST_USERNAME=testuser
These are the variables that allowed for flexibility during the creation of the project and allowed me to separate production environments and testing environments.
Great news, we probably won't have to worry about this file ever again after this configuration.
Database Folder Setup
Create a folder named database with the path of `src/database`
with the following tree:
.
├── database.py
├── __init__.py
└── utils.py
1 directory, 3 files
Do not worry about the utils.py file, this will come in handy later on
For now, just open up the database.py file.
from typing import AsyncGenerator
from loguru import logger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from src.utils.settings import PostgresSettings, get_pg_settings
settings: PostgresSettings = get_pg_settings()
DATABASE_URL = (
f"postgresql+asyncpg://{settings.USER}:{settings.PASSWORD}@"
f"{settings.HOST}:{settings.PORT}/{settings.DB_NAME}"
)
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
except SQLAlchemyError as error:
await session.rollback()
logger.exception(error)
Here's where the pydantic_settings are useful. It allowed me to easily separate POSTGRES, SERVER, LOGGING and TESTING .env variables, which means that everything is nice and tidy.
Little breakdown:
-
get_db_session
is just a generator that gives our session to us for usage in ourroutes
and in case of an error, it rolls it back and log's the exception, therefore allowing us to debug more effectively.- DATABASE_URL is needed for the engine and the alembic migrations which are coming. Now.
Alembic migrations
Make sure your virtual env is activated
Setting up
Install alembic via
pip install alembic
pip install alembic==1.12.0
Generate an async template inside the src/
folder via:
alembic init --template async ./alembic
*You can change the name of the folder to anything, eg ./migrations
Now that we have our alembic folder and the alembic.ini file,
let's move on to modifying them.
Open the alembic.ini
file and change this prepend_sys_path
to:
prepend_sys_path = ..
Done!
Let's move onto the alembic/env.py
file.
here's what we have to set up for our first automatic migration.
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
from src.database.database import DATABASE_URL
config.set_main_option("sqlalchemy.url", DATABASE_URL)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from src.models.base import Base
from src.models.likes import Like
from src.models.media import Media
from src.models.tweets import Tweet
from src.models.users import User
target_metadata = Base.metadata
We needed the DATABASE_URL, so we had to setup the .env SETTINGS!
We import all the models here, because unfortunately It was generating empty migrations if i didn't.
Next run:
alembic revision --autogenerate -m "Initial migrations"
which will generate somthing like This
That's done, so let's move onto the the migration that is not auto generated:
You can obviously put any msg into there via -m
- Run
alembic revision -m "Add Test Users"
- Modify the empty migrations which will be stored at
alembic/versions/number_add_test_user.py
to:
"""add_test_user
Revision ID: 65272ae975a2
Revises: fa5378814a19
Create Date: 2023-09-19 14:17:16.495358
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "65272ae975a2"
down_revision: Union[str, None] = "fa5378814a19"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
users_table = sa.table(
"users", sa.column("username", sa.String), sa.column("api_key", sa.String)
)
op.bulk_insert(
users_table,
[
{"username": "testuser1", "api_key": "test"},
{"username": "testuser2", "api_key": "test2"},
{"username": "testuser3", "api_key": "test3"},
],
)
users_to_users_table = sa.table(
"user_to_user",
sa.column("follower_id", sa.Integer),
sa.column("following_id", sa.Integer),
)
op.bulk_insert(
users_to_users_table,
[
{"follower_id": 1, "following_id": 3},
{"follower_id": 3, "following_id": 1},
],
)
def downgrade() -> None:
users_table = sa.table(
"users", sa.column("username", sa.String), sa.column("api_key", sa.String)
)
op.execute(
users_table.delete().where(
users_table.c.username.in_(["testuser1", "testuser2", "testuser3"])
)
)
users_to_users_table = sa.table(
"user_to_user",
sa.column("follower_id", sa.Integer),
sa.column("following_id", sa.Integer),
)
op.execute(
users_to_users_table.delete().where(
users_to_users_table.c.follower_id.in_([1, 3])
)
)
This is a basic migration where we insert some test users:
[
{"username": "testuser1", "api_key": "test"},
{"username": "testuser2", "api_key": "test2"},
{"username": "testuser3", "api_key": "test3"},
],
and their make them follow each other here:
{"follower_id": 1, "following_id": 3},
{"follower_id": 3, "following_id": 1},
which gives us enough test data to work with and continue further on our journey!
P.S run migrations upgrade via:
alembic upgrade head
Movin on, if you are not depressed already, prepare for the punch line. We are only getting started.
Auth
Since, I didn't want to use Jwt auth or Oauth2 and deal with the usernames and passswords, i just created an api-key that is can be given out to your friends, family or dog, fuck knows.
Anyways, create a file in the following path: src/utils/auth.py
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.database import get_db_session
from src.database.utils import get_user_by_api_key
API_KEY_HEADER = APIKeyHeader(name="api-key")
async def authenticate_user(
api_key: str = Security(API_KEY_HEADER),
session: AsyncSession = Depends(get_db_session),
):
"""Check if user exists otherwise raise errors"""
user = await get_user_by_api_key(api_key, session)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key authentication failed",
headers={"api-key": ""},
)
return user
That is the auth.py file and let me show you the code at src/database/utils/
for the the get_user_by_api_key
function:
async def get_user_by_api_key(
api_key: str, session: AsyncSession = Depends(get_db_session)
):
query = (
select(User)
.where(User.api_key == api_key)
.options(
selectinload(User.following),
selectinload(User.followers),
)
)
user = await session.execute(query)
return user.scalar_one_or_none()
Nothing too hard, we just check and see if the api_key matches any, and return the object if it matches, otherwise return nothing.
And with that note our "authentication" process is done.
Want to break your computer yet?
Yeah we are about 20% through the way:
Moving on:
First Route(Media)
Packages
fastapi==0.103.1
aiofiles==23.2.1
Media schema
In FastAPI, we use schemas
to serialize and validate the data unlike django serializers.
- Create the following
src/schemas/base_schema.py
andsrc/schemas/media_schema.py
Base schema
from pydantic import BaseModel, ConfigDict
class DefaultSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
result: bool = True
I just wanted to put result: True in the code, you don't technically need to include this but i'll just add it here anyway.
Media Schema
from pydantic import BaseModel, Field
from src.schemas.base_schema import ConfigDict, DefaultSchema
class MediaUpload(DefaultSchema):
id: int = Field(alias="media_id")
model_config = ConfigDict(from_attributes=True, populate_by_name=True)
File Utils
src/utils/file_utils.py
ffrom pathlib import Path
from aiofiles import open
from fastapi import UploadFile
from src.utils.settings import MEDIA_PATH
async def check_or_get_filename(path: Path) -> Path:
"""
Adds a numerical suffix to the filename if a file with the same name already exists.
:param path: The path to check and modify.
:return: The modified path with a numerical suffix.
"""
original_path = path
counter = 0
while path.exists():
counter += 1
filename = f"{original_path.stem} ({counter}){original_path.suffix}"
path = original_path.with_name(filename)
return path
async def save_uploaded_file(uploaded_file: UploadFile) -> str:
"""
Uploads a file and returns the relative path
:param uploaded_file: The FastAPI UploadFile object representing the uploaded file.
:return: The relative path to the saved file.
:raises: Any exceptions that may occur during file upload and storage.
"""
MEDIA_PATH.mkdir(parents=True, exist_ok=True)
file_path = MEDIA_PATH / uploaded_file.filename
filename = await check_or_get_filename(path=file_path)
img_path = f"images/{filename.stem}{filename.suffix}"
content = uploaded_file.file.read()
async with open(filename, "wb") as file:
await file.write(content)
return img_path
Code probabbly speaks for itself, but lemme just break down some things:
-
check_or_get_filename
esentially performs a check and if a file exists with that username it will return a path like this:- 1.py already exists
- Somebody else uploads 1.py to our server
- Function returns 1 (1).py to separate the content with the same filenames
Anyways that's done.
Here's another meme.
Creating the actual media route!
Well with all of that done, we can actually create the media route now.
from typing import Annotated, Union
from fastapi import APIRouter, Depends, HTTPException, UploadFile, status
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.database import get_db_session
from src.models.media import Media
from src.models.users import User
from src.schemas.media_schema import MediaUpload
from src.utils.auth import authenticate_user
from src.utils.file_utils import save_uploaded_file
router = APIRouter(prefix="/api", tags=["media_v1"])
@router.post("/medias", status_code=status.HTTP_201_CREATED, response_model=MediaUpload)
async def upload_media(
file: UploadFile,
user: Annotated[User, "User model obtained from the api key"] = Depends(
authenticate_user
),
session: AsyncSession = Depends(get_db_session),
):
try:
file = await save_uploaded_file(file)
new_media = Media(media_path=file)
session.add(new_media)
await session.commit()
return new_media
except ValueError as exc:
logger.exception(exc)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
WTf is this?:
- We just save the file and perform the check from the File Utils section.
- We create a new Media object and save it to our session.
- Catch any exceptions with the logger.exception()
It will end up looking like this :p
but we still have a lot to do, and i spent too much time on writing this.
So stay tuned!
I gotta go Leetcode man.
Let me know on what i should improve in the next one.
Source Code
My Github
Top comments (0)