DEV Community

Cover image for Como usar Kafka num projeto Quarkus
Aleatório
Aleatório

Posted on • Edited on • Originally published at aleatorio.dev.br

4 1

Como usar Kafka num projeto Quarkus

Kafka é uma ferramenta muito boa para troca de mensagens entre diferentes aplicações, porém, ele é bem intimidador e com um de detalhes para começar a consumir as mensagens.

E agora, quem poderá nos defender?

Alt Text

O Que faremos?

Vamos alterar a nossa gloriosa fábrica de bolo que já está persistindo os dados. Após fazer a operação será enviado uma mensagem para o broker Kafka com a entidade transformada em JSON.

Depois disso, por fim de simplicidade, vamos criar um consumidor para essas mensagens no mesmo projeto. Como vamos enviar as mensagens ao broker Kafka é possível consumir essas mensagens em algum outro projeto.

Para esse artigo é necessário saber um pouco sobre Kafka e ter o feito projeto da fábrica de bolo com banco de dados.

É hora da ação

Caso você não tenha o projeto da fábrica, você pode pegar lá no meu git. Tendo o código em mãos, vamos adicionar as dependências do Kafka e do Jackson (para mandar a mensagem em JSON). Isso é feito colocando o código abaixo na parte de dependências do arquivo pom.xml:

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jackson</artifactId>
    </dependency>
Enter fullscreen mode Exit fullscreen mode

Escrevendo a mensagem

Vamos começar escrevendo a mensagem. Lá no nosso arquivo BoloResource, vamos adicionar os emitters. O código para isso é:

  @Inject //1
  @Channel("add-bolo") //2
  Emitter<Bolo> addEmitter; //3

  @Inject
  @Channel("delete-bolo")
  Emitter<String> deleteEmitter;
Enter fullscreen mode Exit fullscreen mode
  1. @Inject é utilizado para fazer a inejção do nosso emitter
  2. O @Channel vem do import org.eclipse.microprofile.reactive.messaging.Channel ele é responsável por identificar para onde vamos enviar as mensagens e configurar tudo corretamente (vamos ver mais sobre a configuração lá pra frente)
  3. A classe Emitter vem do import org.eclipse.microprofile.reactive.messaging.Emitter e é ele quem envia a mensagem para o Kafka*

O Envio de mensagem é feito através do método Emitter#send. Vamos alterar BoloResource adicionando esse método logo antes do return. O código vai ficar assim:

  @POST
  @Transactional
  public List<Bolo> add(Bolo bolo) {
    bolo.id = null; //coisa feia, não façam isso em casa
    bolo.persist();
    addEmitter.send(bolo); //1
    return list();
  }

  @DELETE
  @Path("/{nome}")
  @Transactional
  public List<Bolo> delete(String nome) {
    Bolo.delete("nome", nome);
    deleteEmitter.send(nome); //2
    return Bolo.listAll();
  }
Enter fullscreen mode Exit fullscreen mode
  1. Estamos enviando para o kafka todo o bolo
  2. Estamos enviando para o kafka apenas o nome do bolo

Além disso, é necessário descrever e envio de mensagens no arquivo application.properties. Para tanto, basta adicionar as seguintes linhas nele.

mp.messaging.outgoing.add-bolo.connector=smallrye-kafka
mp.messaging.outgoing.add-bolo.topic=add-bolo
mp.messaging.outgoing.add-bolo.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.delete-bolo.connector=smallrye-kafka
mp.messaging.outgoing.delete-bolo.topic=delete-bolo
mp.messaging.outgoing.delete-bolo.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

Todas as configurações tem o mesmo formato mp.messaging.outgoing.<nome do canal>.<nome da propriedade>. As propriedades que nós definimos servem para dizer que vamos nos comunicar com Kafka e que vamos serializar a mensagem mensagem como JSON usando o Jackson ou enviando uma String normal.
Qualquer propriedade dos produtores do kafka podem ser utilizados.

Lendo a mensagem

A classe abaixo pode estar tanto no mesmo projeto quanto em um outro projeto (desde que tenha as dependências corretas).

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class BoloKafkaConsumer {

  @Incoming("add-bolo-consumer")
  public void consumeAdd(String bolo) {
    System.out.println("Bolo adicionado: " + bolo);
  }

  @Incoming("delete-bolo-consumer")
  public void consumeDelete(String nome) {
    System.out.println("Nome do bolo deletado: " + nome);
  }
}
Enter fullscreen mode Exit fullscreen mode

Os dois métodos recebem o JSON do Bolo e escrevem na saída padrão. Notem que, apesar de termos enviado o bolo como objeto, estamos lendo como String. É possível configurar para receber um Bolo ou fazer qualquer transformação que seja interessante para nós.

Assim como no envio, também temos que configurar a leitura no application.properties. Isso pode ser feito através do código:

mp.messaging.incoming.add-bolo-consumer.connector=smallrye-kafka
mp.messaging.incoming.add-bolo-consumer.topic=add-bolo
mp.messaging.incoming.add-bolo-consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.incoming.delete-bolo-consumer.connector=smallrye-kafka
mp.messaging.incoming.delete-bolo-consumer.topic=delete-bolo
mp.messaging.incoming.delete-bolo-consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Enter fullscreen mode Exit fullscreen mode

Semelhante ao produtor, no consumidor as propriedades tem o formato mp.messaging.outgoing.<nome do canal>.<nome da propriedade> e todas as propriedades podem ser encontradas no site da confluent.

Preparar para rodar

Para rodar essa aplicação, nós vamos precisar do docker-compose. Para isso, basta usar o seguinte arquivo docker-compose.yml:

version: '2'
services:
  postgres:
    image: postgres:12-alpine
    ports: 
      - "5432:5432"
    environment:
      POSTGRES_USER: Sarah
      POSTGRES_PASSWORD: Connor
      POSTGRES_DB: skynet
  zookeeper:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Enter fullscreen mode Exit fullscreen mode

Esse docker-compose.yml instância e configura um banco de dados, o kafka e o zookeeper (que é um requisito para o Kafka funcionar).

Hora do teste

Alt Text

Depois de levantar o docker-compose (docker-compose up no mesmo diretório que está o arquivo docker-compose.yml), levantar o projeto (mvn quarkus:dev no diretório base do projeto), entrar na url http://localhost:8080/swagger-ui e fazer a cadastro de um bolo.

Alt Text

Depois de apertar o botão do execute. É só olhar no terminal e vamos ver a saída da aplicação:

Alt Text

E é isso, agora é possível brincar enviando novos bolos, excluíndo, etc...

Considerações

Nossa fábrica de bolo que já salva as coisas no banco de dados está evoluindo e está se comunicando via mensagens <3

Nos próximos episódios, vamos garantir a atualização de banco de dados com flyway, adicionar rastreabilidade, resiliência e mais um monte de coisa massa =D

Outra coisa muito massa é que conseguimos enviar mensagens para o MQTT, AMQP ou JMS sem mexer no código. Só ajustando o pom.xml e o application.properties. Isso é mesmo é algo supimpa.

Ah, e o código desse post pode ser encontrado no github.

*Na realidade, ele a função do emitter é colocar a mensagem dentro de um fluxo reativo do Smallrye. O Smallrye possui um conector que acaba fazendo o processo de enviar a mensagem para o kafka. Porém, é possível utilizar esses fluxos sem enviar para o kafka ou para qualquer outro broker de mensageria.

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay