DEV Community

Dev Cookies
Dev Cookies

Posted on

πŸš€ Building a Pub/Sub System in Java from Scratch (with Offset Management)

πŸ“Œ Introduction

Most modern distributed systems rely on message brokers like Kafka, RabbitMQ, Pulsar, etc. They help decouple producers and consumers while ensuring reliability, fault tolerance, and scalability.

But have you ever wondered what goes on under the hood? πŸ€”

In this blog, we’ll implement a mini Pub/Sub system in Java from scratch. We’ll cover:

  • Topics, Messages, Subscribers
  • Offset tracking per subscriber
  • Worker threads for consumption
  • Resetting subscriber offsets (like Kafka seek)

This won’t replace Kafka in production, but it’s a great way to understand core concepts.


πŸ—οΈ Low-Level Design (LLD)

Here’s the breakdown of components:

  • Message β†’ simple wrapper holding message text.
  • Topic β†’ holds a list of messages and its subscribers.
  • TopicSubscriber β†’ binds a subscriber to a topic with its own offset.
  • ISubscriber β†’ interface defining how a subscriber consumes messages.
  • Queue β†’ manager that creates topics, subscribes consumers, publishes messages, and resets offsets.
  • TopicHandler β†’ manages worker threads for subscribers of a topic.
  • SubscriberWorker β†’ background thread that delivers messages sequentially to a subscriber.

πŸ‘‰ Think of it as:
Queue β†’ Topic β†’ TopicHandler β†’ SubscriberWorker β†’ ISubscriber


🧩 Code Walkthrough

1️⃣ Message Model

@AllArgsConstructor
@Getter
public class Message {
    private final String msg;
}
Enter fullscreen mode Exit fullscreen mode

Simple message wrapper.


2️⃣ Topic & Subscriber

@AllArgsConstructor
@Getter
public class Topic {
    private final String id;
    private final String name;
    private final List<Message> messages = new ArrayList<>();
    private final List<TopicSubscriber> subscribers = new ArrayList<>();

    public void addMessage(Message message) {
        messages.add(message);
    }

    public void addSubscriber(TopicSubscriber subscriber) {
        subscribers.add(subscriber);
    }
}
Enter fullscreen mode Exit fullscreen mode

Each Topic maintains:

  • A list of messages
  • A list of subscribers

3️⃣ TopicSubscriber (with Offset)

@AllArgsConstructor
@Getter
public class TopicSubscriber {
    private final AtomicInteger offset;
    private final ISubscriber subscriber;

