DEV Community

Nucu Labs
Nucu Labs

Posted on

Apache Kafka: How-to set offsets to a fixed time

Hello ๐Ÿ‘‹,

This is a short article about setting offsets in Apache Kafka for a consumer group.

Normally, to reset offsets in Kafka you need to use the kafka-consumer-groups.sh tool, this means downloading the zip archive with Kafka's source code and setting up the Java SDK. All Kafka's tools are dependent on Java and this isn't that nice or developer friendly...

Sometimes getting Java correctly and getting the tools to run they don't work ๐Ÿคท๐Ÿปโ€โ™‚๏ธ. Either the tool versions are incompatible with the Kafka version on the server or the command executes successfully but it doesn't seem to do anything...

Another method to set offsets for a consumer it is to use a Kafka library, and to do it through code.

I have Python installed on my setup and all I need to do is to install the confluent-kafka library:

pip install confluent-kafka
Enter fullscreen mode Exit fullscreen mode

And then run the following code snippet to reset the consumer's offsets to a specific timestamp:



from confluent_kafka import Consumer, TopicPartition
import time

# Configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.partition.eof': True
}

topic = 'my-topic'
timestamp_ms = int(time.mktime(time.strptime("2025-04-01 12:00:00", "%Y-%m-%d %H:%M:%S")) * 1000) # or time in miliseconds

# Create consumer
consumer = Consumer(consumer_config)

# Get metadata to discover partitions
metadata = consumer.list_topics(topic)
partitions = [TopicPartition(topic, p.id, timestamp_ms) for p in metadata.topics[topic].partitions.values()]

# Lookup offsets for the timestamp
offsets = consumer.offsets_for_times(partitions, timeout=10.0)

# Assign partitions with correct offsets
consumer.assign(offsets)

# Start consuming
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Error:", msg.error())
            continue

        print(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}")
        break

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
Enter fullscreen mode Exit fullscreen mode

Thanks for reading!

Top comments (0)