DEV Community

Sylvain Magnier
Sylvain Magnier

Posted on • Updated on

Easy time tracing in RxJava and Reactor

There's a lot of ways to track performance issues and unexpectedly time consuming methods, from the quick and simple time measurement from the start to the end of the method, to the more complete (and complex) CPU flamegraph, or even the classics as the thread dump, the cpu profiling, etc.

In many cases just working with the trick of timestamps and comparison of them will do the job for debugging.
But with the asynchronous paradigm, this may be a too naive way.

For instance, in a RxJava context, this kind of statements in a method:

Instant start = Instant.now();
Flowable<Item> promise = itemRepository.findAllById(idList);
log.info("{}", Duration.between(start, Instant.now());
return promise;
Enter fullscreen mode Exit fullscreen mode

won't really logs the time it took to fetch the repository.

Make old new again

Fortunately, RxJava come with a convenient operator to measure how much time an observable took since its subscription until its completion: timeInterval()
(Reactor has a similar operator that I'll detail later)

timeInterval() doc explains that it will measure the duration in milliseconds and wrap the previously emitted value in an object containing both the value and the measured delay. With the previous example, Flowable<Item> would become Flowable<Timed<Item>>.

Thus we can write our code like this:

return itemRepository.findAllById(idList)
                .timeInterval()
                .map(timed -> {
                    log.info("{}", timed.time());
                    timed.value();
                });
Enter fullscreen mode Exit fullscreen mode

This time, we effectively log the duration to fetch the items.

But there's still a subtility: doing so on a Flowable may fatally give you a log per element, not for the overall. What if I perform API calls for a bunch of item ?
That's it:

Flowable.fromIterable(idList)
            .map(apiClient::getItemInfo)
            .timeInterval()
            .map(timed -> {
                log.info("{}", timed.time());
                return timed.value();
            });
Enter fullscreen mode Exit fullscreen mode

We'll get a log per id as we have many API call and as much subscription of the .map(apiClient::getItemInfo).

You talked about Reactor too

Reactor provides its own operator called elapsed().
It pretty similar as its return a Flux<Tuple2<Long,T>, the Tuple2 having the methods getT1() (get left value, here the duration in milliseconds) and getT2() (get right value, the value emitted by the previous call).

Thus the above code would look like this:

Flux.fromIterable(idList)
            .map(apiClient::getItemInfo)
            .elapsed()
            .map(tuple -> {
                log.info("{}", tuple.getT1());
                return tuple.getT2();
            });
Enter fullscreen mode Exit fullscreen mode

Now imagine that you put some static LongSummaryStatistics somewhere and you're ready to quickly export min/max, average and counts regarding a complex set of operations like, for example, calling two APIs, combining their results and putting some info in a Redis cache.

Top comments (0)