Ao iniciar o post Enviando Mensagens, falei que enviar é simples. Se você não achou simples, depois de ver a complexidade de receber uma mensagem verá que sim, é simples!
Parâmetros comuns
Para configurar um consumer são necessários mais parâmetros. Os parâmetros abaixo são similares ao do producer com excessão da troca de Serializador por Desserializador, visto que a mensagem deve ser agora transformada de byte[] para um objeto.
-
BOOTSTRAP_SERVERS: Conjunto de pares IP:PORTA para acessar o cluster. Por exemplo,
kafka-1:9092,kafka-2:9092
irá acessar o cluster que contem os duas instânciaskafka-1:9092
ekafka-2:9092
-
KEY_DESERIALIZER_CLASS: É a classe que fará a desserialização da Chave. O que é e como é usada a chave, vamos explicar posteriormente. Deve ser uma implementação da interface
org.apache.kafka.common.serialization.Deserializer
. -
VALUE_DESERIALIZER_CLASS: É a class que fará a desserialização do Valor. Não há mensagem sem um valor. Deve ser uma implementação da interface
org.apache.kafka.common.serialization.Deserializer
.
Dados os parametros abaixo, agora é preciso configurar o GROUP_ID. Antes de escolher um, é necessário pensar em como será a leitura dessa mensagem.
Read once
Um Producer, escreve uma mensagem por tópico. Ele não se preocupa como essa mensagem será lida. Já o Consumer deve se preocupar para que a mensagem não apenas uma vez, não menos que isso.
No Kafka, uma mensagem é recebida apenas uma vez por um Consumer de um GROUP_ID. Isso que dizer que, se dois processos diferentes tiverem consumers com o mesmo GROUP_ID, essa mensagem será consumida por apenas um.
Essa funcionalidade pode ajudar muito, mas pode também atrapalhar. O cuidado que deve ser tomado é:
- Para cada tipo de consumer, sempre escolher um GROUP_ID por tipo
- Tratar as mensagens como idempotentes
Instanciando um Consumer
Para instanciar um consumer, então, são necessários os 4 parâmetros: BOOTSTRAP_SERVERS, KEY_DESERIALIZER_CLASS, VALUE_DESERIALIZER_CLASS e GROUP_ID.
package io.vepo.kafka.articles; | |
import static java.util.Arrays.asList; | |
import java.time.Duration; | |
import java.util.Properties; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
public class SimpleConsumer { | |
public static void main(String[] argv) throws Exception { | |
if (argv.length != 1) { | |
System.err.println("Please specify 1 parameters: topic-name"); | |
System.exit(-1); | |
} | |
String topicName = argv[0]; | |
System.out.println("Reading messages from Topic: " + topicName); | |
// Configure the Consumer | |
Properties configProperties = new Properties(); | |
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "SimpleConsumerGroup"); | |
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
try (Consumer<String, String> consumer = new KafkaConsumer<>(configProperties)) { | |
consumer.subscribe(asList(topicName)); | |
mainLoop: while (true) { | |
for (ConsumerRecord<String, String> message : consumer.poll(Duration.ofSeconds(1))) { | |
System.out.println("Message received from partition=" + message.partition() + " with offset=" | |
+ message.offset()); | |
System.out.println("Key=" + message.key() + "\t value=" + message.value()); | |
if ("exit".equals(message.value())) { | |
break mainLoop; | |
} | |
} | |
} | |
} | |
System.out.println("Exiting..."); | |
} | |
} |
Pronto, criado o Consumer, deve-se fazer o pull e processar as mensagens.
No exemplo acima, há uma condição de saida, mas isso é apenas para exemplo. Normalmente um programa que processas mensagens opera até ser interrompido.
Conclusão
Criar um Consumer para um Tópico Kafka é simples, não tão simples quanto um Producer. Deve-se atentar para quantos outros consumers vão concorrer pelas mensagens, se uma mensagem deve ser lida por um ou mais consumer. E ainda deve-se tratar a mensagem como se ela fosse repetida.
Ficou com dúvida... alguns pontos ainda serão tradados. Mas pode perguntar!
Top comments (0)