As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Let me talk to you about how Python works with databases. Imagine you’re running a busy coffee shop. You have a counter where orders are taken, a barista who makes the coffee, and a system to track everything. Databases are like the organized storage room for all your coffee beans, cups, and records. Python provides the tools—the counter system, the barista's instructions, and the inventory logs—to manage that storage room efficiently and reliably. I've spent a lot of time building these systems, and I want to share some clear, practical methods that make this process smooth.
We'll look at eight specific techniques. The goal is to make your application talk to the database quickly, safely, and without confusion, whether you're handling ten users or ten thousand. I'll explain each one with straightforward examples, the kind of code I write and use myself.
First, let's discuss managing multiple conversations with your database. A web application doesn't have just one user; it has many, all asking for data at the same time. Opening a brand new connection to the database for every single request is slow and wasteful, like hiring a new barista for every customer. Instead, we use a connection pool. Think of it as a team of baristas ready at the counter. When a customer (a user request) arrives, a free barista (a connection) helps them. When done, the barista goes back to the team, ready for the next customer.
This pool manages the team. It sets how many baristas can be active at once, how long they wait for a customer, and when they should take a break to stay sharp. Here's a concrete way to set this up. We create a manager that handles the pool, can run queries, and even test how the team performs under pressure.
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import threading
import time
class ConnectionPoolManager:
def __init__(self, db_url, pool_size=5, max_spare=10):
self.engine = create_engine(
db_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_spare,
pool_timeout=30,
pool_recycle=1800,
echo_pool='debug'
)
self.metrics = {'active': 0, 'idle': 0}
def run_safe_query(self, sql_statement, data=None):
with self.engine.connect() as conn:
if data:
outcome = conn.execute(text(sql_statement), data)
else:
outcome = conn.execute(text(sql_statement))
if sql_statement.strip().upper().startswith('SELECT'):
return outcome.fetchall()
return outcome.rowcount
def stress_test(self, concurrent_users=20, requests_per_user=25):
report = {'ok': 0, 'errors': 0, 'total_duration': 0.0}
def simulated_user(user_id):
for req in range(requests_per_user):
try:
start = time.perf_counter()
with self.engine.connect() as c:
c.execute(text("SELECT 1 + %s"), (req,))
time.sleep(0.005)
duration = time.perf_counter() - start
with threading.Lock():
report['ok'] += 1
report['total_duration'] += duration
except Exception:
with threading.Lock():
report['errors'] += 1
threads = []
for i in range(concurrent_users):
t = threading.Thread(target=simulated_user, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
avg_time = (report['total_duration'] / report['ok'] * 1000) if report['ok'] > 0 else 0
return {
'successful_requests': report['ok'],
'failed_requests': report['errors'],
'average_ms': avg_time
}
# Using it
manager = ConnectionPoolManager('postgresql://user:pass@localhost/dbname')
print("Creating a test table...")
manager.run_safe_query("CREATE TABLE IF NOT EXISTS test_items (id SERIAL, name TEXT);")
print("Adding sample items...")
for n in range(50):
manager.run_safe_query("INSERT INTO test_items (name) VALUES (%s)", (f'Item_{n}',))
print("Running a simulated load test...")
load_results = manager.stress_test(concurrent_users=15, requests_per_user=30)
print(f"Results: {load_results}")
Next, we need to build our questions for the database. You wouldn't shout a jumbled list of ingredients at your barista; you give a clear order. In code, we don't just paste raw text into a query; we build it carefully. This approach, called query building, lets us create precise, dynamic questions safely. It stops bad actors from injecting their own commands, which is a common security problem. We can add filters, sort orders, and combine data from different tables logically.
Here is a builder that constructs different types of queries. You tell it what you're looking for, and it writes the correct, safe SQL for you.
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, ForeignKey, select, and_, or_, func, desc
from datetime import datetime, timedelta
metadata = MetaData()
# Define what our tables look like in code
products = Table('products', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), nullable=False),
Column('category', String(50)),
Column('price', Float),
Column('stock', Integer),
Column('added_on', DateTime, default=datetime.utcnow)
)
sales = Table('sales', metadata,
Column('id', Integer, primary_key=True),
Column('product_id', ForeignKey('products.id')),
Column('quantity', Integer),
Column('sale_price', Float),
Column('sold_at', DateTime, default=datetime.utcnow)
)
class DynamicQueryTool:
@staticmethod
def find_products(**filters):
query = select([products])
conditions = []
if 'category' in filters:
conditions.append(products.c.category == filters['category'])
if 'min_price' in filters:
conditions.append(products.c.price >= filters['min_price'])
if 'max_price' in filters:
conditions.append(products.c.price <= filters['max_price'])
if 'in_stock' in filters and filters['in_stock']:
conditions.append(products.c.stock > 0)
if 'name_like' in filters:
conditions.append(products.c.name.ilike(f'%{filters["name_like"]}%'))
if conditions:
query = query.where(and_(*conditions))
if 'order_by' in filters:
order_field = filters['order_by']
if order_field == 'price_high':
query = query.order_by(desc(products.c.price))
elif order_field == 'price_low':
query = query.order_by(products.c.price)
elif order_field == 'newest':
query = query.order_by(desc(products.c.added_on))
if 'limit' in filters:
query = query.limit(filters['limit'])
return query
@staticmethod
def get_daily_sales_report(days=7):
week_ago = datetime.utcnow() - timedelta(days=days)
query = select([
func.date(sales.c.sold_at).label('date'),
func.count(sales.c.id).label('total_sales'),
func.sum(sales.c.quantity).label('items_sold'),
func.sum(sales.c.sale_price * sales.c.quantity).label('revenue')
]).where(
sales.c.sold_at >= week_ago
).group_by(
func.date(sales.c.sold_at)
).order_by(
desc('date')
)
return query
@staticmethod
def get_top_performing_products(limit=10):
query = select([
products.c.name,
products.c.category,
func.sum(sales.c.quantity).label('total_units_sold'),
func.sum(sales.c.sale_price * sales.c.quantity).label('total_revenue')
]).select_from(
products.join(sales, products.c.id == sales.c.product_id)
).group_by(
products.c.id, products.c.name, products.c.category
).order_by(
desc('total_revenue')
).limit(limit)
return query
# Using the tool
print("Query for electronics under $500, in stock:")
query1 = DynamicQueryTool.find_products(category='Electronics', max_price=500.0, in_stock=True, order_by='price_low', limit=5)
print(str(query1))
print("\n---\n")
print("Sales report for the last 7 days:")
query2 = DynamicQueryTool.get_daily_sales_report(days=7)
print(str(query2))
print("\n---\n")
print("Top 5 best-selling products:")
query3 = DynamicQueryTool.get_top_performing_products(limit=5)
print(str(query3))
The third technique is about ensuring reliability: transaction management. A transaction is a group of operations that must all succeed or all fail together. Think of it as a customer order for a latte and a muffin. You either charge their card, prepare the latte, and hand over the muffin, or if the oven breaks and you can't get the muffin, you cancel the whole order and refund the charge. You don't want to charge them for just the latte if the muffin isn't available.
Databases use transactions for this atomicity. In Python, we use context managers to make this easy and safe. The code below shows a manager that handles transactions, can retry them if something temporary goes wrong, and even processes work in batches, making sure each batch is fully completed before moving to the next.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging
import random
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SafeTransactionHandler:
def __init__(self, database_url):
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
self.retry_delay = 0.1
@contextmanager
def atomic_block(self):
"""A context manager that ensures all operations complete or none do."""
session = self.Session()
try:
yield session
session.commit()
logger.info("Transaction committed.")
except Exception as e:
session.rollback()
logger.error(f"Transaction rolled back due to: {e}")
raise
finally:
session.close()
def execute_with_retry(self, operation_func, max_attempts=3):
"""Tries an operation multiple times if it fails, useful for network glitches."""
last_error = None
for attempt in range(1, max_attempts + 1):
try:
with self.atomic_block() as session:
return operation_func(session)
except Exception as e:
last_error = e
logger.warning(f"Attempt {attempt} failed: {e}")
if attempt < max_attempts:
sleep_time = self.retry_delay * (2 ** (attempt - 1))
time.sleep(sleep_time)
logger.error(f"All {max_attempts} attempts failed.")
raise last_error
def process_batch_safely(self, list_of_items, chunk_size=50):
"""Processes a large list by breaking it into chunks, each in its own transaction."""
for i in range(0, len(list_of_items), chunk_size):
chunk = list_of_items[i:i + chunk_size]
try:
with self.atomic_block() as session:
for item in chunk:
# Example: Mark each item as processed
session.execute(
"UPDATE data_queue SET status = 'processed' WHERE id = :id",
{'id': item['id']}
)
logger.info(f"Successfully processed chunk starting at index {i}")
except Exception as e:
logger.error(f"Failed to process chunk starting at {i}: {e}")
# You might decide to log the failed chunk and continue, or stop.
# For this example, we'll re-raise to stop the whole process.
raise
# Example setup and usage
print("Initializing transaction handler...")
handler = SafeTransactionHandler('sqlite:///test_transactions.db')
# Create a simple table for demonstration
setup_sql = """
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
balance INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS transfers (
id INTEGER PRIMARY KEY,
from_account INTEGER,
to_account INTEGER,
amount INTEGER,
status TEXT
);
"""
handler.engine.execute(setup_sql)
handler.engine.execute("DELETE FROM accounts")
handler.engine.execute("INSERT INTO accounts (id, username, balance) VALUES (1, 'alice', 1000), (2, 'bob', 500)")
# Define a money transfer function that must be atomic
def transfer_money(session, from_id, to_id, amount):
# Check balance
result = session.execute("SELECT balance FROM accounts WHERE id = :id", {'id': from_id}).fetchone()
if not result or result[0] < amount:
raise ValueError("Insufficient funds or account not found.")
# Deduct from sender
session.execute("UPDATE accounts SET balance = balance - :amt WHERE id = :id", {'amt': amount, 'id': from_id})
# Simulate a potential random failure (like a network hiccup)
if random.random() < 0.3: # 30% chance to fail for demonstration
raise RuntimeError("Simulated random system fault!")
# Add to receiver
session.execute("UPDATE accounts SET balance = balance + :amt WHERE id = :id", {'amt': amount, 'id': to_id})
# Log the transfer
session.execute(
"INSERT INTO transfers (from_account, to_account, amount, status) VALUES (:from, :to, :amt, 'completed')",
{'from': from_id, 'to': to_id, 'amt': amount}
)
print(f" Transfer of {amount} from {from_id} to {to_id} recorded in session.")
print("\n--- Attempting a transfer with retry logic ---")
try:
# This will retry up to 3 times if our simulated random fault occurs
handler.execute_with_retry(lambda session: transfer_money(session, 1, 2, 200), max_attempts=3)
print("Transfer succeeded with retry handler.")
except Exception as e:
print(f"Transfer ultimately failed: {e}")
# Check final balances
final_balances = handler.engine.execute("SELECT username, balance FROM accounts ORDER BY id").fetchall()
print(f"\nFinal account states: {final_balances}")
Now, let's move to the fourth technique: implementing the ORM pattern. ORM stands for Object-Relational Mapping. It's a way to let you work with database records as if they were normal Python objects. Instead of writing SQL to insert a new row, you create a Python object, set its attributes, and tell the ORM to save it. It translates your objects into database operations. This makes the code cleaner and more intuitive.
However, a common mistake is to let the ORM generate very inefficient queries, especially when fetching lots of related data. The key is to be explicit about what you need. Here's a pattern I often use with SQLAlchemy's ORM. It defines models, but also includes specific, optimized query methods.
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import relationship, sessionmaker, joinedload, selectinload
from datetime import datetime
Base = declarative_base()
# Define our data models as Python classes
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
bio = Column(String(500))
books = relationship("Book", back_populates="author", lazy='dynamic') # Use 'dynamic' for large collections
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
published_year = Column(Integer)
author_id = Column(Integer, ForeignKey('authors.id'))
author = relationship("Author", back_populates="books")
reviews = relationship("Review", back_populates="book")
class Review(Base):
__tablename__ = 'reviews'
id = Column(Integer, primary_key=True)
book_id = Column(Integer, ForeignKey('books.id'))
reviewer_name = Column(String(100))
content = Column(String(1000))
rating = Column(Integer)
book = relationship("Book", back_populates="reviews")
engine = create_engine('sqlite:///library_orm.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
class ORMRepository:
"""A repository class to handle all database operations for our models."""
def __init__(self):
self.session_factory = Session
def add_author_with_books(self, author_name, book_titles):
"""Creates an author and their books in one transaction."""
with self.session_factory() as session:
new_author = Author(name=author_name)
session.add(new_author)
session.flush() # Gets the author's ID
for title in book_titles:
new_book = Book(title=title, author_id=new_author.id)
session.add(new_book)
session.commit()
print(f"Added author '{author_name}' with {len(book_titles)} books.")
def get_author_with_books_optimized(self, author_id):
"""Fetches an author and all their books efficiently in one query."""
with self.session_factory() as session:
# Using joinedload to get author and books together
author = session.query(Author).\
options(joinedload(Author.books)).\
filter(Author.id == author_id).\
one()
return author
def get_books_with_reviews(self):
"""Fetches all books and their reviews efficiently, avoiding the N+1 query problem."""
with self.session_factory() as session:
# selectinload is great for loading collections (like reviews for many books)
books = session.query(Book).\
options(selectinload(Book.reviews)).\
order_by(Book.title).\
all()
return books
def search_books(self, title_fragment=None, min_year=None, author_name_fragment=None):
"""Builds a dynamic search query using the ORM's query builder."""
with self.session_factory() as session:
query = session.query(Book).join(Author)
if title_fragment:
query = query.filter(Book.title.ilike(f'%{title_fragment}%'))
if min_year:
query = query.filter(Book.published_year >= min_year)
if author_name_fragment:
query = query.filter(Author.name.ilike(f'%{author_name_fragment}%'))
query = query.options(joinedload(Book.author)) # Load author data too
return query.all()
# Example usage
print("Setting up ORM example...")
repo = ORMRepository()
print("\n1. Adding data...")
repo.add_author_with_books("Jane Austen", ["Pride and Prejudice", "Sense and Sensibility"])
repo.add_author_with_books("George Orwell", ["1984", "Animal Farm"])
print("\n2. Fetching an author with books (optimized query):")
author = repo.get_author_with_books_optimized(1)
print(f"Author: {author.name}")
for book in author.books:
print(f" - {book.title}")
print("\n3. Searching for books...")
found_books = repo.search_books(title_fragment="Pride", author_name_fragment="Austen")
for b in found_books:
print(f"Found: '{b.title}' by {b.author.name}")
The fifth technique is managing changes to your database structure over time, known as migrations. As your application grows, you might need to add a new column, remove an old one, or change a data type. You can't just delete the database and start over in production. Migration tools let you write these changes as scripts that can be applied in order, like a version history for your database schema. They also keep track of which changes have already been run.
Alembic is a popular tool for this with SQLAlchemy. The code below shows a simplified manager that orchestrates migrations, ensuring they run in the correct order and only once.
import os
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, inspect
from datetime import datetime
class SimpleMigrationManager:
def __init__(self, db_url, migrations_dir='./migrations'):
self.engine = create_engine(db_url)
self.metadata = MetaData()
self.migrations_dir = migrations_dir
self._ensure_migrations_table()
def _ensure_migrations_table(self):
"""Creates a table to track which migrations have been applied."""
with self.engine.connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
id INTEGER PRIMARY KEY,
version VARCHAR(50) UNIQUE NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def _get_applied_versions(self):
with self.engine.connect() as conn:
result = conn.execute("SELECT version FROM _schema_migrations ORDER BY id")
return {row[0] for row in result}
def create_migration(self, description, upgrade_sql, downgrade_sql=None):
"""Creates a new migration file."""
os.makedirs(self.migrations_dir, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{description}.py"
filepath = os.path.join(self.migrations_dir, filename)
with open(filepath, 'w') as f:
f.write(f'''
"""
Migration: {description}
Created: {timestamp}
"""
def upgrade(connection):
"""Apply the migration."""
{self._indent_sql(upgrade_sql)}
def downgrade(connection):
"""Revert the migration."""
{self._indent_sql(downgrade_sql if downgrade_sql else 'pass')}
''')
print(f"Created migration file: {filepath}")
return filepath
@staticmethod
def _indent_sql(sql_text, indent_level=4):
indent = ' ' * indent_level
lines = sql_text.strip().split('\n')
indented_lines = [f'{indent}{line}' for line in lines]
return '\n'.join(indented_lines)
def run_migrations(self):
"""Finds and runs all new migration files in order."""
applied = self._get_applied_versions()
migration_files = sorted([
f for f in os.listdir(self.migrations_dir)
if f.endswith('.py') and not f.startswith('__')
])
print(f"Found {len(migration_files)} migration files on disk.")
with self.engine.connect() as conn:
for filename in migration_files:
version = filename[:-3]
if version in applied:
print(f" [SKIP] {version} - already applied.")
continue
print(f" [APPLY] {version}...")
filepath = os.path.join(self.migrations_dir, filename)
# Load the migration module dynamically
import importlib.util
spec = importlib.util.spec_from_file_location(version, filepath)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
# Run upgrade within a transaction
trans = conn.begin()
try:
mod.upgrade(conn)
conn.execute(
"INSERT INTO _schema_migrations (version) VALUES (%s)",
(version,)
)
trans.commit()
print(f" -> Success.")
except Exception as e:
trans.rollback()
print(f" -> FAILED: {e}")
raise
print("All pending migrations complete.")
# Example: Creating and running migrations
print("Initializing migration manager...")
manager = SimpleMigrationManager('sqlite:///migration_test.db', migrations_dir='./my_migrations')
print("\n--- Creating first migration: Add users table ---")
mig1_sql = """
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
full_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
manager.create_migration("create_users_table", mig1_sql, "DROP TABLE users;")
print("\n--- Creating second migration: Add posts table ---")
mig2_sql = """
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
);
CREATE INDEX idx_posts_user ON posts(user_id);
"""
manager.create_migration("create_posts_table", mig2_sql, "DROP TABLE posts;")
print("\n--- Running all migrations ---")
manager.run_migrations()
# Verify the schema
inspector = inspect(manager.engine)
print("\nCurrent tables in database:", inspector.get_table_names())
The sixth technique is building resilience through connection retry logic. Networks and databases are not perfectly reliable. A connection might drop for a moment, or a database might be temporarily busy. Instead of crashing, your application should pause and try again a few times. This is different from transaction retries; this is about simply establishing the initial connection or re-establishing it after a failure.
Here's a connection class that wraps the standard engine with retry logic and exponential backoff (waiting longer between each attempt).
import time
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker
from random import random
class ResilientDatabaseConnection:
def __init__(self, connection_string, max_retries=5, base_delay=1.0):
self.connection_string = connection_string
self.max_retries = max_retries
self.base_delay = base_delay
self._engine = None
self._connect()
def _connect(self):
"""Attempts to create the engine with retries."""
last_exception = None
for attempt in range(self.max_retries):
try:
self._engine = create_engine(self.connection_string, echo=False)
# Test the connection immediately
with self._engine.connect() as test_conn:
test_conn.execute("SELECT 1")
print(f"Database connection established on attempt {attempt + 1}.")
return
except exc.DBAPIError as e:
last_exception = e
if attempt == self.max_retries - 1:
break
# Calculate delay with exponential backoff and a little randomness
delay = self.base_delay * (2 ** attempt) + (random() * 0.5)
print(f"Connection attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
time.sleep(delay)
print(f"Failed to connect after {self.max_retries} attempts.")
raise last_exception or ConnectionError("Could not connect to database")
def get_session(self):
"""Provides a new session from the resilient engine."""
if self._engine is None:
self._connect()
Session = sessionmaker(bind=self._engine)
return Session()
def execute_with_connection_retry(self, operation):
"""
Executes an operation that requires a raw connection.
If the connection is found to be invalid, it will try to reconnect once.
"""
try:
with self._engine.connect() as conn:
return operation(conn)
except exc.DBAPIError as e:
# Check if error suggests a broken connection
if 'connection' in str(e).lower() or 'closed' in str(e).lower():
print("Detected a broken connection. Attempting to reconnect...")
self._connect()
# Retry the operation once with the new connection
with self._engine.connect() as new_conn:
return operation(new_conn)
else:
# It's a different error, re-raise it
raise
# Simulating an unreliable database for demonstration
print("Testing resilient connection (simulating initial failures)...")
# This is a mock. In reality, you'd point to a real, temporarily unavailable DB.
# For the example, we'll simulate failure by using a dummy string and catching the error.
try:
# This will fail to connect, demonstrating the retry loop
resilient_db = ResilientDatabaseConnection('postgresql://bad_host:9999/nonexistent', max_retries=3, base_delay=0.5)
except Exception as e:
print(f"Expected final failure: {type(e).__name__}")
print("\n--- Simulating a recovery scenario ---")
# Let's create a valid connection to an in-memory SQLite DB
print("Now connecting to a valid database...")
good_db = ResilientDatabaseConnection('sqlite:///recovery_test.db', max_retries=2)
good_db.execute_with_connection_retry(lambda conn: conn.execute("CREATE TABLE test (id INT, val TEXT)"))
# Simulate a mid-application failure (e.g., someone restarts the DB)
print("\nManually invalidating the engine to simulate a restart...")
good_db._engine.dispose() # This closes all connections
print("Attempting to use the connection after invalidation...")
# This call should trigger the reconnection logic inside execute_with_connection_retry
good_db.execute_with_connection_retry(lambda conn: conn.execute("INSERT INTO test (id, val) VALUES (1, 'reconnected')"))
print("Operation succeeded after automatic reconnection.")
The seventh technique is caching query results. Some data doesn't change very often, like a list of product categories or a user's country list. Running the same database query thousands of times for this static data is wasteful. We can store the result in a fast, in-memory cache after the first fetch. The next time we need it, we check the cache first. This dramatically reduces load on the database.
It's important to invalidate or expire the cache when the underlying data does change. This example shows a simple but effective cache decorator and a more advanced version that uses a central cache store.
import functools
import time
import pickle
from typing import Any, Optional, Callable
from datetime import datetime, timedelta
class QueryResultCache:
"""A simple in-memory cache for database query results."""
def __init__(self, default_ttl_seconds=300):
self._store = {}
self.default_ttl = default_ttl_seconds
print(f"Cache initialized with default TTL of {default_ttl_seconds}s.")
def get(self, key: str) -> Optional[Any]:
"""Retrieves a value from the cache if it exists and is not expired."""
if key not in self._store:
return None
value, expiry = self._store[key]
if expiry and datetime.utcnow() > expiry:
print(f"Cache entry expired for key: {key}")
del self._store[key]
return None
print(f"Cache hit for key: {key}")
return value
def set(self, key: str, value: Any, ttl_seconds: Optional[int] = None):
"""Stores a value in the cache with a time-to-live."""
ttl = ttl_seconds if ttl_seconds is not None else self.default_ttl
expiry = datetime.utcnow() + timedelta(seconds=ttl) if ttl > 0 else None
self._store[key] = (value, expiry)
print(f"Cache set for key: {key} (expires: {expiry})")
def invalidate(self, key: str):
"""Forcibly removes an item from the cache."""
if key in self._store:
del self._store[key]
print(f"Cache invalidated for key: {key}")
def clear(self):
"""Clears the entire cache."""
self._store.clear()
print("Cache cleared.")
def cache_query(ttl_seconds=60):
"""
A decorator to cache the results of a function that performs a database query.
The function's arguments are used to create a unique cache key.
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create a simple key based on function name and arguments
key_parts = [func.__name__] + [str(arg) for arg in args] + [f"{k}:{v}" for k, v in sorted(kwargs.items())]
cache_key = "|".join(key_parts)
# Try to get from global cache
cache = wrapper.cache_store
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Not in cache, execute the function
print(f"Cache miss for '{func.__name__}'. Executing query...")
result = func(*args, **kwargs)
# Store the result
cache.set(cache_key, result, ttl_seconds)
return result
# Attach a cache instance to the wrapper function
wrapper.cache_store = QueryResultCache()
return wrapper
return decorator
# --- Example Usage ---
print("Demonstrating query result caching...")
# Simulate a slow database query function
@cache_query(ttl_seconds=10) # Cache results for 10 seconds
def get_active_users(region: str, minimum_orders: int):
"""Simulates a slow database query fetching active users."""
time.sleep(1) # Simulate network/database latency
print(f" [DB QUERY EXECUTED] for region='{region}', min_orders={minimum_orders}")
# Simulate a result
return [
{"id": 101, "name": f"User_A_{region}"},
{"id": 102, "name": f"User_B_{region}"},
]
print("\nFirst call (will be slow, hits the DB):")
start = time.time()
result1 = get_active_users("US", 5)
print(f" Result: {result1}. Took {time.time()-start:.2f}s")
print("\nSecond call with same args (should be instant, from cache):")
start = time.time()
result2 = get_active_users("US", 5)
print(f" Result: {result2}. Took {time.time()-start:.2f}s")
print("\nCall with different args (will hit DB again):")
start = time.time()
result3 = get_active_users("EU", 3)
print(f" Result: {result3}. Took {time.time()-start:.2f}s")
print("\nWaiting 11 seconds for cache to expire...")
time.sleep(11)
print("Calling again with original args after expiration:")
start = time.time()
result4 = get_active_users("US", 5)
print(f" Result: {result4}. Took {time.time()-start:.2f}s")
The eighth and final technique is monitoring connection health. In a long-running application, connections in a pool can go stale—the database server might close them due to inactivity or a timeout. If your application tries to use a dead connection, it will get an error. Health checks periodically verify that connections are still alive before handing them out, or right before they are used. This is often called "pre-ping."
SQLAlchemy has a pool_pre_ping flag that does this automatically. But sometimes you need more control, like logging health stats or taking action if too many connections are failing. The class below extends our connection pool manager to include active health monitoring and reporting.
import threading
import time
from sqlalchemy import create_engine, text, event
from sqlalchemy.pool import QueuePool
from collections import deque
import statistics
class MonitoredConnectionPool:
def __init__(self, db_url, pool_size=5, health_check_interval=30):
self.engine = create_engine(
db_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=10,
pool_timeout=5,
pool_pre_ping=True, # Basic health check on checkout
echo=False
)
self.health_interval = health_check_interval
self.health_history = deque(maxlen=100) # Store last 100 check times
self._monitor_active = False
self._monitor_thread = None
self.latency_samples = deque(maxlen=50)
self._setup_event_listeners()
def _setup_event_listeners(self):
"""Listen to SQLAlchemy events to gather metrics."""
@event.listens_for(self.engine, "before_cursor_execute")
def before_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.perf_counter())
@event.listens_for(self.engine, "after_cursor_execute")
def after_execute(conn, cursor, statement, parameters, context, executemany):
start = conn.info['query_start_time'].pop()
total = time.perf_counter() - start
self.latency_samples.append(total * 1000) # Store in ms
def start_health_monitor(self):
"""Starts a background thread that periodically checks pool health."""
if self._monitor_active:
return
self._monitor_active = True
def monitor_loop():
while self._monitor_active:
time.sleep(self.health_check_interval)
self._run_health_check()
self._monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
self._monitor_thread.start()
print("Health monitor started.")
def stop_health_monitor(self):
"""Stops the health monitoring thread."""
self._monitor_active = False
if self._monitor_thread:
self._monitor_thread.join(timeout=2)
print("Health monitor stopped.")
def _run_health_check(self):
"""Performs a health check by borrowing and testing a connection."""
try:
start_time = time.perf_counter()
with self.engine.connect() as conn:
# Execute a simple query to test responsiveness
conn.execute(text("SELECT 1"))
check_time = (time.perf_counter() - start_time) * 1000 # ms
self.health_history.append((time.time(), check_time, 'ok'))
status = 'OK'
except Exception as e:
self.health_history.append((time.time(), 0, f'error: {e}'))
status = f'ERROR: {e}'
check_time = 0
pool = self.engine.pool
stats = {
'timestamp': time.strftime('%H:%M:%S'),
'status': status,
'check_duration_ms': check_time,
'connections_in_use': pool.checkedout(),
'connections_idle': pool.checkedin(),
'pool_size': pool.size(),
'recent_avg_latency_ms': statistics.mean(self.latency_samples) if self.latency_samples else 0,
}
print(f"[Health Check] {stats}")
def get_health_report(self):
"""Generates a summary health report."""
if not self.health_history:
return {"status": "No checks performed yet."}
recent_checks = [h for h in self.health_history if time.time() - h[0] < 300] # Last 5 min
ok_checks = [h for h in recent_checks if h[2] == 'ok']
error_checks = [h for h in recent_checks if h[2] != 'ok']
latency_list = [h[1] for h in ok_checks]
return {
"checks_last_5min": len(recent_checks),
"success_rate": len(ok_checks) / len(recent_checks) if recent_checks else 0,
"avg_latency_ms": statistics.mean(latency_list) if latency_list else 0,
"recent_errors": [h[2] for h in error_checks[-3:]], # Last 3 errors
"current_pool_utilization": self.engine.pool.checkedout() / self.engine.pool.size() if self.engine.pool.size() > 0 else 0
}
def execute_operation(self, query, params=None):
"""A simple method to execute a query, now with built-in monitoring."""
with self.engine.connect() as conn:
if params:
result = conn.execute(text(query), params)
else:
result = conn.execute(text(query))
if query.strip().upper().startswith('SELECT'):
return result.fetchall()
return result
# Example Usage
print("Starting monitored connection pool example...")
monitored_pool = MonitoredConnectionPool('sqlite:///health_monitor.db', pool_size=3, health_check_interval=10)
# Create a test table
monitored_pool.execute_operation("""
CREATE TABLE IF NOT EXISTS system_log (
id INTEGER PRIMARY KEY,
message TEXT,
log_level TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
print("\nStarting background health monitor...")
monitored_pool.start_health_monitor()
print("\nSimulating some application activity for 35 seconds...")
activity_start = time.time()
while time.time() - activity_start < 35:
# Simulate occasional inserts and selects
monitored_pool.execute_operation(
"INSERT INTO system_log (message, log_level) VALUES (:msg, :lvl)",
{'msg': f'Test log at {time.time()}', 'lvl': 'INFO'}
)
time.sleep(0.5)
if random.random() > 0.7: # Occasionally do a select
results = monitored_pool.execute_operation("SELECT COUNT(*) FROM system_log")
print(f" Current log count: {results[0][0]}")
time.sleep(1)
print("\nGenerating final health report...")
report = monitored_pool.get_health_report()
for key, value in report.items():
print(f" {key}: {value}")
print("\nStopping health monitor...")
monitored_pool.stop_health_monitor()
print("Example complete.")
These eight techniques—connection pooling, query building, transaction management, ORM patterns, schema migrations, connection retries, query caching, and health monitoring—form a strong foundation for building Python applications that interact with databases in a way that is efficient, reliable, and maintainable. Each one addresses a specific challenge you'll face as your application grows from a simple script to a service that many people depend on.
I find that starting with a clear mental model, like the coffee shop analogy, helps in understanding why these patterns exist. They're not just academic concepts; they solve real, practical problems of speed, safety, and clarity. By implementing them gradually, you can build systems that are a pleasure to work on and robust enough to handle the unexpected. Remember, the goal isn't to use the most complex tool, but to use the right tool to make your data layer simple and solid.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)