Accessing Meetup's streaming API with RxNetty
{ "id" : "219088449",Every time our long-polling HTTP connection (with
"name" : "Silver Wings Brunch",
"time" : 1421609400000,
"mtime" : 1417814004321,
"duration" : 900000,
"rsvp_limit" : 0,
"status" : "upcoming",
"event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/",
"group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co",
"state" : "CA"
...
},
"venue" : { "address_1" : "26860 Ortega Highway",
"city" : "San Juan Capistrano",
"country" : "US"
...
},
"venue_visibility" : "public",
"visibility" : "public",
"yes_rsvp_count" : 1
...
}
Transfer-Encoding: chunked
response header) pushes such piece of JSON, we want to parse it and somehow pass further. We hate callbacks, thus RxJava seems like a reasonable alternative (think: Observable<Event>
).Step 1: Receiving raw data with RxNetty
We can't use ordinary HTTP client as they are focused on request-response semantics. There is no response here, we simply leave opened connection forever and consume data when it comes. RxJava has an out-of-the-box RxApacheHttp library, but it assumestext/event-stream
content type. Instead we will use quite low-level, versatile RxNetty library. It's a wrapper around Netty (duh!) and is capable of implementing arbitrary TCP/IP (including HTTP) and UDP clients and servers. If you don't know Netty, it's packet- rather than stream-oriented, so we can expect one Netty event per each Meetup push. The API certainly isn't straightforward, but makes sense once you grok it:HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443)First we create
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.withSslEngineFactory(DefaultFactories.trustAll())
.build();
final Observable<HttpClientResponse<ByteBuf>> responses =
httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable<ByteBuf> byteBufs =
responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable<String> chunks =
byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));
HttpClient
and set up SSL (keep in mind that trustAll()
with regards to server certificates is probably not the best production setting). Later we submit()
GET request and receive Observable<HttpClientResponse<ByteBuf>>
in return. ByteBuf
is Netty's abstraction over a bunch of bytes sent or received over the wire. This observable will tell us immediately about every piece of data received from Meetup. After extracting ByteBuf
from response we turn it into a String
containing aforementioned JSON. So far so good, it works.Step 2: Aligning packets with JSON documents
Netty is very powerful because it doesn't hide inherent complexity over leaky abstractions. Every time something is received over the TCP/IP wire, we are notified. You might believe that when server sends 100 bytes, Netty on the client side will notify us about these 100 bytes received. However TCP/IP stack is free to split and merge data you send over wire, especially since it is suppose to be a stream, so how it is split into packets should be irrelevant. This caveat is greatly explained in Netty's documentation. What does it mean to us? When Meetup sends a single event, we might receive just oneString
in chunks
observable. But just as well it can be divided into arbitrary number of packets, thus chunks
will emit multiple String
s. Even worse, if Meetup sends two events right after another, they might fit in one packet. In that case chunks
will emit one String
with two independent JSON documents. As a matter of fact we can't assume any alignment between JSON strings and networks packets received. All we know is that individual JSON documents representing events are separated by newlines. Amazingly, RxJavaString
official add-on has a method precisely for that:Observable<String> jsonChunks = StringObservable.split(chunks, "\n");Actually there is even simpler
StringObservable.byLine(chunks)
, but it uses platform-dependent end-of-line. What split()
does is best explained in official documentation:String
emitted by jsonChunks
:Step 3: Parsing JSON
Interestingly this step is not so straightforward. I admit, I sort-of enjoyed WSDL times because I could easily and predictably generate Java model that follows web-service's contract. JSON, especially taking marginal market penetration of JSON schema, is basically the Wild West of integration. Typically you are left with informal documentation or samples of requests and responses. No type information or format, whether fields are mandatory, etc. Moreover because I reluctantly work with maps of maps (hi there, fellow Clojure programmers), in order to work with JSON based REST services I have to write mapping POJOs myself. Well, there are workarounds. First I took one representative example of JSON produced by Meetup streaming API and placed it insrc/main/json/meetup/event.json
. Then I used jsonschema2pojo-maven-plugin
(Gradle and Ant versions exist as well). Plugin's name is confusing, it can also work with JSON example, not only schema, to produce Java models: <plugin>At this point Maven will create
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.4.7</version>
<configuration>
<sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory>
<targetPackage>com.nurkiewicz.meetup.generated</targetPackage>
<includeHashcodeAndEquals>true</includeHashcodeAndEquals>
<includeToString>true</includeToString>
<initializeCollections>true</initializeCollections>
<sourceType>JSON</sourceType>
<useCommonsLang3>true</useCommonsLang3>
<useJodaDates>true</useJodaDates>
<useLongIntegers>true</useLongIntegers>
<outputDirectory>target/generated-sources</outputDirectory>
</configuration>
<executions>
<execution>
<id>generate-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
Event.java
, Venue.java
, Group.java
, etc. compatible with Jackson:private Event parseEventJson(String jsonStr) {It just works, sweet:
try {
return objectMapper.readValue(jsonStr, Event.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
final Observable<Event> events = jsonChunks.map(this::parseEventJson);
Step 4: ???[1]
Step 5: PROFIT!!!
HavingObservable<Event>
we can implement some really interesting use cases. Want to find names of all meetups in Poland that were just created? Sure!eventsLooking for statistics how many events are created per minute? No problem!
.filter(event -> event.getVenue() != null)
.filter(event -> event.getVenue().getCountry().equals("pl"))
.map(Event::getName)
.forEach(System.out::println);
eventsOr maybe you want to continually search for meetups furthest in the future, skipping those closer than ones already found?
.buffer(1, TimeUnit.MINUTES)
.map(List::size)
.forEach(count -> log.info("Count: {}", count));
eventsThis code filters out events without known time, emits either current event or the previous one (
.filter(event -> event.getTime() != null)
.scan(this::laterEventFrom)
.distinct()
.map(Event::getTime)
.map(Instant::ofEpochMilli)
.forEach(System.out::println);
//...
private Event laterEventFrom(Event first, Event second) {
return first.getTime() > second.getTime() ?
first :
second;
}
scan()
), depending on which one was later, filters out duplicates and displays time. This tiny program running for few minutes already found one just created meetup scheduled for November 2015 - and it's December 2014 as of this writing. Possibilities are endless.I hope I gave you a good grasp of how you can mashup various technologies together easily: reactive programming to write super fast networking code, type-safe JSON parsing without boiler-plate code and RxJava to quickly process streams of events. Enjoy!
Tags: JSON, Meetup, Netty, RxNetty, rxjava