Fixed-rate vs. fixed-delay - RxJava FAQ
Topiło lake in Białowieża Forest |
import java.util.concurrent.Executors;Basically it supports two types of operations:
import java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(10);
scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS);
scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS);
scheduleAtFixedRate()
will make sure doStuff()
is invoked precisely every second with an initial delay of two seconds. Of course garbage collection, context-switching, etc. still can affect the precision. scheduleWithFixedDelay()
is seemingly similar, however it takes doStuff()
processing time into account. For example, if doStuff()
runs for 200ms, fixed rate will wait only 800ms until next retry. scheduleWithFixedDelay()
on the other hand, always waits for the same amount of time (1 second in our case) between retries. Both behaviours are of course desirable under different circumstances. Only remember that when doStuff()
is slower than 1 second scheduleAtFixedRate()
will not preserve desired frequency. Even though our ScheduledExecutorService
has 10 threads, doStuff()
will never be invoked concurrently and overlap with previous execution. Therefore, in this case, the rate will actually be smaller than configured.Scheduling in RxJava
SimulatingscheduleAtFixedRate()
with RxJava is very simple with interval()
operator. With a few caveats:FlowableIf
.interval(2, 1, SECONDS)
.subscribe(i -> doStuff());
doStuff()
is slower than 1 second, bad things happen. First of all, we are using Schedulers.computation()
thread pool, default one inherited from interval()
operator. It's a bad idea, this thread pool should only be used for CPU-intensive tasks and is shared across whole RxJava. A better idea is to use your own scheduler (or at least io()
):Flowable
.interval(2, 1, SECONDS)
.observeOn(Schedulers.io())
.subscribe(i -> doStuff());
observeOn()
switches from computation()
scheduler used by interval()
to io()
scheduler. Because subscribe()
method is never invoked concurrently by design, doStuff()
is never invoked concurrently, just like with scheduleAtFixedRate()
. However, interval()
operator tries very hard to keep the constant frequency. This means if doStuff()
is slower than 1 second after a while we should expect MissingBackpressureException
... RxJava basically tells us that our subscriber is too slow, but interval()
(by design) can't slow down. If you tolerate (or even expect) overlapping concurrent executions of doStuff()
, it's very simple to fix. First, you must wrap blocking doStuff()
with non-blocking Completable
. Technically, Flowable
Single
or Maybe
would work just as well, but since doStuff()
is void
, Completable
sounds fine:import io.reactivex.Completable;It's important to catch and swallow exceptions, otherwise a single error will cause whole
import io.reactivex.schedulers.Schedulers;
Completable doStuffAsync() {
return Completable
.fromRunnable(this::doStuff)
.subscribeOn(Schedulers.io())
.doOnError(e -> log.error("Stuff failed", e))
.onErrorComplete();
}
interval()
to interrupt. doOnError()
allows logging, but it passes the exception through downstream. doOnComplete()
on the other hand, simply swallows the exception. We can now simply run this operation at each interval event:FlowableIf you don't
.interval(2, 1, SECONDS)
.flatMapCompletable(i -> doStuffAsync())
.subscribe();
subscribe()
loop will never start - but that's RxJava 101. Notice that if doStuffAsync()
takes more than one second to complete we will get overlapping, concurrent executions. There is nothing wrong with that, you just have to be aware of it. But what if what you really need is a fixed delay?Fixed delays in RxJava
In some cases you need fixed delay: tasks should not overlap and we should keep some breathing time between executions. No matter how slow periodic task is, there should always be a constant time pause.interval()
operator is not suitable to implement this requirement. However if turns out the solution in RxJava is embarrassingly simple. Think about it: you need to sleep for a while, run some task and when this task completes, repeat. Let me tell it again:- sleep for a while (have some sort of a
timer()
) - run some task and wait for it to
complete()
repeat()
That's it!
Flowable
.timer(1, SECONDS)
.flatMapCompletable(i -> doStuffAsync())
.repeat()
.subscribe();
timer()
operator emits a single event (0
of type Long
) after a second. We use this event to trigger doStuffAsync()
. When our stuff is done, the whole stream completes - but we would like to repeat! Well, repeat()
operator does just that: when it receives completion notification from upstream, it resubscribes. Resubscription basically means: wait 1 second more, fire doStuffAsync()
- and so on.Tags: rxjava, scheduling