Reactive emoji tracker with WebClient and Reactor: aggregating data
data:{"1F60D":1}Each message represents the number of various emojis that appeared on Twitter since the previous message. After a few transformations, we got a stream of hexadecimal Unicode values for each emoji. E.g. for
data:{"1F3A8":1,"1F48B":1,"1F499":1,"1F602":1,"2764":1}
data:{"1F607":1,"2764":2}
{"1F607":1,"2764":2}
we produce three events: "1F607"
, "2764"
, "2764"
. This is how we achieved it:import org.springframework.http.codec.ServerSentEvent;Our next goal is to show the top 50 most popular emojis since we started the application. But first let's convert these boring Unicode hexadecimal values into, you know, emojis! This is pretty simple:
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
final Flux<String> stream = WebClient
.create("http://emojitrack-gostreamer.herokuapp.com")
.get().uri("/subscribe/eps")
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.flatMap(e -> Mono.justOrEmpty(e.data()))
.map(x -> (Map<String, Integer>) x)
.flatMapIterable(Map::entrySet)
.flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()));
String hexToEmoji(String hex) {Seems to work:
return new String(Character.toChars(Integer.parseInt(hex, 16)));
}
@UnrollApparently, this is the weirdest test case I ever wrote. Let's plug in
class EmojiTrackerSpec extends Specification {
def 'translate #hex to #emoji'() {
expect:
hexToEmoji(hex) == emoji
where:
hex || emoji
'2611' || '☑'
'263A' || '☺'
'2764' || '❤'
'1F440' || '👀'
'1F49E' || '💞'
'1F605' || '😅'
'1F60A' || '😊'
'1F60D' || '😍'
'1F60E' || '😎'
'1F60F' || '😏'
'1F61E' || '😞'
'1F62D' || '😭'
'1F646' || '🙆'
'1F6B6' || '🚶'
}
}
hexToEmoji()
:final Flux<String> stream = WebClientMy terminal exploded with happy faces, hearts and other emojis:
//...see above for missing lines...
.map(ServerSentEvent::data)
.map(x -> (Map<String, Integer>) x)
.flatMapIterable(Map::entrySet)
.flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()))
.map(Tracker::hexToEmoji);
Received: 🎥Then it exploded for real with:
Received: 💙
Received: 😍
Received: 🚑
Received: 😂
Received: 😒
Received: 🎉
Received: 😉
Received: ❤
Received: ⚽
Received: 👐
Received: 😍
Received: ♻
Received: ♻
Received: 💙
Received: 🔥
Received: 😂
Received: 😅
Received: 😘
Received: 💪
Received: 😉
Received: ♻
Received: 😪
Received: 😃
Received: 🙏
Received: 💔
Received: 😂
Received: 😍
Received: 🎶
Received: 🎹
Received: 👍
Received: 🔥
Received: 😎
NumberFormatException: For input string: "1F1F5-1F1F1"
. Turns out some emojis are bigger than the other. For example, two individual characters: 🇵 and 🇱 when placed next to each other (🇵🇱) may be rendered as a flag. Polish flag in this case. A single emoji formed from two emojis. We need to enhance our parsing logic by parsing each hexadecimal number separated by dash (-
) individually and concatenating characters. To be honest I started with something quite complex:private static String codeToEmoji(String hex) {But it turns out the more straightforward solution is both more readable and most likely faster:
return Arrays
.stream(hex.split("-"))
.map(Tracker::hexToEmoji)
.collect(joining());
}
private static String hexToEmoji(String hex) {
return new String(Character.toChars(Integer.parseInt(hex, 16)));
}
private static String codeToEmoji(String hex) {Maybe not as impressive, but I like it more. Few more test cases and we are free to go:
final String[] codes = hex.split("-");
if (codes.length == 2) {
return hexToEmoji(codes[0]) + hexToEmoji(codes[1]);
} else {
return hexToEmoji(hex);
}
}
'1F1F5-1F1F1' || '🇵🇱'OK, we are finally ready to aggregate individual events. We must somehow aggregate individual emojis into some sort of histogram (occurrence map). Basically, we want a
'1F1FA-1F1E6' || '🇺🇦'
'1F1FA-1F1F8' || '🇺🇸'
Map<String, Long>
of all emojis since the very beginning. The worst way to do this is global, mutable state:final ConcurrentHashMap<String, Long> histogram = new ConcurrentHashMap<>();If you are still not aware of
final Flux<String> stream = WebClient
//...see above for missing lines...
.map(Tracker::codeToEmoji)
.doOnNext(emoji -> histogram.merge(emoji, 1L, Math::addExact));
Map.merge()
method, it does something quite common, that can be expressed like this:if(histogram.contains(emoji)) {After five seconds the
histogram.put(emoji, Math.addExact(histogram.get(emoji), 1L)
} else {
histogram.put(emoji(1L));
}
histogram
may look for example like this:{💸=1, ☀=1, ☁=1, ✅=2, ⬅=1, ✈=1, 💯=3, 🚮=1, ✋=2, ✌=2, 💲=1, 🚨=1, 💨=1, 🏆=1, 🚧=1, 💥=1, ✔=4, ☕=1, 💪=2, 🎼=1, 💡=1, 🏀=1, 📚=1, ✨=7, 📅=1, 📌=2, 🏨=1, ☺=6, ‼=2, 📷=5, 🌚=2, 📹=3, 📰=1, 🌍=1, 🌆=1, 🌊=1, ❗=1, 📞=2, 📝=1, 🇺🇸=2, 😘=5, 🌷=1, 😖=2, 😕=2, ❤=42, 😜=3, ♥=7, 😛=1, ♦=1, 🌹=3, 😚=1, 🌸=2, 😙=1, 🐐=2, 😐=2, 😏=11, 😎=2, 😍=12, 🔔=1, 😔=1, 🐓=1, 🌱=1, 😒=8, 🐒=1, 🔑=1, 😑=4, 🔈=1, 😈=1, 😇=2, 😆=2, 🐆=1, 😅=3, 😌=3, 😋=2, 😊=11, 😉=9, 🔉=1, 🌟=2, 😀=4, 🌞=2, ♻=9, 😄=8, 😃=1, 😂=75, 😁=9, 😸=1, 🐶=1, 😶=1, 😵=1, 😻=3, 😰=1, 😮=2, 😭=18, 🔴=2, 😴=3, 😳=1, 🐳=1, 😲=2, 😱=4, 😨=2, 🍄=1, 🔥=4, 😥=4, 😬=2, 🔫=1, 😫=2, ➕=1, 😩=7, 🌿=1, 😠=1, 😞=3, 🔞=1, 😝=2, 🌼=1, 😤=1, 😣=1, 😢=5, 🍁=1, 😡=4, ⚠=1, ➡=4, ⚡=2, 🍺=1, ©=1, 👏=6, 🙏=2, 👎=1, 👍=10, 👓=1, ®=1, 👑=2, 🙈=4, 👇=3, 🍫=1, 🙌=5, 👌=2, 👋=2, 🙋=1, 🙊=2, ▶=2, 👊=1, 👉=2, 👀=3, ⚽=3, ◀=1, 9⃣=1, 🆒=1, 🎉=2, 💘=3, 🎶=2, 💗=2, 🚗=1, 🚖=2, 🎵=1, 💕=6, 💜=2, 💛=7, 💙=8, 💎=1, 🎬=1, 💔=11, 🎲=1, 💓=2, 🎧=2, 💋=5, 💀=4, 💄=1, 💃=1}
Within 5 seconds 😂 emoji was sent 75 times to Twitter! So why is this solution bad? Modifying global mutable state from within your reactive stream inevitably leads to race conditions and problems with synchronization. A much better solution is to aggregate events within the stream itself. It's a bit mind-bending. Basically, we turn a stream of individual events into a stream of gradually built aggregation. Every event is applied to our histogram and passed further downstream. Look:
final Flux<HashMap<String, Long>> stream = WebClientSee how a single emoji in the input stream (for example "💖") results in a histogram of "
//...see above for missing lines...
.map(Tracker::codeToEmoji)
.scan(new HashMap<String, Long>(), (histogram, emoji) -> {
histogram.merge(emoji, 1L, Math::addExact);
return histogram;
});
{💖=1}
" on the output stream:💖 -> {💖=1}Notice how each individual emoji is either added to the map or increments existing entry. Theoretically, the occurrence map (histogram) can grow quite large. However, the number of different emojis is fixed and not that large (2666 as of this writing). Now we'd like to find the top 50 emojis - 50 map entries with highest occurrence count. This can easily be done with JDK 8
🔝 -> {💖=1, 🔝=1}
😂 -> {💖=1, 🔝=1, 😂=1}
👀 -> {💖=1, 👀=1, 🔝=1, 😂=1}
😍 -> {💖=1, 👀=1, 😍=1, 🔝=1, 😂=1}
😂 -> {💖=1, 👀=1, 😍=1, 🔝=1, 😂=2}
😀 -> {💖=1, 😀=1, 👀=1, 😍=1, 🔝=1, 😂=2}
😂 -> {💖=1, 😀=1, 👀=1, 😍=1, 🔝=1, 😂=3}
Stream
API:import java.util.Comparator;In the end we generate a
import static java.util.Comparator.comparing;
String topEmojis(HashMap<String, Long> histogram, int max) {
return histogram
.entrySet()
.stream()
.sorted(comparing(Map.Entry<String, Long>::getValue).reversed())
.limit(max)
.map(Map.Entry::getKey)
.collect(joining(" "));
}
String
containing top 50 emojis, separated by spaces. We don't want to see the outcome after each and every emoji. Instead, let's sample the results 10 times a second:final Flux<String> stream = WebClientThe full source code looks as follows:
//...see above for missing lines...
.scan(new HashMap<String, Long>(), (histogram, emoji) -> {
histogram.merge(emoji, 1L, Math::addExact);
return histogram;
})
.map(hist -> topEmojis(hist, 50))
.sample(Duration.ofMillis(100));
import org.slf4j.Logger;And the results are adorable:
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.util.stream.Collectors.joining;
import static java.util.Comparator.comparing;
public class Tracker {
private static final Logger log = LoggerFactory.getLogger(Tracker.class);
public static void main(String[] args) throws InterruptedException {
final Flux<String> stream = WebClient
.create("http://emojitrack-gostreamer.herokuapp.com")
.get().uri("/subscribe/eps")
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.flatMap(e -> Mono.justOrEmpty(e.data()))
.map(x -> (Map<String, Integer>) x)
.flatMapIterable(Map::entrySet)
.flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()))
.map(Tracker::codeToEmoji)
.scan(new HashMap<String, Long>(), (histogram, emoji) -> {
histogram.merge(emoji, 1L, Math::addExact);
return histogram;
})
.map(hist -> topEmojis(hist, 50))
.sample(Duration.ofMillis(100));
stream.subscribe(sse -> log.info("Received: {}", sse));
TimeUnit.MINUTES.sleep(10);
}
private static String topEmojis(HashMap<String, Long> histogram, int max) {
return histogram
.entrySet()
.stream()
.sorted(comparing(Map.Entry<String, Long>::getValue).reversed())
.limit(max)
.map(Map.Entry::getKey)
.collect(joining(" "));
}
private static String codeToEmoji(String hex) {
final String[] codes = hex.split("-");
if (codes.length == 2) {
return hexToEmoji(codes[0]) + hexToEmoji(codes[1]);
} else {
return hexToEmoji(hex);
}
}
private static String hexToEmoji(String hex) {
return new String(Character.toChars(Integer.parseInt(hex, 16)));
}
}
You might think this and the previous article aren't very practical. On the surface, yes. But we learned a few techniques that can be really valuable when dealing with real streams of data. Also producing and consuming SSE stream is the easiest way to enable streaming architecture in your ecosystem. Tags: emoji, emojitracker, reactor, spring, webclient