DEV Community

eidher
eidher

Posted on • Edited on

4 1

Reactive Spring Applications

Reactive Programming is programming with asynchronous data streams. Streams are ongoing events ordered in time (messages, variables, data structures, etc). Events are almost anything (functions returning results, rows returned from a database query, calls to a web server, etc). It is used for distributed multi-process applications to improve the latency, redundancy, recovery, and scale-out.

Spring supports Reactor and RxJava. There are three types of streams:

  1. A sequence of zero or more events (Flux or Observable): Returns a continuous stream of events where you can apply operation(s) to all the items in the stream and the new stream may complete or fail.
  2. Stream of zero or one event (Mono or Single): Returns a single result (Mono or Fail). The item is processed by one or more operations.
  3. Publishers (1 and 2)

Example:

Flux.just("red", "green", "blue")
        .log()
        .map(String::toUpperCase)
        .subscribe(System.out::println);

// Using concurrency
Flux.just("red", "green", "blue")
        .log()
        .flatMap(value -> Mono.just(value.toUpperCase()).subscribeOn(Schedulers.parallel()), 4) //
        .subscribe(System.out::println);

Enter fullscreen mode Exit fullscreen mode

Reactive Features in Spring

@Repository
public interface AccountCrudRepository 
  extends ReactiveCrudRepository<Account, String> {

    Flux<Account> findAllByValue(String value);
    Mono<Account> findFirstByOwner(Mono<String> owner);
}
Enter fullscreen mode Exit fullscreen mode
  • Web Client: Reactive alternative to RestTemplate (See Spring MVC REST). The WebClient is faster because it can run multiple requests in parallel. Meanwhile, the RestTemplate has to wait for each to finish because it runs sequentially. See Spring 5 WebClient
WebClient client = WebClient.create("http://localhost:8080");
Mono<Account> result = client.get()
        .uri("/accounts/{id}", id)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(Account.class);
// Wait for account to be returned
Account account = result.block();

// Alternatively, you can do something like this:
result.doOnSuccess(a -> {
    ...
}).doOnError(e -> {
    System.out.println(e.getMessage());
}).subscribe();

Flux<Account> result2 = client.get()
        .uri("/accounts")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToFlux(Account.class);

// We will count the number of responses (Flux items)
final AtomicInteger counter = new AtomicInteger(0);

// Process all the items in the Flux by counting each one using
// counter.incrementAndGet().
result2.subscribe(a -> {
    counter.incrementAndGet();
    System.out.println("  Account:" + counter + " " + a.getName());
});
Enter fullscreen mode Exit fullscreen mode
@GetMapping("/{id}")
private Mono<Employee> getEmployeeById(@PathVariable String id) {
    return employeeRepository.findEmployeeById(id);
}

@GetMapping
private Flux<Employee> getAllEmployees() {
    return employeeRepository.findAllEmployees();
}
Enter fullscreen mode Exit fullscreen mode

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more