Small scale stream processing kata. Part 1: thread pools
A system delivers around one thousand events per second. Each
Event
has at least two attributes:clientId
- we expect up to few events per second for one clientUUID
- globally unique
- allows processing events in real time
- events related to one client should be processed sequentially and in order, i.e. you can not parallelize events for the same
clientId
- if duplicated
UUID
appeared within 10 seconds, drop it. Assume duplicates will not appear after 10 seconds
There are few important details in these requirements:
- 1000 events/s and 10 ms to consume one event. Clearly we need at least 10 concurrent consumers in order to consume in near real-time.
- Events have natural aggregate ID (
clientId
). During one second we can expect a few events for a given client and we are not allowed to process them concurrently or out of order. - We must somehow ignore duplicated messages, most likely by remembering all unique IDs in last 10 seconds. This gives about 10 thousand
UUID
s to keep temporarily.
Naive sequential processing
Let's tackle this problem in iterations. First we must make some assumption on the API. Imagine it looks like that:interface EventStream {A typical push-based API, similar to JMS. An important note is that
void consume(EventConsumer consumer);
}
@FunctionalInterface
interface EventConsumer {
Event consume(Event event);
}
@Value
class Event {
private final Instant created = Instant.now();
private final int clientId;
private final UUID uuid;
}
EventConsumer
is blocking, meaning it won't deliver new Event
until the previous one was consumed by EventConsumer
. This is just an assumption I made that does not drastically change the requirements. This is also how message listeners work in JMS. The naive implementation simply attaches a listener that takes around 10 milliseconds to complete:class ClientProjection implements EventConsumer {Of course in real life this consumer would store something in a database, make remote call, etc. I add a bit of randomness to sleep time distribution to make manual testing more realistic:
@Override
public Event consume(Event event) {
Sleeper.randSleep(10, 1);
return event;
}
}
class Sleeper {It compiles and runs but in order to figure out that the requirements aren't met we must plug in few metrics. The most important metric is the latency of message consumption, measured as a time between message creation and start of processing. We'll use Dropwizard Metrics for that:
private static final Random RANDOM = new Random();
static void randSleep(double mean, double stdDev) {
final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
try {
TimeUnit.MICROSECONDS.sleep((long) micros);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
//...
EventStream es = new EventStream(); //some real implementation here
es.consume(new ClientProjection());
class ClientProjection implements EventConsumer {The
private final ProjectionMetrics metrics;
ClientProjection(ProjectionMetrics metrics) {
this.metrics = metrics;
}
@Override
public Event consume(Event event) {
metrics.latency(Duration.between(event.getCreated(), Instant.now()));
Sleeper.randSleep(10, 1);
return event;
}
}
ProjectionMetrics
class was extracted to separate responsibilities:import com.codahale.metrics.Histogram;Now when you run the naive solution you'll quickly discover that median latency as well as 99.9th percentile keep growing infinitely:
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Slf4j
class ProjectionMetrics {
private final Histogram latencyHist;
ProjectionMetrics(MetricRegistry metricRegistry) {
final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
.outputTo(log)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(1, TimeUnit.SECONDS);
latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));
}
void latency(Duration duration) {
latencyHist.update(duration.toMillis());
}
}
type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]After 30 seconds our application processes events on average with 15 second delay. Not entirely real-time. Obviously the lack of concurrency whatsoever is the reason. Our
median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0
type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]
median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0
[...30 seconds later...]
type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0
ClientProjection
event consumer takes around 10 ms to complete so it can handle up to 100 events per second, whereas we need an order of magnitude more. We must scale ClientProjection
somehow. And we haven't even touched other requirements!Naive thread pool
The most obvious solution is to invokeEventConsumer
from multiple threads. The easiest way to do this is by taking advantage of ExecutorService
:import java.util.concurrent.ExecutorService;We use a decorator pattern here. The original
import java.util.concurrent.Executors;
class NaivePool implements EventConsumer, Closeable {
private final EventConsumer downstream;
private final ExecutorService executorService;
NaivePool(int size, EventConsumer downstream) {
this.executorService = Executors.newFixedThreadPool(size);
this.downstream = downstream;
}
@Override
public Event consume(Event event) {
executorService.submit(() -> downstream.consume(event));
return event;
}
@Override
public void close() throws IOException {
executorService.shutdown();
}
}
ClientProjection
, implementing EventConsumer
was correct. However we wrap it with another implementation of EventConsumer
that adds concurrency. This will allows us to compose complex behaviors without changing ClientProjection
itself. Such design promotes:- loose coupling: various
EventConsumer
don't know about each other and can be combined freely - single responsibility: each does one job and delegates to the next component
- open/closed principle: we can change the behavior of the system without modifying existing implementations.
MetricRegistry metricRegistry =Our carefully crafted metrics reveal that the situation is indeed much better:
new MetricRegistry();
ProjectionMetrics metrics =
new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection =
new ClientProjection(metrics);
NaivePool naivePool =
new NaivePool(10, clientProjection);
EventStream es = new EventStream();
es.consume(naivePool);
type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]Yet we still see growing delay on a much smaller scale, after 30 seconds the latency reached 364 milliseconds. It keeps growing so the problem is systematic. We... need... more... metrics. Notice that
median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0
[...30 seconds later...]
type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0
NaivePool
(you'll see soon why it's naive) has exactly 10 threads at its disposal. This should be just about enough to handle thousand events, each taking 10 ms to process. In reality we need a little bit of extra processing power to avoid issues after garbage collection or during small load spikes. To prove that thread pool is actually our bottleneck it's best to monitor its internal queue. This requires a little bit of work:class NaivePool implements EventConsumer, Closeable {The idea here is to create
private final EventConsumer downstream;
private final ExecutorService executorService;
NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
String name = MetricRegistry.name(ProjectionMetrics.class, "queue");
Gauge<Integer> gauge = queue::size;
metricRegistry.register(name, gauge);
this.executorService =
new ThreadPoolExecutor(
size, size, 0L, TimeUnit.MILLISECONDS, queue);
this.downstream = downstream;
}
@Override
public Event consume(Event event) {
executorService.submit(() -> downstream.consume(event));
return event;
}
@Override
public void close() throws IOException {
executorService.shutdown();
}
}
ThreadPoolExecutor
manually in order to provide custom LinkedBlockingQueue
instance. We can later use that queue to monitor its length (see: ExecutorService - 10 tips and tricks). Gauge
will periodically invoke queue::size
and report it to wherever you need it. Metrics confirm that thread pool size was indeed a problem:type=GAUGE, name=[...].queue, value=35The ever-growing size of the queue holding pending tasks hurts the latency. Increasing thread pool size from 10 to 20 finally reports decent results and no stalls. However we still didn't address duplicates and protecting from concurrent modification of events for the same
type=GAUGE, name=[...].queue, value=52
[...30 seconds later...]
type=GAUGE, name=[...].queue, value=601
clientId
.Obscure locking
Let's start from avoiding concurrent processing of events for the sameclientId
. If two events come very quickly one after another, both related to the same clientId
, NaivePool
will pick both of them and start processing them concurrently. First we'll at least discover such situation by having a Lock
for each clientId
:@Slf4jThis is definitely going in the wrong direction. The amount of complexity is overwhelming but running this code at least reveals there is an issue. The event processing pipeline looks as follows, with one decorator wrapping another:
class FailOnConcurrentModification implements EventConsumer {
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
private final EventConsumer downstream;
FailOnConcurrentModification(EventConsumer downstream) {
this.downstream = downstream;
}
@Override
public Event consume(Event event) {
Lock lock = findClientLock(event);
if (lock.tryLock()) {
try {
downstream.consume(event);
} finally {
lock.unlock();
}
} else {
log.error("Client {} already being modified by another thread", event.getClientId());
}
return event;
}
private Lock findClientLock(Event event) {
return clientLocks.computeIfAbsent(
event.getClientId(),
clientId -> new ReentrantLock());
}
}
ClientProjection clientProjection =Once in a while the error message will pop-up, telling us that some other thread is already processing event for the same
new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification =
new FailOnConcurrentModification(clientProjection);
NaivePool naivePool =
new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es = new EventStream();
es.consume(naivePool);
clientId
. For each clientId
we associate a Lock
that we examine in order to figure out if another thread is not processing that client at the moment. As ugly as it gets we are actually quite close to a brutal solution. Rather than failing when Lock
cannot be obtained because another thread is already processing some event, let's wait a little bit, hoping the Lock
will get released:@Slf4jThe idea is very similar. But instead of failing
class WaitOnConcurrentModification implements EventConsumer {
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
private final EventConsumer downstream;
private final Timer lockWait;
WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
this.downstream = downstream;
lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));
}
@Override
public Event consume(Event event) {
try {
final Lock lock = findClientLock(event);
final Timer.Context time = lockWait.time();
try {
final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);
time.stop();
if(locked) {
downstream.consume(event);
}
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
log.warn("Interrupted", e);
}
return event;
}
private Lock findClientLock(Event event) {
return clientLocks.computeIfAbsent(
event.getClientId(),
clientId -> new ReentrantLock());
}
}
tryLock()
waits up to 1 second hoping the Lock
for given client will be released. If two events come in very quick succession, one will obtain a Lock
and proceed whereas the other will block waiting for unlock()
to happen.Not only this code is really convoluted, but probably also broken in many subtle ways. For example what if two events for the same
clientId
came almost exactly at the same time, but obviously one was first? Both events will ask for Lock
at the same time and we have no guarantee which event will obtain a non-fair Lock
first, possibly consuming events out of order. There must be a better way...Dedicated threads
Let's take a step back and a very deep breath. How do you ensure things aren't happening concurrently? Well, just use one thread! As a matter of fact that's what we did in the very beginning but the throughput was unsatisfactory. But we don't care about concurrency for differentclientId
s, we just have to make sure events with the same clientId
are always processed by the same thread!Maybe creating a map from
clientId
to Thread
comes to your mind? Well, this would be overly simplistic. We would create thousands of threads, each idle most of the time as per the requirements (only few events per second for given clientId
). A good compromise is a fixed-size pool of threads, each thread responsible for a well-known subset of clientId
s. This way two different clientId
s may end up on the same thread but the same clientId
will always be handled by the same thread. If two events for the same clientId
appear, they will both be routed to the same thread, thus avoiding concurrent processing. The implementation is embarrassingly simple:class SmartPool implements EventConsumer, Closeable {The crucial part is right at the end:
private final List<ExecutorService> threadPools;
private final EventConsumer downstream;
SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
this.downstream = downstream;
List<ExecutorService> list = IntStream
.range(0, size)
.mapToObj(i -> Executors.newSingleThreadExecutor())
.collect(Collectors.toList());
this.threadPools = new CopyOnWriteArrayList<>(list);
}
@Override
public void close() throws IOException {
threadPools.forEach(ExecutorService::shutdown);
}
@Override
public Event consume(Event event) {
final int threadIdx = event.getClientId() % threadPools.size();
final ExecutorService executor = threadPools.get(threadIdx);
executor.submit(() -> downstream.consume(event));
return event;
}
}
int threadIdx = event.getClientId() % threadPools.size();This simple algorithm will always use the same single-thread
ExecutorService executor = threadPools.get(threadIdx);
ExecutorService
for the same clientId
. Different IDs may end up in the same pool, for example when pool size is 20
, clients 7
, 27
, 47
, etc. will use the same thread. But this is OK, as long as one clientId
always uses the same thread. At this point no locking is necessary and sequential invocation is guaranteed because events for the same client are always executed by the same thread. Side note: one thread per clientId
would not scale, but one actor per clientId
(e.g. in Akka) is a great idea that simplifies a lot.By the way to be extra safe I plugged in metrics for average queue size in each and every thread pool which made the implementation longer:
class SmartPool implements EventConsumer, Closeable {If you are paranoid you can even create one metric per each queue.
private final List<LinkedBlockingQueue<Runnable>> queues;
private final List<ExecutorService> threadPools;
private final EventConsumer downstream;
SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
this.downstream = downstream;
this.queues = IntStream
.range(0, size)
.mapToObj(i -> new LinkedBlockingQueue<Runnable>())
.collect(Collectors.toList());
List<ThreadPoolExecutor> list = queues
.stream()
.map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q))
.collect(Collectors.toList());
this.threadPools = new CopyOnWriteArrayList<>(list);
metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);
}
private double averageQueueLength() {
double totalLength =
queues
.stream()
.mapToDouble(LinkedBlockingQueue::size)
.sum();
return totalLength / queues.size();
}
//...
}
Deduplication and idempotency
In distributed environment it's quite common to receive duplicated events when your producer has at least once guarantees. The reasons behind such behavior are beyond the scope of this article but we must learn how to live with that issue. One way is to attach globally unique identifier (UUID
) to every message and make sure on the consumer side that messages with the same identifier aren't processed twice. Each Event
has such UUID
. The most straightforward solution under our requirements is to simply store all seen UUID
s and verify on arrival that received UUID
was never seen before. Using ConcurrentHashMap<UUID, UUID>
(there is no ConcurrentHashSet
in JDK) as-is will lead to memory leak as we will keep accumulating more and more IDs over time. That's why we only look for duplicates in the last 10 seconds. You can technically have ConcurrentHashMap<UUID, Instant>
that maps from UUID
to timestamp when it was encountered. By using a background thread we can then remove elements older than 10 seconds. But if you are a happy Guava user, Cache<UUID, UUID>
with declarative eviction policy will do the trick:import com.codahale.metrics.Gauge;Once again to be safe on production there are at least two metrics I can think of that might become useful: cache size and number of duplicates discovered. Let's plug-in these metrics as well:
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
class IgnoreDuplicates implements EventConsumer {
private final EventConsumer downstream;
private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();
IgnoreDuplicates(EventConsumer downstream) {
this.downstream = downstream;
}
@Override
public Event consume(Event event) {
final UUID uuid = event.getUuid();
if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
return downstream.consume(event);
} else {
return event;
}
}
}
class IgnoreDuplicates implements EventConsumer {Finally we have all the pieces to build our solution. The idea is to compose pipeline from
private final EventConsumer downstream;
private final Meter duplicates;
private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();
IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {
this.downstream = downstream;
duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));
metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);
}
@Override
public Event consume(Event event) {
final UUID uuid = event.getUuid();
if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
return downstream.consume(event);
} else {
duplicates.mark();
return event;
}
}
}
EventConsumer
instances wrapping each other:- First we apply
IgnoreDuplicates
to reject duplicates - Then we call
SmartPool
that always pins givenclientId
to the same thread and executes next stage in that thread - Finally
ClientProjection
is invoked that does the real business logic.
FailOnConcurrentModification
step between SmartPool
and ClientProjection
for extra safety (concurrent modification shouldn't happen by design):ClientProjection clientProjection =It took us a lot of work to come up with relatively simple and well structured (I hope you agree) solution. In the end the best way to tackle concurrency issues is to... avoid concurrency and run code that is subject to race conditions in one thread. This is also the idea behind Akka actors (single message processed per actor) and RxJava (one message processed by
new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification =
new FailOnConcurrentModification(clientProjection);
SmartPool smartPool =
new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates =
new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es = new EventStream();
es.consume(withoutDuplicates);
Subscriber
). In the next installment we will see declarative solution in RxJava.See also:
- Small scale stream processing kata. Part 1: thread pools
- Small scale stream processing kata. Part 2: RxJava 1.x/2.x