This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaPicker implements AutoCloseable {
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
private String topicName;
private KafkaConsumer<Object, Object> consumer;
public KafkaPicker(String topicName, Properties properties) {
this.topicName = topicName;
consumer = new KafkaConsumer<>(properties);
}
public Object pick(Long offset, Integer partition) {
TopicPartition topicPartition = new TopicPartition(topicName, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
return records.iterator().next().value();
}
public void close() {
consumer.close();
}
}
In the example below, you will need a Properties
object to connect to Kafka with at least three mandatory properties: broker. bootstrap.servers
, key.deserializer
and value.deserializer
.
I this example, the spring.json.trusted.packages
property is used by the Spring JsonDeserializer.
As we will be picking specific messages and not be reading in batch, it is recommended do set max.poll.records
to 1, so unnecessary messages will not be read.
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer");
properties.put("spring.json.trusted.packages", "*");
properties.put("max.poll.records", "1");
try (KafkaPicker kafkaPicker = new KafkaPicker("mytopic", properties)) {
System.out.println("msg = " + kafkaPicker.pick(4L, 0));
System.out.println("msg = " + kafkaPicker.pick(0L, 0));
}
Top comments (2)
This helped me, thank you
Hi,can U give me this code or give github link?