Implementing Backpressure in Project Reactor

What is a Backpressure in Reactive Programming

In Reactive Programming, we are dealing with Publishers and Subscribers. The Publisher publishes the data to the Subscriber. It is usually a data source. It can be a database, a remote service or anything which holds some data. In the Project Reactor, we have two Publishers: Mono and Flux

Subscriber is some app that consumes data from the Publisher.

The interaction between these two types starts when the Subscriber subscribes to the Publisher. Then Publisher sends out a Subscription and starts sending data concurrently, as soon as it is available, in the form of a stream of events.

When the Publisher finishes with emitting data, it sends the onComplete signal to notify the Subscriber that there will be no more data. The whole process in detail is explained in this post: Introduction to Reactive Streams in Java.

Let’s see one example:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.fromIterable(
           new ArrayList<>(Arrays.asList("New York", "London", "Paris", "Toronto", "Rome")));

    cities.log().subscribe();

  }
}
Output: INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | request(unbounded) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(New York) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(London) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Paris) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Toronto) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Rome) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onComplete()
 
You can see from the logs that after we subscribed to Flux, the request() method got invoked, and Flux started sending data with the onNext signals. The Flux will emit all the elements, and in such cases, it can easily overwhelm the Consumer.
 
To avoid this problem, the Consumer should be able to tell the Producer how much data to send. This is called a Backpressure.
 
We use the request() method of Subscription interface to tell the Publisher how much data we want.

Implementing the Backpressure in Project Reactor

In the example above, since we didn’t modify the Subscription, the default behavior is to request unbounded data from the Publisher, which means that the Subscriber is expecting the Publisher to emit all the values.

Now, let’s implement the Backpressure by telling the Publisher how much data to send.

For this, we will use the version of the subscribe() method that accepts a BaseSubscriber. This class lets the user to perform a request() and cancel() operations directly on it. Using it, we can override the default behavior.

class ReactiveProgrammingTutorial {

  public static void main(String[] args) {

    Flux<Integer> publisher = Flux.range(1, 100).log();

    publisher.subscribe(new BaseSubscriber<>() {
      @Override
      protected void hookOnSubscribe(Subscription subscription) {
        request(5); // request only 5 elements
      }
    });
  }
}


Here, we override the hookOnSubcribe() method invoked upon a subscription. Inside, we call the request() method of the Subscription object and tell the Publisher to send only 5 values.

Let’s see the output:

INFO 7224 --- [main] reactor.Flux.Range.1  : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
INFO 7224 --- [main] reactor.Flux.Range.1  : | request(5)
INFO 7224 --- [main] reactor.Flux.Range.1  : | onNext(1)
INFO 7224 --- [main] reactor.Flux.Range.1  : | onNext(2)
INFO 7224 --- [main] reactor.Flux.Range.1  : | onNext(3)
INFO 7224 --- [main] reactor.Flux.Range.1  : | onNext(4)
INFO 7224 --- [main] reactor.Flux.Range.1  : | onNext(5)


As you can see, Flux sent as much data as requested. 

It is always good to cancel a Subscription after receiving the last requested element.mFor this, we will override the onNext() method invoked on every element emitted. Let’s extend the previous example:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<Integer> publisher = Flux.range(1, 100).log();

    publisher.subscribe(new BaseSubscriber<Integer>() {
      @Override
      protected void hookOnSubscribe(Subscription subscription) {
        request(5);
      }

      @Override
      protected void hookOnNext(Integer value) {
        if(value == 5) { // we know that the last element is 5
          cancel();
        }
      }
    });
  }
}
Output: INFO 8308 — [main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) INFO 8308 — [main] reactor.Flux.Range.1 : | request(5) INFO 8308 — [main] reactor.Flux.Range.1 : | onNext(1) INFO 8308 — [main] reactor.Flux.Range.1 : | onNext(2) INFO 8308 — [main] reactor.Flux.Range.1 : | onNext(3) INFO 8308 — [main] reactor.Flux.Range.1 : | onNext(4) INFO 8308 — [main] reactor.Flux.Range.1 : | onNext(5) INFO 8308 — [main] reactor.Flux.Range.1 : | cancel()
 
The cancel() request got executed after the last element.
 
That was all about implementing Backpressure in Project Reactor. Proceed to the next lesson.
 
Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *