DEV Community

Cover image for Creating a RabbitMQ PubSub system with Dapr
Sergio Méndez
Sergio Méndez

Posted on

Creating a RabbitMQ PubSub system with Dapr

Hi, readers community, to continue with this series of blog posts, we are going to learn about how to use Dapr. Darp gives you a set of tools to write a event-driven runtime focusing on building resilient, stateless, and stateful applications that run on the cloud and edge. Also supports a diversity of languages and developer frameworks. Thats more or less what Darp is based in the official documentation. So we can see it as a set of building blocks to build your event-driven system. The idea in this blog post, is to give a quick tutorial about, how you can use Dapr to write a Publisher Subscriber system really fast and simple, without writing a lot of code. So Dapr, can speed your workday. This blog post will focus on:

  • Create a PubSub System using Dapr SDK and RabbitMQ

What you will learn

In this blog post you will learn how to:

  • Install Dapr in your cluster
  • Install RabbitMQ to manage messages with Dapr
  • Create a Publisher using the Golang Dapr SDK
  • Create a Subscriber using the Golang Dapr SDK

These are the topics that you will learn in this blog post.

Requirements

  • A Kubernetes cluster
  • Access to the cluster using kubectl
  • Internet Access

So let's get started with this tutorial. :)

Install Dapr in your cluster:

1. Install the DAPR CLI with the following command:

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
Enter fullscreen mode Exit fullscreen mode

2. Verify that the CLI was installed:

dapr -h
Enter fullscreen mode Exit fullscreen mode

3. Install Dapr in Kubernetes

dapr init --kubernetes
dapr init --slim # to install without default configurations
Enter fullscreen mode Exit fullscreen mode

4. Verify the installation:

dapr status -k
Enter fullscreen mode Exit fullscreen mode

5. Access the Dashboard with the following command

kubectl port-forward svc/dapr-dashboard -n dapr-system 8080:8080 --address 0.0.0.0
Enter fullscreen mode Exit fullscreen mode

6. Access the Dashboard in http://localhost:8080

Install RabbitMQ to manage messages with Dapr

1. Install the RabbitMQ Operator:

kubectl apply -f "https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml"
Enter fullscreen mode Exit fullscreen mode

2. Create a cluster with the following YAML:

echo "apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: mycluster" | kubectl apply -f -
Enter fullscreen mode Exit fullscreen mode

Note: If you want labels you can use something like this:

echo "apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  labels:
    app: rabbitmq
  annotations:
    some: annotation
  name: mycluster" | kubectl apply -f -
Enter fullscreen mode Exit fullscreen mode

3. Get the user password for this RabbitMQ instance:

#To get the user
export USERNAME=$(kubectl -n default get secret mycluster-default-user -o jsonpath="{.data.username}" | base64 --decode)

#To get the password
export PASSWORD=$(kubectl -n default get secret mycluster-default-user -o jsonpath="{.data.password}" | base64 --decode)
Enter fullscreen mode Exit fullscreen mode

4. Now create the component to connect RabbitMQ to the Dapr installation using the previos USERNAME and PASSWORD:

echo 'apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: message-pub-sub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: connectionString
    value: "amqp://'$USERNAME:$PASSWORD@'mycluster:5672"
  - name: protocol
    value: amqp  
  - name: username
    value: '$USERNAME'
  - name: password
    value: '$PASSWORD'
  - name: hostname
    value: mycluster
  - name: durable
    value: "true"
  - name: maxLen
    value: 20
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - publisher
  - subscriber' | kubectl apply -f -
Enter fullscreen mode Exit fullscreen mode

Create a Publisher using the Golang Dapr SDK

Now its time to create the publisher code and create a deployment with this code.
1. Create a container with the following code:

package main

import (
    "net/http"   
    "context"
    "os"
    "log"
    "github.com/gin-gonic/gin"
    dapr "github.com/dapr/go-sdk/client"
)

type Message struct {
  Data     string    `json:"data"`
}

var (
    AppPort = getEnv("APP_PORT", "3500")
    PubsubName   = getEnv("PUBSUB_NAME", "mypubsub")
    TopicName    = getEnv("TOPIC_NAME", "mytopic")
)

func getEnv(key, fallback string) string {
    if value, ok := os.LookupEnv(key); ok {
        return value
    }
    return fallback
}

func publishHandler(c *gin.Context) {
    var message Message

    if err := c.ShouldBind(&message); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    log.Println("Published message with data: " + message.Data)
    PublishEvent(message.Data)

    c.JSON(http.StatusOK, gin.H{
        "message_data":     message.Data,
    })
}

func PublishEvent(data string) error {
    client, err := dapr.NewClient()
    if err != nil {
        panic(err)
    }
    defer client.Close()
    ctx := context.Background()
    //Using Dapr SDK to publish a topic
    if err := client.PublishEvent(ctx, PubsubName, TopicName, []byte(data)); err != nil {
        panic(err)
    }

    log.Println("Published %s to topic %s\n", data, TopicName)
    return nil
}

