DEV Community

Cover image for From Zero to MicroTwitter: A FastAPI Journey
r00t
r00t

Posted on

From Zero to MicroTwitter: A FastAPI Journey

Introduction


Ever wanted to build your own microblogging service like Twitter? In this blog post, I'll walk you through the creation of MicroTwitter, a backend project powered by . Whether you're a developer looking to learn more about FastAPI or simply curious about how social media platforms work under the hood, this post has something for you. Let's dive in!

Prerequisites

Virtual environment

Create or navigate to the root of our project.
EG

mkdir twitter && cd twitter
Enter fullscreen mode Exit fullscreen mode
python -m venv venv
Enter fullscreen mode Exit fullscreen mode
source venv/bin/activate #Linux
venv\Scripts\activate  #Windows
Enter fullscreen mode Exit fullscreen mode

Keep the virtual environment activated throughout the blog post as you will need to install multiple packages


Project Architechture

To get started, we will need to define a solid project architechture to separate everything, from our SQLALCHEMY 2.0 to our actual FASTAPI routes

.
├── docker-compose.yaml
├── Dockerfile
├── init.sql
├── README.md
├── requirements.dev.txt
├── requirements.txt
└── src
    ├── alembic
    │   ├── env.py
    │   ├── README
    │   ├── script.py.mako
    │   └── versions
    │       ├── 65272ae975a2_add_test_user.py
    │       └── fa5378814a19_initial_migration.py
    ├── alembic.ini
    ├── database
    │   ├── database.py
    │   ├── __init__.py
    │   └── utils.py
    ├── __init__.py
    ├── main.py
    ├── models
    │   ├── base.py
    │   ├── __init__.py
    │   ├── likes.py
    │   ├── media.py
    │   ├── tweets.py
    │   └── users.py
    ├── routes
    │   ├── __init__.py
    │   ├── media_route.py
    │   ├── tweet_route.py
    │   └── user_route.py
    ├── schemas
    │   ├── base_schema.py
    │   ├── exception_schema.py
    │   ├── __init__.py
    │   ├── media_schema.py
    │   ├── tweet_schema.py
    │   └── user_schema.py
    ├── setup.cfg
    ├── tests
    │   ├── conftest.py
    │   ├── __init__.py
    │   ├── test_media_route.py
    │   ├── test_tweet_route.py
    │   └── test_user_route.py
    └── utils
        ├── auth.py
        ├── exceptions.py
        ├── file_utils.py
        ├── __init__.py
        ├── loggerconf.py
        └── settings.py

10 directories, 45 files


Enter fullscreen mode Exit fullscreen mode

Although, the tree might seem large. Trust me, its better to separate each little part into another little part. It's way less confusing and makes our imports absolute which is good practice. By separating each module into different parts. we allow benefits into our life such as:

  1. Scalibity - Easier to modify and add new features to the codebase
  2. Modularity - breaking things down into different modules, where each part has a clear function
  3. Crippling Depression and not leaving the house for days - You already know the benefits!

Anyways. lets move on to our models.
In particular, we are going to create a folder called models
with the path of src/models/

.
├── base.py
├── __init__.py
├── likes.py
├── media.py
├── tweets.py
└── users.py

1 directory, 6 files
Enter fullscreen mode Exit fullscreen mode

And then create these files.
Lets start by breaking down the base.py first.

Models

Install the required packages

pip install sqlalchemy[asyncio] asyncpg
Enter fullscreen mode Exit fullscreen mode

Asyncpg is just a database interface library that is specifically designed to connect to PostgreSQL asyncronously.

Base Model

from typing import Any

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.exc import DetachedInstanceError


class Base(AsyncAttrs, DeclarativeBase):
    def __repr__(self) -> str:
        return self._repr(id=self.id)

    def _repr(self, **fields: Any) -> str:
        """
        Helper for __repr__
        """
        field_strings = list()
        at_least_one_attached_attribute = False
        for key, field in fields.items():
            try:
                field_strings.append(f"{key}={field!r}")
            except DetachedInstanceError:
                field_strings.append(f"{key}=DetachedInstanceError")
            else:
                at_least_one_attached_attribute = True
        if at_least_one_attached_attribute:
            return f"<{self.__class__.__name__}({','.join(field_strings)})>"
        return f"<{self.__class__.__name__} {id(self)}>"

Enter fullscreen mode Exit fullscreen mode

Here, we just define a way of using the repr method with ease. You will see why i put in this method later on.

Media Model

from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column

from src.models.base import Base


