https://grokonez.com/java/java-9/java-9-flow-api-example-publisher-and-subscriber
Java 9 Flow API example – Publisher and Subscriber
In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we're gonna look at an example that implements Publisher and Subscriber for reactive programming.
Related Articles:
- Java 9 Flow API – Reactive Streams
- Java 9 Flow API example – Processor
- Java 9 FLow SubmissionPublisher – A Concrete Publisher
I. Technologies
- Java 9
- Eclipse with Java 9 Support for Oxygen (4.7)
II. Project Overview
We will create a Publisher that is subscribed by two Subscribers. - Publisher maintains a list of Subscriptions, each Subscription is correlative to each Subscriber above.
-
Publisher uses one Subscription to push items to correlative Subscriber by
Subscriber::onNext()
method. -
Subscriber uses Subscription to request items from Publisher by
Subscription::request()
method. -
Publisher defines an Executor for multi-threading. Then
request()
andonNext()
method work asynchronously, producing data to each Subscriber by Subscription is also asynchronous. - After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).
III. Practice
To understand how Publisher, Subscriber and Subscription behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams1. Create implementation of Publisher
package com.javasampleapproach.java9flow.pubsub;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyPublisher implements Publisher {
private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";
final ExecutorService executor = Executors.newFixedThreadPool(4);
private List<MySubscription> subscriptions = Collections.synchronizedList(new ArrayList<MySubscription>());
private final CompletableFuture<Void> terminated = new CompletableFuture<>();
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
MySubscription subscription = new MySubscription(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void waitUntilTerminated() throws InterruptedException {
try {
terminated.get();
} catch (ExecutionException e) {
System.out.println(e);
}
}
private class MySubscription implements Subscription {
private final ExecutorService executor;
private Subscriber<? super Integer> subscriber;
private final AtomicInteger value;
private AtomicBoolean isCanceled;
public MySubscription(Subscriber<? super Integer> subscriber, ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
value = new AtomicInteger();
isCanceled = new AtomicBoolean(false);
}
@Override
public void request(long n) {
if (isCanceled.get())
return;
if (n < 0)
executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
else
publishItems(n);
}
@Override
public void cancel() {
isCanceled.set(true);
synchronized (subscriptions) {
subscriptions.remove(this);
if (subscriptions.size() == 0)
shutdown();
}
}
private void publishItems(long n) {
for (int i = 0; i < n; i++) {
executor.execute(() -> {
int v = value.incrementAndGet();
log("publish item: [" + v + "] ...");
subscriber.onNext(v);
});
}
}
private void shutdown() {
log("Shut down executor...");
executor.shutdown();
newSingleThreadExecutor().submit(() -> {
log("Shutdown complete.");
terminated.complete(null);
});
}
}
private void log(String message, Object... args) {
String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);
System.out.printf(fullMessage, args);
}
}
2. Create implementation of Subscriber
More at:
https://grokonez.com/java/java-9/java-9-flow-api-example-publisher-and-subscriber
Java 9 Flow API example – Publisher and Subscriber
Top comments (0)