DEV Community

Cover image for Simple Ranking with SQLAlchemy
Kajiru
Kajiru

Posted on

Simple Ranking with SQLAlchemy

Simple Ranking with SQLAlchemy

In this article, we will create a simple ranking system using SQLAlchemy.

SQLite manages a single database as a single file.

To inspect the generated database file, it is convenient to use dedicated software such as:

The complete source code is shown at the end.

1. Preparing the Modules

In your working folder, prepare main.py for execution and util_db.py to collect all database-related logic.

The folder structure is as follows:

# Folder structure
working_folder/
 ├ main.py      # Main program
 └ util_db.py   # Database utility
Enter fullscreen mode Exit fullscreen mode

2. Installing SQLAlchemy

SQLAlchemy is a third-party module, so it must be installed separately.

# Command (Mac)
$ python3 -m pip install sqlalchemy
Enter fullscreen mode Exit fullscreen mode
# Command (Windows)
$ python -m pip install sqlalchemy
Enter fullscreen mode Exit fullscreen mode

3. CRUD Pattern with SQLAlchemy

CRUD stands for Create, Read, Update, and Delete, the four basic database operations.

In util_db.py, import the required modules and define the MyDB class.

In the constructor, the database file name is received as an argument, and create_engine() is used to create the SQLAlchemy engine.

(If echo=True is specified, executed SQL statements will be printed to the console.)

At the same time, sessionmaker() is used to prepare a session factory for database access.

Finally, passing the engine to create_all() creates the tables.

(Only tables that do not already exist in the database will be created.)

# util_db.py (excerpt)
def __init__(self, path):
    """ Constructor """
    self.dir_base = os.path.dirname(__file__)
    self.db_path = "sqlite:///" + os.path.join(self.dir_base, path)
    # Engine, Database
    self.db_engine = create_engine(self.db_path, echo=True)
    self.db_session = sessionmaker(bind=self.db_engine)
    Base.metadata.create_all(self.db_engine)  # Create tables
Enter fullscreen mode Exit fullscreen mode

The initial state of the MyDB class is shown below.

# util_db.py (excerpt)
import datetime
import os
from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    select,
    String
)
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

class MyDB():

    def __init__(self, path):
        """ Constructor """
        self.dir_base = os.path.dirname(__file__)
        self.db_path = "sqlite:///" + os.path.join(self.dir_base, path)
        # Engine, Session
        self.db_engine = create_engine(self.db_path, echo=True)
        self.db_session = sessionmaker(bind=self.db_engine)
        Base.metadata.create_all(self.db_engine)  # Create tables

    # CRUD(Create)
    def insert_record(self, name, comment, score):
        """ Insert a record """
        pass

    # CRUD(Read)
    def read_records(self, limit=10):
        """ Read records (up to limit) """
        pass

    def read_record(self, uid):
        """ Read one record """
        pass

    # CRUD(Update)
    def update_record(self, uid, name, comment, score):
        """ Update one record """
        pass

    # CRUD(Delete)
    def delete_record(self, uid):
        """ Delete one record """
        pass

    # Time
    def get_time(self):
        """ Get current datetime """
        return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
Enter fullscreen mode Exit fullscreen mode

4. Defining the ORM

SQLAlchemy accesses databases using a mechanism called ORM (Object Relational Mapping).

(It is also possible to use raw SQL.)

First, define a Record class in util_db.py that inherits from declarative_base().

This class is designed for a simple ranking system, with the table name set to ranking and the following simple columns:

Column name Meaning Example
uid Sequential ID 1, 2, 3, ...
name User name Alex
comment Comment I'm champion!!
score Score 123
time_stamp Created time 1970/01/01 00:00:00
# util_db.py (excerpt)
# ORM
Base = declarative_base()
class Record(Base):

    # Table
    __tablename__ = "ranking"
    # ID
    uid = Column(Integer, primary_key=True, autoincrement=True)
    # Name
    name = Column(String(12), nullable=False)
    # Comment
    comment = Column(String(24), nullable=False)
    # Score
    score = Column(Integer, nullable=False)
    # Timestamp
    time_stamp = Column(String(24), nullable=False)

    def __str__(self):
        return "uid:{0}, name:{1}, comment:{2}, score:{3}, time_stamp:{4}".format(
            self.uid, self.name, self.comment, self.score, self.time_stamp)
