DEV Community

Igor Diniz
Igor Diniz

Posted on

Como utilizar o Ack do RabbitMQ de forma eficiente

A Motivação por Trás Deste Artigo

Recentemente, participei de um debate no ambiente de trabalho sobre como lidar com o reconhecimento da leitura de mensagens no RabbitMQ.

Alguns colegas argumentaram que baseados em experiências anteriores a melhor abordagem para essa aplicação seria usar o auto-ack. Eles relataram que, ao utilizar o ack manual, um erro inesperado ocorreu durante o processamento da mensagem, resultando na sua volta para a fila e em seu reprocessamento várias vezes.

Inspirado por essa discussão e por este post do blog de Luiz Carlos Faria, decidi escrever sobre por que provavelmente esse comportamento ocorreu e qual, na minha visão, seria a melhor maneira de lidar com o reconhecimento de leitura dessas mensagens e o tratamento adequado em caso de erros.

Para chegar ao assunto principal, preciso explicar o que é, e o que o RabbitMQ nos oferece

O RabbitMQ é um poderoso message broker que facilita a comunicação entre diferentes partes de um software. O RabbitMQ oferece diversos recursos para lidar com a publicação e o recebimento de mensagens. Os principais recursos abordados neste artigo são:

  • Queues
  • Exchanges
  • Acknowledgement
  • Dead Letter Exchange

Queues:

A principal função de uma fila é evitar a execução imediata de uma tarefa que consome muitos recursos e evitar a necessidade de esperar que ela seja concluída.

No modelo de mensagens do RabbitMQ, o produtor (producer) nunca envia uma mensagem diretamente para uma fila.

Para uma mensagem chegar a uma fila no RabbitMQ, ela precisa passar por um recurso chamado Exchange. Mesmo ao enviar uma mensagem para uma fila sem definir uma exchange, como mostrado no exemplo abaixo, a mensagem passará por uma exchange conhecida como default exchange.

package main

func main() {
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()
    queue, _ := ch.QueueDeclare(
        "TransactionCompleted",
        true,
        false,
        false,
        false,
        nil,
    )
    message := message{
        ID: "1",
    }
    payload, _ := json.Marshal(message)
    ch.Publish(
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(payload),
        },
    )
}
Enter fullscreen mode Exit fullscreen mode

Exchanges:

As exchanges são um recurso que tem como propósito gerenciar e direcionar as mensagens publicadas no RabbitMQ.

Existem alguns tipos de exchanges disponíveis: direct, topic, headers e fanout. Caso queira saber mais, este artigo pode ajudar.

declare

ch.ExchangeDeclare(
    "Transaction",
    "topic",
    true,
    false,
    false,
    false,
    nil,
)
queue, _ := ch.QueueDeclare(
    "ProcessTransaction",
    true,
    false,
    false,
    false,
    nil,
)
eventName := "out.requested"
ch.QueueBind(
    queue.Name, // name
    eventName, // key
    "Transaction", // exchange
    false, // noWait
    nil, // args
)
Enter fullscreen mode Exit fullscreen mode

publish

func (r *RabbitMQAdapter) Publish(eventName string, data any) error {
    payload, err := json.Marshal(data)
    if err != nil {
        return err
    }
    return r.channel.Publish(
        r.exchange, // exchange
        eventName,  // key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(payload),
        }, //  msg
    )
}

func main() {
    // ...
    rabbitmq := RabbitMQAdapter{
        connection: conn,
        channel:    ch,
    }
    message := message{
        ID: "1",
    }
    rabbitmq.Publish(eventName, message)
}

Enter fullscreen mode Exit fullscreen mode

consume

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
    msgCh, _ := r.channel.Consume(
        queueName,
        "",    // consumer
        true,  // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    for msg := range msgCh {
        callback(msg.Body)
    }
    return nil
}

