Reactive Programming is programming with asynchronous data streams. Streams are ongoing events ordered in time (messages, variables, data structures, etc). Events are almost anything (functions returning results, rows returned from a database query, calls to a web server, etc). It is used for distributed multi-process applications to improve the latency, redundancy, recovery, and scale-out.
Spring supports Reactor and RxJava. There are three types of streams:
- A sequence of zero or more events (Flux or Observable): Returns a continuous stream of events where you can apply operation(s) to all the items in the stream and the new stream may complete or fail.
- Stream of zero or one event (Mono or Single): Returns a single result (Mono or Fail). The item is processed by one or more operations.
- Publishers (1 and 2)
Example:
Flux.just("red", "green", "blue")
.log()
.map(String::toUpperCase)
.subscribe(System.out::println);
// Using concurrency
Flux.just("red", "green", "blue")
.log()
.flatMap(value -> Mono.just(value.toUpperCase()).subscribeOn(Schedulers.parallel()), 4) //
.subscribe(System.out::println);
Reactive Features in Spring
- Spring Data Reactive Repositories: Returns query results as streams. See Spring Data Reactive Repositories with MongoDB
@Repository
public interface AccountCrudRepository
extends ReactiveCrudRepository<Account, String> {
Flux<Account> findAllByValue(String value);
Mono<Account> findFirstByOwner(Mono<String> owner);
}
- Web Client: Reactive alternative to RestTemplate (See Spring MVC REST). The WebClient is faster because it can run multiple requests in parallel. Meanwhile, the RestTemplate has to wait for each to finish because it runs sequentially. See Spring 5 WebClient
WebClient client = WebClient.create("http://localhost:8080");
Mono<Account> result = client.get()
.uri("/accounts/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Account.class);
// Wait for account to be returned
Account account = result.block();
// Alternatively, you can do something like this:
result.doOnSuccess(a -> {
...
}).doOnError(e -> {
System.out.println(e.getMessage());
}).subscribe();
Flux<Account> result2 = client.get()
.uri("/accounts")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(Account.class);
// We will count the number of responses (Flux items)
final AtomicInteger counter = new AtomicInteger(0);
// Process all the items in the Flux by counting each one using
// counter.incrementAndGet().
result2.subscribe(a -> {
counter.incrementAndGet();
System.out.println(" Account:" + counter + " " + a.getName());
});
- WebFlux: Reactive @Controllers. See Guide to Spring 5 WebFlux
@GetMapping("/{id}")
private Mono<Employee> getEmployeeById(@PathVariable String id) {
return employeeRepository.findEmployeeById(id);
}
@GetMapping
private Flux<Employee> getAllEmployees() {
return employeeRepository.findAllEmployees();
}
Top comments (0)