Redis is more than a cache. It's a Swiss Army knife for distributed systems. Here are 7 patterns that solve real problems.
1. Rate Limiting
import redis
import time
r = redis.Redis()
def is_rate_limited(user_id: str, max_requests: int = 100, window: int = 60) -> bool:
key = f"rate:{user_id}:{int(time.time()) // window}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current > max_requests
# Usage
if is_rate_limited("user_123"):
raise HTTPException(status_code=429, detail="Too many requests")
2. Distributed Locking
Prevent race conditions across multiple servers:
import uuid
def acquire_lock(lock_name: str, timeout: int = 10) -> str | None:
lock_id = str(uuid.uuid4())
if r.set(f"lock:{lock_name}", lock_id, nx=True, ex=timeout):
return lock_id
return None
def release_lock(lock_name: str, lock_id: str) -> bool:
pipe = r.pipeline(True)
key = f"lock:{lock_name}"
try:
pipe.watch(key)
if pipe.get(key).decode() == lock_id:
pipe.multi()
pipe.delete(key)
pipe.execute()
return True
except redis.WatchError:
pass
return False
# Usage
lock_id = acquire_lock("process_payment")
if lock_id:
try:
process_payment(order_id)
finally:
release_lock("process_payment", lock_id)
3. Leaderboards with Sorted Sets
# Add scores
r.zadd("leaderboard", {"alice": 2500, "bob": 1800, "charlie": 3200})
# Top 10 players
top_10 = r.zrevrange("leaderboard", 0, 9, withscores=True)
for rank, (player, score) in enumerate(top_10, 1):
print(f"#{rank} {player.decode()}: {int(score)}")
# Player's rank
rank = r.zrevrank("leaderboard", "alice") # 0-indexed
# Increment score
r.zincrby("leaderboard", 100, "alice")
4. Session Storage
import json
def create_session(user_id: str, data: dict, ttl: int = 3600) -> str:
session_id = str(uuid.uuid4())
r.setex(f"session:{session_id}", ttl, json.dumps({"user_id": user_id, **data}))
return session_id
def get_session(session_id: str) -> dict | None:
data = r.get(f"session:{session_id}")
if data:
r.expire(f"session:{session_id}", 3600) # refresh TTL
return json.loads(data)
return None
def destroy_session(session_id: str):
r.delete(f"session:{session_id}")
5. Pub/Sub for Real-Time Events
# Publisher
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
publish_event("notifications", {"type": "new_order", "order_id": 123})
# Subscriber
def listen_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_event(event)
6. Queue with Reliability (BRPOPLPUSH)
def enqueue(queue: str, task: dict):
r.lpush(queue, json.dumps(task))
def dequeue_reliable(queue: str, processing_queue: str, timeout: int = 0):
data = r.brpoplpush(queue, processing_queue, timeout=timeout)
if data:
return json.loads(data)
return None
def complete_task(processing_queue: str, task: dict):
r.lrem(processing_queue, 1, json.dumps(task))
# Worker
while True:
task = dequeue_reliable("tasks", "tasks:processing")
if task:
try:
process(task)
complete_task("tasks:processing", task)
except Exception:
# Task stays in processing queue for retry
pass
7. Counting Unique Items with HyperLogLog
# Count unique visitors (uses only 12KB regardless of count!)
r.pfadd("visitors:2024-01-15", "user_1", "user_2", "user_3")
r.pfadd("visitors:2024-01-15", "user_1", "user_4") # user_1 not counted again
unique_count = r.pfcount("visitors:2024-01-15") # ~4
# Merge multiple days
r.pfmerge("visitors:week", "visitors:2024-01-15", "visitors:2024-01-16")
weekly_unique = r.pfcount("visitors:week")
Key Takeaways
- Rate limiting with INCR + EXPIRE is simple and effective
- Distributed locks prevent race conditions across servers
- Sorted sets are perfect for leaderboards and rankings
- Pub/Sub enables real-time event broadcasting
- HyperLogLog counts unique items with minimal memory
6. Redis is a data structure server, not just a key-value store
🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99
Top comments (0)