Turning Twitter4J into RxJava's Observable
North of Oslo |
Status
in the API.The question is, how would you design a Java API for streaming purposes? No surprise here: callbacks, callbacks everywhere!
import twitter4j.*;Say that on top of this API we would like to count how many messages we receive per second. A lot of accidental complexity sneaks in:
TwitterStream twitter = new TwitterStreamFactory().getInstance();
twitter.addListener(new StatusAdapter() {
public void onStatus(Status status) {
System.out.println(status.getUser().getName() + " : " + status.getText());
}
});
twitter.sample();
final AtomicInteger countPerSecond = new AtomicInteger();We need a
twitter.addListener(new StatusAdapter() {
public void onStatus(Status status) {
countPerSecond.incrementAndGet();
}
});
twitter.sample();
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final int count = countPerSecond.getAndSet(0);
log.debug("Tweets/second: {}", count);
}
}, 1, 1, SECONDS);
ScheduledExecutorService
and be very careful about thread safety. Moreover this approach doesn't scale as it requires hand-crafted code for every use case we can imagine, like throttling, combining or accumulating. It turns out that bridging Twitter4J streaming API (and virtually any callback-based API for that matter) to RxJava's Observable
is quite straightforward and will greatly simplify further solutions.Before we explore how to create new
Observable
representing stream of Twitter messages on top of Twitter4J API let's assume that we already have one:Observable<Status> twitter = twitterObservable(); //to be implemented
Observable<Status> twitter
is a stream of Status
objects where each such object is one tweet. How do we solve our initial problem of counting tweets per second (tps)?Observable<Integer> tpsStream = twitter.That was easy! We take initial stream of tweets and buffer them every second. When one second elapses only a single event is triggered containing a
buffer(1, TimeUnit.SECONDS).
map(list -> list.size());
List<Status>
produced within that time frame. Later on we transform List
into Integer
by taking its size()
. And that's it! tpsStream
will yield one number per second representing count of tweets per second. If we suddenly realized that our system is overloaded by that number, we can easily sample the stream and pick just a subset of them. E.g. we want to get at most one tweet every 100 milliseconds:twitter.sample(100, MILLISECONDS)There are more than hundred operators available similar to
buffer()
and sample()
but I hope you get the idea. Now that we see how useful an Observable<Status>
is, let's implement it. When defining Observable
we need to supply two handlers: one describing what happens when client subscribes to a given Observable
and optionally - how to handle unsubscribing:public Observable<Status> twitterObservable() {Quite a bit of code written in Java 8 (Scala and Groovy work equally well with RxJava). Callback provided to
return Observable.create(subscriber -> {
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusAdapter() {
public void onStatus(Status status) {
subscriber.onNext(status);
}
public void onException(Exception ex) {
subscriber.onError(ex);
}
});
twitterStream.sample();
return Subscriptions.create(() -> {
twitterStream.cleanUp();
});
});
}
Observable.create()
is executed every time someone subscribes to Observable
. It turns out that all examples below never trigger this handler because RxJava is very lazy in nature, thus it won't connect to Twitter unless absolutely required. For example twitter.filter(...)
will return a new Observable
with only a subset of tweets matching certain criteria. But as long as you don't physically subscribe (using twitter.subscribe()
) to that Observable
, nothing will really happen. In example below the connection is postponed until we call subscribe()
. After that text of each encountered tweet is extracted and if it contains #java
hashtag - it will be printed. All of this happens asynchronously and the whole statement is non-blocking:twitter.The
map(Status::getText).
filter(text -> text.contains("#java")).
subscribe(System.out::println);
Subscriptions.create()
also takes a handler - and as you can guess it tells what should happen when client is no longer interested in Observable<Status>
.Twitter4J is just an example how you can adapt callback-based API into an
Observable
. Other examples include incoming network packages, JMS messages or file system changes. In all cases the scenario is the same.Tags: rxjava, twitter4j