loading...

RxJava, a story about delay and schedulers

dbottillo profile image Daniele Bottillo ・2 min read

RxJava is a beast. It has a very high learning curve but it lets you do complicated things in very few lines of code which comes with a price: you need to understand how the internal works otherwise it will bite you very hard!

Recently I found a very strange behaviour with the delay operator.

Let’s assume to have an observable that we want to delay before it finishes:

Single.fromCallable {
   Thread.sleep(DELAY)
   "done!"
} 
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe {
   view.showText("started!")
}
.doOnSuccess {
   view.showText(it)
}.subscribe()

Nothing strange here, we have a callable that will wait for a delay before returning the “done!” string. 

But we can write a better version with the delay operator:

Single.fromCallable {
   "done!"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.delay(DELAY, TimeUnit.MILLISECONDS)
.doOnSubscribe {
   view.showText("started!")
}.doOnSuccess {
   view.showText(it)
}
.subscribe()

Seems legit, we have removed that ugly Thread.sleep() inside fromCallable and now we use the delay operator, nice!

But if you run that code it will crash badly:

io.reactivex.exceptions.OnErrorNotImplementedException: 
Only the original thread that created a view hierarchy can touch its views.

Wait what? it seems that is trying to update the view in the wrong thread! I usually check the documentation in this case but it’s not very helpful:

http://reactivex.io/documentation/operators/delay.html 

It doesn’t mention anything about threads. But what you can do is to go inside the implementation!

So if you follow the delay operator from code you will get to:

/**
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
*
* Scheduler:
* {@code delay} operates by default on the {@code computation} {@link Scheduler}.
* 
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single delay(long time, TimeUnit unit) {
    return delay(time, unit, Schedulers.computation(), false);
}

Wait, it says that by default it operates on the computation scheduler! That’s it, delay is changing the scheduler back to computation so we are trying to update the view from the wrong thread. 

Luckily there is another method that we can use:

public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler)

Perfect, we can rewrite our code using this alternative delay operator:

Single.fromCallable {
   "done!"
}
.subscribeOn(Schedulers.io())
.delay(DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.doOnSubscribe {
   view.showText("started!")
}
.doOnSuccess {
   view.showText(it)
}
.subscribe()

You can notice that now you don’t need to specify ObserveOn anymore because we are choosing the thread in the delay operator.

You can find a working example on my github: https://github.com/dbottillo/Blog/blob/rxjava_delay

Happy Rxjava!

Discussion

markdown guide
 

Thanks. I started learning Rxjava. And this kind of small things makes my day. It's quite helpful for learning. Keep posting stuff like this.