DEV Community

dikac for Tentang Anak Tech Team

Posted on

Handling RabbitMQ Reconnections With Go and amqp091-go

In a distributed system, maintaining a stable connection between components is crucial for ensuring reliable communication. This article explores strategies for reconnecting to RabbitMQ, ensuring that your applications can recover from network disruptions or broker restarts without losing valuable data.

Setting Up

Importing Dependencies

import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)
Enter fullscreen mode Exit fullscreen mode

We use amqp091-go as a connection for RabbitMQ

Preparing option

type Config struct {
    username string
    password string
    host     string
    port     int
}

type Container struct {
    connection *amqp.Connection
}

type Channel struct {
    name       string
    kind       string
    durable    bool
    autoDelete bool
    internal   bool
    noWait     bool
    args       amqp.Table
}
Enter fullscreen mode Exit fullscreen mode
  • Config holds connection configuration.
  • Container holds the connection to RabbitMQ. In each application, they should have a wrapper for holding the connection.
  • Channel holds channel configuration.

Connection Creation

// dial 
// create connection and register channel
func dial(config Config, channels []Channel) (conn *amqp.Connection, err error) {

    // create connection
    conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d", config.username, config.password, config.host, config.port))
    if err != nil {
        return
    }
    log.Print("rabbitmq connect success")

    // register channel
    channel, err := conn.Channel()
    for _, c := range channels {
        err = channel.ExchangeDeclare(c.name, c.kind, c.durable, c.autoDelete, c.internal, c.noWait, c.args)
        if err != nil {
            return
        }
    }
    log.Print("rabbitmq channels register success")

    return
}
Enter fullscreen mode Exit fullscreen mode

We create a Go function dial to assist in establishing a connection to RabbitMQ. It takes in a Config struct containing connection details like username, password, host, and port, and a slice of Channel structs representing different channels to be registered with the server.

Connection Status

func check(container *Container) {
    for {
        time.Sleep(time.Duration(1) * time.Second)
        log.Printf("rabbitmq connected: %v", !container.connection.IsClosed())
    }
}
Enter fullscreen mode Exit fullscreen mode

We create a Go function check to check and print connection status periodically, to have visibility of the connection status.

Reconnection Code

Below is the full code containing reconnection code. We will walk through step by step to understand how reconnection works in Go with RabbitMQ.

func reconnect(container *Container, config Config, channels []Channel) {

    go func() {
        for {
            reason, ok := <-container.connection.NotifyClose(make(chan *amqp.Error))
            if !ok {
                log.Print("rabbitmq connection closed")
                break
            }
            log.Printf("rabbitmq connection closed unexpectedly, reason: %v", reason)

            for {

                time.Sleep(time.Duration(1) * time.Second)

                connection, err := dial(config, channels)

                if err == nil {
                    container.connection = connection
                    log.Print("rabbitmq reconnect success")
                    break
                }

                log.Printf("rabbitmq reconnect failed, err: %v", err)
            }

        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

This function is a reconnect loop for a RabbitMQ client, and since reconnection creates a brand-new connection, the reconnect function is required to have all the values like the initial connection.

  1. It starts a goroutine to prevent blocking that listens for the closure of the connection.
  2. When the connection is closed, it enters a loop to attempt reconnection.
  3. It uses a delay of 1 second (time.Sleep(time.Duration(1) * time.Second)) between reconnection attempts.
  4. It attempts to reconnect by calling the dial function with the provided configuration and channels.
  5. If the reconnection is successful, it updates the container.connection with the new connection.
  6. If the reconnection fails, it logs the error and continues the loop.

Stitching Together

func main() {

    config := Config{
        username: "guest",
        password: "guest",
        host:     "localhost",
        port:     5672,
    }

    channels := []Channel{
        {
            name:       "reconnection-exchange",
            kind:       "direct",
            durable:    true,
            autoDelete: false,
            internal:   false,
            noWait:     false,
            args:       nil,
        },
    }

    container := &Container{}

    connection, err := dial(config, channels)
    if err != nil {
        log.Fatal(err)
        return
    }
    container.connection = connection
    go check(container)

    defer func() {
        if err := container.connection.Close(); err != nil {
            log.Print(err)
        }
        log.Print("rabbitmq connection closed")

    }()

    reconnect(container, config, channels)

    {
        quit := make(chan os.Signal, 1)
        signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
        <-quit

    }
}
Enter fullscreen mode Exit fullscreen mode

Now we combine all code together in main function.

Testing

Running

Before running the code, make sure RabbitMQ is running and accessible. Upon running the code, you should see a message like the one below.

rabbitmq connect success
rabbitmq channels register success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
...
Enter fullscreen mode Exit fullscreen mode

This message indicates that the RabbitMQ connection is connected normally.

Disconnection & Down

After running and connected normally, you can emulate disconnection & server down by manually shutting down the RabbitMQ server. After the server stops, you should see a message like the one below.

rabbitmq connection closed unexpectedly, reason: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
...
Enter fullscreen mode Exit fullscreen mode

This message indicates disconnection from the RabbitMQ connection, and you should set up an alert for this event.

Reconnection

After disconnection, you can emulate reconnection by running the RabbitMQ server again. After the server is up and ready, and the reconnection mechanism is successful, you should see a message like the one below.

rabbitmq connect success
rabbitmq channels register success
rabbitmq reconnect success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
Enter fullscreen mode Exit fullscreen mode

Conclusion

In conclusion, the article demonstrates a robust strategy for handling RabbitMQ reconnections in Go, ensuring reliable communication in distributed systems. By implementing a reconnection loop and monitoring the connection status, the application can gracefully recover from network disruptions or broker restarts. This approach enhances the resilience of the system, ensuring that important data is not lost due to temporary connection issues. Using the amqp091-go library, developers can easily implement these reconnection mechanisms, improving the overall reliability of their RabbitMQ-based applications.

External Link

Top comments (0)