DEV Community

Lane Wagner
Lane Wagner

Posted on • Originally published at qvault.io on

Connecting To RabbitMQ In Golang

RabbitMQ is a message broker that’s great for pub-sub systems in a micro-services architecture. At my current day job, we use RabbitMQ in our data ingest pipeline. All the services are written in Go, and they all push data through hundreds of RabbitMQ queues. Let’s take a look at how to efficiently publish and subscribe to Rabbit, while GO ing as fast as possible.

Brief Overview On Rabbit

The two main entities to be aware of with Rabbit are routing keys and queues. A service publishes a message (JSON in our case) to a routing key. RabbitMQ then copies that message into each queue that is subscribed to that routing key.

The subscribing service (the consumer) can pull messages off of a queue one at a time. It is worthwhile to note that a queue can also receive messages from multiple routing keys, but we won’t be diving into that here.

Connecting With Go

First things first, there is no reason to reinvent the wheel. We will use the amqp package provided by streadway to handle the nitty gritty of the connection details.

In most of our projects we build a small rabbit package in the internal folder of the project. It exposes only the rabbit functionality that our project cares about.

// Conn -
type Conn struct {
    Channel *amqp.Channel
}

// GetConn -
func GetConn(rabbitURL string) (Conn, error) {
    conn, err := amqp.Dial(rabbitURL)
    if err != nil {
        return Conn{}, err
    }

    ch, err := conn.Channel()
    return Conn{
        Channel: ch,
    }, err
}
Enter fullscreen mode Exit fullscreen mode

The “Conn” struct will simply hold a connection to rabbit. We also expose a method to get a new connection using just a connection URI. For example, amqp://username:password@localhost

Publishing

Publishing as quite easy, and is thread-safe. We expose a function that publishes using the connection, the application just provides the routing key and a payload:

// Publish -
func (conn Conn) Publish(routingKey string, data []byte) error {
    return conn.Channel.Publish(
        // exchange - yours may be different
        "events",
        routingKey,
        // mandatory - we don't care if there I no queue
        false,
        // immediate - we don't care if there is no consumer on the queue
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body: data,
            DeliveryMode: amqp.Persistent,
        })
}
Enter fullscreen mode Exit fullscreen mode

A couple of things to note here.

One purpose of this package we are building for our app is to set the defaults for the more powerful AMQP package and control which functionality is exposed to our app. For example, we know that our app will always use the “events” exchange, and we know that we don’t want the mandatory or immediate flags set for our use case.

Consuming

Consuming is a bit trickier than publishing. We use a simple pattern here where we have the app supply a handler function , a queue , the routing key that the queue bind to, and how many goroutines the handler should be ran in concurrently.

// StartConsumer -
func (conn Conn) StartConsumer(
    queueName,
    routingKey string,
    handler func(d amqp.Delivery) bool,
    concurrency int) error {

    // create the queue if it doesn't already exist
    _, err := conn.Channel.QueueDeclare(queueName, true, false, false, false, nil)
    if err != nil {
        return err
    }

    // bind the queue to the routing key
    err = conn.Channel.QueueBind(queueName, routingKey, "events", false, nil)
    if err != nil {
        return err
    }

    // prefetch 4x as many messages as we can handle at once
    prefetchCount := concurrency * 4
    err = conn.Channel.Qos(prefetchCount, 0, false)
    if err != nil {
        return err
    }

    msgs, err := conn.Channel.Consume(
        queueName, // queue
        "", // consumer
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil, // args
    )
    if err != nil {
        return err
    }

    // create a goroutine for the number of concurrent threads requested
    for i := 0; i < concurrency; i++ {
        fmt.Printf("Processing messages on thread %v...\n", i)
        go func() {
            for msg := range msgs {
                // if tha handler returns true then ACK, else NACK
                // the message back into the rabbit queue for
                // another round of processing
                if handler(msg) {
                    msg.Ack(false)
                } else {
                    msg.Nack(false, true)
                }
            }
            fmt.Println("Rabbit consumer closed - critical Error")
            os.Exit(1)
        }()
    }
    return nil
}

Enter fullscreen mode Exit fullscreen mode

Noteworthy items:

If you are concerned with speed then don’t be afraid to run with a concurrency of at least 100. Assuming your handler is written in a thread-safe way, this is a good way to ensure that your app uses all of its available CPU without being bottlenecked by I/O.

If your app’s handler is very fast (perhaps no network or disk involved) you may need to change the prefetch multiplier from 4 to something higher. The prefetch count tells the Rabbit connection how many messages to go get from the server at a time. The higher the number, the less time waiting on network calls for each message.

Our programs are ephemeral – we don’t mind if they just restart from time to time when bad things happen. For this reason if the rabbit consumer fails for any reason we use the os.Exit(1) command. Our logs pick it up and we just restart. If this doesn’t work for your use case you may want to handle that more elegantly.

Testing The Package

func main() {
    conn, err := rabbit.GetConn("amqp://guest:guest@localhost")
    if err != nil {
        panic(err)
    }

    go func() {
        for {
            time.Sleep(time.Second)
            conn.Publish("test-key", []byte(`{"message":"test"}`))
        }
    }()

    err = conn.StartConsumer("test-queue", "test-key", handler, 2)

    if err != nil {
        panic(err)
    }

    forever := make(chan bool)
    <-forever
}

func handler(d amqp.Delivery) bool {
    if d.Body == nil {
        fmt.Println("Error, no message body!")
        return false
    }
    fmt.Println(string(d.Body))
    return true
}
Enter fullscreen mode Exit fullscreen mode

Thanks For Reading

Hit me up on twitter @wagslane if you have any questions or comments.

Follow me on Dev.to: wagslane

The post Connecting To RabbitMQ In Golang appeared first on Qvault.

Top comments (1)

Collapse
 
rngallen profile image
Luqman Jr

How do you manage connection reconnecting?