loading...

Working With RabbitMQ in Golang

olushola_k profile image Olushola Karokatose ・8 min read

Introduction

Hi There! I am Olushola! I am a Backend Software Engineer. I recently picked up Golang and I have been really excited about its capabilities, mostly in terms of speed. In this post, I'll be setting up a simple Golang project with RabbitMQ.

What is RabbitMQ?

RabbitMQ is an Open Source Message Broker that facilitates communication between services using the Publish/Subscribe Pattern with AMQP.

If you are familiar with web programming, there's a good chance you've come across REST (Representational State Transfer), which is simply a way for different services to communicate and exchange information via HTTP (Hypertext Transfer Protocol) with the Request/Response model.

Another way for services to communicate with each other is through the Publish/Subscribe pattern using a message broker, like RabbitMQ. Publishers, well, publish messages and subscribers? you guessed right, subscribe to messages.

One beautiful thing about the Publish/Subscribe architecture is that publishers and subscribers are loosely coupled, meaning, publishers do not need to concern themselves with the subscribers and vice-versa, messages are typically topic-based.

GoLang Workspace Setup

To work with rabbitMQ in our Golang project, we'll need some basic setup.

To get started, we will download Golang on our development machine and setup our workspace.

  • Download GoLang here and install.
  • Verify the installation by running go version in your terminal; you should get a response with your version of Golang.
➜ go version
go version go1.12.7 darwin/amd64

Go is pretty opinionated about the structure of our workspace but setting it up is pretty straight forward.

  • First, run go env in your terminal and find the variable named GOPATH; it shows where your go installation folder is located.
➜ go env
...
GOPATH="/Users/olushola/go"
...
  • Navigate to the directory and create two subfolders, src and bin.
  • Within the src directory, create a subfolder with your version control software of choice; I'll be using GitHub, so I'll create a folder called GitHub.com (for bitbucket, gitlab use bitbucket.com, gitlab.com respectively).
  • Navigate into the GitHub.com directory and create a subfolder with your version control software username, for me its shola-0507. (We will create our projects within this directory.)
  • For this tutorial, we'll create a directory named go_rabbit.

Our Go Workspace structure should look like this.

-go
    -src
        -Github.com
            -shola-0507
                -go_rabbit
    -bin

RabbitMQ Setup with Docker

We can setup rabbitMQ in our development environment in a couple ways; in this tutorial we'll be using docker. If you don't have docker installed, you can create an account and download it here.

In your terminal, run

➜ docker run --detach --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

This will create a docker container with a rabbitMQ instance.

In your browser, navigate to localhost:15672, you should see the RabbitMQ UI; login with guest as the username and password.

RabbitMQ User Interface

Connect and Publish Messages with RabbitMQ

Now that we have our project setup and rabbitMQ running, we'll be connecting to rabbitMQ from our GoLang project.

To perform the connection, we'll need the GitHub.com/streadway/amqp library installed. To do that, run go get -u GitHub.com/streadway/amqp in your terminal.

➜ go get -u GitHub.com/streadway/amqp

In the go_rabbit directory, create a main.go file, this is where we will put all the Golang code.

First, import the required dependencies for our project, including the amqp library, and define a main function.

package main

import (
    "fmt"
    "os"

    "github.com/streadway/amqp"
)

func main() {}

Next, we'll connect to rabbitmq broker using the amqp.Dail() method which returns a new TCP connection and an error- which will be nil if the connection is successful.

func main() {
    // Get the connection string from the environment variable
    url := os.Getenv("AMQP_URL")

    //If it doesn't exist, use the default connection string.

    if url == "" {
        //Don't do this in production, this is for testing purposes only.
        url = "amqp://guest:guest@localhost:5672"
    }

    // Connect to the rabbitMQ instance
    connection, err := amqp.Dial(url)

    if err != nil {
        panic("could not establish connection with RabbitMQ:" + err.Error())
    }
}

In AMQP, it isn't good practice to perform operations directly on the connection. This is because some applications require many simultaneous connections to the broker, and keeping many TCP connections open at the same time consumes system resources and makes setting up firewalls difficult.

