DEV Community

Salad Lam
Salad Lam

Posted on

Project Reactor: About Fuseable interface ASYNC mode under the same thread

Last time I introduce how to use the SYNC mode of Fuseable interface to retrieve signal from pipeline. The disadvantage of SYNC mode is the singal sequence must exist in source (For example the source from Flux.just() or Flux.generate() function). In order to retrieve signal which is generated in indefinite time. ASYNC mode must be used. Below I will introduce the ASYNC mode which receives action that is triggered by the thread of the upstream operator.

Example of ASYNC mode implementation under the same thread.

Below is an example to display the signal received by implements both Fuseable ASYNC mode (which signal is triggered by thread of parent operator) and traditional Reactive Streams’s request and onNext cycle. Project Reactor 3.6.3 is used.

package example;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncDisplaySubscriber<T> implements CoreSubscriber<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDisplaySubscriber.class);
    private Subscription subscription;

    private final AtomicInteger called = new AtomicInteger(0);

    @Override
    public void onSubscribe(Subscription s) {
        LOGGER.info("onSubscribe(): s={}", s);
        subscription = s;
        if (s instanceof Fuseable.QueueSubscription) { // (1)
            @SuppressWarnings("unchecked")
            Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
            int mode = qs.requestFusion(Fuseable.ASYNC); // (2)
            LOGGER.info("onSubscribe(): requestFusion()={}", mode);
            if (mode == Fuseable.ASYNC) { // (3)
                return;
            }
        }
        s.request(1L);
    }

    @Override
    public void onNext(T t) {
        if (Objects.nonNull(t)) {
            LOGGER.info("onNext(): t={}", t);
            subscription.request(1L);
        } else {
            LOGGER.info("onNext(): can poll"); // (4)
            drain();
        }
    }

    @Override
    public void onError(Throwable t) { // (10)
        LOGGER.info("onError()", t);
    }

    @Override
    public void onComplete() { // (10)
        LOGGER.info("onComplete()");
    }

    // only one thread can run it in a time
    private void drain() {
        LOGGER.info("drain(): start drain");
        if (called.getAndIncrement() > 0) { // (5)
            LOGGER.info("drain(): not 1st thread detected, end drain");
            return;
        }

        @SuppressWarnings("unchecked")
        Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) subscription;
        T next;
        do {
            do {
                next = qs.poll(); // (6)
                if (Objects.nonNull(next)) {
                    LOGGER.info("drain(): next={}", next); // (7)
                }
            } while (Objects.nonNull(next)); // (8)
        } while (called.decrementAndGet() > 0); // (9)
        LOGGER.info("drain(): end drain");
    }

}
Enter fullscreen mode Exit fullscreen mode

(1): The Subscription from upstream operator must implement Fuseable.QueueSubscription, otherwise Fuseable is not supported.

(2): Using Fuseable.QueueSubscription.requestFusion(Fuseable.ASYNC) method to check if ASYNC mode is supported or not.

(3): ASYNC mode is supported only if the return is Fuseable.ASYNC. In ASYNC mode it is not necessarily to call Subscription.request(long). If Fuseable.NONE is returned, the subscriber will get data by Reactive Streams’s request and onNext cycle.

(4): If onNext(null) is received, values of onNext signal can be collected by calling Fuseable.QueueSubscription.poll(). Please note that only one thread can call this at a time.

(5): Lock for checking if some thread is running or not. And record the time of this function being called.

(6): Get value.

(7): Non null value return of poll() means receives onNext signal of that value.

(8): Null means that no signal is available. Should break the loop and wait for the next onNext(null) event.

(9): The time of running the loop above is equal to the time of onNext(null) event received.

(10): onComplete and onError is received at these calls.

Below is the code to test the subscriber. The main() method is to simulate the blocking operation of generating signals.

package example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class FuseableAsyncModeTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(FuseableAsyncModeTest.class);

    public static void main(String[] args) throws InterruptedException {
        Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<Integer> flux = sink.asFlux()
                .publishOn(Schedulers.parallel());
        flux.subscribe(new AsyncDisplaySubscriber<>());

        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitNext(1);
        sink.tryEmitNext(2);
        sink.tryEmitNext(3);
        sink.tryEmitNext(4);
        sink.tryEmitNext(5);
        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitNext(6);
        sink.tryEmitNext(7);
        sink.tryEmitNext(8);
        sink.tryEmitNext(9);
        sink.tryEmitNext(10);
        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitComplete();
    }

}
Enter fullscreen mode Exit fullscreen mode

The output is

13:30:16.031 [main] INFO  example.AsyncDisplaySubscriber -- onSubscribe(): s=reactor.core.publisher.FluxPublishOn$PublishOnSubscriber@11c9af63
13:30:16.032 [main] INFO  example.AsyncDisplaySubscriber -- onSubscribe(): requestFusion()=2
13:30:16.034 [main] INFO  example.FuseableAsyncModeTest -- main(): sleep
13:30:16.035 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:16.035 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:16.035 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:17.035 [main] INFO  example.FuseableAsyncModeTest -- main(): wake up
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=1
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:17.036 [main] INFO  example.FuseableAsyncModeTest -- main(): sleep
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=2
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=3
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=4
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=5
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:17.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:18.036 [main] INFO  example.FuseableAsyncModeTest -- main(): wake up
13:30:18.036 [main] INFO  example.FuseableAsyncModeTest -- main(): sleep
13:30:18.036 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=6
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=7
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=8
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=9
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): next=10
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:18.037 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:19.038 [main] INFO  example.FuseableAsyncModeTest -- main(): wake up
13:30:19.038 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onNext(): can poll
13:30:19.038 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): start drain
13:30:19.038 [parallel-1] INFO  example.AsyncDisplaySubscriber -- drain(): end drain
13:30:19.038 [parallel-1] INFO  example.AsyncDisplaySubscriber -- onComplete()
Enter fullscreen mode Exit fullscreen mode

From the output above, The "parallel-1" thread is created by the publishOn() operator, and it will call the method of the subscriber in the below way repeatedly until the terminal signal is received.

  1. onNext(): can poll
  2. drain(): start drain
  3. (If singal exists) drain(): next=N
  4. drain(): end

Top comments (0)