DEV Community

Mensageria com RabbitMQ e Golang - Conceitos e Primeiro Contato

Introdução

A comunicação entre sistemas mudou bastante nos últimos anos. Antes, era comum que uma aplicação chamasse outra diretamente e esperasse sua resposta. Isso funcionava, mas só até certo ponto. Quanto maior o sistema, maior o acoplamento e menor a tolerância a falhas. Hoje, quando falamos em microsserviços, escalabilidade e resiliência, surge uma alternativa que atende diversos cenários: a mensageria.

A ideia central é simples. Em vez de depender de uma resposta imediata, o serviço envia uma mensagem para uma fila e segue sua vida. Outro serviço, quando puder, lê essa mensagem e faz o trabalho necessário. Isso torna a aplicação mais leve, mais modular e mais preparada para lidar com períodos de carga alta.

Neste artigo, vamos entender o básico sobre mensageria, aprender como usar o RabbitMQ para organizar o fluxo de mensagens e criar exemplos práticos em Go.


O que é Mensageria e por que ela é tão útil

Mensageria é o padrão de comunicação onde sistemas conversam através do envio e consumo de mensagens, e não por chamadas diretas. Isso resolve alguns problemas comuns em arquiteturas tradicionais.

Imagine um serviço de API que precisa realizar diversas etapas a partir de uma única requisição, sendo que uma delas (ou mais) pode sofrer com lentidão devido a conexão instável, grande volume de dados ou processamento mais pesado. Se for feito de forma síncrona, basta um desses passos ficar lento para o serviço todo travar, ou até mesmo ocasionar em erros.

A mensageria permite deslocar essas tarefas para outro componente. Assim, a API recebe a requisição, separa o que precisa ser feito naquele momento (processamento síncrono) e o que não precisa ser concluído antes do retorno da API (processamento assíncrono).

Por exemplo, se houver a necessidade dessa requisição enviar um email ao final, a API pode retornar uma mensagem informando que o email será enviado, e designar o disparo desse email a um serviço de mensageria e que será feito de forma independente.

Uma cenário comum desse exemplo seria o enviar o e-mail de boas-vindas após um cadastro. Ao invés de ser feito de forma assíncrona na requisição, pode ser criada uma estrutura similar a esta:

Mensagem:
{
  "usuario_id": 123,
  "acao": "enviar_email_boas_vindas"
}
Enter fullscreen mode Exit fullscreen mode

Isso vai para uma fila de mensagens. Depois, um outro serviço lê essa fila e manda o e-mail.


O que é RabbitMQ e como ele funciona

RabbitMQ é um message broker. Ele recebe mensagens, organiza essas mensagens em filas e garante que consumidores possam acessá-las com segurança. RabbitMQ implementa o protocolo AMQP, que define regras claras para envio, roteamento e entrega de mensagens.

O RabbitMQ se destaca pela simplicidade, estabilidade e pela facilidade de adaptação a diferentes cenários — desde pequenas aplicações até sistemas de alta disponibilidade. Dentre alternativas conhecidas estão Kafka, Redis Streams e SQS.

Alguns elementos básicos e fundamentais do RabbitMQ são:
Produtor (Producer): Quem envia a mensagem
Consumidor (Consumer): Quem lê e processa a mensagem
Exchange: Recebe a mensagen e a redireciona
Fila (Queue): Onde a mensagem fica armazenada
Binding: Associação, ou vínculo, entre uma exchange e uma fila
Routing key: Chave usada para auxiliar no roteamento

O Exchange é a entidade na qual o Produtor publica mensagens, que são então roteadas para um conjunto de Filas. O objetivo do Exchange é encaminhar todas as mensagens que passam por ele para uma ou mais Filas ou outros Exchanges. O tipo de Exchange e as propriedades do Binding são usados ​​para implementar a lógica de roteamento.

Uma Fila no RabbitMQ é uma coleção ordenada de mensagens. As mensagens são enfileiradas (criadas pelo Produtor) e desenfileiradas (entregues aos Consumidores) de maneira FIFO ("primeiro a entrar, primeiro a sair").


Exemplo 1 - Conectando ao RabbitMQ com Go

Esse artigo não tem como foco ensinar Go, ou apresentar a linguagem para quem não a conhece. Ou seja, não irei detalhar a estrutura de um projeto em Go, arquivos de configuração, dependências ou similares. Parto do princípio que se você quiser replicar os códigos mostrados aqui, saberá como criar uma estrutura básica para executar o código.