    public TopicSubscriber(@NonNull final ISubscriber subscriber) {
        this.subscriber = subscriber;
        this.offset = new AtomicInteger(0);
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, each subscriber has its own offset (like Kafka consumer groups).


4️⃣ Subscriber Interface + SleepingSubscriber

public interface ISubscriber {
    String getId();
    void consume(Message message) throws InterruptedException;
}

public class SleepingSubscriber implements ISubscriber {
    private final String id;
    private final int sleepTimeInMillis;

    public SleepingSubscriber(String id, int sleepTimeInMillis) {
        this.id = id;
        this.sleepTimeInMillis = sleepTimeInMillis;
    }

    @Override
    public String getId() { return id; }

    @Override
    public void consume(Message message) throws InterruptedException {
        System.out.println("Subscriber: " + id + " started consuming: " + message.getMsg());
        Thread.sleep(sleepTimeInMillis);
        System.out.println("Subscriber: " + id + " done consuming: " + message.getMsg());
    }
}
Enter fullscreen mode Exit fullscreen mode

This subscriber simulates real-world delays by sleeping while processing.


5️⃣ Queue Manager

public class Queue {
    private final Map<String, TopicHandler> topicHandlers = new HashMap<>();

    public Topic createTopic(String topicName) {
        Topic topic = new Topic(UUID.randomUUID().toString(), topicName);
        TopicHandler handler = new TopicHandler(topic);
        topicHandlers.put(topic.getId(), handler);
        return topic;
    }

    public void subscribe(final Topic topic, ISubscriber subscriber) {
        topic.addSubscriber(new TopicSubscriber(subscriber));
    }

    public void sendMessage(Message message, Topic topic) {
        topic.addMessage(message);
        new Thread(() -> topicHandlers.get(topic.getId()).publish()).start();
    }

    public void resetOffset(@NonNull final Topic topic,
                            @NonNull final ISubscriber subscriber,
                            @NonNull final Integer newOffset) {
        for (TopicSubscriber topicSubscriber : topic.getSubscribers()) {
            if (topicSubscriber.getSubscriber().equals(subscriber)) {
                topicSubscriber.getOffset().set(newOffset);
                System.out.println(subscriber.getId() + " offset reset to: " + newOffset);
                new Thread(() -> topicHandlers.get(topic.getId())
                        .startSubscriberWorker(topicSubscriber)).start();
                break;
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The Queue manages:

  • Topic creation
  • Subscriptions
  • Message publishing
  • Offset reset

6️⃣ TopicHandler

public class TopicHandler {
    private final Topic topic;
    private final Map<String, SubscriberWorker> subscriberWorkers = new HashMap<>();

    public TopicHandler(Topic topic) {
        this.topic = topic;
    }

    public void publish() {
        for (TopicSubscriber subscriber : topic.getSubscribers()) {
            startSubscriberWorker(subscriber);
        }
    }

    public void startSubscriberWorker(TopicSubscriber topicSubscriber) {
        final String subscriberId = topicSubscriber.getSubscriber().getId();

        subscriberWorkers.computeIfAbsent(subscriberId, id -> {
            SubscriberWorker worker = new SubscriberWorker(topic, topicSubscriber);
            new Thread(worker).start();
            return worker;
        });

        subscriberWorkers.get(subscriberId).wakeUpIfNeeded();
    }
}
Enter fullscreen mode Exit fullscreen mode

Ensures each subscriber has one worker thread.


7️⃣ SubscriberWorker

@Getter
public class SubscriberWorker implements Runnable {
    private final Topic topic;
    private final TopicSubscriber topicSubscriber;

    public SubscriberWorker(@NonNull final Topic topic, @NonNull final TopicSubscriber topicSubscriber) {
        this.topic = topic;
        this.topicSubscriber = topicSubscriber;
    }

    @Override
    public void run() {
        synchronized (topicSubscriber) {
            while (true) {
                try {
                    int curOffset = topicSubscriber.getOffset().get();

                    while (curOffset >= topic.getMessages().size()) {
                        topicSubscriber.wait();
                    }

                    Message message = topic.getMessages().get(curOffset);
                    topicSubscriber.getSubscriber().consume(message);

                    topicSubscriber.getOffset().compareAndSet(curOffset, curOffset + 1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    public void wakeUpIfNeeded() {
        synchronized (topicSubscriber) {
            topicSubscriber.notify();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This worker:

  • Waits for new messages
  • Delivers messages sequentially
  • Updates subscriber’s offset safely

🎬 Example Usage

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Queue queue = new Queue();

        Topic topic = queue.createTopic("orders");

        ISubscriber s1 = new SleepingSubscriber("S1", 1000);
        ISubscriber s2 = new SleepingSubscriber("S2", 2000);

        queue.subscribe(topic, s1);
        queue.subscribe(topic, s2);

        queue.sendMessage(new Message("order-1"), topic);
        queue.sendMessage(new Message("order-2"), topic);

        Thread.sleep(5000);

        // Reset offset of S1 back to 0 (reconsume all messages)
        queue.resetOffset(topic, s1, 0);
    }
}
Enter fullscreen mode Exit fullscreen mode

βœ… Output Snapshot

Subscriber: S1 started consuming: order-1
Subscriber: S2 started consuming: order-1
Subscriber: S1 done consuming: order-1
Subscriber: S1 started consuming: order-2
Subscriber: S2 done consuming: order-1
Subscriber: S2 started consuming: order-2
Subscriber: S1 done consuming: order-2
Subscriber: S1 offset reset to: 0
Subscriber: S1 started consuming: order-1
...
Enter fullscreen mode Exit fullscreen mode

Notice how resetting offset allows replay of messages.


πŸ“Œ Key Takeaways

  • Each subscriber maintains its own offset.
  • Subscribers consume asynchronously via worker threads.
  • Offset can be reset to replay messages.
  • This design mimics the core mechanics of Kafka, but much simpler.

πŸš€ Next Steps (if you want to extend)

  • Add consumer groups (multiple subscribers share same offset).
  • Add acknowledgements (at-least-once vs at-most-once).
  • Add persistence (write messages to disk).
  • Add multi-topic routing.

πŸ‘‰ With this foundation, you now have a clear idea of how distributed log-based messaging systems like Kafka actually work internally.

Top comments (0)