DEV Community

Joseph utuedeye
Joseph utuedeye

Posted on

What's an ORM? (Your Database Translator!) 🗣️

ORMYou've learned that SQL is the language to talk to relational databases like MySQL. You've also seen how NoSQL databases like MongoDB store data differently.

Writing raw SQL or specific NoSQL queries in your Python code can get repetitive and complex, especially as your application grows. This is where an ORM (Object-Relational Mapper) comes in handy!

An ORM acts as a translator and an abstraction layer between your Python code (your "objects") and your database (your "relational tables" or "NoSQL documents"). Instead of writing SQL or low-level NoSQL commands, you interact with your database using familiar Python objects and methods.

Benefits of using an ORM:

  • Pythonic Code: You write Python code instead of mixing Python and SQL/NoSQL queries.
  • Reduced Boilerplate: ORMs often handle common tasks (like converting Python data types to database types) automatically.
  • Database Agnostic (for some ORMs): Some ORMs (like SQLAlchemy) allow you to switch between different SQL databases (MySQL, PostgreSQL, SQLite) with minimal code changes.
  • Improved Readability: Your code often becomes cleaner and easier to understand.
  • Error Prevention: ORMs can help catch common database-related errors during development.

ORMs for MySQL: SQLAlchemy 🐘

For relational databases like MySQL, SQLAlchemy is arguably the most widely used and powerful ORM in Python. It's known for its flexibility, allowing you to use it at a high level (ORM style) or a lower level (SQL Expression Language).

Setting Up SQLAlchemy for MySQL

First, you'll need to install SQLAlchemy and a MySQL driver (like mysql-connector-python or PyMySQL). We'll use PyMySQL here.

pip install SQLAlchemy pymysql
Enter fullscreen mode Exit fullscreen mode

SQLAlchemy Example: A Simple User Table

Let's imagine we want to store User information (id, name, email).

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base

# 1. Connect to the Database
# Replace 'root:password@localhost:3306/mydatabase' with your MySQL connection string
# 'mysql+pymysql' specifies the dialect and driver
DATABASE_URL = "mysql+pymysql://root:password@localhost:3306/mydatabase"
engine = create_engine(DATABASE_URL)

# Base class for our declarative models
Base = declarative_base()

# 2. Define Your Model (Table Structure)
class User(Base):
    __tablename__ = "users" # This is the name of your table in MySQL

    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False) # String with max length 50, cannot be empty
    email = Column(String(100), unique=True, nullable=False) # Unique email

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

# 3. Create the Table (if it doesn't exist)
# This command goes to MySQL and creates the 'users' table based on our User class
Base.metadata.create_all(engine)

# 4. Create a Session (Your conversation with the DB)
# A Session is how you perform database operations (add, query, update, delete)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# --- How to Use It (CRUD Operations) ---

def create_user(name: str, email: str):
    db = SessionLocal() # Get a new session
    try:
        new_user = User(name=name, email=email)
        db.add(new_user) # Add the new user object to the session
        db.commit() # Commit the transaction (save changes to DB)
        db.refresh(new_user) # Refresh the object to get its generated ID
        print(f"Created user: {new_user}")
        return new_user
    finally:
        db.close() # Always close the session

def get_users():
    db = SessionLocal()
    try:
        users = db.query(User).all() # Query all users
        print("All users:")
        for user in users:
            print(f"- {user.name} ({user.email})")
        return users
    finally:
        db.close()

def update_user_email(user_id: int, new_email: str):
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.id == user_id).first() # Find user by ID
        if user:
            user.email = new_email # Update the attribute
            db.commit()
            db.refresh(user)
            print(f"Updated user {user.name}'s email to {user.email}")
            return user
        else:
            print(f"User with ID {user_id} not found.")
            return None
    finally:
        db.close()

def delete_user(user_id: int):
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            db.delete(user) # Delete the user object
            db.commit()
            print(f"Deleted user: {user.name}")
        else:
            print(f"User with ID {user_id} not found.")
    finally:
        db.close()

# --- Run the examples ---
if __name__ == "__main__":
    print("--- Creating Users ---")
    user1 = create_user("Alice Smith", "alice@example.com")
    user2 = create_user("Bob Johnson", "bob@example.com")

    print("\n--- Getting All Users ---")
    get_users()

    print("\n--- Updating a User ---")
    if user1: # Check if user1 was successfully created
        update_user_email(user1.id, "alice.new@example.com")

    print("\n--- Getting All Users After Update ---")
    get_users()

    print("\n--- Deleting a User ---")
    if user2:
        delete_user(user2.id)

    print("\n--- Getting All Users After Delete ---")
    get_users()
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • create_engine: Establishes the connection to your MySQL database.
  • Base = declarative_base(): Creates a base class for your models.
  • class User(Base): Defines your Python class that maps to a MySQL table. Each attribute (id, name, email) maps to a column.
  • Base.metadata.create_all(engine): This is the magic that tells SQLAlchemy to create the users table in your MySQL database if it doesn't already exist, based on your User model.
  • SessionLocal(): Creates a "session." This is your temporary workspace where you stage changes (add, update, delete) before committing them to the actual database.
  • db.add(), db.commit(), db.delete(), db.query().all(), db.query().filter().first(): These are the ORM methods that translate into SQL INSERT, SELECT, UPDATE, and DELETE commands.

