DEV Community

pratik wayase
pratik wayase

Posted on

thread safe pub - sub

import java.util.concurrent.*;
import java.util.function.Consumer;

public class AdvancedEventBus {
    private final ConcurrentHashMap<String, CopyOnWriteArraySet<Consumer<Object>>> topicSubscribers = 
        new ConcurrentHashMap<>();
    private final Executor executor;

    public AdvancedEventBus(Executor executor) {
        this.executor = executor != null ? executor : Runnable::run;
    }

    public AdvancedEventBus() {
        this(ForkJoinPool.commonPool());
    }

    public void subscribe(String topic, Consumer<Object> subscriber) {
        topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArraySet<>()).add(subscriber);
    }

    public void unsubscribe(String topic, Consumer<Object> subscriber) {
        topicSubscribers.computeIfPresent(topic, (k, v) -> {
            v.remove(subscriber);
            return v.isEmpty() ? null : v;
        });
    }

    public void publish(String topic, Object message) {
        CopyOnWriteArraySet<Consumer<Object>> subscribers = topicSubscribers.get(topic);
        if (subscribers != null) {
            subscribers.forEach(subscriber -> 
                executor.execute(() -> {
                    try {
                        subscriber.accept(message);
                    } catch (Exception e) {
                        System.err.println("Error in subscriber for topic " + topic + ": " + e.getMessage());
                    }
                })
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

ConcurrentHashMap: Used for storing topic-subscriber mappings to ensure thread-safe access

CopyOnWriteArraySet: Used for subscriber lists to allow safe iteration while subscribers can be added/removed

Executor: In the advanced version, allows control over how subscriber callbacks are executed

Atomic operations: Methods like computeIfAbsent ensure atomic updates to the data structures

example

public class PubSubExample {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus();

        // Create subscribers
        Consumer<Object> subscriber1 = message -> 
            System.out.println("Subscriber1 received: " + message);
        Consumer<Object> subscriber2 = message -> 
            System.out.println("Subscriber2 received: " + message);

        // Subscribe to topics
        eventBus.subscribe("news", subscriber1);
        eventBus.subscribe("news", subscriber2);
        eventBus.subscribe("weather", subscriber1);

        // Publish messages
        eventBus.publish("news", "Latest headlines...");
        eventBus.publish("weather", "Sunny today!");

        // Unsubscribe
        eventBus.unsubscribe("news", subscriber2);
        eventBus.publish("news", "Breaking news!");
    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)