DEV Community

Igor Diniz
Igor Diniz

Posted on

12 2 2 1 1

Como utilizar o Ack do RabbitMQ de forma eficiente

A Motivação por Trás Deste Artigo

Recentemente, participei de um debate no ambiente de trabalho sobre como lidar com o reconhecimento da leitura de mensagens no RabbitMQ.

Alguns colegas argumentaram que baseados em experiências anteriores a melhor abordagem para essa aplicação seria usar o auto-ack. Eles relataram que, ao utilizar o ack manual, um erro inesperado ocorreu durante o processamento da mensagem, resultando na sua volta para a fila e em seu reprocessamento várias vezes.

Inspirado por essa discussão e por este post do blog de Luiz Carlos Faria, decidi escrever sobre por que provavelmente esse comportamento ocorreu e qual, na minha visão, seria a melhor maneira de lidar com o reconhecimento de leitura dessas mensagens e o tratamento adequado em caso de erros.

Para chegar ao assunto principal, preciso explicar o que é, e o que o RabbitMQ nos oferece

O RabbitMQ é um poderoso message broker que facilita a comunicação entre diferentes partes de um software. O RabbitMQ oferece diversos recursos para lidar com a publicação e o recebimento de mensagens. Os principais recursos abordados neste artigo são:

  • Queues
  • Exchanges
  • Acknowledgement
  • Dead Letter Exchange

Queues:

A principal função de uma fila é evitar a execução imediata de uma tarefa que consome muitos recursos e evitar a necessidade de esperar que ela seja concluída.

No modelo de mensagens do RabbitMQ, o produtor (producer) nunca envia uma mensagem diretamente para uma fila.

Para uma mensagem chegar a uma fila no RabbitMQ, ela precisa passar por um recurso chamado Exchange. Mesmo ao enviar uma mensagem para uma fila sem definir uma exchange, como mostrado no exemplo abaixo, a mensagem passará por uma exchange conhecida como default exchange.

package main

func main() {
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()
    queue, _ := ch.QueueDeclare(
        "TransactionCompleted",
        true,
        false,
        false,
        false,
        nil,
    )
    message := message{
        ID: "1",
    }
    payload, _ := json.Marshal(message)
    ch.Publish(
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(payload),
        },
    )
}
Enter fullscreen mode Exit fullscreen mode

Exchanges:

As exchanges são um recurso que tem como propósito gerenciar e direcionar as mensagens publicadas no RabbitMQ.

Existem alguns tipos de exchanges disponíveis: direct, topic, headers e fanout. Caso queira saber mais, este artigo pode ajudar.

declare

ch.ExchangeDeclare(
    "Transaction",
    "topic",
    true,
    false,
    false,
    false,
    nil,
)
queue, _ := ch.QueueDeclare(
    "ProcessTransaction",
    true,
    false,
    false,
    false,
    nil,
)
eventName := "out.requested"
ch.QueueBind(
    queue.Name, // name
    eventName, // key
    "Transaction", // exchange
    false, // noWait
    nil, // args
)
Enter fullscreen mode Exit fullscreen mode

publish

func (r *RabbitMQAdapter) Publish(eventName string, data any) error {
    payload, err := json.Marshal(data)
    if err != nil {
        return err
    }
    return r.channel.Publish(
        r.exchange, // exchange
        eventName,  // key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(payload),
        }, //  msg
    )
}

func main() {
    // ...
    rabbitmq := RabbitMQAdapter{
        connection: conn,
        channel:    ch,
    }
    message := message{
        ID: "1",
    }
    rabbitmq.Publish(eventName, message)
}

Enter fullscreen mode Exit fullscreen mode

consume

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
    msgCh, _ := r.channel.Consume(
        queueName,
        "",    // consumer
        true,  // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    for msg := range msgCh {
        callback(msg.Body)
    }
    return nil
}

func main() {
    // ...
    rabbitmq.Consume("ProcessTransaction", func(data any) error {
        // process transaction ...
        return nil
    })
}
Enter fullscreen mode Exit fullscreen mode

