Hot and Cold Publishers in Project Reactor

In one of the previous lessons, you saw that we have two types of Publishers in Project Reactor: Mono and Flux.

Mono is a Publisher that can emit 0 or 1 items, and Flux can emit 0 . . . n elements.

Publishers emit data asynchronously, and based on the behaviour, we can divide them into Hot and Cold Publishers.

Let’s start with the Cold Publishers first.

Cold Publishers in Project Reactor

Cold Publisher will not start emitting data until a Subscriber subscribes to it. It creates a new data producer for each new subscription. 

Let’s take youtube as an example. One user clicks on the video, and the streaming starts. After a while, another user clicks on the same video, and streaming starts from the beginning for him.

So it doesn’t matter that one user was already watching the video when the other joined. The other user will get a separate data producer.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo)
            .delayElements(Duration.ofSeconds(2)); // each part will play for 2 seconds

    // First Subscriber
    netFlux.subscribe(part -> System.out.println("Subscriber 1: " + part));

    // wait 5 seconds before next Subscriber joins
    Thread.sleep(5000);

    // Seconds Subscriber
    netFlux.subscribe(part -> System.out.println("Subscriber 2: " + part));

    Thread.sleep(60000);
  }

  private static Stream<String> getVideo() {
    System.out.println("Request for the video streaming received.");
    return Stream.of("part 1", "part 2", "part 3", "part 4", "part 5");
  }
}
Output: Request for the video streaming received. Subscriber 1: part 1 Subscriber 1: part 2 Request for the video streaming received. Subscriber 1: part 3 Subscriber 2: part 1 Subscriber 1: part 4 Subscriber 2: part 2 Subscriber 1: part 5 Subscriber 2: part 3 Subscriber 2: part 4 Subscriber 2: part 5
 
Here you see that it doesn’t matter that the first user was in the middle of watching the video. The second user sent a request, got a different/separate data producer and was able to watch the video from the beginning, regardless of the first user. So they watched the same video in parallel.

Hot Publishers in Project Reactor

With Hot Publishers, there will be only one data producer. All Subscribers listen to the data produced by the single data producer. The data is shared.

Imagine a TV station. It does not matter if there is no one to watch the program. It will be emitted regardless. Watchers can start watching anytime they want. But all watchers get the same info at any given moment. 

Watchers would lose the content if they joined late. The same is with the Hot Publishers.

Let’s transform the Cold Publisher from the above example into a Hot Publisher. We can do that by using the share() method. This method will effectively turn Publisher into a hot task when the first Subscriber subscribes. Then, further Subscribers will share the same Subscription, and therefore, the same result.

Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo)
    .delayElements(Duration.ofSeconds(2))
    .share(); // turn the cold publisher into a hot publisher


And the output is:

Request for the video streaming received.
Subscriber 1: part 1
Subscriber 1: part 2
Subscriber 1: part 3
Subscriber 2: part 3
Subscriber 1: part 4
Subscriber 2: part 4
Subscriber 1: part 5
Subscriber 2: part 5

You can see that the second Publisher lost the first two parts of the movie. But it was able to continue watching along with the first one.

Using the refCount() method

With the refCount() method, we can set how many Subscribers need to be subscribed for the Hot Publisher to start emitting data. Without this method, the share() will behave as refCount(1), requiring at least one Subscriber.

If we use the refCount() and pass the 1, the output will be the same as in the above example when we used the share() method. But what will happen if we pass 2, for example? Let’s try that in the next example:

Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo)
    .delayElements(Duration.ofSeconds(2))
    .publish()
    .refCount(2); // minSubscribers


In this case, the output is:

Request for the video streaming received.
Subscriber 1: part 1
Subscriber 2: part 1
Subscriber 1: part 2
Subscriber 2: part 2
Subscriber 1: part 3
Subscriber 2: part 3
Subscriber 1: part 4
Subscriber 2: part 4
Subscriber 1: part 5
Subscriber 2: part 5

Here, the Hot Publisher waited until the Second Publisher subscribed and then started emitting data for both Subscribers simultaneously.

Using the cache() method

With the cache() method, the data will be stored into a cache as soon as the Publisher finishes emitting. Then, the data will be replayed, extracted from the cache for any further Subscribers.

Let’s modify the previous example and add the cache() method:

Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo)
    .delayElements(Duration.ofSeconds(2))
    .cache();

The output is:

Request for the video streaming received.
Subscriber 1: part 1
Subscriber 1: part 2
Subscriber 1: part 3
Subscriber 1: part 4
Subscriber 1: part 5
Subscriber 2: part 1
Subscriber 2: part 2
Subscriber 2: part 3
Subscriber 2: part 4
Subscriber 2: part 5

There was only one request for video streaming. The second Subscriber got its data from the cache, there was no repeated emitting or similar. 

If you do not want to cache all the items, you can use the cache(int history) – where the history is the number of elements retained in the cache.

That was all about Hot and Cold Publishers in Project Reactor. Proceed to the next lesson.

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *