DEV Community

Cover image for Recebendo Mensagens
Victor Osório
Victor Osório

Posted on • Edited on

1

Recebendo Mensagens

Ao iniciar o post Enviando Mensagens, falei que enviar é simples. Se você não achou simples, depois de ver a complexidade de receber uma mensagem verá que sim, é simples!

Parâmetros comuns

Para configurar um consumer são necessários mais parâmetros. Os parâmetros abaixo são similares ao do producer com excessão da troca de Serializador por Desserializador, visto que a mensagem deve ser agora transformada de byte[] para um objeto.

  • BOOTSTRAP_SERVERS: Conjunto de pares IP:PORTA para acessar o cluster. Por exemplo, kafka-1:9092,kafka-2:9092 irá acessar o cluster que contem os duas instâncias kafka-1:9092 e kafka-2:9092
  • KEY_DESERIALIZER_CLASS: É a classe que fará a desserialização da Chave. O que é e como é usada a chave, vamos explicar posteriormente. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.
  • VALUE_DESERIALIZER_CLASS: É a class que fará a desserialização do Valor. Não há mensagem sem um valor. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.

Dados os parametros abaixo, agora é preciso configurar o GROUP_ID. Antes de escolher um, é necessário pensar em como será a leitura dessa mensagem.

Read once

Um Producer, escreve uma mensagem por tópico. Ele não se preocupa como essa mensagem será lida. Já o Consumer deve se preocupar para que a mensagem não apenas uma vez, não menos que isso.

No Kafka, uma mensagem é recebida apenas uma vez por um Consumer de um GROUP_ID. Isso que dizer que, se dois processos diferentes tiverem consumers com o mesmo GROUP_ID, essa mensagem será consumida por apenas um.

Essa funcionalidade pode ajudar muito, mas pode também atrapalhar. O cuidado que deve ser tomado é:

  • Para cada tipo de consumer, sempre escolher um GROUP_ID por tipo
  • Tratar as mensagens como idempotentes

Instanciando um Consumer

Para instanciar um consumer, então, são necessários os 4 parâmetros: BOOTSTRAP_SERVERS, KEY_DESERIALIZER_CLASS, VALUE_DESERIALIZER_CLASS e GROUP_ID.

package io.vepo.kafka.articles;
import static java.util.Arrays.asList;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class SimpleConsumer {
public static void main(String[] argv) throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters: topic-name");
System.exit(-1);
}
String topicName = argv[0];
System.out.println("Reading messages from Topic: " + topicName);
// Configure the Consumer
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "SimpleConsumerGroup");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(configProperties)) {
consumer.subscribe(asList(topicName));
mainLoop: while (true) {
for (ConsumerRecord<String, String> message : consumer.poll(Duration.ofSeconds(1))) {
System.out.println("Message received from partition=" + message.partition() + " with offset="
+ message.offset());
System.out.println("Key=" + message.key() + "\t value=" + message.value());
if ("exit".equals(message.value())) {
break mainLoop;
}
}
}
}
System.out.println("Exiting...");
}
}

Pronto, criado o Consumer, deve-se fazer o pull e processar as mensagens.

No exemplo acima, há uma condição de saida, mas isso é apenas para exemplo. Normalmente um programa que processas mensagens opera até ser interrompido.

Conclusão

Criar um Consumer para um Tópico Kafka é simples, não tão simples quanto um Producer. Deve-se atentar para quantos outros consumers vão concorrer pelas mensagens, se uma mensagem deve ser lida por um ou mais consumer. E ainda deve-se tratar a mensagem como se ela fosse repetida.

Ficou com dúvida... alguns pontos ainda serão tradados. Mas pode perguntar!

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

AWS GenAI Live!

GenAI LIVE! is a dynamic live-streamed show exploring how AWS and our partners are helping organizations unlock real value with generative AI.

Tune in to the full event

DEV is partnering to bring live events to the community. Join us or dismiss this billboard if you're not interested. ❤️