Instead, we use channels. Channels are lightweight connections that share a single TCP connection.

To create a channel, we call the Channel method on the connection.

func main() {
    ...

    // Create a channel from the connection. We'll use channels to access the data in the queue rather than the connection itself.
    channel, err := connection.Channel()

    if err != nil {
        panic("could not open RabbitMQ channel:" + err.Error())
    }
}

Next, we declare a topic exchange named events using the ExchangeDeclare method which we'll use to publish messages.

func main() {
    ...

    // We create an exchange that will bind to the queue to send and receive messages
    err = channel.ExchangeDeclare("events", "topic", true, false, false, false, nil)

    if err != nil {
        panic(err)
    }
}

We then declare a queue named test, bind the test queue to the events exchange we declared earlier, and publish data to test via events.

The data to be sent to the exchange has to implement the amqp.Publishing struct.

func main() {
    ...

    // We create a message to be sent to the queue. 
    // It has to be an instance of the aqmp publishing struct
    message := amqp.Publishing{
        Body: []byte("Hello World"),
    }

    // We publish the message to the exahange we created earlier
    err = channel.Publish("events", "random-key", false, false, message)

    if err != nil {
        panic("error publishing a message to the queue:" + err.Error())
    }

    // We create a queue named Test
    _, err = channel.QueueDeclare("test", true, false, false, false, nil)

    if err != nil {
        panic("error declaring the queue: " + err.Error())
    }

    // We bind the queue to the exchange to send and receive data from the queue
    err = channel.QueueBind("test", "#", "events", false, nil)

    if err != nil {
        panic("error binding to the queue: " + err.Error())
    }
}

We have successfully setup our publisher. At this point, our main.go file should look like this.

package main

import (
    "fmt"
    "os"

    "github.com/streadway/amqp"
)

func main() {
    // Get the connection string from the environment variable
    url := os.Getenv("AMQP_URL")

    //If it doesnt exist, use the default connection string
    if url == "" {
        url = "amqp://guest:guest@localhost:5672"
    }

    // Connect to the rabbitMQ instance
    connection, err := amqp.Dial(url)

    if err != nil {
        panic("could not establish connection with RabbitMQ:" + err.Error())
    }

    // Create a channel from the connection. We'll use channels to access the data in the queue rather than the
    // connection itself
    channel, err := connection.Channel()

    if err != nil {
        panic("could not open RabbitMQ channel:" + err.Error())
    }

    // We create an exahange that will bind to the queue to send and receive messages
    err = channel.ExchangeDeclare("events", "topic", true, false, false, false, nil)

    if err != nil {
        panic(err)
    }

    // We create a message to be sent to the queue.
    // It has to be an instance of the aqmp publishing struct
    message := amqp.Publishing{
        Body: []byte("Hello World"),
    }

    // We publish the message to the exahange we created earlier
    err = channel.Publish("events", "random-key", false, false, message)

    if err != nil {
        panic("error publishing a message to the queue:" + err.Error())
    }

    // We create a queue named Test
    _, err = channel.QueueDeclare("test", true, false, false, false, nil)

    if err != nil {
        panic("error declaring the queue: " + err.Error())
    }

    // We bind the queue to the exchange to send and receive data from the queue
    err = channel.QueueBind("test", "#", "events", false, nil)

    if err != nil {
        panic("error binding to the queue: " + err.Error())
    }
}

We can test our code by running go run main.go within the project directory.

➜ go run main.go

If everything works as expected, we should have a test queue containing a Hello, World message.

Test Queue With Published Data

Consume the Message in a Queue

Now that we have data in test, we'll consume the data in the queue and log it to the console.

In our main.go file, we'll use the consume method get the messages (data) in test, loop through the messages, print the message body to the console and acknowledge the messages.

