import java.util.concurrent.*;
import java.util.function.Consumer;
public class AdvancedEventBus {
private final ConcurrentHashMap<String, CopyOnWriteArraySet<Consumer<Object>>> topicSubscribers =
new ConcurrentHashMap<>();
private final Executor executor;
public AdvancedEventBus(Executor executor) {
this.executor = executor != null ? executor : Runnable::run;
}
public AdvancedEventBus() {
this(ForkJoinPool.commonPool());
}
public void subscribe(String topic, Consumer<Object> subscriber) {
topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArraySet<>()).add(subscriber);
}
public void unsubscribe(String topic, Consumer<Object> subscriber) {
topicSubscribers.computeIfPresent(topic, (k, v) -> {
v.remove(subscriber);
return v.isEmpty() ? null : v;
});
}
public void publish(String topic, Object message) {
CopyOnWriteArraySet<Consumer<Object>> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
subscribers.forEach(subscriber ->
executor.execute(() -> {
try {
subscriber.accept(message);
} catch (Exception e) {
System.err.println("Error in subscriber for topic " + topic + ": " + e.getMessage());
}
})
);
}
}
}
ConcurrentHashMap: Used for storing topic-subscriber mappings to ensure thread-safe access
CopyOnWriteArraySet: Used for subscriber lists to allow safe iteration while subscribers can be added/removed
Executor: In the advanced version, allows control over how subscriber callbacks are executed
Atomic operations: Methods like computeIfAbsent ensure atomic updates to the data structures
example
public class PubSubExample {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
// Create subscribers
Consumer<Object> subscriber1 = message ->
System.out.println("Subscriber1 received: " + message);
Consumer<Object> subscriber2 = message ->
System.out.println("Subscriber2 received: " + message);
// Subscribe to topics
eventBus.subscribe("news", subscriber1);
eventBus.subscribe("news", subscriber2);
eventBus.subscribe("weather", subscriber1);
// Publish messages
eventBus.publish("news", "Latest headlines...");
eventBus.publish("weather", "Sunny today!");
// Unsubscribe
eventBus.unsubscribe("news", subscriber2);
eventBus.publish("news", "Breaking news!");
}
}
Top comments (0)