class Media(Base):
    __tablename__ = "media"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)

    media_path: Mapped[str]
    tweet_id: Mapped[int] = mapped_column(ForeignKey("tweets.id"), nullable=True)

    def __repr__(self):
        return self._repr(
            id=self.id,
            media_path=self.media_path,
            tweet_id=self.tweet_id,
        )
Enter fullscreen mode Exit fullscreen mode

This a simple Media Model that could be customized further with, the @validates tag in the media_path to filter out any unwanted file types.
tweet_id is just a foreign key that can be set to Null.
And as you can see here we have an easy way to use the repr method.

Like Model

from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column

from src.models.base import Base


class Like(Base):
    __tablename__ = "likes"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    tweet_id: Mapped[int] = mapped_column(ForeignKey("tweets.id"), nullable=False)

    def __repr__(self):
        return self._repr(
            id=self.id,
            user_id=self.user_id,
            tweets_id=self.tweet_id,
        )

Enter fullscreen mode Exit fullscreen mode

Very Simple Model, that has two foreign keys that cannot be set to Null

Tweet Model

from datetime import datetime
from typing import List

from sqlalchemy import ForeignKey, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship

from src.models.base import Base


class Tweet(Base):
    __tablename__ = "tweets"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    create_date: Mapped[datetime] = mapped_column(server_default=func.now())
    tweet_data: Mapped[str] = mapped_column(String(2500))
    media: Mapped[List["Media"]] = relationship(backref="tweets", cascade="all, delete")
    likes: Mapped[List["Like"]] = relationship(backref="tweets", cascade="all, delete")

    def __repr__(self):
        return self._repr(
            id=self.id,
            user_id=self.user_id,
            create_date=self.create_date,
            tweet_data=self.tweet_data,
        )

Enter fullscreen mode Exit fullscreen mode

This model is more interesting as we have new variables like create_date and new relationships that we did not see before.
Lets break things down:

  • create_date is a field that will automatically be created during the creation of a Tweet. An example is: 2023-09-23 20:37:26.444296
    • tweet_data is simple text limited to 2500 characters.
    • media and likes establish a reverse relationship to the Tweet model, so we can access the Tweet.media and Tweet.likes attributes.
    • Cascade, all - means that all Likes and Media that is associated with the Tweet will be deleted if a Tweet is deleted.

User Model

from typing import List

from sqlalchemy import Column, ForeignKey, Integer, String, Table
from sqlalchemy.orm import Mapped, mapped_column, relationship

from src.models.base import Base
from src.models.likes import Like

# Needed import for creating the media model, sqlalchemy doesn't recognize other models otherwise
from src.models.media import Media
from src.models.tweets import Tweet

user_to_user = Table(
    "user_to_user",
    Base.metadata,
    Column("follower_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("following_id", Integer, ForeignKey("users.id"), primary_key=True),
)


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, index=True)
    api_key: Mapped[str] = mapped_column(String(255))
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)

    tweets: Mapped[List["Tweet"]] = relationship(
        backref="user", cascade="all, delete-orphan"
    )
    likes: Mapped[List["Like"]] = relationship(
        backref="user", cascade="all, delete-orphan"
    )

    following: Mapped[List["None"]] = relationship(
        "User",
        secondary=user_to_user,
        primaryjoin=lambda: User.id == user_to_user.c.follower_id,
        secondaryjoin=lambda: User.id == user_to_user.c.following_id,
        backref="followers",
        lazy="selectin",
    )

    def __repr__(self):
        return self._repr(
            id=self.id,
            api_key=self.api_key,
            username=self.username,
        )

Enter fullscreen mode Exit fullscreen mode

The last model and probably the hardest to create out of all.
What we are doing here is establishinng a self-referential many-to-many relationship
Read more Here

Lets break things down:

  1. api_key is a string limited to 255 characters
  2. username is a string that is unique and and index limited to 50 characters.
  3. tweets and likes the only difference is the word orphan which is indicates that the child object is to follow the Parent object at all times. If a user gets deleted, then it makes sense to delete all his Tweets, Likes and Media, as well
  4. following is an explicit definition of a relationship, user_to_user, we need to specify primaryjoin and secondaryjoin because sqlalchemy is unable to which columns to connect with, eg follower_id or following_id, therefore we explicitly have to state the joins. The backref attribute specifies that we also want to create a reverse relationship to be able to access, User.followers and lazy=selectin allows us to have a second SELECT statement, so that all members of our related models are loaded at once!

Database package and setting up .env variables

Packages

pydantic-settings==2.0.3
loguru==0.7.2
asyncpg==0.28.0
Enter fullscreen mode Exit fullscreen mode

Setting up .env variables

