PostgreSQL Performance Tuning for Python/FastAPI APIs: Indexes, Query Optimization & Connection Pooling
I spent three months debugging slow endpoints on CitizenApp before realizing the bottleneck wasn't my FastAPI code—it was my database. A single N+1 query was making 12,000 queries per request. I wasn't monitoring slow queries. I wasn't using connection pooling properly. I wasn't indexing strategically.
This post is what I wish I'd read back then.
Why SQLAlchemy Makes You Lazy (And That's Dangerous)
SQLAlchemy's ORM is fantastic for rapid development. But it abstracts away the SQL your code actually generates, and that abstraction hides performance disasters.
# This looks clean. It's also a ticking time bomb.
@router.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.execute(
select(User).where(User.id == user_id)
)
user = user.scalars().first()
# Each post query triggers a lazy load of comments
posts = user.posts # N+1 query incoming
return {
"user": user,
"posts": [
{
"id": p.id,
"title": p.title,
"comments": [c.text for c in p.comments] # Query per post
}
for p in posts
]
}
This endpoint makes 1 + N + N*M queries. With 50 posts and 10 comments each, that's 551 database hits.
Here's the fix—use selectinload and explicit eager loading:
from sqlalchemy.orm import selectinload
@router.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.execute(
select(User)
.where(User.id == user_id)
.options(
selectinload(User.posts).selectinload(Post.comments)
)
)
user = user.scalars().first()
return {
"user": user,
"posts": [
{
"id": p.id,
"title": p.title,
"comments": [c.text for c in p.comments]
}
for p in user.posts
]
}
Now it's 2 queries total. Load time dropped from 3.2 seconds to 45ms on production data.
Why I prefer eager loading over lazy loading: The ORM can't know which relationships you'll access later. Lazy loading feels convenient until it destroys your database at 2 AM.
Strategic Indexing—Know What Actually Gets Queried
Most developers index primary keys and call it a day. That's not enough.
Profile your slow queries first. In PostgreSQL, enable log_min_duration_statement:
-- In postgresql.conf or via ALTER SYSTEM
ALTER SYSTEM SET log_min_duration_statement = 500; -- Log queries > 500ms
SELECT pg_reload_conf();
-- Then check logs
tail -f /var/log/postgresql/postgresql.log | grep "duration:"
Once you identify slow queries, index the right columns. For CitizenApp, this query was killing us:
-- Before: 2.8 seconds
SELECT * FROM posts
WHERE user_id = $1 AND status = 'published'
ORDER BY created_at DESC
LIMIT 10;
-- Add a composite index
CREATE INDEX idx_posts_user_status_created
ON posts(user_id, status, created_at DESC);
-- After: 3ms
Why composite indexes matter: PostgreSQL can use a single index for filtering AND sorting, avoiding table scans and sort operations.
But I made a mistake early on—I over-indexed. I had 47 indexes on a 15-column table. Insert performance tanked. Index only what gets queried frequently. Use pg_stat_user_indexes to find unused indexes:
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;
Drop indexes with zero scans. On CitizenApp, that freed up 850MB of disk space and improved write performance by 22%.
Connection Pooling: The Invisible Killer
FastAPI doesn't magically manage database connections efficiently. Without proper pooling, each request creates a new connection. That's expensive—TCP handshake, authentication, initialization.
I prefer asyncpg with sqlalchemy.ext.asyncio:
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
from sqlalchemy.pool import NullPool, QueuePool
# Development: NullPool (no pooling, debug-friendly)
# Production: QueuePool with size + overflow settings
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(
DATABASE_URL,
echo=False,
poolclass=QueuePool,
pool_size=20, # Idle connections to keep
max_overflow=10, # Extra connections when pool exhausted
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Check connection health before use
)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db():
async with async_session() as session:
yield session
Pool size tuning:
- pool_size = number of concurrent users you expect
- max_overflow = buffer for traffic spikes
- pool_recycle = prevent "server closed connection" errors (AWS RDS closes idle connections after 15 minutes)
For CitizenApp running on Render with ~500 concurrent users, I use pool_size=25, max_overflow=15. This prevents "QueuePool limit exceeded" errors while keeping connections efficient.
Monitor pool health with:
# Add to your monitoring/health endpoint
@router.get("/health/db")
async def db_health(db: AsyncSession = Depends(get_db)):
pool = db.sync_session_factory.kw['bind'].pool
return {
"checkedout": pool.checkedout(),
"size": pool.size(),
"overflow": pool.overflow(),
}
Query Profiling in Production
You can't optimize what you don't measure. I use django-silk equivalent: SQLAlchemy's event system + structured logging.
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
logging.info(f"Query: {statement}")
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total_time = time.time() - conn.info['query_start_time'].pop(-1)
logging.info(f"Execution time: {total_time:.4f}s")
But honestly? For production, use Sentry with SQLAlchemy integration. It captures slow queries automatically and integrates with your error tracking.
Gotcha: VACUUM and ANALYZE
PostgreSQL doesn't automatically clean up dead rows. Without VACUUM, your table bloats and queries slow down. Worse, the query planner uses stale statistics, choosing bad execution plans.
Set up autovacuum properly:
ALTER TABLE posts SET (autovacuum_vacuum_scale_factor = 0.05);
ALTER TABLE posts SET (autovacuum_analyze_scale_factor = 0.02);
The defaults are 10% and 5%, which is fine for small tables but terrible for high-volume ones like events or logs.
Run ANALYZE after bulk inserts:
# After bulk insert/migration
await db.execute(text("ANALYZE posts"))
await db.commit()
I missed this on CitizenApp for months. A table grew to 80GB bloated. Queries on an otherwise well-indexed table took 8+ seconds. One VACUUM FULL later: 4GB, queries in milliseconds.
The Real Win
Optimization isn't sexy code—it's boring SQL profiles and composite indexes. But when your API goes from p95 latency of 3.2 seconds to 120ms, your users notice. Your database costs drop 40%. Your on-call doesn't get paged at 3 AM.
Invest in monitoring from day one. Profile before optimizing. Use eager loading. Index thoughtfully. Pool connections properly. Your future self will thank you.
Top comments (0)