DEV Community

eidher
eidher

Posted on • Edited on

Reactive Spring Applications

Reactive Programming is programming with asynchronous data streams. Streams are ongoing events ordered in time (messages, variables, data structures, etc). Events are almost anything (functions returning results, rows returned from a database query, calls to a web server, etc). It is used for distributed multi-process applications to improve the latency, redundancy, recovery, and scale-out.

Spring supports Reactor and RxJava. There are three types of streams:

  1. A sequence of zero or more events (Flux or Observable): Returns a continuous stream of events where you can apply operation(s) to all the items in the stream and the new stream may complete or fail.
  2. Stream of zero or one event (Mono or Single): Returns a single result (Mono or Fail). The item is processed by one or more operations.
  3. Publishers (1 and 2)

Example:

Flux.just("red", "green", "blue")
        .log()
        .map(String::toUpperCase)
        .subscribe(System.out::println);

// Using concurrency
Flux.just("red", "green", "blue")
        .log()
        .flatMap(value -> Mono.just(value.toUpperCase()).subscribeOn(Schedulers.parallel()), 4) //
        .subscribe(System.out::println);

Enter fullscreen mode Exit fullscreen mode

Reactive Features in Spring

@Repository
public interface AccountCrudRepository 
  extends ReactiveCrudRepository<Account, String> {

    Flux<Account> findAllByValue(String value);
    Mono<Account> findFirstByOwner(Mono<String> owner);
}
Enter fullscreen mode Exit fullscreen mode
  • Web Client: Reactive alternative to RestTemplate (See Spring MVC REST). The WebClient is faster because it can run multiple requests in parallel. Meanwhile, the RestTemplate has to wait for each to finish because it runs sequentially. See Spring 5 WebClient
WebClient client = WebClient.create("http://localhost:8080");
Mono<Account> result = client.get()
        .uri("/accounts/{id}", id)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(Account.class);
// Wait for account to be returned
Account account = result.block();

// Alternatively, you can do something like this:
result.doOnSuccess(a -> {
    ...
}).doOnError(e -> {
    System.out.println(e.getMessage());
}).subscribe();

Flux<Account> result2 = client.get()
        .uri("/accounts")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToFlux(Account.class);

// We will count the number of responses (Flux items)
final AtomicInteger counter = new AtomicInteger(0);

// Process all the items in the Flux by counting each one using
// counter.incrementAndGet().
result2.subscribe(a -> {
    counter.incrementAndGet();
    System.out.println("  Account:" + counter + " " + a.getName());
});
Enter fullscreen mode Exit fullscreen mode
@GetMapping("/{id}")
private Mono<Employee> getEmployeeById(@PathVariable String id) {
    return employeeRepository.findEmployeeById(id);
}

@GetMapping
private Flux<Employee> getAllEmployees() {
    return employeeRepository.findAllEmployees();
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)