Mid-January, I held a talk at Kotlin.amsterdam based on my post Migrating from Imperative to Reactive (a Spring Boot application). Because it was a Kotlin meetup, I demoed Kotlin code, and I added a step by migrating the codebase to coroutines. During Q&A, somebody asked whether coroutines implemented backpressure. I admit I was not sure of the answer, so I did a bit of research.
This post provides information on backpressure in general and how RxJava (v3), Project Reactor and Kotlin's Coroutines handle it.
What is backpressure?
Back pressure (or backpressure) is a resistance or force opposing the desired flow of fluid through pipes, leading to friction loss and pressure drop.
The term back pressure is a misnomer, as pressure is a scalar quantity, so it has a magnitude but no direction.
-- Wikipedia
In software, backpressure has a slightly related but still different meaning: considering a fast data producer and a slow data consumer, backpressure is the mechanism that "pushes back" on the producer not to be overwhelmed by data.
Whether based on reactivestreams.org or Java's java.util.concurrent.Flow
, Reactive Streams provides four building blocks
- A
Publisher
that emits elements - A
Subscriber
that reacts when elements are received - a
Subscription
that binds aPublisher
and aSubscriber
- And a
Processor
Here's the class diagram:
The Subscription
is at the root of backpressure via its request()
method.
The specifications are pretty straightforward:
A
Subscriber
MUST signal demand viaSubscription.request(long n)
to receiveonNext
signals.The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.
Reactive Streams' specifications are pretty solid. They also come with a Java-based TCK.
But it falls outside the specifications' scope to define how to manage items emitted by the producer that cannot be handled downstream. While the problem is pretty simple, different solutions are possible. Each Reactive framework provides some options, so let's see them in turn.
Backpressure in RxJava 3
RxJava v3 provides several base classes:
Class | Description | ||||
---|---|---|---|---|---|
Flowable
|
A flow of 0..N items. It supports Reactive-Streams and backpressure. | ||||
Observable
|
A flow of 0..N items. It doesn't support backpressure. | ||||
Single
|
A flow of exactly:
|
Among these classes, Flowable
is the only class that implements Reactive Streams - and backpressure. Yet, providing backpressure is not the only issue. As RxJava's wiki states:
Backpressure doesn’t make the problem of an overproducing Observable or an underconsuming Subscriber go away. It just moves the problem up the chain of operators to a point where it can be handled better.
To cope with that, RxJava offers two main strategies to handle "overproduced" items:
-
Store items in a buffer
Note that if you set no upper bound to the buffer, it might cause
OutOfMemoryError
. -
Drop items
The following diagram summarizes the different methods that implement those strategies:
Note that onBackPressureLatest
operator is similar to using onBackpressureBuffer(1)
:
Note that I took the above Marble diagrams from RxJava's wiki.
Compared to other frameworks, RxJava offers methods to send an overflow exception signal after sending all items. These allow the consumer to receive items and still be notified that the producer has dropped items.
Backpressure in Project Reactor
Strategies offered by Project Reactor are similar to those of RxJava's.
The APIs have some slight differences, though. For example, Project Reactor offers a convenient method to throw an exception if the producer overflows:
var stream = Stream.generate(Math::random);
// RxJava
Flowable.fromStream(stream) // 1
.onBackpressureBuffer(0); // 2
// Project Reactor
Flux.fromStream(stream) // 1
.onBackpressureError(); // 2
- Create the Reactive Stream
- Throw if the producer overflows
Here's the Flux
class diagram that highlights backpressure capabilities:
Compared to other frameworks, Project Reactor offers methods to set a TTL for buffered items to prevent overflowing it.
Backpressure in coroutines
Coroutines do offer the same buffering and dropping capabilities. The base class in coroutines is Flow
.
You can use the classes like this:
flow { // 1
while (true) emit(Math.random()) // 2
}.buffer(10) // 3
- Create a
Flow
which content is defined by the next block - Define the
Flow
content - Set the buffer's capacity to 10
Conclusion
All in all, RxJava, Project Reactor, and Kotlin coroutines all provide backpressure capabilities. All cope with a producer that is faster than its subscriber by offering two strategies: either buffer items or drop them.
Thanks my friend Oleh Dokuka for his kind review.
To go further:
- Reactive Streams JVM specifications
- How (not) to use Reactive Streams in Java 9+
- RxJava Backpressure
Originally published at A Java Geek on March 14th 2021
Top comments (0)