In a distributed system, maintaining a stable connection between components is crucial for ensuring reliable communication. This article explores strategies for reconnecting to RabbitMQ, ensuring that your applications can recover from network disruptions or broker restarts without losing valuable data.
Setting Up
Importing Dependencies
import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)
We use amqp091-go as a connection for RabbitMQ
Preparing option
type Config struct {
    username string
    password string
    host     string
    port     int
}
type Container struct {
    connection *amqp.Connection
}
type Channel struct {
    name       string
    kind       string
    durable    bool
    autoDelete bool
    internal   bool
    noWait     bool
    args       amqp.Table
}
- 
Configholds connection configuration. - 
Containerholds the connection to RabbitMQ. In each application, they should have a wrapper for holding the connection. - 
Channelholds channel configuration. 
Connection Creation
// dial 
// create connection and register channel
func dial(config Config, channels []Channel) (conn *amqp.Connection, err error) {
    // create connection
    conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d", config.username, config.password, config.host, config.port))
    if err != nil {
        return
    }
    log.Print("rabbitmq connect success")
    // register channel
    channel, err := conn.Channel()
    for _, c := range channels {
        err = channel.ExchangeDeclare(c.name, c.kind, c.durable, c.autoDelete, c.internal, c.noWait, c.args)
        if err != nil {
            return
        }
    }
    log.Print("rabbitmq channels register success")
    return
}
We create a Go function dial to assist in establishing a connection to RabbitMQ. It takes in a Config struct containing connection details like username, password, host, and port, and a slice of Channel structs representing different channels to be registered with the server.
Connection Status
func check(container *Container) {
    for {
        time.Sleep(time.Duration(1) * time.Second)
        log.Printf("rabbitmq connected: %v", !container.connection.IsClosed())
    }
}
We create a Go function check to check and print connection status periodically, to have visibility of the connection status.
Reconnection Code
Below is the full code containing reconnection code. We will walk through step by step to understand how reconnection works in Go with RabbitMQ.
func reconnect(container *Container, config Config, channels []Channel) {
    go func() {
        for {
            reason, ok := <-container.connection.NotifyClose(make(chan *amqp.Error))
            if !ok {
                log.Print("rabbitmq connection closed")
                break
            }
            log.Printf("rabbitmq connection closed unexpectedly, reason: %v", reason)
            for {
                time.Sleep(time.Duration(1) * time.Second)
                connection, err := dial(config, channels)
                if err == nil {
                    container.connection = connection
                    log.Print("rabbitmq reconnect success")
                    break
                }
                log.Printf("rabbitmq reconnect failed, err: %v", err)
            }
        }
    }()
}
This function is a reconnect loop for a RabbitMQ client, and since reconnection creates a brand-new connection, the reconnect function is required to have all the values like the initial connection.
- It starts a goroutine to prevent blocking that listens for the closure of the connection.
 - When the connection is closed, it enters a loop to attempt reconnection.
 - It uses a delay of 1 second (
time.Sleep(time.Duration(1) * time.Second)) between reconnection attempts. - It attempts to reconnect by calling the 
dialfunction with the provided configuration and channels. - If the reconnection is successful, it updates the 
container.connectionwith the new connection. - If the reconnection fails, it logs the error and continues the loop.
 
Stitching Together
func main() {
    config := Config{
        username: "guest",
        password: "guest",
        host:     "localhost",
        port:     5672,
    }
    channels := []Channel{
        {
            name:       "reconnection-exchange",
            kind:       "direct",
            durable:    true,
            autoDelete: false,
            internal:   false,
            noWait:     false,
            args:       nil,
        },
    }
    container := &Container{}
    connection, err := dial(config, channels)
    if err != nil {
        log.Fatal(err)
        return
    }
    container.connection = connection
    go check(container)
    defer func() {
        if err := container.connection.Close(); err != nil {
            log.Print(err)
        }
        log.Print("rabbitmq connection closed")
    }()
    reconnect(container, config, channels)
    {
        quit := make(chan os.Signal, 1)
        signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
        <-quit
    }
}
Now we combine all code together in main function.
Testing
Running
Before running the code, make sure RabbitMQ is running and accessible. Upon running the code, you should see a message like the one below.
rabbitmq connect success
rabbitmq channels register success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
...
This message indicates that the RabbitMQ connection is connected normally.
Disconnection & Down
After running and connected normally, you can emulate disconnection & server down by manually shutting down the RabbitMQ server. After the server stops, you should see a message like the one below.
rabbitmq connection closed unexpectedly, reason: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
...
This message indicates disconnection from the RabbitMQ connection, and you should set up an alert for this event.
Reconnection
After disconnection, you can emulate reconnection by running the RabbitMQ server again. After the server is up and ready, and the reconnection mechanism is successful, you should see a message like the one below.
rabbitmq connect success
rabbitmq channels register success
rabbitmq reconnect success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
Conclusion
In conclusion, the article demonstrates a robust strategy for handling RabbitMQ reconnections in Go, ensuring reliable communication in distributed systems. By implementing a reconnection loop and monitoring the connection status, the application can gracefully recover from network disruptions or broker restarts. This approach enhances the resilience of the system, ensuring that important data is not lost due to temporary connection issues. Using the amqp091-go library, developers can easily implement these reconnection mechanisms, improving the overall reliability of their RabbitMQ-based applications.
    
Top comments (0)