DEV Community

Cover image for **8 Python Database Techniques Every Developer Needs for Scalable, Reliable Applications**
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

**8 Python Database Techniques Every Developer Needs for Scalable, Reliable Applications**

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Let me talk to you about how Python works with databases. Imagine you’re running a busy coffee shop. You have a counter where orders are taken, a barista who makes the coffee, and a system to track everything. Databases are like the organized storage room for all your coffee beans, cups, and records. Python provides the tools—the counter system, the barista's instructions, and the inventory logs—to manage that storage room efficiently and reliably. I've spent a lot of time building these systems, and I want to share some clear, practical methods that make this process smooth.

We'll look at eight specific techniques. The goal is to make your application talk to the database quickly, safely, and without confusion, whether you're handling ten users or ten thousand. I'll explain each one with straightforward examples, the kind of code I write and use myself.

First, let's discuss managing multiple conversations with your database. A web application doesn't have just one user; it has many, all asking for data at the same time. Opening a brand new connection to the database for every single request is slow and wasteful, like hiring a new barista for every customer. Instead, we use a connection pool. Think of it as a team of baristas ready at the counter. When a customer (a user request) arrives, a free barista (a connection) helps them. When done, the barista goes back to the team, ready for the next customer.

This pool manages the team. It sets how many baristas can be active at once, how long they wait for a customer, and when they should take a break to stay sharp. Here's a concrete way to set this up. We create a manager that handles the pool, can run queries, and even test how the team performs under pressure.

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import threading
import time

class ConnectionPoolManager:
    def __init__(self, db_url, pool_size=5, max_spare=10):
        self.engine = create_engine(
            db_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=max_spare,
            pool_timeout=30,
            pool_recycle=1800,
            echo_pool='debug'
        )
        self.metrics = {'active': 0, 'idle': 0}

    def run_safe_query(self, sql_statement, data=None):
        with self.engine.connect() as conn:
            if data:
                outcome = conn.execute(text(sql_statement), data)
            else:
                outcome = conn.execute(text(sql_statement))

            if sql_statement.strip().upper().startswith('SELECT'):
                return outcome.fetchall()
            return outcome.rowcount

    def stress_test(self, concurrent_users=20, requests_per_user=25):
        report = {'ok': 0, 'errors': 0, 'total_duration': 0.0}
        def simulated_user(user_id):
            for req in range(requests_per_user):
                try:
                    start = time.perf_counter()
                    with self.engine.connect() as c:
                        c.execute(text("SELECT 1 + %s"), (req,))
                        time.sleep(0.005)
                    duration = time.perf_counter() - start
                    with threading.Lock():
                        report['ok'] += 1
                        report['total_duration'] += duration
                except Exception:
                    with threading.Lock():
                        report['errors'] += 1

        threads = []
        for i in range(concurrent_users):
            t = threading.Thread(target=simulated_user, args=(i,))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()

        avg_time = (report['total_duration'] / report['ok'] * 1000) if report['ok'] > 0 else 0
        return {
            'successful_requests': report['ok'],
            'failed_requests': report['errors'],
            'average_ms': avg_time
        }

# Using it
manager = ConnectionPoolManager('postgresql://user:pass@localhost/dbname')
print("Creating a test table...")
manager.run_safe_query("CREATE TABLE IF NOT EXISTS test_items (id SERIAL, name TEXT);")

print("Adding sample items...")
for n in range(50):
    manager.run_safe_query("INSERT INTO test_items (name) VALUES (%s)", (f'Item_{n}',))

print("Running a simulated load test...")
load_results = manager.stress_test(concurrent_users=15, requests_per_user=30)
print(f"Results: {load_results}")
Enter fullscreen mode Exit fullscreen mode

Next, we need to build our questions for the database. You wouldn't shout a jumbled list of ingredients at your barista; you give a clear order. In code, we don't just paste raw text into a query; we build it carefully. This approach, called query building, lets us create precise, dynamic questions safely. It stops bad actors from injecting their own commands, which is a common security problem. We can add filters, sort orders, and combine data from different tables logically.

Here is a builder that constructs different types of queries. You tell it what you're looking for, and it writes the correct, safe SQL for you.

from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, ForeignKey, select, and_, or_, func, desc
from datetime import datetime, timedelta

metadata = MetaData()

# Define what our tables look like in code
products = Table('products', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(100), nullable=False),
    Column('category', String(50)),
    Column('price', Float),
    Column('stock', Integer),
    Column('added_on', DateTime, default=datetime.utcnow)
)

