Reactor workshop

Exercises: github.com/nurkiewicz/reactor-workshop

Ctrl + Shift + F to search

Use arrow keys to navigate slides

edit slides on GitHub

  • 9:00 - 9:50
  • 10:00 - 10:50
  • 11:00 - 11:50
  • 12:00 - 13:00
  • 13:00 - 13:45 (lunch)
  • 13:45 - 14:30
  • 14:40 - 15:25
  • 15:35 - 16:15
  • 16:25 - 17:00

Quick links

whoami

Tomasz Nurkiewicz

Java Champion

Disclaimer

These slides are rough notes I made during several 3-day Reactor and WebFlux workshops. They are not a complete tutorial by far. Many code snippets were written by hand.

I had a problem so I thought to use #Reactor. Now I have a Flux<Problem>

twitter.com/tnurkiewicz/status/1099021813152124928

Blocking code

						
							//REST, DB, file system: 100ms, 200ms, 2s
							String s = blockingOp();
						
					

Classic Future

						
							String s1 = future1.get();  //poor practice
							String s2 = future1.get(1, SECONDS);
							boolean done = future1.isDone();							
						
					

Wait for first 🤦‍♂️

						
								Future<String>     future1 = asyncOp1();
								Future<BigDecimal> future2 = asyncOp2();
								
								while(true) {
									if(future1.isDone()) {
										log.info(future1.get());
										break;
									} else {
										if(future2.isDone()) {
											log.info(future2.get());
											break;
										}
									}
									TimeUnit.MILLISECONDS.sleep(50);
								}
						
					

CompletableFuture

						
								CompletableFuture<String>     future1 = asyncOp1();
								CompletableFuture<BigDecimal> future2 = asyncOp2();
								log.info("Done");
						
					

How to create CompletableFuture

					
						CompletableFuture<BlockingResult> f = 
								CompletableFuture.supplyAsync(() -> blockingOp())					
					
				
Watch out! ForkJoinPool.commonPool()

Custom ExecutorService

					
						Future<BlockingResult> x = executorService.submit(() -> blockingOp());

						CompletableFuture<BlockingResult> f = 
							CompletableFuture.supplyAsync(() -> blockingOp(), executorService);						
					
				
						
							CompletableFuture<String>     future1 = asyncOp1();
							CompletableFuture<BigDecimal> future2 = asyncOp2();
							
							CompletableFuture<Integer> future3 = 
								future1.thenApply((String s) -> s.length());
							
							CompletableFuture<Double> future4 = 
								future3.thenApply((int x) -> x * 2.0d);
						
					

Wait for the first one

						
							CompletableFuture<Object> f =
								CompletableFuture.anyOf(future1, future2);
						
					

Mono and Flux

						
							//≈ CompletableFuture<Optional<T>>
							reactor.core.publisher.Mono<T> mono  

							//≈ CompletableFuture<List<T>>
							reactor.core.publisher.Flux<T> flux  
						
					
RxJava 2+ Reactor Java 8+
Maybe<T> Mono<T>  
Single<T> Mono<T> CompletableFuture<T>
Completable Mono<Void>  
Flowable<T> Flux<T>  
Observable<T> Flux<T> w/o backpressure CompletableFuture<List<T>>*

Flux vs Java's Stream

  • You can subscribe to Flux multiple times
  • parallelStream() is a joke
  • Much richer API in Reactor (zip(), window(), etc.)

Laziness

  • Default in Haskell
  • In Kotlin: lazy / lateinit
  • In Vavr: Lazy<T>

Flux

  • Asynchronous event stream
  • May be infinite
  • May terminate with success or error
  • May be empty

Flux use cases

  • Loading many records
  • Streaming changes
  • Event/command queue
  • Infinite updates

Mono vs. CompletableFuture

  • Mono is lazy
  • Mono can complete without an answer

4 aspects of Mono

Mono to Flux

					
						Mono<T> mono;
						Flux<T> = mono.flux();
					
				