Também não irei entrar em detalhes de como instalar RabbitMQ e Go, afinal existem diferentes maneiras de como você pode querer fazer (Linux, Mac, Windows, Docker, etc), para isso recomendo os links abaixo das documentações oficiais. Após instalar ambos, podemos dar continuidade:

Vamos começar criando um projeto em Go. O comando abaixo irá criar o(s) arquivo(s) necessário(s) dentro da pasta que você o executar:

go mod init exemplo/mensageria

Próximo passo é criar um arquivo (fiz um chamado teste.go) e colar o código abaixo. Esse exemplo é algo simples, que apenas abre uma conexão com o RabbitMQ e garante que está funcionando:

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Erro ao conectar: %s", err)
    }
    defer conn.Close()

    log.Println("Conexão bem-sucedida com RabbitMQ!")
}
Enter fullscreen mode Exit fullscreen mode

Como criamos o projeto agora, precisamos baixar as dependências e depois executar o código. Para isso devemos executar os comandos abaixo dentro da pasta do projeto, que foi criada anteriormente:

go get
go run .

Se o serviço do RabbitMQ estiver rodando, teremos a mensagem de sucesso no log:

Certifique-se de utilizar os parâmetros corretos para conectar ao RabbitMQ pela função amqp.Dial("amqp://guest:guest@localhost:5672"), caso você tenha mudado algum durante ou após a instalação, onde:

  • amqp - Protocolo usado pelo RabbitMQ. AMQP é o default e mais moderno
  • guest:guest - Usuário e senha do RabbitMQ. Valores usados são os default
  • localhost - URL de hospedagem do serviço RabbitMQ, nesse caso local
  • 5672 - Porta usada pelo serviço RabbitMQ, também é usado um valor default aqui

Exemplo 2 - Produção e Consumo de uma Mensagem

Agora vamos para um exemplo mais próximo da realidade: criar um produtor, que irá criar uma mensagem e colocá-la em uma fila, e um consumidor para ler essa mensagem e exibir uma mensagem no log da aplicação.

Para isso teremos 2 pastas diferentes, para simular um cenário onde existem 2 aplicações distintas e que são executadas de forma independente. Podemos repetir o passo do exemplo anterior para criar a estrutura, além de aproveitar também o código usado para conexão com o serviço do RabbitMQ. Lembrando de rodar os comandos abaixo em pastas diferentes e preferencialmente, para facilitar a localização, no mesmo nível de hierarquia de pastas:

go mod init exemplo/produtor
go mod init exemplo/consumidor

Novamente, seguindo os passos do exemplo anterior, dentro de cada projeto vamos criar um arquivo com o código Go. Abaixo está o código completo do produtor, o qual explicarei algumas partes na sequência:

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

type User struct {
    UserType int    `json:"user_type"`
    Id       int    `json:"id"`
    Name     string `json:"name"`
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "my_topic", // name
        "topic",    // type
        true,       // durable
        false,      // auto-deleted
        false,      // internal
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    user1 := User{
        UserType: 1,
        Id:       10,
        Name:     "Paulo",
    }
    user1Json, _ := json.Marshal(user1)

    user2 := User{
        UserType: 2,
        Id:       20,
        Name:     "Maria",
    }
    user2Json, _ := json.Marshal(user2)

    // Envio de json
    err = ch.PublishWithContext(ctx,
        "my_topic", // exchange
        "type_1",   // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         []byte(user1Json),
        })

    failOnError(err, "Failed to publish a message for User 1")
    log.Printf(" [x] Sent %s\n", user1Json)

    err = ch.PublishWithContext(ctx,
        "my_topic", // exchange
        "type_2",   // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         []byte(user2Json),
        })

    failOnError(err, "Failed to publish a message for User 2")
    log.Printf(" [x] Sent %s\n", user2Json)
}

Enter fullscreen mode Exit fullscreen mode

O primeiro ponto é entender, de forma macro, o que esse código faz. Há uma struct chamada User com 3 campos: UserType, Id e Name. Nosso produtor irá instanciar variáveis do tipo User e, para cada variável, produzir uma mensagem no nosso serviço do RabbitMQ. Ao longo do código há também algumas checagens para analisar se houveram erros em diferentes etapas (conectar com RabbitMQ, declarar uma Exchange, publicar uma mensagem, etc). Vamos ver alguns trechos referente à produção da mensagem:

err = ch.ExchangeDeclare(
    "my_topic", // name
    "topic",    // type
    true,       // durable
    false,      // auto-deleted
    false,      // internal
    false,      // no-wait
    nil,        // arguments
)
Enter fullscreen mode Exit fullscreen mode

A função ExchangeDeclare() irá criar uma Exchange com o nome my_topic. Caso já exista uma com esse nome, é verificado se ambas possuem os mesmos parâmetros e, se sim, ela é utilizada ao invés de criar uma duplicata.

O tipo da Exchange foi declarado como topic. Existem diferentes tipos, cada um com sua distinção e contextos mais adequados para uso. O tipo topic permite que possamos fazer o vínculo dessa mensagem com uma routing key ao produzir uma mensagem. Isso é útil, pois podemos produzir mensagens dentro de uma mesma Fila e Exchange, mas separá-las por routing key diferentes e as ler por diferentes consumidores (ou pelo mesmo consumidor em situações diferentes).

Por exemplo, estamos produzindo mensagens com 2 usuários, cada um com o UserType diferente. Podemos querer/precisar aplicar lógicas diferentes para cada tipo. Usar uma routing key diferente para cada UserType permite que a implementação fique mais desacoplada no consumidor, usando a mesma Exchange e Fila já que estamos criando um usuário em ambos os casos. Veremos isso na prática mais a frente.

durable true e auto-deleted false permitem que a Exchange continue existindo mesmo após o servidor do RabbitMQ for reiniciado e também quando não houverem mais Filas vinculadas a ela. internal false não permite que a Exchange aceite mensagens publicadas para ela. Mais útil para contextos onde a Exchange será para uso interno ou privado, não ficando pública/visível para usuários do serviço. no-wait true Permite criar a Exchange sem esperar a confirmação do servidor se a mesma foi criada com sucesso (no exemplo deixamos como false, pois queremos o retorno do servidor). arguments permite o envio de parâmetros adicionais e opcionais, geralmente usado quando o tipo da Exchange demanda essas informações.

err = ch.PublishWithContext(ctx,
        "my_topic", // exchange
        "type_1",   // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         []byte(user1Json),
        })
Enter fullscreen mode Exit fullscreen mode

A função PublishWithContext() vai ser responsável por publicar a mensagem no servidor do RabbitMQ. O parâmetro exchange informa o nome da Exchange, que criamos anteriormente. routing key é a chave que vai vincular a mensagem publicada com a Exchange informada. Se o parâmetro mandatory for true, a mensagem não será entregue ao RabbitMQ caso não haja uma Fila para consumí-la. O campo immediate tem o mesmo propósito do mandatory, mas funciona para caso não haja um consumidor associado a uma Fila que corresponda a essa mensagem. ContentType informa a estrutura de dados que vai ser usada no conteúdo da mensagem, que no nosso caso é uma instância da struct User convertida para json. Body são os dados em si que serão enviados.

Foram publicadas 2 mensagens na mesma Exchange, cada uma routing key diferente: type_1 e type_2. No código do consumidor veremos que mesmo na mesma Exchange e Fila, podemos consumir apenas uma routing key ou ambas. Também são exibidas mensagens no log após a publicação de cada mensagem:

Vamos agora ao código do consumidor:

package main

