Redis 8.0 Advanced Patterns: Stream, Distributed Locks & Cache Strategies
Redis 8.0 has arrived with significant performance improvements. In production, Redis is far more than a simple key-value cache — it serves as a message queue, distributed coordinator, and real-time data processing engine. This article explores Redis in four core scenarios.
1. Redis Stream: Building Reliable Message Queues
Redis Stream (introduced in 5.0, enhanced in 8.0) provides message persistence, Consumer Groups, and ACK mechanisms — making it ideal for lightweight message queues.
Python Producer/Consumer
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Producer
class OrderProducer:
def __init__(self, stream_key='order:stream'):
self.stream_key = stream_key
def send_order(self, order_id: str, user_id: str, amount: float):
message = {
'order_id': order_id,
'user_id': user_id,
'amount': str(amount),
'timestamp': str(int(time.time() * 1000))
}
result = r.xadd(self.stream_key, message, maxlen=10000, approximate=True)
print(f"[Producer] Order {order_id} sent, msg_id={result}")
return result
# Consumer Group
class OrderConsumer:
def __init__(self, stream_key='order:stream', group='order-processors', consumer_name=None):
self.stream_key = stream_key
self.group = group
self.consumer = consumer_name or 'worker-default'
try:
r.xgroup_create(stream_key, group, id='0', mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def process_orders(self, batch_size=10, block_ms=5000):
while True:
messages = r.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream_key: '>'},
count=batch_size,
block=block_ms
)
if not messages:
continue
for stream, msg_list in messages:
for msg_id, fields in msg_list:
try:
self._handle_message(msg_id, fields)
r.xack(stream, self.group, msg_id)
except Exception as e:
print(f"[Consumer] Failed: {msg_id}: {e}")
def _handle_message(self, msg_id, fields):
order_id = fields['order_id']
amount = float(fields['amount'])
print(f"[Consumer] Processing order: {order_id}, amount: {amount}")
Go Consumer
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
type OrderConsumer struct {
client *redis.Client
stream string
group string
consumer string
}
func NewOrderConsumer(addr string) *OrderConsumer {
client := redis.NewClient(&redis.Options{Addr: addr})
return &OrderConsumer{
client: client,
stream: "order:stream",
group: "order-processors",
consumer: fmt.Sprintf("go-worker-%d", time.Now().UnixNano()),
}
}
func (c *OrderConsumer) Start(ctx context.Context) error {
err := c.client.XGroupCreateMkStream(ctx, c.stream, c.group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
streams, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.group,
Consumer: c.consumer,
Streams: []string{c.stream, ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
orderID := msg.Values["order_id"].(string)
log.Printf("Processing order: %s", orderID)
c.client.XAck(ctx, c.stream, c.group, msg.ID)
}
}
}
}
}
2. Redlock: Distributed Locks in Go
Single-node Redis locks carry single-point-of-failure risk. Redlock acquires locks across multiple independent Redis instances.
package redlock
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"time"
"github.com/redis/go-redis/v9"
)
var (
ErrLockNotAcquired = errors.New("lock not acquired")
)
type DistributedLock struct {
clients []*redis.Client
quorum int
key string
value string
expiration time.Duration
}
func NewLock(clients []*redis.Client, key string, ttl time.Duration) *DistributedLock {
quorum := len(clients)/2 + 1
n, _ := rand.Int(rand.Reader, big.NewInt(1<<62))
return &DistributedLock{
clients: clients,
quorum: quorum,
key: key,
value: n.String(),
expiration: ttl,
}
}
func (dl *DistributedLock) Lock(ctx context.Context) error {
acquired := 0
start := time.Now()
for _, client := range dl.clients {
ok, err := client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
if err != nil {
continue
}
if ok {
acquired++
}
}
validity := dl.expiration - time.Since(start) - 10*time.Millisecond
if acquired >= dl.quorum && validity > 0 {
return nil
}
dl.Unlock(ctx)
return ErrLockNotAcquired
}
var unlockScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (dl *DistributedLock) Unlock(ctx context.Context) {
for _, client := range dl.clients {
unlockScript.Run(ctx, client, []string{dl.key}, dl.value)
}
}
3. Cache Penetration, Breakdown & Avalanche
Cache Penetration — Cache null values with Bloom filters
import redis
import hashlib
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def cache_with_protection(prefix, ttl=3600, null_cache_ttl=300):
def decorator(func):
def wrapper(*args, **kwargs):
key_parts = [prefix] + [str(a) for a in args]
cache_key = "cache:" + hashlib.md5(":".join(key_parts).encode()).hexdigest()
cached = r.get(cache_key)
if cached is not None:
if cached == "__NULL__":
return None
return json.loads(cached)
result = func(*args, **kwargs)
if result is None:
r.setex(cache_key, null_cache_ttl, "__NULL__")
else:
r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
return result
return wrapper
return decorator
@cache_with_protection(prefix="user:info", ttl=1800, null_cache_ttl=120)
def get_user_info(user_id: int):
return {"id": user_id, "name": "Zhang San"}
Cache Breakdown — Mutex lock for hot keys
import time
def get_with_mutex(cache_key: str, db_query_fn, ttl: int = 3600):
data = r.get(cache_key)
if data is not None:
return json.loads(data)
lock_key = f"lock:{cache_key}"
acquired = r.set(lock_key, "1", nx=True, ex=10)
if acquired:
try:
data = r.get(cache_key) # Double-check
if data is not None:
return json.loads(data)
result = db_query_fn()
if result is not None:
r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
return result
finally:
r.delete(lock_key)
else:
time.sleep(0.1)
data = r.get(cache_key)
return json.loads(data) if data else db_query_fn()
Cache Avalanche — TTL jitter + multi-level cache
import random
class AntiAvalancheCache:
def __init__(self):
self.local_cache = {}
def get(self, cache_key: str, db_query_fn, base_ttl: int = 3600):
if cache_key in self.local_cache:
return self.local_cache[cache_key]
data = r.get(cache_key)
if data is not None:
result = json.loads(data)
self.local_cache[cache_key] = result
return result
result = db_query_fn()
if result is not None:
jitter = random.randint(int(base_ttl * 0.8), int(base_ttl * 1.2))
r.setex(cache_key, jitter, json.dumps(result, ensure_ascii=False))
self.local_cache[cache_key] = result
return result
4. Redis Module Ecosystem
Redis modules extend Redis with new data structures and commands:
| Module | Purpose |
|---|---|
| RediSearch | Full-text search with Chinese tokenization |
| RedisJSON | Native JSON processing with JSONPath |
| RedisBloom | Bloom filters, Cuckoo filters |
| RedisTimeSeries | Time-series data with downsampling |
# RedisJSON + RediSearch
import redis.commands.search.field as search_field
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
doc = {
"title": "Redis 8.0 Advanced Patterns",
"author": "WD Tech",
"tags": ["redis", "cache", "distributed"],
"views": 1500
}
r.json().set("article:1001", "$", doc)
r.ft("article_idx").create_index(
fields=[
search_field.TextField("$.title", as_name="title"),
search_field.TagField("$.tags", as_name="tags"),
search_field.NumericField("$.views", as_name="views"),
],
definition=IndexDefinition(prefix=["article:"], index_type=IndexType.JSON)
)
result = r.ft("article_idx").search("Redis cache")
for doc in result.docs:
print(f"Title: {doc.title}, Views: {doc.views}")
Summary
Redis 8.0 is a versatile multi-tool: Stream for lightweight message queues, Redlock for distributed mutual exclusion, layered cache defenses for reliability, and the Module ecosystem for extending beyond key-value storage. Choose the right pattern for your scale — Redis's simplicity and efficiency remain its greatest strengths.
This is a condensed version. The full article includes exclusive tool recommendations and in-depth analysis — visit WD Tech Blog for the complete version!
Follow my blog for the latest tech news, AI tutorials, and productivity tool recommendations!
Top comments (0)