Flux to Mono

					
						Flux<T> flux;
						Mono<List<T>> mono = flux.collectList();
						Mono<T>      first = flux.next();
						Mono<T>       last = flux.last();
						Mono<T>       last = flux.single();
						Mono<T>       last = flux.singleOrEmpty();
						Mono<T>       last = flux.elementAt(n);
						Mono<Long>   count = flux.count();
						Mono<T>    reduced = flux.reduce();
					
				

History and ports

Hot vs. cold

publish() / connect() / autoConnect().

block()

					
						Mono<String> mono;

						//DON'T!
						String s = mono.block();
					
				

Cardinality

Mono and Flux can complete without result:
						
							Mono<User> findById(int id);
						
					

just()

Antipattern:
						
							Mono.just(restTemplate.getForEntity("/url"));
							nextOperation1();
							nextOperation2();
						
					

just() in zip()

						
							
						
					
Better:
						
							
						
					

defer()

Make eager things lazy:
					
						
					
				
Better:
					
						
					
				

collectList()

							
								Flux<String> flux;
								Mono<List<String>> list = flux.collectList();
							
						

collectList()

Disadvantages

  • Must wait for the last element (completion)
  • May lead to OutOfMemoryError
  • One exception throws out all elements
  • Doesn't work with infinite streams

Operators that buffer extensively

  • buffer()
  • cache()
  • distinct() (but not distinctUntilChanged())
  • collectList()
  • collectMap()
  • flatMapSequential()
  • sort()

map()

						
							OUTPUT fun(INPUT in)

							Flux<INPUT> flux;
							Flux<OUTPUT> result = flux.map(fun);
						
					

Operators are single-threaded

Antipattern 1:

(blocking save)
					
						users  //x1000
							.map(user -> jpaRepository.save(user))
					
				

Antipattern 2:

					
						users  //x1000
							.subscribe(user -> jpaRepository.save(user))
					
				

flatMap()

							
								Optional<Integer> tryParse(String s);
								
								Optional<String> s = Optional.of("42");
	
								Optional<Optional<Integer>> x = s.map(this::tryParse);
								
								Optional<Integer> y = s.flatMap(this::tryParse);
							
						

flatMap()

							
								Stream<Integer> tryParse(String s);
								
								Stream<String> s = Stream.of("42");

								Stream<Stream<Integer>> x = s.map(this::tryParse);

								Stream<Integer> y = s.flatMap(this::tryParse);
							
						

flatMap()

							
								CompletableFuture<Integer> tryParse(String s);

								CompletableFuture<String> s = 
									CompletableFuture.completed("42");

								CompletableFuture<CompletableFuture<Integer>> x = 
									s.thenApply(this::tryParse);

								CompletableFuture<Integer> y = 
									s.thenCompose(this::tryParse);
							
						

flatMap()

							
								Mono<Integer> tryParse(String s);
								
								Mono<String> s = Mono.just("42");
								
								Mono<Mono<Integer>> x = 
									s.map(this::tryParse);
								
								Mono<Integer> y = 
									s.flatMap(this::tryParse);
							
						

flatMap = map + merge

							
								Flux<Integer> userIds;  //Integer x1000

								Mono<User> loadUser(int id); //Mono<User> x1000
								
								Flux<User> x = userIds.flatMap(this::loadUser);
							
						

flatMap pseudocode

							
								flatMap(f) {
									return merge(map(f));
								}
							
						

zip()

							
								Mono<Ads> loadAds();
								Mono<Article> fetchArticle();
								Mono<Header> composeHeader();
								Mono<Footer> composeFooter();
								Mono<Tags> guessTags();
								Mono<Next> recommendNext();
							
						

zip()

							
								Page makePage(Ads ads, Article article, Header header, 
												Footer footer, Tags tags, Next next);

								Mono<Page> x = Mono.zip(
									loadAds(),
									fetchArticle(),
									composeHeader(),
									composeFooter(),
									guessTags(),
									recommendNext(),
										(ads, art, head, foot, tags, next) -> 
											makePage(ads, art, head, foot, tags, next)
								)

							
						

zip()

Watch out for empty Mono!
							
								Mono<Page> x = Mono.zip(
									loadAds(),
									fetchArticle(),
									composeHeader(),
									composeFooter(),
									guessTags()
										.switchIfEmpty(Mono.just(new Tags()))
										.timeout(ofMillis(500)),
									recommendNext(),
									this::makePage)

							
						