sales = Table('sales', metadata,
    Column('id', Integer, primary_key=True),
    Column('product_id', ForeignKey('products.id')),
    Column('quantity', Integer),
    Column('sale_price', Float),
    Column('sold_at', DateTime, default=datetime.utcnow)
)

class DynamicQueryTool:
    @staticmethod
    def find_products(**filters):
        query = select([products])
        conditions = []
        if 'category' in filters:
            conditions.append(products.c.category == filters['category'])
        if 'min_price' in filters:
            conditions.append(products.c.price >= filters['min_price'])
        if 'max_price' in filters:
            conditions.append(products.c.price <= filters['max_price'])
        if 'in_stock' in filters and filters['in_stock']:
            conditions.append(products.c.stock > 0)
        if 'name_like' in filters:
            conditions.append(products.c.name.ilike(f'%{filters["name_like"]}%'))
        if conditions:
            query = query.where(and_(*conditions))
        if 'order_by' in filters:
            order_field = filters['order_by']
            if order_field == 'price_high':
                query = query.order_by(desc(products.c.price))
            elif order_field == 'price_low':
                query = query.order_by(products.c.price)
            elif order_field == 'newest':
                query = query.order_by(desc(products.c.added_on))
        if 'limit' in filters:
            query = query.limit(filters['limit'])
        return query

    @staticmethod
    def get_daily_sales_report(days=7):
        week_ago = datetime.utcnow() - timedelta(days=days)
        query = select([
            func.date(sales.c.sold_at).label('date'),
            func.count(sales.c.id).label('total_sales'),
            func.sum(sales.c.quantity).label('items_sold'),
            func.sum(sales.c.sale_price * sales.c.quantity).label('revenue')
        ]).where(
            sales.c.sold_at >= week_ago
        ).group_by(
            func.date(sales.c.sold_at)
        ).order_by(
            desc('date')
        )
        return query

    @staticmethod
    def get_top_performing_products(limit=10):
        query = select([
            products.c.name,
            products.c.category,
            func.sum(sales.c.quantity).label('total_units_sold'),
            func.sum(sales.c.sale_price * sales.c.quantity).label('total_revenue')
        ]).select_from(
            products.join(sales, products.c.id == sales.c.product_id)
        ).group_by(
            products.c.id, products.c.name, products.c.category
        ).order_by(
            desc('total_revenue')
        ).limit(limit)
        return query

# Using the tool
print("Query for electronics under $500, in stock:")
query1 = DynamicQueryTool.find_products(category='Electronics', max_price=500.0, in_stock=True, order_by='price_low', limit=5)
print(str(query1))
print("\n---\n")

print("Sales report for the last 7 days:")
query2 = DynamicQueryTool.get_daily_sales_report(days=7)
print(str(query2))
print("\n---\n")

print("Top 5 best-selling products:")
query3 = DynamicQueryTool.get_top_performing_products(limit=5)
print(str(query3))
Enter fullscreen mode Exit fullscreen mode

The third technique is about ensuring reliability: transaction management. A transaction is a group of operations that must all succeed or all fail together. Think of it as a customer order for a latte and a muffin. You either charge their card, prepare the latte, and hand over the muffin, or if the oven breaks and you can't get the muffin, you cancel the whole order and refund the charge. You don't want to charge them for just the latte if the muffin isn't available.

