DEV Community

Salad Lam
Salad Lam

Posted on

Project Reactor: About Fuseable interface ASYNC mode under different threads

One use case of running ASYNC mode under different thread is there is a long delay, or calling blocking method in the subscriber. So that separate thread can be used to avoid delays of the upstream pipeline.

Example of ASYNC mode implementation under different threads.

Below is an example to display the signal received by implements both Fuseable ASYNC mode under different threads (draining work is done by seperate thread) and traditional Reactive Streams’s request and onNext cycle. Project Reactor 3.6.3 is used.

package example;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncNewThreadDisplaySubscriber<T> implements CoreSubscriber<T>, Runnable { // (1)

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNewThreadDisplaySubscriber.class);
    private final Scheduler scheduler;
    private Subscription subscription;

    private final AtomicInteger called = new AtomicInteger(0);

    public AsyncNewThreadDisplaySubscriber(Scheduler scheduler) {
        this.scheduler = scheduler; // (2)
    }

    @Override
    public void onSubscribe(Subscription s) {
        LOGGER.info("onSubscribe(): s={}", s);
        subscription = s;
        if (s instanceof Fuseable.QueueSubscription) {
            @SuppressWarnings("unchecked")
            Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
            int mode = qs.requestFusion(Fuseable.ASYNC | Fuseable.THREAD_BARRIER); // (3)
            LOGGER.info("onSubscribe(): requestFusion()={}", mode);
            if (mode == Fuseable.ASYNC) {
                return;
            }
        }
        s.request(1L);
    }

    @Override
    public void onNext(T t) {
        if (Objects.nonNull(t)) {
            LOGGER.info("onNext(): t={}", t);
            subscription.request(1L);
        } else {
            LOGGER.info("onNext(): can poll");
            scheduler.schedule(this); // (4)
        }
    }

    @Override
    public void onError(Throwable t) {
        LOGGER.info("onError()", t);
    }

    @Override
    public void onComplete() {
        LOGGER.info("onComplete()");
    }

    // only one thread can run it in a time
    private void drain() {
        if (called.getAndIncrement() > 0) {
            LOGGER.info("drain(): not 1st thread detected");
            return;
        }

        @SuppressWarnings("unchecked")
        Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) subscription;
        T next;
        do {
            do {
                next = qs.poll();
                if (Objects.nonNull(next)) {
                    LOGGER.info("drain(): next={}", next);
                }
            } while (Objects.nonNull(next));
        } while (called.decrementAndGet() > 0);
    }

    @Override
    public void run() {
        LOGGER.info("run(): start");
        drain(); // (5)
        LOGGER.info("run(): end");
    }

}
Enter fullscreen mode Exit fullscreen mode

Now I will show the difference of ASYNC mode under the same thread.

(1): Runnable interface is included in the subscribe class, so that it can be submitted into the scheduler.

(2): The reactor.core.scheduler.Scheduler instance is necessary.

(3): When running onSubscribe(Subscription) method, Fuseable.QueueSubscription.requestFusion(Fuseable.ASYNC | Fuseable.THREAD_BARRIER) is called to inquiry if the requested operation is supported or not. Fuseable.THREAD_BARRIER means that the poll() function will be called by a separate threads. If the operation is supported, Fuseable.ASYNC will return.

(4): If onNext(null) is received, values of onNext signal can be collected. And schedule the draining work to the task scheduler instead of run the draining work directly.

(5): Draining work is running on a separate threads. And only one thread can perform the drain work, also the time of running draining work is equal to how many times onNext(null) is called.

Below is the code to test the subscriber. The code is nearly the same as under the same thread version except creating a subscriber instance.

package example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class FuseableAsyncNewThreadModeTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(FuseableAsyncNewThreadModeTest.class);

    public static void main(String[] args) throws InterruptedException {
        Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<Integer> flux = sink.asFlux()
                .publishOn(Schedulers.parallel());
        flux.subscribe(new AsyncNewThreadDisplaySubscriber<>(Schedulers.newParallel("subscribe")));

        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitNext(1);
        sink.tryEmitNext(2);
        sink.tryEmitNext(3);
        sink.tryEmitNext(4);
        sink.tryEmitNext(5);
        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitNext(6);
        sink.tryEmitNext(7);
        sink.tryEmitNext(8);
        sink.tryEmitNext(9);
        sink.tryEmitNext(10);
        LOGGER.info("main(): sleep");
        Thread.sleep(1000L);
        LOGGER.info("main(): wake up");
        sink.tryEmitComplete();
    }

}
Enter fullscreen mode Exit fullscreen mode

The output is

21:21:48.720 [main] INFO  e.AsyncNewThreadDisplaySubscriber -- onSubscribe(): s=reactor.core.publisher.FluxPublishOn$PublishOnSubscriber@11c9af63
21:21:48.720 [main] INFO  e.AsyncNewThreadDisplaySubscriber -- onSubscribe(): requestFusion()=2
21:21:48.720 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): sleep
21:21:48.720 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:48.725 [subscribe-2] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:48.725 [subscribe-2] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:49.725 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): wake up
21:21:49.725 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:49.728 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): sleep
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:49.728 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=1
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=2
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=3
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=4
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=5
21:21:49.728 [subscribe-3] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:49.728 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:49.728 [subscribe-4] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:49.728 [subscribe-4] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:49.728 [subscribe-5] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:49.728 [subscribe-5] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:50.739 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): wake up
21:21:50.740 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:50.740 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): sleep
21:21:50.740 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:50.740 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:50.740 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=6
21:21:50.740 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=7
21:21:50.740 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=8
21:21:50.740 [subscribe-7] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): start
21:21:50.740 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=9
21:21:50.742 [subscribe-7] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): not 1st thread detected
21:21:50.742 [subscribe-7] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:50.742 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- drain(): next=10
21:21:50.742 [subscribe-6] INFO  e.AsyncNewThreadDisplaySubscriber -- run(): end
21:21:51.754 [main] INFO  e.FuseableAsyncNewThreadModeTest -- main(): wake up
21:21:51.754 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onNext(): can poll
21:21:51.754 [parallel-1] INFO  e.AsyncNewThreadDisplaySubscriber -- onComplete()
Enter fullscreen mode Exit fullscreen mode

From the output above, the draining work is run by "subscribe-X" thread, but the method onNext() is run by a separate "parallel-1" thread. And also please note that the message "drain(): not 1st thread detected" shows that only one thread can run the draining work.

Top comments (0)