DEV Community

Eduardo Issao Ito
Eduardo Issao Ito

Posted on

4 1

Reading one specific message from a Kafka topic

This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaPicker implements AutoCloseable {

    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);

    private String topicName;
    private KafkaConsumer<Object, Object> consumer;

    public KafkaPicker(String topicName, Properties properties) {
        this.topicName = topicName;
        consumer = new KafkaConsumer<>(properties);
    }

    public Object pick(Long offset, Integer partition) {
        TopicPartition topicPartition = new TopicPartition(topicName, partition);
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, offset);

        ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
        return records.iterator().next().value();
    }

    public void close() {
        consumer.close();
    }

}

In the example below, you will need a Properties object to connect to Kafka with at least three mandatory properties: broker. bootstrap.servers, key.deserializer and value.deserializer.

I this example, the spring.json.trusted.packages property is used by the Spring JsonDeserializer.

As we will be picking specific messages and not be reading in batch, it is recommended do set max.poll.records to 1, so unnecessary messages will not be read.

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "kafka:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer");
    properties.put("spring.json.trusted.packages", "*");
    properties.put("max.poll.records", "1");

    try (KafkaPicker kafkaPicker = new KafkaPicker("mytopic", properties)) {
        System.out.println("msg = " + kafkaPicker.pick(4L, 0));
        System.out.println("msg = " + kafkaPicker.pick(0L, 0));
    }

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

Top comments (2)

Collapse
 
art_ptushkin profile image
Artem Ptushkin

This helped me, thank you

Collapse
 
fcairib76 profile image
ali rezvani

Hi,can U give me this code or give github link?

AWS Security LIVE!

Tune in for AWS Security LIVE!

Join AWS Security LIVE! for expert insights and actionable tips to protect your organization and keep security teams prepared.

Learn More