Asynchronous zip()

						
							Mono<Statement> fooAsync(Order order, Customer customer);

							Mono<Mono<Statement>> x = Mono.zip(
									order, customer, 
									(Order o, Customer c) -> fooAsync(o, c));
							
							Mono<Statement> x = Mono
									.zip(order, customer, (Order o, Customer c) -> fooAsync(o, c))
									.flatMap(x -> x);
							
						
					

Which thread runs after zip()?

						
							
						
					

timeout()


					Mono<Localization> loc =
						Mono.fromCallable(() -> slowGpsTracking())
								.timeout(ofSeconds(1),
										Mono.fromCallable(() -> fastBtsTracking())
								);
				

onError*()

					
						users
							.flatMap(user -> 
								lastOrderOf(user)
									.onErrorReturn(Order.FAILED)
							)
							.onErrorReturn(Order.FAILED)
							.subscribe(order1, order2, FAILED, order4, order5, ...)
					
				

filterWhen()

						
							Flux<User> users;

							Mono<Boolean> isVip(User user);
							
							users.filterWhen(this::isVip);
							users.filter(this::isVip);  //does not compile
						
					

doOnSubscribe()

This is misleading:
						
							Mono<Email> sendEmail(String address) {
								log.info("Sending email to {}. O RLY?", address);  //this log lies
								return Mono.fromCallable(() -> malingApi.send("...", address))
										.doOnSubscribe(s -> log.info("Sending email to {}", address));
							}
						
					

doOnNext()

Equivalent to Stream.peek() in Java.

Good for:

Side effects 1/5

❌ Wrong:
						
							Flux<Article> loadAndSaveArticles();
						
					
						
							Flux
								.interval(ofMinutes(5))
								.subscribe(num -> loadAndSaveArticles());
						
					

Side effects 2/5

❌ Wrong:
						
							Flux<Article> loadAndSaveArticles();
						
					
						
							Flux
								.interval(ofMinutes(5))
								.doOnNext(num -> loadAndSaveArticles())
								.subscribe();
						
					

Side effects 3/5

❌ Wrong:
						
							Flux<Article> loadAndSaveArticles();
						
					
						
							Flux
								.interval(ofMinutes(5))
								.doOnNext(num -> loadAndSaveArticles().blockLast())
								.subscribe();
						
					

Side effects 4/5

❌ Wrong:
						
							Flux<Article> loadAndSaveArticles();
						
					
						
							Flux
								.interval(ofMinutes(5))
								.doOnNext(num -> loadAndSaveArticles().subscribe())
								.subscribe();
						
					

Side effects 5/5

✅ Correct:
						
							Flux<Article> loadAndSaveArticles();
						
					
						
							Flux
								.interval(ofMinutes(5))
								.flatMap(n -> loadAndSaveArticles())
								.subscribe();
							
						
					

window()

						
							Flux<Email> emailsToSend;

							Mono<Result> send(Email email);
							
							Flux<Result> x = emailsToSend.flatMap(this::send);
						
					

Run all

						
							Flux<Result> sendMany(List<Email> emails);

							Flux<Result> results = emailsToSend.collectList()
								.flatMap(list -> sendMany(list));
						
					

Run in batches (buffer())

						
							Flux<Result> sendMany(List<Email> emails);

							Flux<List<Email>> batches = emailsToSend.buffer(100);
							batches.flatMap(this::sendMany);
						
					

Run in batches (window())

						
							Flux<Result> sendMany(List<Email> emails);

							Flux<Flux<Email>> batches2 = emailsToSend.window(100);
							batches2.flatMap(
									(Flux<Email> batch) -> batch.collectList()
										.flatMap(batchList -> sendMany(batchList)));
						
					

window() vs. buffer

						
							flux
								.window(Duration.ofSeconds(1))
								.flatMap(Flux::count);
						
					
vs.
						
							flux
								.buffer(Duration.ofSeconds(1))
								.map(List::size);
						
					

window()

window()

buffer()

buffer()

