Pub/Sub is a pattern where the publisher is not programmed to send a message (payload) to a specific receiver. These messages are sent by publishers to specific channels, and receivers can subscribe to one or more channels to consume those same messages.
Imagine that you have a monolithic backend, however you want to add a new feature to that backend, such as sending emails. Instead of this backend being responsible for sending the emails, you can make it a publisher that sends the emails to a channel to be consumed by another backend (receiver) that will be responsible for sending the emails (like newsletters).
The implementation of this process is quite simple, and that's why in today's example I decided to create a simple Api so that it will receive the body of our request and will send it to a specific channel to be consumed by a receiver and print it on the console.
The framework I decided to use today was fiber, with an API similar to Express, I don't have a specific reason to use this framework and the example code is easily replicated to other frameworks.
The Redis client I'm going to use today is go-redis, because its API is intuitive, minimalist and with good performance.
Let's code
As you may have already understood, we are going to have two backends. One of the backends we will call a pub, which will be our Api. The other backend will be the sub, which will be our receiver.
First and foremost, let's install our dependencies:
go get github.com/gofiber/fiber/v2
go get github.com/go-redis/redis/v8
Now let's create a simple API:
// @/pub/main.go
package main
import "github.com/gofiber/fiber/v2"
func main() {
app := fiber.New()
app.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Hello there 👋")
})
app.Listen(":3000")
}
Now we can import go-redis into our project and let's create our client.
// @/pub/main.go
package main
import (
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
)
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
app := fiber.New()
app.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Hello there 👋")
})
app.Listen(":3000")
}
Now we have to create a struct which I'll name User, in which the fields we're going to add are the following:
// @/pub/main.go
// ...
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
// ...
Now we can start working on our endpoint, first let's change the http verb from Get
to Post
, because we're going to be waiting to receive data from the request body.
Next, we will acquire that same data from the body using the c.BodyParser()
function and then we have to convert this same data into a string using the json.Marshall()
function.
// @/pub/main.go
app.Post("/", func(c *fiber.Ctx) error {
user := new(User)
if err := c.BodyParser(user); err != nil {
panic(err)
}
payload, err := json.Marshal(user)
if err != nil {
panic(err)
}
// ...
})
Now on our endpoint we will create a publisher, for that we will use the redisClient.Publish()
function. This function takes three arguments, the first is the context, the sencond is the name of the channel to which we want to send the message and the third is that same message.
// @/pub/main.go
var ctx = context.Background()
// ...
app.Post("/", func(c *fiber.Ctx) error {
user := new(User)
if err := c.BodyParser(user); err != nil {
panic(err)
}
payload, err := json.Marshal(user)
if err != nil {
panic(err)
}
if err := redisClient.Publish(ctx, "send-user-data", payload).Err(); err != nil {
panic(err)
}
return c.SendStatus(200)
})
Your Api should have a code similar to this:
// @/pub/main.go
package main
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
app := fiber.New()
app.Post("/", func(c *fiber.Ctx) error {
user := new(User)
if err := c.BodyParser(user); err != nil {
panic(err)
}
payload, err := json.Marshal(user)
if err != nil {
panic(err)
}
if err := redisClient.Publish(ctx, "send-user-data", payload).Err(); err != nil {
panic(err)
}
return c.SendStatus(200)
})
app.Listen(":3000")
}
With this we have our pub finished and now we can start working on our sub.
Let's import go-redis into our project and let's create our client.
// @/sub/main.go
package main
import "github.com/go-redis/redis/v8"
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// ...
Now let's go back to creating the same User struct:
// @/sub/main.go
package main
import "github.com/go-redis/redis/v8"
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// ...
As we are now working on our sub, we are working on our receiver. This way we have to create a subscriber using the redisClient.Subscribe()
function. In this example we will only have two arguments, the first is the context and the second is the channel we want to subscribe.
// @/sub/main.go
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
subscriber := redisClient.Subscribe(ctx, "send-user-data")
// ...
}
Then let's create a for loop so that it prints each of the messages we receive via our subscriber. To receive each message we will use the function subscriber.ReceiveMessage()
which will have the context as its only argument.
// @/sub/main.go
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
subscriber := redisClient.Subscribe(ctx, "send-user-data")
user := User{}
for {
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
// ...
}
}
Since each message is a string, we'll have to convert it back to json. This way we will use the function json.Unmarshal()
which will have two arguments, the first will be a buffer (in this example I will create a byte array through the message) and the second argument is an interface (in our case it is the user).
// @/sub/main.go
package main
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
subscriber := redisClient.Subscribe(ctx, "send-user-data")
user := User{}
for {
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
if err := json.Unmarshal([]byte(msg.Payload), &user); err != nil {
panic(err)
}
// ...
}
}
Finally, just print the channel where each message comes from, as well as the user's json data (which is the message).
// @/sub/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func main() {
subscriber := redisClient.Subscribe(ctx, "send-user-data")
user := User{}
for {
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
if err := json.Unmarshal([]byte(msg.Payload), &user); err != nil {
panic(err)
}
fmt.Println("Received message from " + msg.Channel + " channel.")
fmt.Printf("%+v\n", user)
}
}
Now when testing our Api with a tool similar to Postman, you can send a json object in the request body with the same properties that are defined in our struct User.
Then you should have something similar to this on your terminal:
Conclusion
As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. 🧑🏻💻
Hope you have a great day! 🪗 ☺️
Top comments (0)