Databases use transactions for this atomicity. In Python, we use context managers to make this easy and safe. The code below shows a manager that handles transactions, can retry them if something temporary goes wrong, and even processes work in batches, making sure each batch is fully completed before moving to the next.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging
import random
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SafeTransactionHandler:
    def __init__(self, database_url):
        self.engine = create_engine(database_url)
        self.Session = sessionmaker(bind=self.engine)
        self.retry_delay = 0.1

    @contextmanager
    def atomic_block(self):
        """A context manager that ensures all operations complete or none do."""
        session = self.Session()
        try:
            yield session
            session.commit()
            logger.info("Transaction committed.")
        except Exception as e:
            session.rollback()
            logger.error(f"Transaction rolled back due to: {e}")
            raise
        finally:
            session.close()

    def execute_with_retry(self, operation_func, max_attempts=3):
        """Tries an operation multiple times if it fails, useful for network glitches."""
        last_error = None
        for attempt in range(1, max_attempts + 1):
            try:
                with self.atomic_block() as session:
                    return operation_func(session)
            except Exception as e:
                last_error = e
                logger.warning(f"Attempt {attempt} failed: {e}")
                if attempt < max_attempts:
                    sleep_time = self.retry_delay * (2 ** (attempt - 1))
                    time.sleep(sleep_time)
        logger.error(f"All {max_attempts} attempts failed.")
        raise last_error

    def process_batch_safely(self, list_of_items, chunk_size=50):
        """Processes a large list by breaking it into chunks, each in its own transaction."""
        for i in range(0, len(list_of_items), chunk_size):
            chunk = list_of_items[i:i + chunk_size]
            try:
                with self.atomic_block() as session:
                    for item in chunk:
                        # Example: Mark each item as processed
                        session.execute(
                            "UPDATE data_queue SET status = 'processed' WHERE id = :id",
                            {'id': item['id']}
                        )
                logger.info(f"Successfully processed chunk starting at index {i}")
            except Exception as e:
                logger.error(f"Failed to process chunk starting at {i}: {e}")
                # You might decide to log the failed chunk and continue, or stop.
                # For this example, we'll re-raise to stop the whole process.
                raise

# Example setup and usage
print("Initializing transaction handler...")
handler = SafeTransactionHandler('sqlite:///test_transactions.db')

# Create a simple table for demonstration
setup_sql = """
CREATE TABLE IF NOT EXISTS accounts (
    id INTEGER PRIMARY KEY,
    username TEXT UNIQUE,
    balance INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS transfers (
    id INTEGER PRIMARY KEY,
    from_account INTEGER,
    to_account INTEGER,
    amount INTEGER,
    status TEXT
);
"""
handler.engine.execute(setup_sql)
handler.engine.execute("DELETE FROM accounts")
handler.engine.execute("INSERT INTO accounts (id, username, balance) VALUES (1, 'alice', 1000), (2, 'bob', 500)")

# Define a money transfer function that must be atomic
def transfer_money(session, from_id, to_id, amount):
    # Check balance
    result = session.execute("SELECT balance FROM accounts WHERE id = :id", {'id': from_id}).fetchone()
    if not result or result[0] < amount:
        raise ValueError("Insufficient funds or account not found.")
    # Deduct from sender
    session.execute("UPDATE accounts SET balance = balance - :amt WHERE id = :id", {'amt': amount, 'id': from_id})
    # Simulate a potential random failure (like a network hiccup)
    if random.random() < 0.3:  # 30% chance to fail for demonstration
        raise RuntimeError("Simulated random system fault!")
    # Add to receiver
    session.execute("UPDATE accounts SET balance = balance + :amt WHERE id = :id", {'amt': amount, 'id': to_id})
    # Log the transfer
    session.execute(
        "INSERT INTO transfers (from_account, to_account, amount, status) VALUES (:from, :to, :amt, 'completed')",
        {'from': from_id, 'to': to_id, 'amt': amount}
    )
    print(f"  Transfer of {amount} from {from_id} to {to_id} recorded in session.")

print("\n--- Attempting a transfer with retry logic ---")
try:
    # This will retry up to 3 times if our simulated random fault occurs
    handler.execute_with_retry(lambda session: transfer_money(session, 1, 2, 200), max_attempts=3)
    print("Transfer succeeded with retry handler.")
except Exception as e:
    print(f"Transfer ultimately failed: {e}")

# Check final balances
final_balances = handler.engine.execute("SELECT username, balance FROM accounts ORDER BY id").fetchall()
print(f"\nFinal account states: {final_balances}")
Enter fullscreen mode Exit fullscreen mode

Now, let's move to the fourth technique: implementing the ORM pattern. ORM stands for Object-Relational Mapping. It's a way to let you work with database records as if they were normal Python objects. Instead of writing SQL to insert a new row, you create a Python object, set its attributes, and tell the ORM to save it. It translates your objects into database operations. This makes the code cleaner and more intuitive.

However, a common mistake is to let the ORM generate very inefficient queries, especially when fetching lots of related data. The key is to be explicit about what you need. Here's a pattern I often use with SQLAlchemy's ORM. It defines models, but also includes specific, optimized query methods.

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import relationship, sessionmaker, joinedload, selectinload
from datetime import datetime

Base = declarative_base()

# Define our data models as Python classes
class Author(Base):
    __tablename__ = 'authors'
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    bio = Column(String(500))
    books = relationship("Book", back_populates="author", lazy='dynamic')  # Use 'dynamic' for large collections

