Ctrl + Shift + F to search
Use arrow keys to navigate slides
whoami
These slides are rough notes I made during several 3-day Reactor and WebFlux workshops. They are not a complete tutorial by far. Many code snippets were written by hand.
//REST, DB, file system: 100ms, 200ms, 2s
String s = blockingOp();
Future
String s1 = future1.get(); //poor practice
String s2 = future1.get(1, SECONDS);
boolean done = future1.isDone();
Future<String> future1 = asyncOp1();
Future<BigDecimal> future2 = asyncOp2();
while(true) {
if(future1.isDone()) {
log.info(future1.get());
break;
} else {
if(future2.isDone()) {
log.info(future2.get());
break;
}
}
TimeUnit.MILLISECONDS.sleep(50);
}
CompletableFuture
CompletableFuture<String> future1 = asyncOp1();
CompletableFuture<BigDecimal> future2 = asyncOp2();
log.info("Done");
CompletableFuture
CompletableFuture<BlockingResult> f =
CompletableFuture.supplyAsync(() -> blockingOp())
Watch out! ForkJoinPool.commonPool()
ExecutorService
Future<BlockingResult> x = executorService.submit(() -> blockingOp());
CompletableFuture<BlockingResult> f =
CompletableFuture.supplyAsync(() -> blockingOp(), executorService);
CompletableFuture<String> future1 = asyncOp1();
CompletableFuture<BigDecimal> future2 = asyncOp2();
CompletableFuture<Integer> future3 =
future1.thenApply((String s) -> s.length());
CompletableFuture<Double> future4 =
future3.thenApply((int x) -> x * 2.0d);
CompletableFuture<Object> f =
CompletableFuture.anyOf(future1, future2);
Mono
and Flux
//≈ CompletableFuture<Optional<T>>
reactor.core.publisher.Mono<T> mono
//≈ CompletableFuture<List<T>>
reactor.core.publisher.Flux<T> flux
RxJava 2+ | Reactor | Java 8+ |
---|---|---|
Maybe<T> |
Mono<T> |
|
Single<T> |
Mono<T> |
CompletableFuture<T> |
Completable |
Mono<Void> |
|
Flowable<T> |
Flux<T> |
|
Observable<T> |
Flux<T> w/o backpressure |
CompletableFuture<List<T>> * |
Flux
vs Java's Stream
Flux
multiple timesparallelStream()
is a jokezip()
, window()
, etc.)lazy
/ lateinit
Lazy<T>
Flux
Flux
use casesMono
vs. CompletableFuture
Mono
is lazyMono
can complete without an answerMono
vavr.Future
)vavr.Option
)vavr.Lazy
)vavr.Try
)Mono
to Flux
Mono<T> mono;
Flux<T> = mono.flux();
Flux
to Mono
Flux<T> flux;
Mono<List<T>> mono = flux.collectList();
Mono<T> first = flux.next();
Mono<T> last = flux.last();
Mono<T> last = flux.single();
Mono<T> last = flux.singleOrEmpty();
Mono<T> last = flux.elementAt(n);
Mono<Long> count = flux.count();
Mono<T> reduced = flux.reduce();
publish()
/ connect()
/ autoConnect()
.
block()
Mono<String> mono;
//DON'T!
String s = mono.block();
Mono
and Flux
can complete without result:
Mono<User> findById(int id);
just()
Mono.just(restTemplate.getForEntity("/url"));
nextOperation1();
nextOperation2();
just()
in zip()
Better:
defer()
Better:
collectList()
Flux<String> flux;
Mono<List<String>> list = flux.collectList();
collectList()
OutOfMemoryError
buffer()
cache()
distinct()
(but not distinctUntilChanged()
)collectList()
collectMap()
flatMapSequential()
sort()
map()
OUTPUT fun(INPUT in)
Flux<INPUT> flux;
Flux<OUTPUT> result = flux.map(fun);
save
)
users //x1000
.map(user -> jpaRepository.save(user))
users //x1000
.subscribe(user -> jpaRepository.save(user))
flatMap()
Optional<Integer> tryParse(String s);
Optional<String> s = Optional.of("42");
Optional<Optional<Integer>> x = s.map(this::tryParse);
Optional<Integer> y = s.flatMap(this::tryParse);
flatMap()
Stream<Integer> tryParse(String s);
Stream<String> s = Stream.of("42");
Stream<Stream<Integer>> x = s.map(this::tryParse);
Stream<Integer> y = s.flatMap(this::tryParse);
flatMap()
CompletableFuture<Integer> tryParse(String s);
CompletableFuture<String> s =
CompletableFuture.completed("42");
CompletableFuture<CompletableFuture<Integer>> x =
s.thenApply(this::tryParse);
CompletableFuture<Integer> y =
s.thenCompose(this::tryParse);
flatMap()
Mono<Integer> tryParse(String s);
Mono<String> s = Mono.just("42");
Mono<Mono<Integer>> x =
s.map(this::tryParse);
Mono<Integer> y =
s.flatMap(this::tryParse);
flatMap
= map
+ merge
Flux<Integer> userIds; //Integer x1000
Mono<User> loadUser(int id); //Mono<User> x1000
Flux<User> x = userIds.flatMap(this::loadUser);
flatMap
pseudocode
flatMap(f) {
return merge(map(f));
}
zip()
Mono<Ads> loadAds();
Mono<Article> fetchArticle();
Mono<Header> composeHeader();
Mono<Footer> composeFooter();
Mono<Tags> guessTags();
Mono<Next> recommendNext();
zip()
Page makePage(Ads ads, Article article, Header header,
Footer footer, Tags tags, Next next);
Mono<Page> x = Mono.zip(
loadAds(),
fetchArticle(),
composeHeader(),
composeFooter(),
guessTags(),
recommendNext(),
(ads, art, head, foot, tags, next) ->
makePage(ads, art, head, foot, tags, next)
)
zip()
Mono
!
Mono<Page> x = Mono.zip(
loadAds(),
fetchArticle(),
composeHeader(),
composeFooter(),
guessTags()
.switchIfEmpty(Mono.just(new Tags()))
.timeout(ofMillis(500)),
recommendNext(),
this::makePage)
zip()
Mono<Statement> fooAsync(Order order, Customer customer);
Mono<Mono<Statement>> x = Mono.zip(
order, customer,
(Order o, Customer c) -> fooAsync(o, c));
Mono<Statement> x = Mono
.zip(order, customer, (Order o, Customer c) -> fooAsync(o, c))
.flatMap(x -> x);
zip()
?
timeout()
Mono<Localization> loc =
Mono.fromCallable(() -> slowGpsTracking())
.timeout(ofSeconds(1),
Mono.fromCallable(() -> fastBtsTracking())
);
onError*()
users
.flatMap(user ->
lastOrderOf(user)
.onErrorReturn(Order.FAILED)
)
.onErrorReturn(Order.FAILED)
.subscribe(order1, order2, FAILED, order4, order5, ...)
filterWhen()
Flux<User> users;
Mono<Boolean> isVip(User user);
users.filterWhen(this::isVip);
users.filter(this::isVip); //does not compile
doOnSubscribe()
Mono<Email> sendEmail(String address) {
log.info("Sending email to {}. O RLY?", address); //this log lies
return Mono.fromCallable(() -> malingApi.send("...", address))
.doOnSubscribe(s -> log.info("Sending email to {}", address));
}
doOnNext()
Equivalent to Stream.peek()
in Java.
Good for:
Flux<Article> loadAndSaveArticles();
Flux
.interval(ofMinutes(5))
.subscribe(num -> loadAndSaveArticles());
Flux<Article> loadAndSaveArticles();
Flux
.interval(ofMinutes(5))
.doOnNext(num -> loadAndSaveArticles())
.subscribe();
Flux<Article> loadAndSaveArticles();
Flux
.interval(ofMinutes(5))
.doOnNext(num -> loadAndSaveArticles().blockLast())
.subscribe();
Flux<Article> loadAndSaveArticles();
Flux
.interval(ofMinutes(5))
.doOnNext(num -> loadAndSaveArticles().subscribe())
.subscribe();
Flux<Article> loadAndSaveArticles();
Flux
.interval(ofMinutes(5))
.flatMap(n -> loadAndSaveArticles())
.subscribe();
window()
Flux<Email> emailsToSend;
Mono<Result> send(Email email);
Flux<Result> x = emailsToSend.flatMap(this::send);
Flux<Result> sendMany(List<Email> emails);
Flux<Result> results = emailsToSend.collectList()
.flatMap(list -> sendMany(list));
buffer()
)
Flux<Result> sendMany(List<Email> emails);
Flux<List<Email>> batches = emailsToSend.buffer(100);
batches.flatMap(this::sendMany);
window()
)
Flux<Result> sendMany(List<Email> emails);
Flux<Flux<Email>> batches2 = emailsToSend.window(100);
batches2.flatMap(
(Flux<Email> batch) -> batch.collectList()
.flatMap(batchList -> sendMany(batchList)));
window()
vs. buffer
flux
.window(Duration.ofSeconds(1))
.flatMap(Flux::count);
vs.
flux
.buffer(Duration.ofSeconds(1))
.map(List::size);
window()
buffer()
interval()
Flux
.interval(ofHours(3))
.subscribe(x -> everyThreeHours());
Flux
.interval(ofHours(3))
.flatMap(x -> everyThreeHours())
.subscribe();
@Scheduled
in SpringsubscribeOn()
Mono
.fromCallable(() -> reliable.findBlocking(41))
.subscribeOn(Schedulers.newBoundedElastic(10, 100, "A"))
.doOnNext(x -> log.info("Received {}", x))
.publishOn(Schedulers.newBoundedElastic(10, 100, "B"))
.map(x -> ...)
.publishOn(Schedulers.newBoundedElastic(10, 100, "C"))
.filter(x -> ...)
.publishOn(Schedulers.newBoundedElastic(10, 100, "D"))
.doOnNext(x -> log.info("Still here {}", x))
.publishOn(Schedulers.newBoundedElastic(10, 100, "E"))
.subscribe(x -> log.info("Finally received {}", x));
domains
.flatMap(domain ->
download(domain) //Mono
.subscribeOn(newBoundedElastic(50, 1000, "Crawler"))
);
publishOn()
map
is long-running):
result
.map(result -> result.getId())
.map(id -> cpuIntensive(id))
publishOn()
map
is single-threaded anyway):
result
.map(result -> result.getId())
.publishOn(cpuScheduler)
.map(id -> cpuIntensive(id))
publishOn()
result
.map(result -> result.getId())
.flatMap(id -> Mono.fromCallable(() ->
cpuIntensiveAsync(id))
.subscribeOn(cpuScheduler)
)
publishOn()
Mono.fromCallable()
and publishOn()
flatMap()
-- G - Y - F ---------------------------------------> (t)
| | |
+--------------------------------------|
+---------|
+--------------------|
---------------- Y.html ------- F.html -- G.html ---> (t)
flatMapSequential()
-- G - Y - F ---------------------------------------> (t)
| | |
+--------------------------------------|
+---------|........................
+--------------------|.........
----------------------------------------- G.html Y.html F.html ---> (t)
flatMapSequential()
Mono<LargeImage> download(URL url);
Flux<URL> thousandUrls;
Flux<LargeImage> x = thousandUrls.flatMapSequential(this::download);
flatMapSequential()
What if the first element fails?
See: flatMapSequentialDelayError()
concatMap
Basically equivalent to
parallel()
+ -- A D ...
/
---- A B C D E F ... -+ ---- B E ...
\
+ -- C F ...
parallel()
flux
.flatMap(x ->
Mono.fromCallable(() -> blockingOperation(x))
.subscribeOn(scheduler))
parallel()
flux
.parallel(3)
.map(x -> blockingOperation(x))
expand()
Flux<File> listFiles(File parent);
Flux<File> start = Flux.just(new File("/"));
Flux<File> files = start.flatMap(this::listFiles);
Flux<File> files = start.expand(this::listFiles);
curl 'https://api.twitter.com/live_pipeline/events?topic=%2Ftweet_engagement%2F012345678901234567890' \
-H 'authority: api.twitter.com' \
-H 'accept: text/event-stream'
data: {"topic":"/system/config","payload":{"config":{"session_id":"...","subscription_ttl_millis":120000,"heartbeat_millis":25000}}}
data: {"topic":"/system/subscriptions","payload":{"subscriptions":{"errors":[]}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"20"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"like_count":"30"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"21"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"22"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"quote_count":"1"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"like_count":"30"}}}
data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"23"}}}
Implementations:
Sources you can't slow down:
Flux.interval()
reactor.core.Exceptions$OverflowException:
Could not emit tick 256 due to lack of requests
(interval doesn't support small downstream requests that replenish slower than the ticks)
RestTemplate.getForEntity()
, Socket.read()
.
See also Playtika/feign-reactive
CompletableFuture
, AsyncRestTemplate
WebClient
Threads | Avg. resp. | rps |
---|---|---|
1 | 0.2s | 5 |
100 | 0.5s | 200 |
100 | 0.1s | 1000 |
200 | 0.2s | 1000 |
100 | 2.0s | 50 |
2000 | 2.0s | 1000 |
reactor-netty
➡ WebFluxWebClient
WebClient
RestTemplate
AsyncRestTemplate
RestTemplate
with a thread pool
Person p = db.query();
User u = rest.get();
Person p = Mono.fromCallable(() -> db.query())
.subscribeOn(myScheduler)
.block(ofMillis(500));
User u = Mono.fromCallable(() -> rest.get())
.subscribeOn(myScheduler)
.block(ofMillis(500));
Person p = db.queryAsync()
.block(ofMillis(500));
User u = rest.getAsync()
.block(ofMillis(500));
Tuple2<Person, User> t = Mono.zip(
db.queryAsync(),
rest.getAsync()
).block(ofMillis(500));
return Mono.zip(
db.queryAsync(),
rest.getAsync()
);
//finite
Flux<Document> findAll();
//infinite
@Tailable
Flux<Document> streamAll();
import reactor.tools.agent.ReactorDebugAgent;
@BeforeClass
public static void setup() {
ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
}
Flux
.interval(Duration.ofMillis(10), Schedulers.parallel())
.map(x -> {
TimeUnit.SECONDS.sleep(1);
return x;
})
When returning a reactive type, Micronaut subscribes to the returned reactive type on the same thread as the request (a Netty Event Loop thread).
[...] if you perform any blocking operations, you offload those operations to an appropriately configured thread pool, for example using the Project Reactor [...]
subscribeOn(..)
facility or @ExecuteOn.
ab
wrk
and wrk2