DEV Community

郑沛沛
郑沛沛

Posted on

Redis Beyond Caching: 7 Powerful Patterns You Should Know

Redis is more than a cache. It's a Swiss Army knife for distributed systems. Here are 7 patterns that solve real problems.

1. Rate Limiting

import redis
import time

r = redis.Redis()

def is_rate_limited(user_id: str, max_requests: int = 100, window: int = 60) -> bool:
    key = f"rate:{user_id}:{int(time.time()) // window}"
    current = r.incr(key)
    if current == 1:
        r.expire(key, window)
    return current > max_requests

# Usage
if is_rate_limited("user_123"):
    raise HTTPException(status_code=429, detail="Too many requests")
Enter fullscreen mode Exit fullscreen mode

2. Distributed Locking

Prevent race conditions across multiple servers:

import uuid

def acquire_lock(lock_name: str, timeout: int = 10) -> str | None:
    lock_id = str(uuid.uuid4())
    if r.set(f"lock:{lock_name}", lock_id, nx=True, ex=timeout):
        return lock_id
    return None

def release_lock(lock_name: str, lock_id: str) -> bool:
    pipe = r.pipeline(True)
    key = f"lock:{lock_name}"
    try:
        pipe.watch(key)
        if pipe.get(key).decode() == lock_id:
            pipe.multi()
            pipe.delete(key)
            pipe.execute()
            return True
    except redis.WatchError:
        pass
    return False

# Usage
lock_id = acquire_lock("process_payment")
if lock_id:
    try:
        process_payment(order_id)
    finally:
        release_lock("process_payment", lock_id)
Enter fullscreen mode Exit fullscreen mode

3. Leaderboards with Sorted Sets

# Add scores
r.zadd("leaderboard", {"alice": 2500, "bob": 1800, "charlie": 3200})

# Top 10 players
top_10 = r.zrevrange("leaderboard", 0, 9, withscores=True)
for rank, (player, score) in enumerate(top_10, 1):
    print(f"#{rank} {player.decode()}: {int(score)}")

# Player's rank
rank = r.zrevrank("leaderboard", "alice")  # 0-indexed

# Increment score
r.zincrby("leaderboard", 100, "alice")
Enter fullscreen mode Exit fullscreen mode

4. Session Storage

import json

def create_session(user_id: str, data: dict, ttl: int = 3600) -> str:
    session_id = str(uuid.uuid4())
    r.setex(f"session:{session_id}", ttl, json.dumps({"user_id": user_id, **data}))
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.get(f"session:{session_id}")
    if data:
        r.expire(f"session:{session_id}", 3600)  # refresh TTL
        return json.loads(data)
    return None

def destroy_session(session_id: str):
    r.delete(f"session:{session_id}")
Enter fullscreen mode Exit fullscreen mode

5. Pub/Sub for Real-Time Events

# Publisher
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

publish_event("notifications", {"type": "new_order", "order_id": 123})

# Subscriber
def listen_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_event(event)
Enter fullscreen mode Exit fullscreen mode

6. Queue with Reliability (BRPOPLPUSH)

def enqueue(queue: str, task: dict):
    r.lpush(queue, json.dumps(task))

def dequeue_reliable(queue: str, processing_queue: str, timeout: int = 0):
    data = r.brpoplpush(queue, processing_queue, timeout=timeout)
    if data:
        return json.loads(data)
    return None

def complete_task(processing_queue: str, task: dict):
    r.lrem(processing_queue, 1, json.dumps(task))

# Worker
while True:
    task = dequeue_reliable("tasks", "tasks:processing")
    if task:
        try:
            process(task)
            complete_task("tasks:processing", task)
        except Exception:
            # Task stays in processing queue for retry
            pass
Enter fullscreen mode Exit fullscreen mode

7. Counting Unique Items with HyperLogLog

# Count unique visitors (uses only 12KB regardless of count!)
r.pfadd("visitors:2024-01-15", "user_1", "user_2", "user_3")
r.pfadd("visitors:2024-01-15", "user_1", "user_4")  # user_1 not counted again

unique_count = r.pfcount("visitors:2024-01-15")  # ~4

# Merge multiple days
r.pfmerge("visitors:week", "visitors:2024-01-15", "visitors:2024-01-16")
weekly_unique = r.pfcount("visitors:week")
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Rate limiting with INCR + EXPIRE is simple and effective
  2. Distributed locks prevent race conditions across servers
  3. Sorted sets are perfect for leaderboards and rankings
  4. Pub/Sub enables real-time event broadcasting
  5. HyperLogLog counts unique items with minimal memory

6. Redis is a data structure server, not just a key-value store

🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99

Top comments (0)