Acknowledgement:

Quando o RabbitMQ entrega uma mensagem a um consumidor, ele precisa saber quando considerar que a mensagem foi enviada com sucesso. O RabbitMQ nos oferece duas formas de lidar com isso: auto-ack e ack manual.

No auto-ack, o RabbitMQ descarta a mensagem assim que ela é consumida.

No ack manual, temos o controle de quando emitir essa confirmação e se ela é positiva ou negativa.

exemplo de ack manual

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
    msgCh, _ := r.channel.Consume(
        queueName,
        "",    // consumer
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    for msg := range msgCh {
        if err := callback(msg.Body); err != nil {
            msg.Nack(
                false, // multiple
                true, // requeue
            )
        } else {
            msg.Ack(
                false, // multiple
            )
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Dead Letter Exchange:

As mensagens de uma fila podem ser "dead-lettered", o que significa que essas mensagens são republicadas em uma exchange quando qualquer um dos quatro eventos a seguir ocorrer:

  • A mensagem é reconhecida negativamente por um consumidor usando basic.reject ou basic.nack com requeue definido como false.
  • A mensagem expira devido ao TTL (tempo de vida) por mensagem.
  • A mensagem é descartada porque sua fila excedeu um limite de comprimento.
  • A mensagem é retornada mais vezes para uma fila quorum do que o limite de entrega.

Como evitar o reprocessamento de mensagens

Agora que entendemos os recursos oferecidos pelo RabbitMQ, vamos analisar o possível motivo do comportamento mencionado no debate e como isso poderia ter sido evitado.

É muito provável que a opção de requeue ao chamar o método nack ou reject estivesse ativa.

msg.Nack(
    false, // multiple
    true, // requeue
)
// ou...
msg.Reject(
    true, // requeue
)
Enter fullscreen mode Exit fullscreen mode

Tendo isso em vista, para evitar o reprocessamento da mensagem, poderíamos simplesmente colocar a flag de requeue da mensagem como false.

msg.Nack(
    false, // multiple
    false, // requeue
)
// ou...
msg.Reject(
    false, // requeue
)
Enter fullscreen mode Exit fullscreen mode

Dessa forma, a mensagem não voltaria para a fila e seria descartada, assim como acontece no auto-ack.

Como utilizar o ack manual de forma eficiente

Por padrão, ao utilizar Nack ou Reject, a mensagem é perdida, porém, nem sempre é o que queremos. Para esse caso, o RabbitMQ nos oferece a Dead Letter Exchange. Tendo esse recurso ativo em uma fila, ao rejeitarmos uma mensagem, ela é encaminhada para essa exchange, que por sua vez direciona a mensagem para os interessados.

dlxName := "ProcessingError"
ch.ExchangeDeclare(
    dlxName,  // name
    "fanout", // kind
    true,     // durable
    false,    // autoDelete
    false,    // internal
    false,    // noWait
    nil,      // args
)
dlq, _ := ch.QueueDeclare(
    "NotifyCustomer",
    true,
    false,
    false,
    false,
    nil,
)
ch.QueueBind(dlq.Name, "", dlxName, false, nil)
queue, _ := ch.QueueDeclare(
    "ProcessTransaction",
    true,
    false,
    false,
    false,
    amqp.Table{
        "x-dead-letter-exchange": dlxName,
    },
)
Enter fullscreen mode Exit fullscreen mode

Conclusão

Com os recursos apresentados acima, o problema inicialmente discutido neste artigo, que é o reprocessamento indevido das mensagens, pode ser solucionado.

O ack manual com a opção requeue desativada, juntamente com a Dead Letter Exchange, formam uma poderosa combinação para lidar com erros na sua aplicação.

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (1)

Collapse
 
ahosall profile image
Feh's

👏👏

Heroku

Simplify your DevOps and maximize your time.

Since 2007, Heroku has been the go-to platform for developers as it monitors uptime, performance, and infrastructure concerns, allowing you to focus on writing code.

Learn More

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay