Spring Boot 2: Fluxes, from Elasticsearch to controller
Bieszczady Mountains |
import lombok.RequiredArgsConstructor;The
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
@Component
@RequiredArgsConstructor
class ElasticAdapter {
private final RestHighLevelClient client;
private final ObjectMapper objectMapper;
Mono<IndexResponse> index(Person doc) {
return indexDoc(doc);
}
private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException {
return Mono.create(sink -> {
try {
doIndex(doc, listenerToSink(sink));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
}
private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException {
final IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());
final String json = objectMapper.writeValueAsString(doc);
indexRequest.source(json, XContentType.JSON);
client.indexAsync(indexRequest, listener);
}
private <T> ActionListener<T> listenerToSink(MonoSink<T> sink) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
sink.success(response);
}
@Override
public void onFailure(Exception e) {
sink.error(e);
}
};
}
}
index()
method takes a strongly typed Person
object and sends it over to Elasticsearch. First the doIndex()
method makes the actual call to Elasticsearch, marshalling Person
to JSON. Having Elastic's result of type ActionListener<IndexResponse>
we convert it to Mono<IndexResponse>
. This is done via listenerToSink()
helper method. The sequence of compose()
methods are an elegant way to apply a series of metrics:return indexDoc(doc)These methods are defined as follows:
.compose(this::countSuccFail)
.compose(this::countConcurrent)
.compose(this::measureTime)
.doOnError(e -> log.error("Unable to index {}", doc, e));
private final Timer indexTimer = Metrics.timer("es.timer");We could technically apply these metrics without
private final LongAdder concurrent = Metrics.gauge("es.concurrent", new LongAdder());
private final Counter successes = Metrics.counter("es.index", "result", "success");
private final Counter failures = Metrics.counter("es.index", "result", "failure");
private Mono<IndexResponse> countSuccFail(Mono<IndexResponse> mono) {
return mono
.doOnError(e -> failures.increment())
.doOnSuccess(response -> successes.increment());
}
private Mono<IndexResponse> countConcurrent(Mono<IndexResponse> mono) {
return mono
.doOnSubscribe(s -> concurrent.increment())
.doOnTerminate(concurrent::decrement);
}
private Mono<IndexResponse> measureTime(Mono<IndexResponse> mono) {
return Mono
.fromCallable(System::currentTimeMillis)
.flatMap(time ->
mono.doOnSuccess(response ->
indexTimer.record(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS))
);
}
compose()
operator like this:measureTime(But having a flat sequence of
countConcurrent(
countSuccFail(
indexDoc(doc)
)
)
)
Mono<T>
-> Mono<T>
transformers seems much easier to read. Anyway, this was the write side, let's implement the read side.Mono<Person> findByUserName(String userName) {The procedure is pretty much the same:
return Mono
.<GetResponse>create(sink ->
client.getAsync(new GetRequest("people", "person", userName), listenerToSink(sink))
)
.filter(GetResponse::isExists)
.map(GetResponse::getSource)
.map(map -> objectMapper.convertValue(map, Person.class));
}
- make Elasticsearch request
- adapt it to
Mono<GetResponse>
- verify the result and unmarshall it from
Map
toPerson
object
ObjectMapper
can also convert from Map
, not only from JSON string. Having this layer we can use it directly in our brand new controller:import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.index.IndexResponse;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.util.Map;
@RequiredArgsConstructor
@RestController
@RequestMapping("/person")
class PersonController {
private static final Mono<ResponseEntity<Person>> NOT_FOUND =
Mono.just(ResponseEntity.notFound().build());
private final ElasticAdapter elasticAdapter;
@GetMapping("/{userName}")
Mono<ResponseEntity<Person>> get(@PathVariable("userName") String userName) {
return elasticAdapter
.findByUserName(userName)
.map(ResponseEntity::ok)
.switchIfEmpty(NOT_FOUND);
}
@PutMapping
Mono<ResponseEntity<Map<String, Object>>> put(@Valid @RequestBody Person person) {
return elasticAdapter
.index(person)
.map(this::toMap)
.map(m -> ResponseEntity.status(HttpStatus.CREATED).body(m));
}
private ImmutableMap<String, Object> toMap(IndexResponse response) {
return ImmutableMap
.<String, Object>builder()
.put("id", response.getId())
.put("index", response.getIndex())
.put("type", response.getType())
.put("version", response.getVersion())
.put("result", response.getResult().getLowercase())
.put("seqNo", response.getSeqNo())
.put("primaryTerm", response.getPrimaryTerm())
.build();
}
}
get()
method tries to find a document in Elasticsearch by "userName"
. Newcomers to RxJava or Reactor are very eager to call subscribe()
or block*()
. Interestingly none of these are needed in Spring WebFlux. You create a bunch of Mono
s or Flux
es, pass them through a series of transformations and return from your controller. Just works™.put()
method is equally simple. For debug purposes I convert IndexResponse
to JSON in toMap()
method, but this isn't necessary. As you can see building reactive applications in Spring WebFlux is quite simple. We no longer need any adapting layers or blocking code. Everything is fully asynchronous and event-driven. Moreover in this setup (see source code) there are no servlets or Jetty/Tomcat on the CLASSPATH!Spring has built-in reactive support for some databases like MongoDB. In these blog posts I gave you an overview how to integrate Reactor with Spring and other databases that provide non-blocking API. You can easily adjust code samples to use it with other sources and data stores.
This is part of a longer series:
- Spring, Reactor and ElasticSearch: from callbacks to reactive streams
- Spring, Reactor and ElasticSearch: bechmarking with fake test data
- Monitoring and measuring reactive application with Dropwizard Metrics
- Spring Boot 2: Migrating from Dropwizard metrics to Micrometer
- Spring Boot 2: Fluxes, from Elasticsearch to controller