func main() {
    router := gin.Default()
    gin.SetMode(gin.DebugMode)
    router.Use(gin.Logger())
    router.POST("/publish", publishHandler)
    // Listen and serve on 0.0.0.0:<AppPort>
    router.Run(":"+AppPort)
    log.Println("Running Publisher Service")
}
Enter fullscreen mode Exit fullscreen mode

2. Now create the deployment using the container that contains the previous code and applying the following YAML file:

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: publisher
  name: publisher
spec:
  replicas: 1
  selector:
    matchLabels:
      app: publisher
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: publisher
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "publisher"
        dapr.io/app-port: "4500"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
      - image: <REGISTRY_USER>/<IMAGE>:TAG
        name: dapr-go-publisher
        ports:
        - containerPort: 4500
        env:
        - name: APP_PORT
          value: "4500"
        - name: PUBSUB_NAME
          value: "message-pub-sub"
        - name: TOPIC_NAME
          value: "messages"
        resources: {}
status: {}
---
kind: Service
apiVersion: v1
metadata:
  name: publisher
  labels:
    app: publisher
spec:
  selector:
    app: publisher
  ports:
  - protocol: TCP
    port: 4500
    targetPort: 4500
  type: ClusterIP
Enter fullscreen mode Exit fullscreen mode

Note: This code creates the deployment and the service need to run the publisher. Take a look that we use the following annotations:

      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "publisher"
        dapr.io/app-port: "4500"
        dapr.io/enable-api-logging: "true"
Enter fullscreen mode Exit fullscreen mode

These annotations enable the integration of Dapr in your deployment without coding anything. Now your Publisher is ready and running. Let's continue with the Subscriber deployment.

Create a Subscriber using the Golang Dapr SDK

In this part you are going to run the subscriber. We also use the Dapr SDK to create a subscription using code and the SDK and not a declarative YAML file.
1. Create the subscriber container using the following code:

package main

import (
    "log"
    "net/http"
    "context"
    "os"
    "github.com/dapr/go-sdk/service/common"
    daprd "github.com/dapr/go-sdk/service/http"
)


var sub = &common.Subscription{
    PubsubName: getEnv("PUBSUB_NAME", "mypubsub"),
    Topic:      getEnv("TOPIC_NAME", "mytopic"),
    Route:      "/subscriber",
}

func getEnv(key, fallback string) string {
    if value, ok := os.LookupEnv(key); ok {
        return value
    }
    return fallback
}

func main() {
    s := daprd.NewService(":6002")
   //Subscribe to a topic
    if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
        log.Fatalf("error adding topic subscription: %v", err)
    }
    log.Printf("Subscriber listening")
    if err := s.Start(); err != nil && err != http.ErrServerClosed {
        log.Fatalf("error listenning: %v", err)
    }
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
    log.Printf("Subscriber message received: %s", e.Data)
    return false, nil
}
Enter fullscreen mode Exit fullscreen mode

2. Now create a deployment using the following YAML file:

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: subscriber
  name: subscriber
spec:
  replicas: 1
  selector:
    matchLabels:
      app: subscriber
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: subscriber
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "subscriber"
        dapr.io/app-port: "6002"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
      - image: <REGISTRY_USER>/<IMAGE>:TAG
        name: dapr-go-subscriber
        ports:
        - containerPort: 6002
        env:
        - name: PUBSUB_NAME
          value: "message-pub-sub"
        - name: TOPIC_NAME
          value: "messages"
        resources: {}
status: {}
---
kind: Service
apiVersion: v1
metadata:
  name: subscriber
  labels:
    app: subscriber
spec:
  selector:
    app: subscriber
  ports:
  - protocol: TCP
    port: 80
    targetPort: 6002
  type: ClusterIP
Enter fullscreen mode Exit fullscreen mode

Note: This code create the subscriber you can modify the code to change the topic and the pubsub name.

Code Repository

If you can check the containers code, you can check the repo in the following link:

Conclusion about Dapr

Dapr is an interesting project that can speed your work when you are designing and implementing an event-driven system. Also support different languages that developer can use to implement this kind of systems. Dapr can simplify the adoption of different technologies and solve architecture design and implementation to implement a robust system for your needs. Also I think that there is some lack of documentation for Kubernetes, but if you can take time to play with it you can achieve robust and nice implementation. At the end of this post there are some links including ones in the official documentation that I used to create this blog post. I want to mention that my students in USAC University in Guatemala in the operating systems courses are going to use this tutorial, that's so cool. Maybe my next blog post will be related to microcks to continue with the journey of cloud native stuff.

See you in my next blog post.

Follow me

These are my social networks:

https://www.linkedin.com/in/sergioarmgpl
https://sergiops.xyz
https://x.com/sergioarmgpl
https://www.instagram.com/sergioarmgpl/

Please contribute to this awesome project:

Useful links:

Top comments (0)