Project Reactor in Java

Project Reactor is an implementation of the Reactive Streams specification.

We use the Reactor Core library since Java 8 to implement the reactive programming model.

In the previous lesson, you already saw that the Reactive Streams specification provides four interfaces (Subscriber, Publisher, Subscription, and Processor).

Project Reactor offers two implementations of the Publisher interface: Mono and Flux.

Mono

Mono can emit 0 or 1 items via the onNext signal followed by an onComplete or onError signal.

If the Publisher (Mono) emits one value without an error, we can expect to get the onComplete signal.
There are cases when the Mono doesn’t have any data to send. In that case, it will emit the onComplete signal immediately.

If the Mono can not emit the value for some reason, it will send the onError signal to the Subscriber, telling it that an error occurred.

Since the Mono can emit only 0 or 1 value, if the Subscriber requests more than 1 item, the Publisher will emit only 1 and will send the onComplete signal.


Flux

Flux can emit 0…n items via the onNext signal followed by an onComplete or onError signal.

If the Subscriber requests, for example, 10 items, the Flux (Publisher) will start emitting data in streams via the onNext signals, and when it completes, it will send the onComplete signal.
The Subscriber can ask for more data or finish the request.

If Flux doesn’t have 10 items but only 3, it will send them and call the onComplete() method.

In case that Flux can not emit the data for some reason, it will send the onError signal to the Subscriber, indicating that an error occurred.


Working with the Project Reactor in Java

To work with the Reactive Streams in Java, you need to add the following dependency to your project:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.9</version>
</dependency>

Create a Mono

We create a Mono via the static just(T data) method. The data is the value that the Mono will emit.

Mono<Integer> mono = Mono.just(1);


Create a Flux

Flux has the just(T… data) method, which accepts more than one value.

Flux<Integer> flux = Flux.just(12, 14, 9, 11);


Since both Mono and Flux are implementations of the Publisher interface, we can use it to create them:

Publisher<Integer> mono = Mono.just(2);

Publisher<Integer> flux = Flux.just(12, 14, 9, 11);

Mono and Flux are known for being lazy.
It means whatever function you call on the stream, nothing will happen until you consume it.

In the Project Reactor, we consume the stream by subscribing to it. 

Subscribing to Mono and Flux

Below is a simple example of subscribing to Mono and Flux.

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.*;

class Test {

  public static void main(String[] args) {
    List<Integer> itemsFromMono = new ArrayList<>();
    List<Integer> itemsFromFlux = new ArrayList<>();

    // Create a Mono
    Mono<Integer> mono = Mono.just(121);

    // Create a Flux
    Flux<Integer> flux = Flux.just(12, 14, 9, 11, 12, 14, 9, 11, 12, 14, 9, 11, 12, 14);

    // Subscribe to Mono
    mono.subscribe(itemsFromMono::add);

    // Subscribe to Flux
    flux.subscribe(itemsFromFlux::add);

    System.out.println(itemsFromMono);
    System.out.println(itemsFromFlux);
  }
}
Output: [121] [12, 14, 9, 11, 12, 14, 9, 11, 12, 14, 9, 11, 12, 14]
 
This was an introduction to the Project Reactor and its Publishers Mono and Flux. In the upcoming lessons, we will work with them in more detail.
 
Happy coding!

 

Leave a Reply

Your email address will not be published.