ORMs for MongoDB: MongoEngine 🌿

MongoDB is a NoSQL database that stores data in flexible, JSON-like documents within collections, not rigid tables. While Pymongo is the official low-level driver, MongoEngine provides an ORM-like experience, letting you define your document structure using Python classes.

Setting Up MongoEngine for MongoDB

First, install MongoEngine:

pip install mongoengine
Enter fullscreen mode Exit fullscreen mode

You'll also need a running MongoDB server (e.g., local installation or a cloud service like MongoDB Atlas).

MongoEngine Example: A Simple Post Document

Let's define a Post document (like a blog post) that has a title, content, and a list of tags.

from mongoengine import Document, StringField, ListField, connect

# 1. Connect to the Database
# Connect to a MongoDB database named 'mydatabase' running on localhost:27017
# Replace with your MongoDB connection string if it's different
connect(db="mydatabase", host="mongodb://localhost:27017/mydatabase")

# 2. Define Your Document (Collection Structure)
class Post(Document):
    # This class maps to a collection named 'post' in MongoDB by default
    title = StringField(required=True, max_length=200) # Required string, max length 200
    content = StringField()
    tags = ListField(StringField()) # A list where each item is a string

    def __repr__(self):
        return f"<Post(title='{self.title}')>"

# --- How to Use It (CRUD Operations) ---

def create_post(title: str, content: str, tags: list = None):
    new_post = Post(title=title, content=content, tags=tags or [])
    new_post.save() # Saves the document to the MongoDB collection
    print(f"Created post: {new_post.title} (ID: {new_post.id})")
    return new_post

def get_all_posts():
    posts = Post.objects() # 'objects()' is like a query manager for the Post collection
    print("All posts:")
    for post in posts:
        print(f"- {post.title} (Tags: {', '.join(post.tags)})")
    return list(posts)

def find_posts_by_tag(tag: str):
    posts = Post.objects(tags=tag) # Filter documents where 'tags' list contains 'tag'
    print(f"Posts with tag '{tag}':")
    for post in posts:
        print(f"- {post.title}")
    return list(posts)

def update_post_content(post_id, new_content: str):
    # Retrieve by ID (MongoEngine IDs are ObjectId objects)
    # You'd typically get the ObjectId from a previous operation or from the client
    # For simplicity, we'll assume post_id is a string representation of ObjectId here
    try:
        post = Post.objects.get(id=post_id)
        post.content = new_content
        post.save() # Saves the changes to the document
        print(f"Updated post '{post.title}' content.")
        return post
    except Post.DoesNotExist:
        print(f"Post with ID {post_id} not found.")
        return None

def delete_post(post_id):
    try:
        post = Post.objects.get(id=post_id)
        post.delete() # Deletes the document
        print(f"Deleted post: '{post.title}'")
    except Post.DoesNotExist:
        print(f"Post with ID {post_id} not found.")

# --- Run the examples ---
if __name__ == "__main__":
    print("--- Creating Posts ---")
    post1 = create_post("My First Blog", "This is the content of my first post.", ["python", "beginners"])
    post2 = create_post("Database Basics", "All about SQL and NoSQL.", ["databases", "education"])
    post3 = create_post("FastAPI Magic", "Using decorators effectively.", ["fastapi", "python"])

    print("\n--- Getting All Posts ---")
    get_all_posts()

    print("\n--- Finding Posts by Tag ---")
    find_posts_by_tag("python")

    print("\n--- Updating a Post ---")
    if post1:
        # Note: MongoEngine's ID is an ObjectId. You'd usually pass it as a string
        # from a retrieved object, or from the client's request.
        update_post_content(post1.id, "This is the *updated* content of my first post!")

    print("\n--- Deleting a Post ---")
    if post3:
        delete_post(post3.id)

    print("\n--- Getting All Posts After Updates and Deletes ---")
    get_all_posts()
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • connect(): Establishes the connection to your MongoDB server.
  • class Post(Document): Defines your Python class that maps to a MongoDB collection. Each attribute (title, content, tags) maps to a field in a document.
  • new_post.save(): Saves a new document or updates an existing one if the id is present.
  • Post.objects(): This is the entry point for querying the Post collection.
  • Post.objects.get(id=...): Retrieves a single document by its ID.
  • post.delete(): Deletes the specific document object.

Conclusion: Making Database Interactions Pythonic 🐍

ORMs like SQLAlchemy for relational databases and MongoEngine for MongoDB (or Pymongo for direct interaction) are incredibly valuable tools in backend development. They allow you to work with your database using the familiar concepts of Python classes and objects, rather than raw query languages. This makes your code cleaner, more robust, and faster to develop.

While there's a small learning curve, the benefits of using an ORM quickly outweigh the initial effort, especially as your application grows!

Top comments (0)