Java 9 Flow API
Flow API is Java's official support for Reactive Streams Specification. It is a combination of both Iterator(Pull) and Observer(Push) patterns.
The Flow API is an inter operation specification and not an end-user API like RxJava.
If you are a developer(even an outdated one :D), you must have surely heard about reactive-streams, non blocking IO, asynchronous calls, etc.
So why Reactive?
Consider it as a secretary in a office -
- If they makes you wait for meeting their boss - that's normal way.
- If they asks you to go do something else and they contact you when their boss is free for meeting - that's the reactive way.
People generally tend to think any reactive programming as improvement in speed, but it is not true. Considering both scenarios above you have to spend equal time for meeting her boss (so no improvement in speed), but you can be more productive in the second approach.
Reactive programming increases scalability and stability but not speed.
Flow API
Flow API consists of 4 basic interfaces -
- Subscriber - The Subscriber subscribes to the Publisher for the callbacks.
public static interface Subscriber<T> { | |
public void onSubscribe(Subscription subscription); | |
public void onNext(T item); | |
public void onError(Throwable throwable); | |
public void onComplete(); | |
} |
- Publisher - The publisher publishes the stream of data items to the registered subscribers.
public static interface Publisher<T> { | |
public void subscribe(Subscriber<? super T> subscriber); | |
} |
- Subscription - Link between publisher and subscriber.
public static interface Subscription { | |
public void request(long n); | |
public void cancel(); | |
} |
- Processor - The processor sits between the Publisher and Subscriber, and transforms one stream to another.
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { | |
} |
Now it's time for some hands-on with our new Flow API
Let's create a basic subscriber which asks for one data object, prints it and asks for one more.
package com.java.flow.api; | |
import java.util.concurrent.Flow; | |
public class MySubscriber<T> implements Flow.Subscriber<T> { | |
private Flow.Subscription subscription; | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
this.subscription = subscription; | |
this.subscription.request(1); // Ask for initial one data object. | |
} | |
@Override | |
public void onNext(T item) { | |
System.out.println(item); // Print it. | |
subscription.request(1); // Ask for one more. | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
throwable.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println("DONE"); // Done with the stream of data. | |
} | |
} |
Now let's quickly use a publisher implementation provided by Java (SubmissionPublisher) to complete our session.
package com.java.flow.api; | |
import java.util.List; | |
import java.util.concurrent.SubmissionPublisher; | |
public class Main { | |
public static void main(String[] args) { | |
List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9"); | |
SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); | |
publisher.subscribe(new MySubscriber<>()); | |
items.forEach(s -> { | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
publisher.submit(s); | |
}); | |
publisher.close(); | |
} | |
} |
And here is our sweet output -
Here you can easily see how the submit(T item) function of publisher invokes onNext(T item) function of subscriber.
But it wouldn't have been possible without request(long n) function of the subscription.
The delay of 1 sec helps us see the reactive magic happening. Although it's no magic, just some code written by really smart people.
Top comments (1)
Ajitesh - This is a great write-up, especially the secretary-boss example.
Flux architecture is taking over the programming world like a storm.