Small scale stream processing kata. Part 2: RxJava 1.x/2.x
A system delivers around one thousand events per second. Each
Event
has at least two attributes:clientId
- we expect up to few events per second for one clientUUID
- globally unique
- allows processing events in real time
- events related to one client should be processed sequentially and in order, i.e. you can not parallelize events for the same
clientId
- if duplicated
UUID
appeared within 10 seconds, drop it. Assume duplicates will not appear after 10 seconds
What we came up so far was a combination of thread pools and shared cache. This time we will implement the solution using RxJava. First of all I never revealed how
EventStream
is implemented, only giving the API:interface EventStream {In fact for manual testing I built a simple RxJava stream that behaves like the system from the requirements:
void consume(EventConsumer consumer);
}
@Slf4jUnderstanding how this simulator works is not essential, but quite interesting. First we generate steady stream of
class EventStream {
void consume(EventConsumer consumer) {
observe()
.subscribe(
consumer::consume,
e -> log.error("Error emitting event", e)
);
}
Observable<Event> observe() {
return Observable
.interval(1, TimeUnit.MILLISECONDS)
.delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
.map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
.flatMap(this::occasionallyDuplicate, 100)
.observeOn(Schedulers.io());
}
private Observable<Event> occasionallyDuplicate(Event x) {
final Observable<Event> event = Observable.just(x);
if (Math.random() >= 0.01) {
return event;
}
final Observable<Event> duplicated =
event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
return event.concatWith(duplicated);
}
}
Long
values (0
, 1
, 2
...) every millisecond (thousand events per second) using interval()
operator. Then we delay each event by random amount of time between 0
and 1_000
microseconds with delay()
operator. This way events will appears in less predictable moments in time, a bit more realistic situation. Finally we map (using, ekhem, map()
operator) each Long
value to a random Event
with clientId
somewhere between 1_000
and 1_100
(inclusive-exclusive).The last bit is interesting. We would like to simulate occasional duplicates. In order to do so we map every event (using
flatMap()
) to itself (in 99% of the cases). However in 1% of the cases we return this event twice, where the second occurrence happens between 10 milliseconds and 5 seconds later. In practice the duplicated instance of the event will appear after hundreds of other events, which makes the stream behave really realistically.There are two ways to interact with the
EventStream
- callback based via consume()
and stream based via observe()
. We can take advantage of Observable<Event>
to quickly build processing pipeline very similar in functionality to part 1 but much simpler.Missing backpressure
The first naive approach to take advantage of RxJava falls short very quickly:EventStream es = new EventStream();(
EventConsumer clientProjection = new ClientProjection(
new ProjectionMetrics(
new MetricRegistry()));
es.observe()
.subscribe(
clientProjection::consume,
e -> log.error("Fatal error", e)
);
ClientProjection
, ProjectionMetrics
et. al. come from part 1). We get MissingBackpressureException
almost instantaneously and that was expected. Remember how our first solution was lagging by handling events with more and more latency? RxJava tries to avoid that, as well as avoiding overflow of queues. MissingBackpressureException
is thrown because consumer (ClientProjection
) is incapable of handling events in real time. This is fail-fast behavior. The quickest solution is to move consumption to a separate thread pool, just like before, but using RxJava's facilities:EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
new ClientProjection(
new ProjectionMetrics(
new MetricRegistry())));
es.observe()
.flatMap(e -> clientProjection.consume(e, Schedulers.io()))
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
EventConsumer
interface has a helper method that can consume events asynchronously on a supplied Scheduler
:@FunctionalInterfaceBy consuming events using
interface EventConsumer {
Event consume(Event event);
default Observable<Event> consume(Event event, Scheduler scheduler) {
return Observable
.fromCallable(() -> this.consume(event))
.subscribeOn(scheduler);
}
}
flatMap()
in a separate Scheduler.io()
each consumption is invoked asynchronously. This time events are processed near real-time, but there is a bigger problem. I decorated ClientProjection
with FailOnConcurrentModification
for a reason. Events are consumed independently from each other so it may happen that two events for the same clientId
are processed concurrently. Not good. Luckily in RxJava solving this problem is much easier than with plain threads:es.observe()A little bit has changed. First of all we group events by
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume))
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
clientId
. This splits single Observable
stream into stream of streams. Each substream named byClient
represents all events related to the same clientId
. Now if we map over this substream we can be sure that events related to the same clientId
are never processed concurrently. The outer stream is lazy so we must subscribe to it. Rather than subscribing to every event separately we collect events every second and count them. This way we receive a single event of type Integer
every second representing the number of events consumed per second.Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state
Now we must drop duplicateUUID
s. The simplest, yet very foolish way of discarding duplicates is by taking advantage of global state. We can simply filter out duplicates by looking them up in cache available outside of filter()
operator:final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()If you want to monitor the usage of this mechanism simply add metric:
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();
es.observe()
.filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
.doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
.subscribe(
clientProjection::consume,
e -> log.error("Fatal error", e)
);
Meter duplicates = metricRegistry.meter("duplicates");Accessing global, especially mutable state from inside of operators is very dangerous and undermines the sole purposes of RxJava - simplifying concurrency. Obviously we use thread-safe
es.observe()
.filter(e -> {
if (seenUuids.getIfPresent(e.getUuid()) != null) {
duplicates.mark();
return false;
} else {
return true;
}
})
Cache
from Guava, but in many cases it's easy to miss places where shared global mutable state is accessed from multiple threads. If you find yourself mutating some variable outside of the operator chain, be very careful.Custom distinct()
operator in RxJava 1.x
RxJava 1.x has a distinct()
operator that presumably does the job:es.observe()Unfortunately
.distinct(Event::getUuid)
.groupBy(Event::getClientId)
distinct()
stores all keys (UUID
s) internally in ever-growing HashSet
. But we only care about duplicates in last 10 seconds! By copy-pasting the implementation of DistinctOperator
I created DistinctEvent
operator that takes advantage of Guava's cache to only store last 10 seconds worth of UUID's. I intentionally hard-coded Event
in this operator rather than making it more generic to keep code easier to understand:class DistinctEvent implements Observable.Operator<Event, Event> {The usage is fairly simple and the whole implementation (plus the custom operator) is as short as:
private final Duration duration;
DistinctEvent(Duration duration) {
this.duration = duration;
}
@Override
public Subscriber<? super Event> call(Subscriber<? super Event> child) {
return new Subscriber<Event>(child) {
final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
.expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
.<UUID, Boolean>build().asMap();
@Override
public void onNext(Event event) {
if (keyMemory.put(event.getUuid(), true) == null) {
child.onNext(event);
} else {
request(1);
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
}
}
es.observe()Actually it can be even shorter if you skip logging every second:
.lift(new DistinctEvent(Duration.ofSeconds(10)))
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
es.observe()This solution is much shorter than previous one based on thread pools and decorators. The only awkward part is custom operator that avoid memory leak when storing too many historic
.lift(new DistinctEvent(Duration.ofSeconds(10)))
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.subscribe(
e -> {},
e -> log.error("Fatal error", e)
);
UUID
s. Luckily RxJava 2 to the rescue!RxJava 2.x and more powerful built-in distinct()
I was actually this close from submitting a PR to RxJava with more powerful implementation of distinct()
operator. But before I checked 2.x
branch and there it was: distinct()
that allows providing custom Collection
as opposed to hard-coded HashSet
. Believe it or not, dependency inversion is not only about Spring framework or Java EE. When a library allows you to provide custom implementation of its internal data structure, this is also DI. First I create a helper method that can build Set<UUID>
backed by Map<UUID, Boolean>
backed by Cache<UUID, Boolean>
. We sure like delegation!private Set<UUID> recentUuids() {Having this method we can implement the whole task using this expression:
return Collections.newSetFromMap(
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.<UUID, Boolean>build()
.asMap()
);
}
es.observe()The elegance, the simplicity, the clarity! It reads almost like a problem:
.distinct(Event::getUuid, this::recentUuids)
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.subscribe(
e -> {},
e -> log.error("Fatal error", e)
);
- observe a stream of events
- take only distinct UUIDs into account
- group events by client
- for each client consume them (sequentially)
See also:
- Small scale stream processing kata. Part 1: thread pools
- Small scale stream processing kata. Part 2: RxJava 1.x/2.x