class Book(Base):
    __tablename__ = 'books'
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    published_year = Column(Integer)
    author_id = Column(Integer, ForeignKey('authors.id'))
    author = relationship("Author", back_populates="books")
    reviews = relationship("Review", back_populates="book")

class Review(Base):
    __tablename__ = 'reviews'
    id = Column(Integer, primary_key=True)
    book_id = Column(Integer, ForeignKey('books.id'))
    reviewer_name = Column(String(100))
    content = Column(String(1000))
    rating = Column(Integer)
    book = relationship("Book", back_populates="reviews")

engine = create_engine('sqlite:///library_orm.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

class ORMRepository:
    """A repository class to handle all database operations for our models."""
    def __init__(self):
        self.session_factory = Session

    def add_author_with_books(self, author_name, book_titles):
        """Creates an author and their books in one transaction."""
        with self.session_factory() as session:
            new_author = Author(name=author_name)
            session.add(new_author)
            session.flush()  # Gets the author's ID
            for title in book_titles:
                new_book = Book(title=title, author_id=new_author.id)
                session.add(new_book)
            session.commit()
            print(f"Added author '{author_name}' with {len(book_titles)} books.")

    def get_author_with_books_optimized(self, author_id):
        """Fetches an author and all their books efficiently in one query."""
        with self.session_factory() as session:
            # Using joinedload to get author and books together
            author = session.query(Author).\
                options(joinedload(Author.books)).\
                filter(Author.id == author_id).\
                one()
            return author

    def get_books_with_reviews(self):
        """Fetches all books and their reviews efficiently, avoiding the N+1 query problem."""
        with self.session_factory() as session:
            # selectinload is great for loading collections (like reviews for many books)
            books = session.query(Book).\
                options(selectinload(Book.reviews)).\
                order_by(Book.title).\
                all()
            return books

    def search_books(self, title_fragment=None, min_year=None, author_name_fragment=None):
        """Builds a dynamic search query using the ORM's query builder."""
        with self.session_factory() as session:
            query = session.query(Book).join(Author)
            if title_fragment:
                query = query.filter(Book.title.ilike(f'%{title_fragment}%'))
            if min_year:
                query = query.filter(Book.published_year >= min_year)
            if author_name_fragment:
                query = query.filter(Author.name.ilike(f'%{author_name_fragment}%'))
            query = query.options(joinedload(Book.author))  # Load author data too
            return query.all()

# Example usage
print("Setting up ORM example...")
repo = ORMRepository()

print("\n1. Adding data...")
repo.add_author_with_books("Jane Austen", ["Pride and Prejudice", "Sense and Sensibility"])
repo.add_author_with_books("George Orwell", ["1984", "Animal Farm"])

print("\n2. Fetching an author with books (optimized query):")
author = repo.get_author_with_books_optimized(1)
print(f"Author: {author.name}")
for book in author.books:
    print(f"  - {book.title}")

print("\n3. Searching for books...")
found_books = repo.search_books(title_fragment="Pride", author_name_fragment="Austen")
for b in found_books:
    print(f"Found: '{b.title}' by {b.author.name}")
Enter fullscreen mode Exit fullscreen mode

The fifth technique is managing changes to your database structure over time, known as migrations. As your application grows, you might need to add a new column, remove an old one, or change a data type. You can't just delete the database and start over in production. Migration tools let you write these changes as scripts that can be applied in order, like a version history for your database schema. They also keep track of which changes have already been run.

Alembic is a popular tool for this with SQLAlchemy. The code below shows a simplified manager that orchestrates migrations, ensuring they run in the correct order and only once.

import os
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, inspect
from datetime import datetime

class SimpleMigrationManager:
    def __init__(self, db_url, migrations_dir='./migrations'):
        self.engine = create_engine(db_url)
        self.metadata = MetaData()
        self.migrations_dir = migrations_dir
        self._ensure_migrations_table()

    def _ensure_migrations_table(self):
        """Creates a table to track which migrations have been applied."""
        with self.engine.connect() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS _schema_migrations (
                    id INTEGER PRIMARY KEY,
                    version VARCHAR(50) UNIQUE NOT NULL,
                    applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)

    def _get_applied_versions(self):
        with self.engine.connect() as conn:
            result = conn.execute("SELECT version FROM _schema_migrations ORDER BY id")
            return {row[0] for row in result}

    def create_migration(self, description, upgrade_sql, downgrade_sql=None):
        """Creates a new migration file."""
        os.makedirs(self.migrations_dir, exist_ok=True)
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        filename = f"{timestamp}_{description}.py"
        filepath = os.path.join(self.migrations_dir, filename)
        with open(filepath, 'w') as f:
            f.write(f'''
"""
Migration: {description}
Created: {timestamp}
"""

def upgrade(connection):
    """Apply the migration."""
{self._indent_sql(upgrade_sql)}

def downgrade(connection):
    """Revert the migration."""
{self._indent_sql(downgrade_sql if downgrade_sql else 'pass')}
''')
        print(f"Created migration file: {filepath}")
        return filepath

    @staticmethod
    def _indent_sql(sql_text, indent_level=4):
        indent = ' ' * indent_level
        lines = sql_text.strip().split('\n')
        indented_lines = [f'{indent}{line}' for line in lines]
        return '\n'.join(indented_lines)

    def run_migrations(self):
        """Finds and runs all new migration files in order."""
        applied = self._get_applied_versions()
        migration_files = sorted([
            f for f in os.listdir(self.migrations_dir)
            if f.endswith('.py') and not f.startswith('__')
        ])
        print(f"Found {len(migration_files)} migration files on disk.")
        with self.engine.connect() as conn:
            for filename in migration_files:
                version = filename[:-3]
                if version in applied:
                    print(f"  [SKIP] {version} - already applied.")
                    continue
                print(f"  [APPLY] {version}...")
                filepath = os.path.join(self.migrations_dir, filename)
                # Load the migration module dynamically
                import importlib.util
                spec = importlib.util.spec_from_file_location(version, filepath)
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                # Run upgrade within a transaction
                trans = conn.begin()
                try:
                    mod.upgrade(conn)
                    conn.execute(
                        "INSERT INTO _schema_migrations (version) VALUES (%s)",
                        (version,)
                    )
                    trans.commit()
                    print(f"    -> Success.")
                except Exception as e:
                    trans.rollback()
                    print(f"    -> FAILED: {e}")
                    raise
        print("All pending migrations complete.")

# Example: Creating and running migrations
print("Initializing migration manager...")
manager = SimpleMigrationManager('sqlite:///migration_test.db', migrations_dir='./my_migrations')

print("\n--- Creating first migration: Add users table ---")
mig1_sql = """
CREATE TABLE users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    email TEXT UNIQUE NOT NULL,
    full_name TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
manager.create_migration("create_users_table", mig1_sql, "DROP TABLE users;")

print("\n--- Creating second migration: Add posts table ---")
mig2_sql = """
CREATE TABLE posts (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    content TEXT,
    FOREIGN KEY (user_id) REFERENCES users (id)
);
CREATE INDEX idx_posts_user ON posts(user_id);
"""
manager.create_migration("create_posts_table", mig2_sql, "DROP TABLE posts;")

print("\n--- Running all migrations ---")
manager.run_migrations()

# Verify the schema
inspector = inspect(manager.engine)
print("\nCurrent tables in database:", inspector.get_table_names())
Enter fullscreen mode Exit fullscreen mode

The sixth technique is building resilience through connection retry logic. Networks and databases are not perfectly reliable. A connection might drop for a moment, or a database might be temporarily busy. Instead of crashing, your application should pause and try again a few times. This is different from transaction retries; this is about simply establishing the initial connection or re-establishing it after a failure.

Here's a connection class that wraps the standard engine with retry logic and exponential backoff (waiting longer between each attempt).

import time
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker
from random import random

class ResilientDatabaseConnection:
    def __init__(self, connection_string, max_retries=5, base_delay=1.0):
        self.connection_string = connection_string
        self.max_retries = max_retries
        self.base_delay = base_delay
        self._engine = None
        self._connect()

    def _connect(self):
        """Attempts to create the engine with retries."""
        last_exception = None
        for attempt in range(self.max_retries):
            try:
                self._engine = create_engine(self.connection_string, echo=False)
                # Test the connection immediately
                with self._engine.connect() as test_conn:
                    test_conn.execute("SELECT 1")
                print(f"Database connection established on attempt {attempt + 1}.")
                return
            except exc.DBAPIError as e:
                last_exception = e
                if attempt == self.max_retries - 1:
                    break
                # Calculate delay with exponential backoff and a little randomness
                delay = self.base_delay * (2 ** attempt) + (random() * 0.5)
                print(f"Connection attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
                time.sleep(delay)
        print(f"Failed to connect after {self.max_retries} attempts.")
        raise last_exception or ConnectionError("Could not connect to database")

    def get_session(self):
        """Provides a new session from the resilient engine."""
        if self._engine is None:
            self._connect()
        Session = sessionmaker(bind=self._engine)
        return Session()

    def execute_with_connection_retry(self, operation):
        """
        Executes an operation that requires a raw connection.
        If the connection is found to be invalid, it will try to reconnect once.
        """
        try:
            with self._engine.connect() as conn:
                return operation(conn)
        except exc.DBAPIError as e:
            # Check if error suggests a broken connection
            if 'connection' in str(e).lower() or 'closed' in str(e).lower():
                print("Detected a broken connection. Attempting to reconnect...")
                self._connect()
                # Retry the operation once with the new connection
                with self._engine.connect() as new_conn:
                    return operation(new_conn)
            else:
                # It's a different error, re-raise it
                raise

# Simulating an unreliable database for demonstration
print("Testing resilient connection (simulating initial failures)...")

# This is a mock. In reality, you'd point to a real, temporarily unavailable DB.
# For the example, we'll simulate failure by using a dummy string and catching the error.
try:
    # This will fail to connect, demonstrating the retry loop
    resilient_db = ResilientDatabaseConnection('postgresql://bad_host:9999/nonexistent', max_retries=3, base_delay=0.5)
except Exception as e:
    print(f"Expected final failure: {type(e).__name__}")

print("\n--- Simulating a recovery scenario ---")
# Let's create a valid connection to an in-memory SQLite DB
print("Now connecting to a valid database...")
good_db = ResilientDatabaseConnection('sqlite:///recovery_test.db', max_retries=2)
good_db.execute_with_connection_retry(lambda conn: conn.execute("CREATE TABLE test (id INT, val TEXT)"))

# Simulate a mid-application failure (e.g., someone restarts the DB)
print("\nManually invalidating the engine to simulate a restart...")
good_db._engine.dispose()  # This closes all connections

print("Attempting to use the connection after invalidation...")
# This call should trigger the reconnection logic inside execute_with_connection_retry
good_db.execute_with_connection_retry(lambda conn: conn.execute("INSERT INTO test (id, val) VALUES (1, 'reconnected')"))
print("Operation succeeded after automatic reconnection.")
Enter fullscreen mode Exit fullscreen mode

The seventh technique is caching query results. Some data doesn't change very often, like a list of product categories or a user's country list. Running the same database query thousands of times for this static data is wasteful. We can store the result in a fast, in-memory cache after the first fetch. The next time we need it, we check the cache first. This dramatically reduces load on the database.

It's important to invalidate or expire the cache when the underlying data does change. This example shows a simple but effective cache decorator and a more advanced version that uses a central cache store.

import functools
import time
import pickle
from typing import Any, Optional, Callable
from datetime import datetime, timedelta

class QueryResultCache:
    """A simple in-memory cache for database query results."""
    def __init__(self, default_ttl_seconds=300):
        self._store = {}
        self.default_ttl = default_ttl_seconds
        print(f"Cache initialized with default TTL of {default_ttl_seconds}s.")

    def get(self, key: str) -> Optional[Any]:
        """Retrieves a value from the cache if it exists and is not expired."""
        if key not in self._store:
            return None
        value, expiry = self._store[key]
        if expiry and datetime.utcnow() > expiry:
            print(f"Cache entry expired for key: {key}")
            del self._store[key]
            return None
        print(f"Cache hit for key: {key}")
        return value

    def set(self, key: str, value: Any, ttl_seconds: Optional[int] = None):
        """Stores a value in the cache with a time-to-live."""
        ttl = ttl_seconds if ttl_seconds is not None else self.default_ttl
        expiry = datetime.utcnow() + timedelta(seconds=ttl) if ttl > 0 else None
        self._store[key] = (value, expiry)
        print(f"Cache set for key: {key} (expires: {expiry})")

    def invalidate(self, key: str):
        """Forcibly removes an item from the cache."""
        if key in self._store:
            del self._store[key]
            print(f"Cache invalidated for key: {key}")

    def clear(self):
        """Clears the entire cache."""
        self._store.clear()
        print("Cache cleared.")

def cache_query(ttl_seconds=60):
    """
    A decorator to cache the results of a function that performs a database query.
    The function's arguments are used to create a unique cache key.
    """
    def decorator(func: Callable):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Create a simple key based on function name and arguments
            key_parts = [func.__name__] + [str(arg) for arg in args] + [f"{k}:{v}" for k, v in sorted(kwargs.items())]
            cache_key = "|".join(key_parts)
            # Try to get from global cache
            cache = wrapper.cache_store
            cached_result = cache.get(cache_key)
            if cached_result is not None:
                return cached_result
            # Not in cache, execute the function
            print(f"Cache miss for '{func.__name__}'. Executing query...")
            result = func(*args, **kwargs)
            # Store the result
            cache.set(cache_key, result, ttl_seconds)
            return result
        # Attach a cache instance to the wrapper function
        wrapper.cache_store = QueryResultCache()
        return wrapper
    return decorator

# --- Example Usage ---
print("Demonstrating query result caching...")

# Simulate a slow database query function
@cache_query(ttl_seconds=10)  # Cache results for 10 seconds
def get_active_users(region: str, minimum_orders: int):
    """Simulates a slow database query fetching active users."""
    time.sleep(1)  # Simulate network/database latency
    print(f"  [DB QUERY EXECUTED] for region='{region}', min_orders={minimum_orders}")
    # Simulate a result
    return [
        {"id": 101, "name": f"User_A_{region}"},
        {"id": 102, "name": f"User_B_{region}"},
    ]

print("\nFirst call (will be slow, hits the DB):")
start = time.time()
result1 = get_active_users("US", 5)
print(f"  Result: {result1}. Took {time.time()-start:.2f}s")

print("\nSecond call with same args (should be instant, from cache):")
start = time.time()
result2 = get_active_users("US", 5)
print(f"  Result: {result2}. Took {time.time()-start:.2f}s")

print("\nCall with different args (will hit DB again):")
start = time.time()
result3 = get_active_users("EU", 3)
print(f"  Result: {result3}. Took {time.time()-start:.2f}s")

print("\nWaiting 11 seconds for cache to expire...")
time.sleep(11)
print("Calling again with original args after expiration:")
start = time.time()
result4 = get_active_users("US", 5)
print(f"  Result: {result4}. Took {time.time()-start:.2f}s")
Enter fullscreen mode Exit fullscreen mode

The eighth and final technique is monitoring connection health. In a long-running application, connections in a pool can go stale—the database server might close them due to inactivity or a timeout. If your application tries to use a dead connection, it will get an error. Health checks periodically verify that connections are still alive before handing them out, or right before they are used. This is often called "pre-ping."

SQLAlchemy has a pool_pre_ping flag that does this automatically. But sometimes you need more control, like logging health stats or taking action if too many connections are failing. The class below extends our connection pool manager to include active health monitoring and reporting.

import threading
import time
from sqlalchemy import create_engine, text, event
from sqlalchemy.pool import QueuePool
from collections import deque
import statistics

class MonitoredConnectionPool:
    def __init__(self, db_url, pool_size=5, health_check_interval=30):
        self.engine = create_engine(
            db_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=10,
            pool_timeout=5,
            pool_pre_ping=True,  # Basic health check on checkout
            echo=False
        )
        self.health_interval = health_check_interval
        self.health_history = deque(maxlen=100)  # Store last 100 check times
        self._monitor_active = False
        self._monitor_thread = None
        self.latency_samples = deque(maxlen=50)
        self._setup_event_listeners()

    def _setup_event_listeners(self):
        """Listen to SQLAlchemy events to gather metrics."""
        @event.listens_for(self.engine, "before_cursor_execute")
        def before_execute(conn, cursor, statement, parameters, context, executemany):
            conn.info.setdefault('query_start_time', []).append(time.perf_counter())

        @event.listens_for(self.engine, "after_cursor_execute")
        def after_execute(conn, cursor, statement, parameters, context, executemany):
            start = conn.info['query_start_time'].pop()
            total = time.perf_counter() - start
            self.latency_samples.append(total * 1000)  # Store in ms

    def start_health_monitor(self):
        """Starts a background thread that periodically checks pool health."""
        if self._monitor_active:
            return
        self._monitor_active = True
        def monitor_loop():
            while self._monitor_active:
                time.sleep(self.health_check_interval)
                self._run_health_check()
        self._monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
        self._monitor_thread.start()
        print("Health monitor started.")

    def stop_health_monitor(self):
        """Stops the health monitoring thread."""
        self._monitor_active = False
        if self._monitor_thread:
            self._monitor_thread.join(timeout=2)
        print("Health monitor stopped.")

    def _run_health_check(self):
        """Performs a health check by borrowing and testing a connection."""
        try:
            start_time = time.perf_counter()
            with self.engine.connect() as conn:
                # Execute a simple query to test responsiveness
                conn.execute(text("SELECT 1"))
            check_time = (time.perf_counter() - start_time) * 1000  # ms
            self.health_history.append((time.time(), check_time, 'ok'))
            status = 'OK'
        except Exception as e:
            self.health_history.append((time.time(), 0, f'error: {e}'))
            status = f'ERROR: {e}'
            check_time = 0

        pool = self.engine.pool
        stats = {
            'timestamp': time.strftime('%H:%M:%S'),
            'status': status,
            'check_duration_ms': check_time,
            'connections_in_use': pool.checkedout(),
            'connections_idle': pool.checkedin(),
            'pool_size': pool.size(),
            'recent_avg_latency_ms': statistics.mean(self.latency_samples) if self.latency_samples else 0,
        }
        print(f"[Health Check] {stats}")

    def get_health_report(self):
        """Generates a summary health report."""
        if not self.health_history:
            return {"status": "No checks performed yet."}
        recent_checks = [h for h in self.health_history if time.time() - h[0] < 300]  # Last 5 min
        ok_checks = [h for h in recent_checks if h[2] == 'ok']
        error_checks = [h for h in recent_checks if h[2] != 'ok']
        latency_list = [h[1] for h in ok_checks]
        return {
            "checks_last_5min": len(recent_checks),
            "success_rate": len(ok_checks) / len(recent_checks) if recent_checks else 0,
            "avg_latency_ms": statistics.mean(latency_list) if latency_list else 0,
            "recent_errors": [h[2] for h in error_checks[-3:]],  # Last 3 errors
            "current_pool_utilization": self.engine.pool.checkedout() / self.engine.pool.size() if self.engine.pool.size() > 0 else 0
        }

    def execute_operation(self, query, params=None):
        """A simple method to execute a query, now with built-in monitoring."""
        with self.engine.connect() as conn:
            if params:
                result = conn.execute(text(query), params)
            else:
                result = conn.execute(text(query))
            if query.strip().upper().startswith('SELECT'):
                return result.fetchall()
            return result

# Example Usage
print("Starting monitored connection pool example...")
monitored_pool = MonitoredConnectionPool('sqlite:///health_monitor.db', pool_size=3, health_check_interval=10)

# Create a test table
monitored_pool.execute_operation("""
CREATE TABLE IF NOT EXISTS system_log (
    id INTEGER PRIMARY KEY,
    message TEXT,
    log_level TEXT,
    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

print("\nStarting background health monitor...")
monitored_pool.start_health_monitor()

print("\nSimulating some application activity for 35 seconds...")
activity_start = time.time()
while time.time() - activity_start < 35:
    # Simulate occasional inserts and selects
    monitored_pool.execute_operation(
        "INSERT INTO system_log (message, log_level) VALUES (:msg, :lvl)",
        {'msg': f'Test log at {time.time()}', 'lvl': 'INFO'}
    )
    time.sleep(0.5)
    if random.random() > 0.7:  # Occasionally do a select
        results = monitored_pool.execute_operation("SELECT COUNT(*) FROM system_log")
        print(f"  Current log count: {results[0][0]}")
    time.sleep(1)

print("\nGenerating final health report...")
report = monitored_pool.get_health_report()
for key, value in report.items():
    print(f"  {key}: {value}")

print("\nStopping health monitor...")
monitored_pool.stop_health_monitor()
print("Example complete.")
Enter fullscreen mode Exit fullscreen mode

These eight techniques—connection pooling, query building, transaction management, ORM patterns, schema migrations, connection retries, query caching, and health monitoring—form a strong foundation for building Python applications that interact with databases in a way that is efficient, reliable, and maintainable. Each one addresses a specific challenge you'll face as your application grows from a simple script to a service that many people depend on.

I find that starting with a clear mental model, like the coffee shop analogy, helps in understanding why these patterns exist. They're not just academic concepts; they solve real, practical problems of speed, safety, and clarity. By implementing them gradually, you can build systems that are a pleasure to work on and robust enough to handle the unexpected. Remember, the goal isn't to use the most complex tool, but to use the right tool to make your data layer simple and solid.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)