func main() {
    // ...
    rabbitmq.Consume("ProcessTransaction", func(data any) error {
        // process transaction ...
        return nil
    })
}
Enter fullscreen mode Exit fullscreen mode

Acknowledgement:

Quando o RabbitMQ entrega uma mensagem a um consumidor, ele precisa saber quando considerar que a mensagem foi enviada com sucesso. O RabbitMQ nos oferece duas formas de lidar com isso: auto-ack e ack manual.

No auto-ack, o RabbitMQ descarta a mensagem assim que ela é consumida.

No ack manual, temos o controle de quando emitir essa confirmação e se ela é positiva ou negativa.

exemplo de ack manual

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
    msgCh, _ := r.channel.Consume(
        queueName,
        "",    // consumer
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    for msg := range msgCh {
        if err := callback(msg.Body); err != nil {
            msg.Nack(
                false, // multiple
                true, // requeue
            )
        } else {
            msg.Ack(
                false, // multiple
            )
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Dead Letter Exchange:

As mensagens de uma fila podem ser "dead-lettered", o que significa que essas mensagens são republicadas em uma exchange quando qualquer um dos quatro eventos a seguir ocorrer:

  • A mensagem é reconhecida negativamente por um consumidor usando basic.reject ou basic.nack com requeue definido como false.
  • A mensagem expira devido ao TTL (tempo de vida) por mensagem.
  • A mensagem é descartada porque sua fila excedeu um limite de comprimento.
  • A mensagem é retornada mais vezes para uma fila quorum do que o limite de entrega.

Como evitar o reprocessamento de mensagens

Agora que entendemos os recursos oferecidos pelo RabbitMQ, vamos analisar o possível motivo do comportamento mencionado no debate e como isso poderia ter sido evitado.

É muito provável que a opção de requeue ao chamar o método nack ou reject estivesse ativa.

msg.Nack(
    false, // multiple
    true, // requeue
)
// ou...
msg.Reject(
    true, // requeue
)
Enter fullscreen mode Exit fullscreen mode

Tendo isso em vista, para evitar o reprocessamento da mensagem, poderíamos simplesmente colocar a flag de requeue da mensagem como false.

msg.Nack(
    false, // multiple
    false, // requeue
)
// ou...
msg.Reject(
    false, // requeue
)
Enter fullscreen mode Exit fullscreen mode

Dessa forma, a mensagem não voltaria para a fila e seria descartada, assim como acontece no auto-ack.

Como utilizar o ack manual de forma eficiente

Por padrão, ao utilizar Nack ou Reject, a mensagem é perdida, porém, nem sempre é o que queremos. Para esse caso, o RabbitMQ nos oferece a Dead Letter Exchange. Tendo esse recurso ativo em uma fila, ao rejeitarmos uma mensagem, ela é encaminhada para essa exchange, que por sua vez direciona a mensagem para os interessados.

dlxName := "ProcessingError"
ch.ExchangeDeclare(
    dlxName,  // name
    "fanout", // kind
    true,     // durable
    false,    // autoDelete
    false,    // internal
    false,    // noWait
    nil,      // args
)
dlq, _ := ch.QueueDeclare(
    "NotifyCustomer",
    true,
    false,
    false,
    false,
    nil,
)
ch.QueueBind(dlq.Name, "", dlxName, false, nil)
queue, _ := ch.QueueDeclare(
    "ProcessTransaction",
    true,
    false,
    false,
    false,
    amqp.Table{
        "x-dead-letter-exchange": dlxName,
    },
)
Enter fullscreen mode Exit fullscreen mode

Conclusão

Com os recursos apresentados acima, o problema inicialmente discutido neste artigo, que é o reprocessamento indevido das mensagens, pode ser solucionado.

O ack manual com a opção requeue desativada, juntamente com a Dead Letter Exchange, formam uma poderosa combinação para lidar com erros na sua aplicação.

Top comments (1)

Collapse
 
ahosall profile image
Feh's

👏👏