func main() {
    ...

    // We consume data in the queue named test using the channel we created in go.
    msgs, err := channel.Consume("test", "", false, false, false, false, nil)

    if err != nil {
        panic("error consuming the queue: " + err.Error())
    }

    // We loop through the messages in the queue and print them to the console.
    // The msgs will be a go channel, not an amqp channel
    for msg := range msgs {
    //print the message to the console
        fmt.Println("message received: " + string(msg.Body))
    // Acknowledge that we have received the message so it can be removed from the queue
        msg.Ack(false)
    }

    // We close the connection after the operation has completed.
    defer connection.Close()
}

Now the main.go file should look like this.

package main

import (
    "fmt"
    "os"

    "github.com/streadway/amqp"
)

func main() {
    // Get the connection string from the environment variable
    url := os.Getenv("AMQP_URL")

    //If it doesnt exist, use the default connection string
    if url == "" {
        url = "amqp://guest:guest@localhost:5672"
    }

    // Connect to the rabbitMQ instance
    connection, err := amqp.Dial(url)

    if err != nil {
        panic("could not establish connection with RabbitMQ:" + err.Error())
    }

    // Create a channel from the connection. We'll use channels to access the data in the queue rather than the
    // connection itself
    channel, err := connection.Channel()

    if err != nil {
        panic("could not open RabbitMQ channel:" + err.Error())
    }

    // We create an exahange that will bind to the queue to send and receive messages
    err = channel.ExchangeDeclare("events", "topic", true, false, false, false, nil)

    if err != nil {
        panic(err)
    }

    // We create a message to be sent to the queue.
    // It has to be an instance of the aqmp publishing struct
    message := amqp.Publishing{
        Body: []byte("Hello World"),
    }

    // We publish the message to the exahange we created earlier
    err = channel.Publish("events", "random-key", false, false, message)

    if err != nil {
        panic("error publishing a message to the queue:" + err.Error())
    }

    // We create a queue named Test
    _, err = channel.QueueDeclare("test", true, false, false, false, nil)

    if err != nil {
        panic("error declaring the queue: " + err.Error())
    }

    // We bind the queue to the exchange to send and receive data from the queue
    err = channel.QueueBind("test", "#", "events", false, nil)

    if err != nil {
        panic("error binding to the queue: " + err.Error())
    }

    // We consume data from the queue named Test using the channel we created in go.
    msgs, err := channel.Consume("test", "", false, false, false, false, nil)

    if err != nil {
        panic("error consuming the queue: " + err.Error())
    }

    // We loop through the messages in the queue and print them in the console.
    // The msgs will be a go channel, not an amqp channel
    for msg := range msgs {
        fmt.Println("message received: " + string(msg.Body))
        msg.Ack(false)
    }

    // We close the connection after the operation has completed.
    defer connection.Close()
}

With all that set up, we can test our code by running go run main.go within the project directory, and expect a "message received: Hello World" response in the console.

➜ go run main.go
message received: Hello World

We now have a working Golang project that communicates with RabbitMQ!

Thanks for reading!

Posted on by:

olushola_k profile

Olushola Karokatose

@olushola_k

I'ma a backend Software Developer at Paystack

Discussion

pic
Editor guide
 

I tried using amqp by itself and lost interest when I realised the amount of boilerplate involved for less than trivial implementations.

Below someone mentioned fluent-amqp as their own library. I went with cony: github.com/assembla/cony

Seems to be the most popular one at the moment.

 

Nice explanation! For a big application you will probably meet another problem: recovery/redeclare all channels, bindings and queues after reconnect.
I can offer github.com/reddec/fluent-amqp . I made it to provide more flexible and clean way to interact with amqp. It's just a wrapper of library in the article

 

Thanks! I'll be sure to check the library out!

 

Hey I've been using cony, which I'm sure you're aware of, but it's really good to see signing and verification upfront in the README! Good stuff.

 

Great article with examples 👏

 
 

great post!
Thanks! looking forward for more go-channels tutorials...

 

Great article! Thank you. The whole CODE from this article: github.com/shola-0507/golang-rabbi...

 

Good intro to rabbit in go. To avoid boilerplate code and complexity of handling errors I use go-mq package: github.com/cheshir/go-mq/blob/mast...

 

Great article . Would have been even better if there were diagrams and a more iterative example of the consumer.