import (
    "encoding/json"
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

type User struct {
    UserType int    `json:"user_type"`
    Id       int    `json:"id"`
    Name     string `json:"name"`
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "my_topic", // name
        "topic",    // type
        true,       // durable
        false,      // auto-deleted
        false,      // internal
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "my_topic", // name
        true,       // durable
        false,      // auto-deleted
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf(" [*] Queue name: %s", q.Name)

    err = ch.QueueBind(
        q.Name,     // queue name
        "type_1",   // routing key
        "my_topic", // exchange
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to bind a queue")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    log.Printf(" [*] Reading message")

    for d := range msgs {
        var user User
        json.Unmarshal(d.Body, &user)
        log.Printf("Received a message in queue %s: %v", q.Name, user)
    }

    ch.Close()
}

Enter fullscreen mode Exit fullscreen mode

O início do código é similar ao do produtor: definição da mesma struct usada, estabelecimento de conexão com o serviço do RabbitMQ e declaração da Exchange. Em seguida temos a declaração da Fila:

q, err := ch.QueueDeclare(
    "my_topic", // name
    true,       // durable
    false,      // auto-deleted
    false,      // exclusive
    false,      // no-wait
    nil,        // arguments
)
Enter fullscreen mode Exit fullscreen mode

Nossa Fila foi criada com o mesmo name que a Exchange, e também com os mesmos valores para os parâmetros durable, auto-deleted, no-wait e arguments já que servem para os mesmos propósitos. O campo exclusive, se for true, serve para informar se a Fila será acessível apenas pela conexão que a criou e irá deletar a Fila quando essa mesma conexão for fechada, que fazemos ao final do código com a função Close().

Seguindo para o próximo trecho, temos a função QueueBind(). Ela é responsável por vincular a Fila que criamos com a Exchange declarada anteriormente. Essa mesma Exchange foi criada no código do produtor (caso o mesmo seja executado primeiro) e, como foi explicado na parte do código do produtor, por ela ainda existir no serviço do RabbitMQ será reutilizada aqui:

err = ch.QueueBind(
    q.Name,     // queue name
    "type_1",   // routing key
    "my_topic", // exchange
    false,      // no-wait
    nil,        // arguments
)
Enter fullscreen mode Exit fullscreen mode

Os parâmetros no-wait e arguments têm a mesma lógica já vista anteriormente. queue name e exchange correspondem aos nomes da Fila e Exchange, respectivamente. O parâmetro routing key indica qual chave, que foi usada ao produzir a mensagem para a Exchange, iremos vincular a nossa Fila.

Voltando ao código do produtor, vemos que 2 mensagens foram criadas: uma com a routing key = "type_1" e outra com routing key = "type_2". Ou seja, aqui nossa Fila está vinculada apenas com a routing key = "type_1" e iremos consumir apenas as mensagens da Exchange my_topic que usaram essa routing key. A mensagem que produzimos com a routing key = "type_2" ficará na Exchange sem nenhuma Fila para consumí-la nesse momento.

Por fim temos o último trecho para analisar. A função Consume(), que é responsável por consumir as mensagens da Fila que criamos e vinculamos com a Exchange:

msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
Enter fullscreen mode Exit fullscreen mode

O parâmetro queue informa o nome da Fila. O consumer serve para atribuir uma tag (ou identificador) ao consumidor da Fila, que estamos criando. exclusive quando true serve para informar ao servidor que somente este consumidor irá acessar esta Fila. no-local é um parâmetro existente no protocolo AMQP, porém o RabbitMQ não dá suporte ao mesmo, portanto o valor é ignorado.

auto-ack true serve para informar ao serviço do RabbitMQ, de modo automático, que a mensagem foi entregue ao consumidor. Isso faz com que ela seja removida da Fila de mensagem no momento que o consumidor a lê. A depender do contexto, isso pode gerar problemas. Por exemplo, o serviço de envio de email ficou fora do ar no meio do processamento. Nesse caso a mensagem será removida da Fila sem que possamos enviar o email. Uma solução seria deixar parâmetro como false e, após processar com sucesso tudo que for necessário, informar manualmente ao servidor que a mensagem foi entregue e ela já pode ser apagada da Fila.

Após declarar o consumidor, temos um loop para exibir no log os dados do usuário que foram publicados na Fila. Ao executar esse código devemos ter uma saída similar a esta:

Note que foi exibido apenas os dados do usuário Paulo. Isso porque a outra mensagem, com os dados da usuária Maria, foi publicada com a routing key = "type_2" e a Fila que criamos foi vinculada a routing key = "type_1". Se quisermos consumir essa outra mensagem, poderíamos ter um outro consumidor com outra Fila vinculada a ela, ou então criar um outro QueueBind com a mesma Fila mudando o routing key. Assim o nosso consumidor conseguiria ler mensagens de ambos os routing key.


Conclusão

Nesse texto vimos uma introdução à mensageria, entendemos como funciona o RabbitMQ e como integrar ele com Go. Apesar de não ser possível abordar todas as combinações e funcionalidades que ambos oferecem, pudemos constatar que essa combinação atende bem a diversos contextos e diferentes cenários.


Referências

Top comments (0)