DEV Community

Rueian
Rueian

Posted on • Edited on

Using Redis 7's New Sharded Pub/Sub in Go

Redis 7.0 RC1 is released in 2022/1/31. And it comes with a new Sharded Pub/Sub feature:

Sharded pubsub helps to scale the usage of pubsub in cluster mode. It restricts the propagation of message to be within the shard of a cluster. Hence, the amount of data passing through the cluster bus is limited in comparison to global pubsub where each message propagates to each node in the cluster. This allows users to horizontally scale the pubsub usage by adding more shards.

Now, users can use the new SPUBLISH command to publish messages without worrying about overwhelming the whole Redis cluster.

And use SSUBSCRIBE command to receive those messages.

Here is an example showing how to use this new feature with rueidis, a high-performance RESP3 go client library:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/rueian/rueidis"
)

func main() {
    client, _ := rueidis.NewClient(rueidis.ClientOption{
        InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    })

    ctx := context.Background()

    go func() {
        for {
            client.Do(ctx, client.B().Spublish().Channel("ch").Message("hi").Build())
            time.Sleep(time.Second)
        }
    }()

    client.Receive(ctx, client.B().Ssubscribe().Channel("ch").Build(), func(msg rueidis.PubSubMessage) {
        fmt.Println(msg.Channel, msg.Message)
    })
}

Enter fullscreen mode Exit fullscreen mode

The above snippet does the following:

  1. Initialize the client with Redis cluster addresses.
  2. Start a goroutine to do SPUBLISH every second.
  3. Use client.Receive with a SSUBSCRIBE command and a callback function to keep processing received messages.

The client.Receive will block forever until the ctx is done or the client is unsubscribed from the channel.

That's it! If you have any questions or issues with rueidis, please contact me on Github.

Top comments (0)