interval()

						
							Flux
								.interval(ofHours(3))
								.subscribe(x -> everyThreeHours());
						
					

Better

						
							Flux
								.interval(ofHours(3))
								.flatMap(x -> everyThreeHours())
								.subscribe();
						
					

Alternatives

  • Quartz scheduler
  • @Scheduled in Spring

Schedulers

newBoundedElastic()
elastic, limited pool
newParallel()
CPU workloads
boundedElastic()
singleton, don't use in production
paralell()
used by Reactor itself, don't use

subscribeOn()

					
						Mono
							.fromCallable(() -> reliable.findBlocking(41))
							.subscribeOn(Schedulers.newBoundedElastic(10, 100, "A"))
							.doOnNext(x -> log.info("Received {}", x))
							.publishOn(Schedulers.newBoundedElastic(10, 100, "B"))
							.map(x -> ...)
							.publishOn(Schedulers.newBoundedElastic(10, 100, "C"))
							.filter(x -> ...)
							.publishOn(Schedulers.newBoundedElastic(10, 100, "D"))
							.doOnNext(x -> log.info("Still here {}", x))
							.publishOn(Schedulers.newBoundedElastic(10, 100, "E"))
							.subscribe(x -> log.info("Finally received {}", x));
					
				

What's wrong here?

					
						domains
								.flatMap(domain ->
										download(domain) //Mono
												.subscribeOn(newBoundedElastic(50, 1000, "Crawler"))
								);
					
				

publishOn()

❌ Wrong (map is long-running):
						
							result
								.map(result -> result.getId())
								.map(id -> cpuIntensive(id))
						
					

publishOn()

❌ Wrong (map is single-threaded anyway):
						
							result
								.map(result -> result.getId())
								.publishOn(cpuScheduler)
								.map(id -> cpuIntensive(id))
						
					

publishOn()

✅ Correct:
						
							result
								.map(result -> result.getId())
								.flatMap(id -> Mono.fromCallable(() -> 
										cpuIntensiveAsync(id))
												.subscribeOn(cpuScheduler)
								)
						
					

Weird behaviour of publishOn()

						
							
						
					
Mono.fromCallable() and publishOn()

flatMap()

					
						-- G - Y - F ---------------------------------------> (t)
						   |   |   |
						   +--------------------------------------|
						       +---------|
						           +--------------------|

						---------------- Y.html ------- F.html -- G.html ---> (t)
					
				

flatMapSequential()

						
							-- G - Y - F ---------------------------------------> (t)
							   |   |   |
							   +--------------------------------------|
							       +---------|........................
							           +--------------------|.........
						
							----------------------------------------- G.html Y.html F.html ---> (t)
						
					

flatMapSequential()

							
								Mono<LargeImage> download(URL url);

								Flux<URL> thousandUrls;
								
								Flux<LargeImage> x = thousandUrls.flatMapSequential(this::download);
							
						

flatMapSequential()

What if the first element fails?

See: flatMapSequentialDelayError()

concatMap

Basically equivalent to

					
						
					
				

parallel()

						
													+ -- A D ...
												/
							---- A B C D E F ... -+ ---- B E ...
												\
													+ -- C F ...					
						
					

parallel()

From:
						
							flux
								.flatMap(x -> 
										Mono.fromCallable(() -> blockingOperation(x))
											.subscribeOn(scheduler))
						
						
					

parallel()

To (incomplete):
						
							flux
								.parallel(3)
								.map(x -> blockingOperation(x))
						
					

expand()

					
						Flux<File> listFiles(File parent);

						Flux<File> start = Flux.just(new File("/"));
						
						Flux<File> files = start.flatMap(this::listFiles);
						
						Flux<File> files = start.expand(this::listFiles);
					
				

SSE

					
						curl 'https://api.twitter.com/live_pipeline/events?topic=%2Ftweet_engagement%2F012345678901234567890' \
						-H 'authority: api.twitter.com' \
						-H 'accept: text/event-stream'
								data: {"topic":"/system/config","payload":{"config":{"session_id":"...","subscription_ttl_millis":120000,"heartbeat_millis":25000}}}
								data: {"topic":"/system/subscriptions","payload":{"subscriptions":{"errors":[]}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"20"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"like_count":"30"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"21"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"22"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"quote_count":"1"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"like_count":"30"}}}
								data: {"topic":"/tweet_engagement/012345678901234567890","payload":{"tweet_engagement":{"retweet_count":"23"}}}
						
					

