DEV Community

WDSEGA
WDSEGA

Posted on

Redis 8.0 Advanced Patterns: Stream, Distributed Locks & Cache Strategies

Redis 8.0 Advanced Patterns: Stream, Distributed Locks & Cache Strategies

Redis 8.0 has arrived with significant performance improvements. In production, Redis is far more than a simple key-value cache — it serves as a message queue, distributed coordinator, and real-time data processing engine. This article explores Redis in four core scenarios.

1. Redis Stream: Building Reliable Message Queues

Redis Stream (introduced in 5.0, enhanced in 8.0) provides message persistence, Consumer Groups, and ACK mechanisms — making it ideal for lightweight message queues.

Python Producer/Consumer

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Producer
class OrderProducer:
    def __init__(self, stream_key='order:stream'):
        self.stream_key = stream_key

    def send_order(self, order_id: str, user_id: str, amount: float):
        message = {
            'order_id': order_id,
            'user_id': user_id,
            'amount': str(amount),
            'timestamp': str(int(time.time() * 1000))
        }
        result = r.xadd(self.stream_key, message, maxlen=10000, approximate=True)
        print(f"[Producer] Order {order_id} sent, msg_id={result}")
        return result

# Consumer Group
class OrderConsumer:
    def __init__(self, stream_key='order:stream', group='order-processors', consumer_name=None):
        self.stream_key = stream_key
        self.group = group
        self.consumer = consumer_name or 'worker-default'
        try:
            r.xgroup_create(stream_key, group, id='0', mkstream=True)
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    def process_orders(self, batch_size=10, block_ms=5000):
        while True:
            messages = r.xreadgroup(
                groupname=self.group,
                consumername=self.consumer,
                streams={self.stream_key: '>'},
                count=batch_size,
                block=block_ms
            )
            if not messages:
                continue
            for stream, msg_list in messages:
                for msg_id, fields in msg_list:
                    try:
                        self._handle_message(msg_id, fields)
                        r.xack(stream, self.group, msg_id)
                    except Exception as e:
                        print(f"[Consumer] Failed: {msg_id}: {e}")

    def _handle_message(self, msg_id, fields):
        order_id = fields['order_id']
        amount = float(fields['amount'])
        print(f"[Consumer] Processing order: {order_id}, amount: {amount}")
Enter fullscreen mode Exit fullscreen mode

Go Consumer

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
)

type OrderConsumer struct {
    client   *redis.Client
    stream   string
    group    string
    consumer string
}

func NewOrderConsumer(addr string) *OrderConsumer {
    client := redis.NewClient(&redis.Options{Addr: addr})
    return &OrderConsumer{
        client:   client,
        stream:   "order:stream",
        group:    "order-processors",
        consumer: fmt.Sprintf("go-worker-%d", time.Now().UnixNano()),
    }
}