Enter fullscreen mode Exit fullscreen mode

5. Implementing CRUD Operations

Now we implement the CRUD operations in the MyDB class.

5-1. Inserting Data (Create)

This method inserts data into the table.

Pay attention to with self.db_session() as session:.

Here, a temporary session is created, data is added and committed, and the session is automatically closed after leaving the scope.

# util_db.py (excerpt)
# CRUD(Create)
def insert_record(self, name, comment, score):
    """ Insert a record """
    print("insert_record:", name, comment)
    record = Record(
        name=name,
        comment=comment,
        score=score,
        time_stamp=self.get_time())
    with self.db_session() as session:
        session.add(record)
        session.commit()
        return record.uid
Enter fullscreen mode Exit fullscreen mode

5-2. Reading Data (Read)

This method retrieves data from the table.

In read_records(), the top limit records are retrieved in descending order by score.

# util_db.py (excerpt)
# CRUD(Read)
def read_records(self, limit=10):
    """ Read records (up to limit) """
    print("read_records:", limit)
    with self.db_session() as session:
        stmt = select(Record).order_by(Record.score.desc()).limit(limit)
        return session.scalars(stmt).all()
Enter fullscreen mode Exit fullscreen mode

In read_record(), a single record is retrieved.

As shown below, session.get(Record, uid) fetches the record corresponding to uid.

# util_db.py (excerpt)
def read_record(self, uid):
    """ Read one record """
    print("read_record:", uid)
    with self.db_session() as session:
        return session.get(Record, uid)
Enter fullscreen mode Exit fullscreen mode

5-3. Updating Data (Update)

This method updates an existing record.

First, the record is retrieved using session.get(Record, uid), then the object’s attributes are updated directly.

After updating, commit() is called.

# util_db.py (excerpt)
# CRUD(Update)
def update_record(self, uid, name, comment, score):
    """ Update one record """
    print("update_record:", uid)
    with self.db_session() as session:
        record = session.get(Record, uid)
        if record is None:
            return -1
        record.name = name
        record.comment = comment
        record.score = score
        record.time_stamp = self.get_time()
        session.commit()
        return record.uid
Enter fullscreen mode Exit fullscreen mode

5-4. Deleting Data (Delete)

This method deletes a record from the table.

session.delete(record) removes the record, and commit() finalizes the deletion.

# util_db.py (excerpt)
# CRUD(Delete)
def delete_record(self, uid):
    """ Delete one record """
    print("delete_record:", uid)
    with self.db_session() as session:
        record = session.get(Record, uid)
        if record is None:
            return -1
        session.delete(record)
        session.commit()
        return record.uid
Enter fullscreen mode Exit fullscreen mode

Testing

The test procedure is as follows.

Here, a random name, comment, and score are generated and inserted into the table.

(By uncommenting some parts, you can also test update and delete operations.)

# main.py
# Members
names = ["Alex", "Becky", "Christy", "David"]
comments = ["Champion!!", "Nice!!", "Good!!", "OMG...!!"]

# MyDB
my_db = util_db.MyDB("data.sqlite")

# Insert (add one random record)
name = random.choice(names)
comment = random.choice(comments)
score = random.randint(30, 300)
result = my_db.insert_record(name, comment, score)
print("insert:", result)

# Update (a hacker appears and tampers with data)
# my_db.update_record(1, "Thomas", "Hi, I'm hacker!!", 777)

# Delete (delete record with uid=4)
# my_db.delete_record(4)

# Read (get one record with uid=1)
# record = my_db.read_record(1)
# print("Record:", record)

