Enviar mensagens no Kafka é bastante simples.
Instanciando um Producer
Para configurar, é preciso basicamente apenas 3 parametros:
-
BOOTSTRAP_SERVERS: Conjunto de pares IP:PORTA para acessar o cluster. Por exemplo,
kafka-1:9092,kafka-2:9092
irá acessar o cluster que contem os duas instânciaskafka-1:9092
ekafka-2:9092
-
KEY_SERIALIZER_CLASS: É a classe que fará a serialização da Chave. O que é e como é usada a chave, vamos explicar posteriormente. Deve ser uma implementação da interface
org.apache.kafka.common.serialization.Serializer
. -
VALUE_SERIALIZER_CLASS: É a class que fará a serialização do Valor. Não há mensagem sem um valor. Deve ser uma implementação da interface
org.apache.kafka.common.serialization.Serializer
.
package io.vepo.kafka.articles; | |
import java.util.Properties; | |
import java.util.Scanner; | |
import java.util.concurrent.Future; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
public class SimpleProducer { | |
public static void main(String[] argv) throws Exception { | |
if (argv.length != 1) { | |
System.err.println("Please specify 1 parameters: topic-name"); | |
System.exit(-1); | |
} | |
String topicName = argv[0]; | |
System.out.println("Enter message(type exit to quit)"); | |
// Configure the Producer | |
Properties configProperties = new Properties(); | |
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
try (Scanner in = new Scanner(System.in); | |
Producer<String, String> producer = new KafkaProducer<>(configProperties)) { | |
String line; | |
do { | |
line = in.nextLine(); | |
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line); | |
Future<RecordMetadata> results = producer.send(rec); | |
RecordMetadata metadata = results.get(); | |
System.out | |
.println("Message sent on partition=" + metadata.partition() + " with offset=" + metadata.offset()); | |
} while (!line.equals("exit")); | |
System.out.print("Exiting..."); | |
} | |
} | |
} |
Pronto, configurado o Producer, basta criar o objeto ProducerRecord
, que irá associar mensagem/chave/tópico, e enviar ela.
No código acima, eu criei um simples cliente que lê todas as mensagens do STDIN e envia para o tópico em questão.
Recomendações
- O envio é assíncrono, verifique se a mensagem foi realmente enviada e tente novamente caso negativo. Normalmente o cliente Kafka já faz algumas tentativas.
- Particione tipos de mensagens por chave, a ordem das entregas de mensagens só é garantida para mensagens com mesma chave.
Conclusão
O envio de mensagens Kafka é simples e descomplicado, mas mesmo assim deve ser feito de forma cuidadosa para se garantir a entrega. Tendo a entrega da mensagem garantida, é certo que teremos o recebimento dela garantido.
Mais informações sobre Chave serão dadas quando formos falar de ordenação.
Referências
Todos os códigos estão no repositório vepo/kafka-articles
Top comments (0)