Detecting and testing stalled streams - RxJava FAQ
Topiło Lake, Białowieża Forest |
- clients are still alive
- let clients know we are still alive
Flowable<String>
stream that produces some events. When there is no event for more than one second, we should send a placeholder "PING"
message. When the silence is even longer, there should be a "PING"
message every second. How can we implement such a requirement in RxJava? The most obvious, but incorrect solution is to merge original stream with pings:Flowable<String> events = //...
Flowable<String> pings = Flowable
.interval(1, SECONDS)
.map(x -> "PING");
Flowable<String> eventsWithPings = events.mergeWith(pings);
mergeWith()
operator is crucial: it takes genuine events
and combines them with a constant stream of pings. Surely, when no genuine events are presents, "PING"
messages will appear. Unfortunately they are entirely unrelated to original stream. This means we keep sending pings even when there are plenty of normal events. Moreover when the silence begins we do not send "PING"
precisely after one second. If you are OK with such mechanism, you may stop reading here.debounce()
operator
A more sophisticated approach requires discovering silence that lasts for more than 1 second. We can use timeout()
operator for that. Unfortunately it yields TimeoutException
and unsubscribes from upstream - way too aggressive behaviour. We just want to get some sort of notification. Turns out debounce()
operator can be used for that. Normally this operator postpones emission of new events just in case new events arrive, overriding the old ones. So if I say:Flowable<String> events = //...This means
Flowable<String> delayed = events.debounce(1, SECONDS);
delayed
stream will only emit an event if it was not followed by another event within 1 second. Technically delayed
may never emit anything if events
stream keeps producing events fast enough. We will use the delayed
stream to discover silence in the following way:Flowable<String> events = //...Keep in mind that there is no difference between
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);
mergeWith()
and its static
merge()
counterpart. So we are getting somewhere. If the stream is busy, delayed
stream never receives any events, therefore no "PING"
messages are sent. However when original stream does not send any event for more than 1 second, delayed
receives the last seen event, ignores it and transforms into "PING"
. Clever, but broken. This implementation only sends one "PING"
after discovering stall, as opposed to sending periodic pings every second. Fairly easy to fix! Rather than transforming the last seen event into single "PING"
we can transform it into a sequence of periodic pings:Flowable<String> events = //...Can you see where the flaw is? Every time a bit of silence appears in the original stream, we start emitting pings every second. However we should stop doing so once some genuine events appear. We don't. Every stall in the upstream causes new infinite stream of pings to appear on the final merged stream. We must somehow tell the the
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
.flatMap(x -> Flowable
.interval(0, 1, SECONDS)
.map(e -> "PING")
);
Flowable<String> eventsWithPings = Flowable.merge(events, pings);
pings
stream that it should stop emitting pings because the original stream emitted genuine event. Guess what, there is takeUntil()
operator that does just that!Flowable<String> events = //...Take a moment to fully grasp the above code snippet.
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
.flatMap(x -> Flowable
.interval(0, 1, SECONDS)
.map(e -> "PING")
.takeUntil(events)
);
Flowable<String> eventsWithPings = Flowable.merge(events, pings);
delayed
stream emits an event every time nothing happens on the original stream for more than 1 second. pings
stream emits a sequence of "PING"
events every second for each event emitted from delayed
. However pings
stream is terminated the moment an event appears on the events
stream. You can even define all of this as a single expression:Flowable<String> events = //...
Flowable<String> eventsWithPings = events
.mergeWith(
events
.debounce(1, SECONDS)
.flatMap(x1 -> Flowable
.interval(0, 1, SECONDS)
.map(e -> "PING")
.takeUntil(events)
));
Testability
All right, we wrote all of this, but how are we suppose to test this triple-nested blob of event-driven code? How do we make sure that pings appear at the right moment and stop when silence is over? How to simulate various time-related scenarios? RxJava has many killer features but testing how time passes through is probably the biggest one. First of all let's make our pinging code a little bit more testable and generic:<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {This utility method takes arbitrary stream of
return events
.mergeWith(
events
.debounce(1, SECONDS, clock)
.flatMap(x1 -> Flowable
.interval(0, 1, SECONDS, clock)
.map(e -> ping)
.takeUntil(events)
));
}
T
and adds pings in case the stream doesn't produce any events for a longer period of time. We use it like this in our test:PublishProcessor<String> events = PublishProcessor.create();Oh boy,
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");
PublishProcessor
, TestScheduler
? PublishProcessor
is an interesting class that is a subtype of Flowable
(so we can use it as an ordinary stream). On the other hand we can imperatively emit events using its onNext()
method:events.onNext("A");If someone listens to
events
stream, he will receive "A"
event straight away. And what's with this clock
thing? Every single operator in RxJava that deals with time in any way (e.g. debounce()
, interval()
, timeout()
, window()
) can take an optional Scheduler
argument. It serves as an external source of time. Special TestScheduler
is an artificial source of time which we have full control of. I.e. time stands still as long as we don't call advanceTimeBy()
explicitly:clock.advanceTimeBy(999, MILLISECONDS);999 milliseconds is not a coincidence. Pings start to appear precisely after 1 second so they should not be visible after 999 milliseconds. Now it's about time to reveal full test case:
@TestLooks like a wall of text but it's actually a complete testing scenario of our logic. It makes sure pings appear precisely after 1000 milliseconds, are repeated when silence is very long and quite down when genuine events appear. But the most important part: the test is 100% predictable and blazingly fast. No Awaitility, busy waiting, polling, intermittent test failures and slowness. Artificial clock that we have full control of makes sure all these combined streams work exactly as expected.
public void shouldAddPings() throws Exception {
PublishProcessor<String> events = PublishProcessor.create();
final TestScheduler clock = new TestScheduler();
final Flowable<String> eventsWithPings = withPings(events, clock, "PING");
final TestSubscriber<String> test = eventsWithPings.test();
events.onNext("A");
test.assertValues("A");
clock.advanceTimeBy(999, MILLISECONDS);
events.onNext("B");
test.assertValues("A", "B");
clock.advanceTimeBy(999, MILLISECONDS);
test.assertValues("A", "B");
clock.advanceTimeBy(1, MILLISECONDS);
test.assertValues("A", "B", "PING");
clock.advanceTimeBy(999, MILLISECONDS);
test.assertValues("A", "B", "PING");
events.onNext("C");
test.assertValues("A", "B", "PING", "C");
clock.advanceTimeBy(1000, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING");
clock.advanceTimeBy(999, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING");
clock.advanceTimeBy(1, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING", "PING");
clock.advanceTimeBy(999, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING", "PING");
events.onNext("D");
test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");
clock.advanceTimeBy(999, MILLISECONDS);
events.onNext("E");
test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
clock.advanceTimeBy(999, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
clock.advanceTimeBy(1, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");
clock.advanceTimeBy(3_000, MILLISECONDS);
test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}
Tags: rxjava