Reactive streams

Implementations:

  • RxJava 2+
  • Reactor
  • Akka Streams
  • Vert.x
  • Smallrye Mutiny

Backpressure

Sources you can't slow down:

  • Flux.interval()
  • Mouse moves (!)
  • Network packets (?)

Ignoring backpressure

						
							
						
					

Improved

						
							
						
					

Missing backpressure

						
							reactor.core.Exceptions$OverflowException: 
							Could not emit tick 256 due to lack of requests 
							(interval doesn't support small downstream requests that replenish slower than the ticks)
						
					

WebFlux

Terminology

blocking
waits for a function to return. E.g. RestTemplate.getForEntity(), Socket.read(). See also Playtika/feign-reactive
asynchronous
happens in background, probably in another thread. E.g. CompletableFuture, AsyncRestTemplate
non-blocking
Event-driven, does not block any thread. E.g. Netty, WebClient
reactive
asynchronous, rather non-blocking, streaming, backpressure-aware

Terminology

concurrent
threads preemptied, sharing the same core, not really running at the same time
parallel
thread running at the same time, on different cores/CPUs

Little's Law

Thread per request
Threads Avg. resp. rps
1 0.2s 5
100 0.5s 200
100 0.1s 1000
200 0.2s 1000
100 2.0s 50
2000 2.0s 1000

Netty

Netty ➡ reactor-netty ➡ WebFlux

WebClient

WebClient
Non-blocking, uses Netty underneath
RestTemplate
Blocking, in maintenance mode
AsyncRestTemplate
Deprecated, wraps RestTemplate with a thread pool

Refactoring toward Reactor

Step 0:

						
							Person p = db.query();
							User u = rest.get();
						
					

Step 1:

						
							Person p = Mono.fromCallable(() -> db.query())
										.subscribeOn(myScheduler)
										.block(ofMillis(500));
							User u = Mono.fromCallable(() -> rest.get())
										.subscribeOn(myScheduler)
										.block(ofMillis(500));
						
					

Step 2:

						
							Person p = db.queryAsync()
										.block(ofMillis(500));
							User u = rest.getAsync()
										.block(ofMillis(500));
						
					

Step 3:

						
							Tuple2<Person, User> t = Mono.zip(
								db.queryAsync(),
								rest.getAsync()
							).block(ofMillis(500));
						
					

Step 4:

						
							return Mono.zip(
								db.queryAsync(),
								rest.getAsync()
							);
						
					

Reactive databases

Transactions

Reactive MongoDB

					
						//finite
						Flux<Document> findAll();
						
						//infinite
						@Tailable
						Flux<Document> streamAll();
						
					
				

ReactorDebugAgent

					
						import reactor.tools.agent.ReactorDebugAgent;

						@BeforeClass
						public static void setup() {
							ReactorDebugAgent.init();
							ReactorDebugAgent.processExistingClasses();
						}
		
					
				

BlockHound

					
						Flux
								.interval(Duration.ofMillis(10), Schedulers.parallel())
								.map(x -> {
									TimeUnit.SECONDS.sleep(1);
									return x;
								})
					
				

When to use WebFlux?

  • proxy/gateway/edge/API mashup type of services
  • Backend for frontend
  • crawler, fetching data from tons of sources
  • ETL, batch processing
  • A lot of open connections (WebSocket, SSE)
  • A lot of traffic

Micronaut

When returning a reactive type, Micronaut subscribes to the returned reactive type on the same thread as the request (a Netty Event Loop thread). [...] if you perform any blocking operations, you offload those operations to an appropriately configured thread pool, for example using the Project Reactor [...] subscribeOn(..) facility or @ExecuteOn.

Performance benchmarking

  • JMeter
  • Gatling
  • ab
  • wrk and wrk2

Reference materials

Beyond

Coroutines

Miscellaneous

My articles

Presentations

Other tools