DEV Community

Eduardo Issao Ito
Eduardo Issao Ito

Posted on

Reading one specific message from a Kafka topic

This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaPicker implements AutoCloseable {

    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);

    private String topicName;
    private KafkaConsumer<Object, Object> consumer;

    public KafkaPicker(String topicName, Properties properties) {
        this.topicName = topicName;
        consumer = new KafkaConsumer<>(properties);
    }

    public Object pick(Long offset, Integer partition) {
        TopicPartition topicPartition = new TopicPartition(topicName, partition);
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, offset);

        ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
        return records.iterator().next().value();
    }

    public void close() {
        consumer.close();
    }

}

In the example below, you will need a Properties object to connect to Kafka with at least three mandatory properties: broker. bootstrap.servers, key.deserializer and value.deserializer.

I this example, the spring.json.trusted.packages property is used by the Spring JsonDeserializer.

As we will be picking specific messages and not be reading in batch, it is recommended do set max.poll.records to 1, so unnecessary messages will not be read.

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "kafka:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer");
    properties.put("spring.json.trusted.packages", "*");
    properties.put("max.poll.records", "1");

    try (KafkaPicker kafkaPicker = new KafkaPicker("mytopic", properties)) {
        System.out.println("msg = " + kafkaPicker.pick(4L, 0));
        System.out.println("msg = " + kafkaPicker.pick(0L, 0));
    }

Top comments (2)

Collapse
 
art_ptushkin profile image
Artem Ptushkin

This helped me, thank you

Collapse
 
fcairib76 profile image
ali rezvani

Hi,can U give me this code or give github link?