DEV Community

loading...

Reading one specific message from a Kafka topic

adzubla profile image Eduardo Issao Ito ・1 min read

This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaPicker implements AutoCloseable {

    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);

    private String topicName;
    private KafkaConsumer<Object, Object> consumer;

    public KafkaPicker(String topicName, Properties properties) {
        this.topicName = topicName;
        consumer = new KafkaConsumer<>(properties);
    }

    public Object pick(Long offset, Integer partition) {
        TopicPartition topicPartition = new TopicPartition(topicName, partition);
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, offset);

        ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
        return records.iterator().next().value();
    }

    public void close() {
        consumer.close();
    }

}

In the example below, you will need a Properties object to connect to Kafka with at least three mandatory properties: broker. bootstrap.servers, key.deserializer and value.deserializer.

I this example, the spring.json.trusted.packages property is used by the Spring JsonDeserializer.

As we will be picking specific messages and not be reading in batch, it is recommended do set max.poll.records to 1, so unnecessary messages will not be read.

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "kafka:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer");
    properties.put("spring.json.trusted.packages", "*");
    properties.put("max.poll.records", "1");

    try (KafkaPicker kafkaPicker = new KafkaPicker("mytopic", properties)) {
        System.out.println("msg = " + kafkaPicker.pick(4L, 0));
        System.out.println("msg = " + kafkaPicker.pick(0L, 0));
    }

Discussion (0)

pic
Editor guide