To keep our secrets safe and avoid hard coding any of them into our application, we will be using pydantic_settings
Create a python file with the following path of src/settings with the following code:

from functools import lru_cache
from pathlib import Path

from pydantic_settings import BaseSettings, SettingsConfigDict

BASE_DIR = Path(__file__).resolve().parent.parent.parent
ENV_PATH = BASE_DIR / ".env"
MEDIA_PATH = BASE_DIR / "media"


class PostgresSettings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=ENV_PATH,
        env_prefix="POSTGRES_",
        env_file_encoding="utf-8",
        case_sensitive=True,
    )
    HOST: str
    PORT: str
    DB_NAME: str
    USER: str
    PASSWORD: str


class ServerSettings(BaseSettings):
    """
    Server-Side configuration for uvicorn
    """

    model_config = SettingsConfigDict(
        env_file=ENV_PATH,
        env_prefix="SERVER_",
        env_file_encoding="utf-8",
        case_sensitive=True,
    )
    LOG_LEVEL: str
    HOST: str
    PORT: str
    DEBUG: bool
    PRODUCTION: bool


class TestSettings(BaseSettings):
    """
    Testing configuration
    """

    model_config = SettingsConfigDict(
        env_file=ENV_PATH,
        env_prefix="TEST_",
        env_file_encoding="utf-8",
        case_sensitive=True,
    )

    DB_NAME: str
    API_KEY: str
    USERNAME: str


class LoggerSettings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=ENV_PATH,
        env_prefix="LOGGER_",
        env_file_encoding="utf-8",
        case_sensitive=True,
        extra="allow",
    )

    LEVEL: str
    ROTATION: str
    COMPRESSION: str
    SERIALIZE: bool
    BACKTRACE: bool


@lru_cache()
def get_pg_settings():
    return PostgresSettings()


@lru_cache()
def get_server_settings():
    return ServerSettings()


@lru_cache()
def get_test_settings():
    return TestSettings()


@lru_cache()
def get_logger_settings():
    return LoggerSettings()

Enter fullscreen mode Exit fullscreen mode

Here, BASE_DIR is pointing at the root of our project, so that we are able to collect the .env file, as well as setup the MEDIA_PATH which we'll need for storing user media later on.

Here's what the .env should look like, its case sensitive:

SERVER_HOST=localhost
SERVER_PORT=8000
SERVER_LOG_LEVEL=debug
SERVER_DEBUG=True
SERVER_PRODUCTION=False


LOGGER_LEVEL=DEBUG
LOGGER_ROTATION='1 week'
LOGGER_COMPRESSION=zip
LOGGER_BACKTRACE=True
LOGGER_SERIALIZE=True

POSTGRES_HOST=database
POSTGRES_PORT=5432
POSTGRES_DB_NAME=twitter
POSTGRES_USER=admin
POSTGRES_PASSWORD=admin

TEST_DB_NAME=test
TEST_API_KEY=APITEST
TEST_USERNAME=testuser

Enter fullscreen mode Exit fullscreen mode

These are the variables that allowed for flexibility during the creation of the project and allowed me to separate production environments and testing environments.
Great news, we probably won't have to worry about this file ever again after this configuration.

Database Folder Setup

Create a folder named database with the path of `src/database`
with the following tree:
.
├── database.py
├── __init__.py
└── utils.py

1 directory, 3 files

Enter fullscreen mode Exit fullscreen mode

Do not worry about the utils.py file, this will come in handy later on
For now, just open up the database.py file.

from typing import AsyncGenerator

from loguru import logger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from src.utils.settings import PostgresSettings, get_pg_settings

settings: PostgresSettings = get_pg_settings()

DATABASE_URL = (
    f"postgresql+asyncpg://{settings.USER}:{settings.PASSWORD}@"
    f"{settings.HOST}:{settings.PORT}/{settings.DB_NAME}"
)


engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)


async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        try:
            yield session
        except SQLAlchemyError as error:
            await session.rollback()
            logger.exception(error)

Enter fullscreen mode Exit fullscreen mode

Here's where the pydantic_settings are useful. It allowed me to easily separate POSTGRES, SERVER, LOGGING and TESTING .env variables, which means that everything is nice and tidy.
Little breakdown:

  1. get_db_session is just a generator that gives our session to us for usage in our routes and in case of an error, it rolls it back and log's the exception, therefore allowing us to debug more effectively.
    1. DATABASE_URL is needed for the engine and the alembic migrations which are coming. Now.

Alembic migrations

Make sure your virtual env is activated

Setting up

Install alembic via

