DEV Community

Cover image for Recebendo Mensagens
Victor Osório
Victor Osório

Posted on • Edited on

1

Recebendo Mensagens

Ao iniciar o post Enviando Mensagens, falei que enviar é simples. Se você não achou simples, depois de ver a complexidade de receber uma mensagem verá que sim, é simples!

Parâmetros comuns

Para configurar um consumer são necessários mais parâmetros. Os parâmetros abaixo são similares ao do producer com excessão da troca de Serializador por Desserializador, visto que a mensagem deve ser agora transformada de byte[] para um objeto.

  • BOOTSTRAP_SERVERS: Conjunto de pares IP:PORTA para acessar o cluster. Por exemplo, kafka-1:9092,kafka-2:9092 irá acessar o cluster que contem os duas instâncias kafka-1:9092 e kafka-2:9092
  • KEY_DESERIALIZER_CLASS: É a classe que fará a desserialização da Chave. O que é e como é usada a chave, vamos explicar posteriormente. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.
  • VALUE_DESERIALIZER_CLASS: É a class que fará a desserialização do Valor. Não há mensagem sem um valor. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.

Dados os parametros abaixo, agora é preciso configurar o GROUP_ID. Antes de escolher um, é necessário pensar em como será a leitura dessa mensagem.

Read once

Um Producer, escreve uma mensagem por tópico. Ele não se preocupa como essa mensagem será lida. Já o Consumer deve se preocupar para que a mensagem não apenas uma vez, não menos que isso.

No Kafka, uma mensagem é recebida apenas uma vez por um Consumer de um GROUP_ID. Isso que dizer que, se dois processos diferentes tiverem consumers com o mesmo GROUP_ID, essa mensagem será consumida por apenas um.

Essa funcionalidade pode ajudar muito, mas pode também atrapalhar. O cuidado que deve ser tomado é:

  • Para cada tipo de consumer, sempre escolher um GROUP_ID por tipo
  • Tratar as mensagens como idempotentes

Instanciando um Consumer

Para instanciar um consumer, então, são necessários os 4 parâmetros: BOOTSTRAP_SERVERS, KEY_DESERIALIZER_CLASS, VALUE_DESERIALIZER_CLASS e GROUP_ID.

package io.vepo.kafka.articles;
import static java.util.Arrays.asList;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class SimpleConsumer {
public static void main(String[] argv) throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters: topic-name");
System.exit(-1);
}
String topicName = argv[0];
System.out.println("Reading messages from Topic: " + topicName);
// Configure the Consumer
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "SimpleConsumerGroup");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(configProperties)) {
consumer.subscribe(asList(topicName));
mainLoop: while (true) {
for (ConsumerRecord<String, String> message : consumer.poll(Duration.ofSeconds(1))) {
System.out.println("Message received from partition=" + message.partition() + " with offset="
+ message.offset());
System.out.println("Key=" + message.key() + "\t value=" + message.value());
if ("exit".equals(message.value())) {
break mainLoop;
}
}
}
}
System.out.println("Exiting...");
}
}

Pronto, criado o Consumer, deve-se fazer o pull e processar as mensagens.

No exemplo acima, há uma condição de saida, mas isso é apenas para exemplo. Normalmente um programa que processas mensagens opera até ser interrompido.

Conclusão

Criar um Consumer para um Tópico Kafka é simples, não tão simples quanto um Producer. Deve-se atentar para quantos outros consumers vão concorrer pelas mensagens, se uma mensagem deve ser lida por um ou mais consumer. E ainda deve-se tratar a mensagem como se ela fosse repetida.

Ficou com dúvida... alguns pontos ainda serão tradados. Mas pode perguntar!

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay