Caching is the single most impactful optimization you can make. Here's a practical guide to caching strategies, patterns, and pitfalls.
Why Cache?
A database query takes 5-50ms. A cache hit takes 0.1-1ms. For read-heavy workloads, caching can reduce latency by 10-100x and cut database load dramatically.
Cache-Aside (Lazy Loading)
The most common pattern. Application checks cache first, falls back to database:
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# Check cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss — fetch from DB
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# Store in cache with TTL
r.setex(cache_key, 3600, json.dumps(user)) # 1 hour TTL
return user
Write-Through Cache
Write to cache and database simultaneously:
def update_user(user_id: int, data: dict):
# Update database
db.execute("UPDATE users SET name=%s WHERE id=%s", data["name"], user_id)
# Update cache immediately
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
Write-Behind (Write-Back)
Write to cache immediately, sync to database asynchronously:
import threading
import time
write_queue = []
def update_user_fast(user_id: int, data: dict):
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
write_queue.append(("users", user_id, data))
def db_sync_worker():
while True:
if write_queue:
table, id, data = write_queue.pop(0)
db.execute(f"UPDATE {table} SET data=%s WHERE id=%s", json.dumps(data), id)
time.sleep(0.1)
threading.Thread(target=db_sync_worker, daemon=True).start()
Cache Invalidation Patterns
# Pattern 1: TTL-based (simplest)
r.setex("key", 300, "value") # expires in 5 minutes
# Pattern 2: Event-based invalidation
def on_user_updated(user_id):
r.delete(f"user:{user_id}")
r.delete(f"user_profile:{user_id}")
r.delete("user_list") # invalidate list cache too
# Pattern 3: Versioned keys
def get_config(version: int):
return r.get(f"config:v{version}")
Multi-Level Caching
from functools import lru_cache
# L1: In-process memory (fastest, per-instance)
@lru_cache(maxsize=1000)
def get_setting(key: str) -> str:
# L2: Redis (shared across instances)
cached = r.get(f"setting:{key}")
if cached:
return cached
# L3: Database (source of truth)
value = db.query("SELECT value FROM settings WHERE key=%s", key)
r.setex(f"setting:{key}", 600, value)
return value
Cache Stampede Prevention
When a popular cache key expires, hundreds of requests hit the database simultaneously:
import hashlib
def get_with_lock(key: str, fetch_fn, ttl=3600):
cached = r.get(key)
if cached:
return json.loads(cached)
lock_key = f"lock:{key}"
if r.set(lock_key, "1", nx=True, ex=10): # acquire lock
try:
data = fetch_fn()
r.setex(key, ttl, json.dumps(data))
return data
finally:
r.delete(lock_key)
else:
# Another process is rebuilding, wait and retry
time.sleep(0.1)
return get_with_lock(key, fetch_fn, ttl)
Key Takeaways
- Cache-aside is the safest default pattern
- Always set TTLs — stale data is worse than slow data
- Use multi-level caching for hot data
- Prevent cache stampedes with locking
- Monitor hit rates — below 80% means your strategy needs work
6. Cache invalidation is hard — prefer TTL-based expiry when possible
🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99
Top comments (0)