pip install alembic
pip install alembic==1.12.0
Enter fullscreen mode Exit fullscreen mode

Generate an async template inside the src/ folder via:

alembic init --template async ./alembic
Enter fullscreen mode Exit fullscreen mode

*You can change the name of the folder to anything, eg ./migrations

Now that we have our alembic folder and the alembic.ini file,
let's move on to modifying them.
Open the alembic.ini file and change this prepend_sys_path
to:

prepend_sys_path = ..
Enter fullscreen mode Exit fullscreen mode

Done!
Let's move onto the alembic/env.py file.

here's what we have to set up for our first automatic migration.

import asyncio
from logging.config import fileConfig

from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

from src.database.database import DATABASE_URL

config.set_main_option("sqlalchemy.url", DATABASE_URL)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata

from src.models.base import Base
from src.models.likes import Like
from src.models.media import Media
from src.models.tweets import Tweet
from src.models.users import User

target_metadata = Base.metadata

Enter fullscreen mode Exit fullscreen mode

We needed the DATABASE_URL, so we had to setup the .env SETTINGS!
We import all the models here, because unfortunately It was generating empty migrations if i didn't.
Next run:
alembic revision --autogenerate -m "Initial migrations"
which will generate somthing like This

That's done, so let's move onto the the migration that is not auto generated:
You can obviously put any msg into there via -m

  1. Run alembic revision -m "Add Test Users"
  2. Modify the empty migrations which will be stored at alembic/versions/number_add_test_user.py to:
"""add_test_user

Revision ID: 65272ae975a2
Revises: fa5378814a19
Create Date: 2023-09-19 14:17:16.495358

"""
from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "65272ae975a2"
down_revision: Union[str, None] = "fa5378814a19"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    users_table = sa.table(
        "users", sa.column("username", sa.String), sa.column("api_key", sa.String)
    )

    op.bulk_insert(
        users_table,
        [
            {"username": "testuser1", "api_key": "test"},
            {"username": "testuser2", "api_key": "test2"},
            {"username": "testuser3", "api_key": "test3"},
        ],
    )

    users_to_users_table = sa.table(
        "user_to_user",
        sa.column("follower_id", sa.Integer),
        sa.column("following_id", sa.Integer),
    )

    op.bulk_insert(
        users_to_users_table,
        [
            {"follower_id": 1, "following_id": 3},
            {"follower_id": 3, "following_id": 1},
        ],
    )


def downgrade() -> None:
    users_table = sa.table(
        "users", sa.column("username", sa.String), sa.column("api_key", sa.String)
    )
    op.execute(
        users_table.delete().where(
            users_table.c.username.in_(["testuser1", "testuser2", "testuser3"])
        )
    )

    users_to_users_table = sa.table(
        "user_to_user",
        sa.column("follower_id", sa.Integer),
        sa.column("following_id", sa.Integer),
    )
    op.execute(
        users_to_users_table.delete().where(
            users_to_users_table.c.follower_id.in_([1, 3])
        )
    )

Enter fullscreen mode Exit fullscreen mode

This is a basic migration where we insert some test users:

  [
            {"username": "testuser1", "api_key": "test"},
            {"username": "testuser2", "api_key": "test2"},
            {"username": "testuser3", "api_key": "test3"},
        ],
Enter fullscreen mode Exit fullscreen mode

and their make them follow each other here:

            {"follower_id": 1, "following_id": 3},
            {"follower_id": 3, "following_id": 1},
Enter fullscreen mode Exit fullscreen mode

which gives us enough test data to work with and continue further on our journey!

P.S run migrations upgrade via:
alembic upgrade head

Movin on, if you are not depressed already, prepare for the punch line. We are only getting started.

Meme Image

Auth

Since, I didn't want to use Jwt auth or Oauth2 and deal with the usernames and passswords, i just created an api-key that is can be given out to your friends, family or dog, fuck knows.

Anyways, create a file in the following path: src/utils/auth.py

from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from sqlalchemy.ext.asyncio import AsyncSession

from src.database.database import get_db_session
from src.database.utils import get_user_by_api_key

API_KEY_HEADER = APIKeyHeader(name="api-key")


async def authenticate_user(
    api_key: str = Security(API_KEY_HEADER),
    session: AsyncSession = Depends(get_db_session),
):
    """Check if user exists otherwise raise errors"""
    user = await get_user_by_api_key(api_key, session)

    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="API key authentication failed",
            headers={"api-key": ""},
        )

    return user

Enter fullscreen mode Exit fullscreen mode

That is the auth.py file and let me show you the code at src/database/utils/
for the the get_user_by_api_key function:

async def get_user_by_api_key(
    api_key: str, session: AsyncSession = Depends(get_db_session)
):
    query = (
        select(User)
        .where(User.api_key == api_key)
        .options(
            selectinload(User.following),
            selectinload(User.followers),
        )
    )
    user = await session.execute(query)

    return user.scalar_one_or_none()

Enter fullscreen mode Exit fullscreen mode

Nothing too hard, we just check and see if the api_key matches any, and return the object if it matches, otherwise return nothing.

And with that note our "authentication" process is done.
Want to break your computer yet?
Yeah we are about 20% through the way:
Moving on:


First Route(Media)

Packages

fastapi==0.103.1
aiofiles==23.2.1
Enter fullscreen mode Exit fullscreen mode

Media schema

In FastAPI, we use schemas to serialize and validate the data unlike django serializers.

  1. Create the following src/schemas/base_schema.py and src/schemas/media_schema.py

Base schema

from pydantic import BaseModel, ConfigDict


class DefaultSchema(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    result: bool = True

Enter fullscreen mode Exit fullscreen mode

I just wanted to put result: True in the code, you don't technically need to include this but i'll just add it here anyway.
Media Schema

 from pydantic import BaseModel, Field

 from src.schemas.base_schema import ConfigDict, DefaultSchema


 class MediaUpload(DefaultSchema):
     id: int = Field(alias="media_id")
     model_config = ConfigDict(from_attributes=True,   populate_by_name=True)

Enter fullscreen mode Exit fullscreen mode

File Utils

src/utils/file_utils.py

ffrom pathlib import Path
from aiofiles import open
from fastapi import UploadFile

from src.utils.settings import MEDIA_PATH


async def check_or_get_filename(path: Path) -> Path:
    """
    Adds a numerical suffix to the filename if a file with the same name already exists.
    :param path: The path to check and modify.
    :return: The modified path with a numerical suffix.
    """
    original_path = path
    counter = 0

    while path.exists():
        counter += 1
        filename = f"{original_path.stem} ({counter}){original_path.suffix}"
        path = original_path.with_name(filename)
    return path


async def save_uploaded_file(uploaded_file: UploadFile) -> str:
    """
    Uploads a file and returns the relative path
    :param uploaded_file: The FastAPI UploadFile object representing the uploaded file.
    :return: The relative path to the saved file.
    :raises: Any exceptions that may occur during file upload and storage.
    """

    MEDIA_PATH.mkdir(parents=True, exist_ok=True)

    file_path = MEDIA_PATH / uploaded_file.filename
    filename = await check_or_get_filename(path=file_path)
    img_path = f"images/{filename.stem}{filename.suffix}"
    content = uploaded_file.file.read()
    async with open(filename, "wb") as file:
        await file.write(content)
    return img_path

Enter fullscreen mode Exit fullscreen mode

Code probabbly speaks for itself, but lemme just break down some things:

  • check_or_get_filename esentially performs a check and if a file exists with that username it will return a path like this:
    • 1.py already exists
    • Somebody else uploads 1.py to our server
    • Function returns 1 (1).py to separate the content with the same filenames

Anyways that's done.
Here's another meme.
Image

Creating the actual media route!

Well with all of that done, we can actually create the media route now.

from typing import Annotated, Union

from fastapi import APIRouter, Depends, HTTPException, UploadFile, status
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession

from src.database.database import get_db_session
from src.models.media import Media
from src.models.users import User
from src.schemas.media_schema import MediaUpload
from src.utils.auth import authenticate_user
from src.utils.file_utils import save_uploaded_file

router = APIRouter(prefix="/api", tags=["media_v1"])


@router.post("/medias", status_code=status.HTTP_201_CREATED, response_model=MediaUpload)
async def upload_media(
    file: UploadFile,
    user: Annotated[User, "User model obtained from the api key"] = Depends(
        authenticate_user
    ),
    session: AsyncSession = Depends(get_db_session),
):
    try:
        file = await save_uploaded_file(file)
        new_media = Media(media_path=file)
        session.add(new_media)
        await session.commit()

        return new_media
    except ValueError as exc:
        logger.exception(exc)
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))

Enter fullscreen mode Exit fullscreen mode

WTf is this?:

  • We just save the file and perform the check from the File Utils section.
  • We create a new Media object and save it to our session.
    • Catch any exceptions with the logger.exception()

Route

It will end up looking like this :p
but we still have a lot to do, and i spent too much time on writing this.
So stay tuned!
I gotta go Leetcode man.
Let me know on what i should improve in the next one.
Source Code
My Github

Top comments (0)