func (c *OrderConsumer) Start(ctx context.Context) error {
    err := c.client.XGroupCreateMkStream(ctx, c.stream, c.group, "0").Err()
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        return err
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            streams, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
                Group:    c.group,
                Consumer: c.consumer,
                Streams:  []string{c.stream, ">"},
                Count:    10,
                Block:    5 * time.Second,
            }).Result()
            if err != nil {
                continue
            }
            for _, stream := range streams {
                for _, msg := range stream.Messages {
                    orderID := msg.Values["order_id"].(string)
                    log.Printf("Processing order: %s", orderID)
                    c.client.XAck(ctx, c.stream, c.group, msg.ID)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Redlock: Distributed Locks in Go

Single-node Redis locks carry single-point-of-failure risk. Redlock acquires locks across multiple independent Redis instances.

package redlock

import (
    "context"
    "crypto/rand"
    "errors"
    "fmt"
    "math/big"
    "time"

    "github.com/redis/go-redis/v9"
)

var (
    ErrLockNotAcquired = errors.New("lock not acquired")
)

type DistributedLock struct {
    clients    []*redis.Client
    quorum     int
    key        string
    value      string
    expiration time.Duration
}

func NewLock(clients []*redis.Client, key string, ttl time.Duration) *DistributedLock {
    quorum := len(clients)/2 + 1
    n, _ := rand.Int(rand.Reader, big.NewInt(1<<62))
    return &DistributedLock{
        clients:    clients,
        quorum:     quorum,
        key:        key,
        value:      n.String(),
        expiration: ttl,
    }
}

func (dl *DistributedLock) Lock(ctx context.Context) error {
    acquired := 0
    start := time.Now()

    for _, client := range dl.clients {
        ok, err := client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
        if err != nil {
            continue
        }
        if ok {
            acquired++
        }
    }

    validity := dl.expiration - time.Since(start) - 10*time.Millisecond
    if acquired >= dl.quorum && validity > 0 {
        return nil
    }

    dl.Unlock(ctx)
    return ErrLockNotAcquired
}

var unlockScript = redis.NewScript(`
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    else
        return 0
    end
`)

func (dl *DistributedLock) Unlock(ctx context.Context) {
    for _, client := range dl.clients {
        unlockScript.Run(ctx, client, []string{dl.key}, dl.value)
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Cache Penetration, Breakdown & Avalanche

Cache Penetration — Cache null values with Bloom filters

import redis
import hashlib
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def cache_with_protection(prefix, ttl=3600, null_cache_ttl=300):
    def decorator(func):
        def wrapper(*args, **kwargs):
            key_parts = [prefix] + [str(a) for a in args]
            cache_key = "cache:" + hashlib.md5(":".join(key_parts).encode()).hexdigest()

            cached = r.get(cache_key)
            if cached is not None:
                if cached == "__NULL__":
                    return None
                return json.loads(cached)

            result = func(*args, **kwargs)
            if result is None:
                r.setex(cache_key, null_cache_ttl, "__NULL__")
            else:
                r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
            return result
        return wrapper
    return decorator

@cache_with_protection(prefix="user:info", ttl=1800, null_cache_ttl=120)
def get_user_info(user_id: int):
    return {"id": user_id, "name": "Zhang San"}
Enter fullscreen mode Exit fullscreen mode

Cache Breakdown — Mutex lock for hot keys

import time

def get_with_mutex(cache_key: str, db_query_fn, ttl: int = 3600):
    data = r.get(cache_key)
    if data is not None:
        return json.loads(data)

    lock_key = f"lock:{cache_key}"
    acquired = r.set(lock_key, "1", nx=True, ex=10)
    if acquired:
        try:
            data = r.get(cache_key)  # Double-check
            if data is not None:
                return json.loads(data)
            result = db_query_fn()
            if result is not None:
                r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
            return result
        finally:
            r.delete(lock_key)
    else:
        time.sleep(0.1)
        data = r.get(cache_key)
        return json.loads(data) if data else db_query_fn()
Enter fullscreen mode Exit fullscreen mode

Cache Avalanche — TTL jitter + multi-level cache

import random

class AntiAvalancheCache:
    def __init__(self):
        self.local_cache = {}

    def get(self, cache_key: str, db_query_fn, base_ttl: int = 3600):
        if cache_key in self.local_cache:
            return self.local_cache[cache_key]

        data = r.get(cache_key)
        if data is not None:
            result = json.loads(data)
            self.local_cache[cache_key] = result
            return result

        result = db_query_fn()
        if result is not None:
            jitter = random.randint(int(base_ttl * 0.8), int(base_ttl * 1.2))
            r.setex(cache_key, jitter, json.dumps(result, ensure_ascii=False))
            self.local_cache[cache_key] = result
        return result
Enter fullscreen mode Exit fullscreen mode

4. Redis Module Ecosystem

Redis modules extend Redis with new data structures and commands:

Module Purpose
RediSearch Full-text search with Chinese tokenization
RedisJSON Native JSON processing with JSONPath
RedisBloom Bloom filters, Cuckoo filters
RedisTimeSeries Time-series data with downsampling
# RedisJSON + RediSearch
import redis.commands.search.field as search_field
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

doc = {
    "title": "Redis 8.0 Advanced Patterns",
    "author": "WD Tech",
    "tags": ["redis", "cache", "distributed"],
    "views": 1500
}
r.json().set("article:1001", "$", doc)

r.ft("article_idx").create_index(
    fields=[
        search_field.TextField("$.title", as_name="title"),
        search_field.TagField("$.tags", as_name="tags"),
        search_field.NumericField("$.views", as_name="views"),
    ],
    definition=IndexDefinition(prefix=["article:"], index_type=IndexType.JSON)
)

result = r.ft("article_idx").search("Redis cache")
for doc in result.docs:
    print(f"Title: {doc.title}, Views: {doc.views}")
Enter fullscreen mode Exit fullscreen mode

Summary

Redis 8.0 is a versatile multi-tool: Stream for lightweight message queues, Redlock for distributed mutual exclusion, layered cache defenses for reliability, and the Module ecosystem for extending beyond key-value storage. Choose the right pattern for your scale — Redis's simplicity and efficiency remain its greatest strengths.


This is a condensed version. The full article includes exclusive tool recommendations and in-depth analysis — visit WD Tech Blog for the complete version!

Follow my blog for the latest tech news, AI tutorials, and productivity tool recommendations!

Top comments (0)