# Read (get top 5 scores)
records = my_db.read_records(limit=5)
print("= Ranking =")
for record in records:
    print("Record:", record)
Enter fullscreen mode Exit fullscreen mode

Execution Result

The output will look like this. (A hacker has taken first place!)

= Ranking =
Record: uid:1, name:Thomas, comment:Hi, I'm hacker!!, score:777, time_stamp:2026/01/10 01:57:14
Record: uid:8, name:Alex, comment:Champion!!, score:291, time_stamp:2026/01/10 00:01:19
Record: uid:33, name:Christy, comment:OMG...!!, score:263, time_stamp:2026/01/10 01:56:35
Record: uid:16, name:Becky, comment:Nice!!, score:256, time_stamp:2026/01/10 00:01:27
Record: uid:18, name:Christy, comment:OMG...!!, score:252, time_stamp:2026/01/10 00:01:29
Enter fullscreen mode Exit fullscreen mode

Complete Code

Below is the complete implementation.

# util_db.py
import datetime
import os
from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    select,
    String
)
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

class MyDB():

    def __init__(self, path):
        """ Constructor """
        self.dir_base = os.path.dirname(__file__)
        self.db_path = "sqlite:///" + os.path.join(self.dir_base, path)
        self.db_engine = create_engine(self.db_path, echo=True)
        self.db_session = sessionmaker(bind=self.db_engine)
        Base.metadata.create_all(self.db_engine)

    def insert_record(self, name, comment, score):
        """ Insert a record """
        print("insert_record:", name, comment)
        record = Record(
            name=name,
            comment=comment,
            score=score,
            time_stamp=self.get_time())
        with self.db_session() as session:
            session.add(record)
            session.commit()
            return record.uid

    def read_records(self, limit=10):
        """ Read records """
        print("read_records:", limit)
        with self.db_session() as session:
            stmt = select(Record).order_by(Record.score.desc()).limit(limit)
            return session.scalars(stmt).all()

    def read_record(self, uid):
        """ Read one record """
        print("read_record:", uid)
        with self.db_session() as session:
            return session.get(Record, uid)

    def update_record(self, uid, name, comment, score):
        """ Update one record """
        print("update_record:", uid)
        with self.db_session() as session:
            record = session.get(Record, uid)
            if record is None:
                return -1
            record.name = name
            record.comment = comment
            record.score = score
            record.time_stamp = self.get_time()
            session.commit()
            return record.uid

    def delete_record(self, uid):
        """ Delete one record """
        print("delete_record:", uid)
        with self.db_session() as session:
            record = session.get(Record, uid)
            if record is None:
                return -1
            session.delete(record)
            session.commit()
            return record.uid

    def get_time(self):
        """ Get current datetime """
        return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")

Base = declarative_base()
class Record(Base):

    __tablename__ = "ranking"
    uid = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(12), nullable=False)
    comment = Column(String(24), nullable=False)
    score = Column(Integer, nullable=False)
    time_stamp = Column(String(24), nullable=False)

    def __str__(self):
        return "uid:{0}, name:{1}, comment:{2}, score:{3}, time_stamp:{4}".format(
            self.uid, self.name, self.comment, self.score, self.time_stamp)
Enter fullscreen mode Exit fullscreen mode
# main.py
import random
import util_db

def main():

    names = ["Alex", "Becky", "Christy", "David"]
    comments = ["Champion!!", "Nice!!", "Good!!", "OMG...!!"]

    my_db = util_db.MyDB("data.sqlite")

    name = random.choice(names)
    comment = random.choice(comments)
    score = random.randint(30, 300)
    result = my_db.insert_record(name, comment, score)
    print("insert:", result)

    records = my_db.read_records(limit=5)
    print("= Ranking =")
    for record in records:
        print("Record:", record)

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Final Notes

Thank you very much for reading until the end.

SQLAlchemy is a simple and easy-to-use library, and it is definitely worth learning.

If you found this article helpful, a 👍 would be greatly appreciated!

Top comments (0)