flatMap() and the order of events - RxJava FAQ
Parc de la Tête d'Or, Lyon |
flatMap()
does not preserve the order of original stream. Let's illustrate this using the GeoNames API example from previous article: public interface GeoNames {By requesting population of multiple cities using
Flowable<Long> populationOf(String city);
}
flatMap()
we have no guarantee that they will arrive in order:Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid");The output is somewhat surprising:
cities
.flatMap(geoNames::populationOf)
.subscribe(response -> log.info("Population: {}", response));
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1After some time we receive response for Madrid followed by London which are later received by subscriber. 7556900 (population of London) and 3255944 (Madrid) come first After a while Paris and Warsaw arrive as well. On one hand it's good that we can proceed with each population immediately when it arrives. This makes the system seem like more responsive. But we lost something. The input stream was
17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms)
17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms)
17:09:49.956 | Rx-3 | Population: 7556900
17:09:49.958 | Rx-3 | Population: 3255944
17:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms)
17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms)
17:09:51.100 | Rx-2 | Population: 2138551
17:09:51.100 | Rx-2 | Population: 1702139
"Warsaw"
, "Paris"
, "London"
, "Madrid"
whereas the resulting stream contains population of "London"
, "Madrid"
, "Paris"
, "Warsaw"
. How can we tell which number represents which city?Obviously the following solution is plain wrong, yet it's not unheard of in real code bases:
Flowable<Long> populations = cities.flatMap(geoNames::populationOf);It compiles, it runs, it even produces some results. Unfortunately these results are entirely wrong:
cities
.zipWith(populations, Pair::of)
.subscribe(response -> log.info("Population: {}", response));
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1We combine cities with some random permutation of their population's. To make matters worse I managed to get wrong results after maybe dozen attempts. For some reason this program was working on my machine most of the time. Worst kind of bug you can imagine.
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Warsaw,2138551)
17:20:03.976 | Rx-2 | Population: (Paris,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Madrid,1702139)
The problem with
flatMap()
is that it looses the original request. Imagine an asynchronous system where you receive a response on some sort of queue but have no idea what the request was. An obvious solution is to somehow attach some sort of correlation ID or even the whole request to the response. Unfortunately populationOf(String city)
doesn't return the original request (city
), only response (population
). It would be so much easier if it returned something like CityWithPopulation
value object or even Pair<String, Long>
. So now imagine we are enhancing the original method by attaching the request (city
):Flowable<Pair<String, Long>> populationOfCity(String city) {We can now take advantage of this method for larger stream of cities:
Flowable<Long> population = geoNames.populationOf(city);
return population.map(p -> Pair.of(city, p));
}
cities...or to avoid extra helper method:
.flatMap(this::populationOfCity)
.subscribe(response -> log.info("Population: {}", response));
citiesThe
.flatMap(city -> geoNames
.populationOf(city)
.map(p -> Pair.of(city, p))
)
.subscribe(response -> log.info("Population: {}", response));
result
variable this time is Pair<String, Long>
but you are encouraged to use more expressive value object.17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1I found
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Paris,2138551)
17:20:03.976 | Rx-2 | Population: (Madrid,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Warsaw,1702139)
flatMap()
with nested map()
adding additional context to be the most effective way of dealing with out-of-order results. Surely it's not the most readable piece of reactive code so make sure you hide this complexity behind some facade.UPDATE
As noted by Dávid Karnok in his comment to this post, themap()
operator inside flatMap()
is such a common idiom that a specialized flatMap()
overload exists. Apart from standard transformation function (in our case String -> Flowable<Long>
) it also takes combiner bi-function (e.g. (String, Long) -> SomeType
). The purpose of this function is to provide a transformation that combines input item with each output item generated by transformation. This is precisely what we did with nested map()
(enriching population with the name of city it corresponds to), but much shorter:Flowable<Pair<String, Long>> populations = citiesThe second lambda expression (
.flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop));
(city, pop) -> Pair.of(city, pop)
) is executed for every downstream event produced by populationOf()
. If you go to the extreme, you can use method references:Flowable<Pair<String, Long>> populations = citiesTake a moment to study the last example, it's actually beautifully simple once you grasp it:
.flatMap(geoNames::populationOf, Pair::of);
- for each
city
find its populationpop
- for each population combine it with
city
by forming aPair<String, Long>
PS: This was 200th post in 9 years!
Tags: rxjava