Reactive Programming: Creating Publishers and Subscribers in Java.

This tutorial will be part of our Series on Reactive Programming in Java and Spring Framework. If you have not read the previous article, I would strongly advise you to do so. This will help you to understand better as we go further.

In the previous blog post, we discussed publishers and subscribers in Java. This post will be a coding tutorial for Publishers and Subscribers in Java. To follow this tutorial, at least Java 9 is required.

Interface Representations

The interface representations of the Publisher, Subscriber, and Subscription are found in the Flow class. The Flow Class is just a holder for these interfaces. Its constructor is made private. It cannot, therefore, be instantiated. Below is the code representing the definitions of these interfaces:

public final class Flow {

    private Flow() {} // uninstantiable

    /**
     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     *
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     *
     * @param <T> the published item type
     */
    @FunctionalInterface
    public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }

    /**
     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     *
     * @param <T> the subscribed item type
     */
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

    /**
     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
     */
    public static interface Subscription {
        /**
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        public void request(long n);

        /**
         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
         */
        public void cancel();
    }

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    static final int DEFAULT_BUFFER_SIZE = 256;

    /**
     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     *
     * @implNote
     * The current value returned is 256.
     *
     * @return the buffer size value
     */
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

}

The Situation We Will Simulate

We will create a simple publisher which emits letters and a subscriber which receives and prints each of them. The key sections of the code are commented on for you to better understand what is happening

Creating a Publisher

For the publisher, we will use a default implementation provided by Java called SubmissionPublisher. This publisher simply sequentially publishes each item given to it. This publisher will emit small letters. The explanation of how SubmissionPublisher works internally is out of our scope, so I will not bother you with that.

SubmissionPublisher<Character> publisher = new SubmissionPublisher<>();

Creating a Subscriber

We will implement a Subscriber Class ourselves. This Subscriber will receive the small letters from the Publisher to which it subscribed, convert them to capital letters, and print them out.

Please note the comments added above each method. They will help you understand what each method is responsible for.

class MySubscriber implements Subscriber<Character> {

    private Subscription subscription;

    /**
     * This method is triggered when the Subscriber subscribes to a Publisher
     */
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    /**
     * This method is triggered the Subscriber receives an event
     * signaling an item being sent from the publisher. The Item is simply printed here.
     */
    @Override
    public void onNext(Character item) {
        System.out.println("Received Letter: " + item);
        subscription.request(1);
    }

    /**
     * This method is triggered when the Subscriber receives an error event.
     * In our case we just print the error message.
     */
    @Override
    public void onError(Throwable error) {
        System.out.println("Error Occurred: " + error.getMessage());
    }

    /**
     * This method is triggered when the Subscriber Receives a complete. This means
     * it has already received and processed all items from the publisher to which it is subscribed.
     */
    @Override
    public void onComplete() {
        System.out.println("Every Element has been received");
    }
}

The Complete Code

Here is the complete code in the main method.

public class ReactiveStreamsExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Character> publisher = new SubmissionPublisher<>();
        publisher.subscribe(new MySubscriber());

        System.out.println("Submitting Items...");
        for (char i = 'A'; i <= 'Z'; i++) {
            publisher.submit(i);
        }

        Thread.sleep(1500);
        publisher.close();
    }

}

I would like to take more time to explain the necessity of the Thread.sleep() method at the end of the example code. Reactive Streams are asynchronous and are, therefore, carried out on a different thread. And we know that if the main thread ends, the program equally ends. If we remove the Thread.sleep() method, we will not be able to see the output in the command line. You can try it out!

Conclusion

Hope this tutorial was helpful to you. In the following tutorial, we will talk about reactive streams in Spring Framework using the Spring Webflux Project.


Leave a Reply

Your email address will not be published. Required fields are marked *