Peek into Reactive Spring Java | Part-1

Expectations from modern app:- Lets start with understanding whats expected out of a application being powered by bunch of micro-services :-

  • Eventual Scaling based on the load.
  • Efficient usage of resources.
  • Response-time should be faster.

How does Spring-boot serves the request traditionally (Thread per request Model) :-

When the HTTP request comes , it comes through Servlet-container, which is Tomcat in this case. Each request is assigned a Thread. This is also known as ‘Thread Per Request’ model. This thread is responsible for interacting with the downstream APIs or databases.

How does Spring-boot handles the concurrency today :-

Tomcat internally maintains the Thread-Pool and for every request, its going to take a thread from the ThreadPool and assign the thread to that request. Each thread is now responsible to handle the lifecycle of that http-request. This count can be controlled with this property: ‘server.tomcat.max-threads’. There is a limit to what value can be set for this property.

Let’s understand its mechanics :-Each thread takes up some memory. General common stack size is 1 MB. Higher the thread-pool size, higher shall be the memory-usage. In case, sufficient memory is not available, the application shall perform poorly. How do we handle the increased load then. Answer is Horizontal-Scaling. Below is how we would achieve :-

Pl be cognisant of the fact that, with horizontal scaling as well, there is alway a limit too, as the below setup would incur cost, maintenance nightmares, etc. Bottom-line is, with this way of scaling, there do exists limitation on concurrency that can be achieved.

Imperative style of programming:- Below we show, how an API has been designed using traditional RESTful API pattern. Its a top-down approach. It works in Blocking and synchronous fashion.

As demonstrated above, the call to DB shown here is blocking and only after getting response from the same, further execution of code shall be done. During this time, the tomcat-thread is just sitting idle OR blocked, till the data is returned from the database/remote-api-call.

The time taken by this API to serve response(i.e. ResponseTime) is sum total of times of all the interactions done by the Thread to serve this request end-to-end. To improvise on this issue, possible option is to make the aforesaid calls Asynchronous and Non-blocking. We have below possible options :-

  • Callbacks → These doesn’t returns any value, but it takes the object of callback as parameter. These are not very great, as we might end-up with state of system called as ‘Callback-Hell’.
  • Future → Here, we basically returns the Future instance, which can be used to check if data is available. Its hard to compose multiple Asynchronous options.
  • Completable Future → Introduced as part of Java8 and supports functional style of programming and multiple Asynchronous operations. This option is not suitable for operation, where request contains multiple items. Lot of plumbing is required to handle the error-handling as well.

Idea behind the Reactive-way of programming is to move away from the “Thread-Per-Request” model. Reactive-Spring would not eliminate the traditional RESTful APIs totally. It depends upon the use-case to use-case basis. At some places, it can make sense to eliminate the RESTful APIs whereas at some places, it may makes sense to continue with this Thread-Per-Request model as well.

Foundations of Reactive way of Programming :-

  • New Programming paradigm.
  • Asynchronous and Non-Blocking.
  • Supports Back-pressure on data-streams. If the data-source is producing more data than the expected, there exists a way to provide feedback to the data-source indicating that, Hey Data-source you need to slow-down, until I catch-up.
  • Embraces Functional Style of coding. Its similar to Streams-API and Lambdas.
  • Data flow as an Event/Message driven stream.

Flow of Data as an Event-Stream with Reactive-Programming :-

In reactive way of programming, Data flow as an event / message stream. The Application requests for the data from the Database and the data flows to the application as event-stream. One event is sent for every result-item from data-source. Please note that, Data-Source can be either a DB or an External-Service. Another event is being received on completion or error. Lets see an example for the same below. Here call to database returns immediately. That’s how, this call becomes Asynchronous & Non-blocking. Now, as and when each item is being fetched from the DB, same can be pushed to the application, until all items are being fetched. This paradigm is called as Event-driven Stream i.e. the application is not blocked and data is being returned back to the application. Once all the items are being successfully sent to the application, an event ‘onComplete()’ shall also be received.

In case, some error comes while fetching this data, an event called ‘onError’ is being received by the application. The logic to deal with this event can then be written, depending upon business requirement.

Conclusion with Data flow as an Event-Stream :-

  • onNext(item) → Data stream events.
  • onComplete() → Completion / Success events.
  • onError() → Error events.

Reactive-Stream Specifications :- Its the set of Rules and Specifications for a Reactive Stream. This has been created by the joint consortium of companies like Pivotal, Twitter, Netflix, LightBend, etc. This Specification has following 4 interfaces. All of these interfaces talks to each other in order for the Reactive-Streams to work together.

  • Publisher :- It has single method called as ‘subscribe’, which takes in ‘Subscriber’ instance. By making this call, the Subscriber registers to the Publisher. Publishers are generally the Data Producers, e.g. → DataBase, Remote APIs.
  • Subscriber :- This is nothing but the application in our above example. Application is a subscriber and have subscribed to the RDBMS data-source. It has 4 methods defined in it.
  • Subscription :- It has 2 methods defined in it.
  • Processor :- It has got NO method.

Event flow diagram in Reactive world of Programming :-

  • Subscriber is going to invoke the ‘subscribe()’ method of the Publisher and pass on the instance of Subscriber as an input.
  • Publisher now sends the ‘Subscription’ object back to the Subscriber, confirming that subscription is successful.
  • Now, Subscriber calls the ‘request’ method from Subscription object with an intent to get all the data from the data-source. The Subscriber can also invoke the ‘cancel’ method from Subscription object, if it wants to cancel the subscription itself. In case of ‘cancel’ method, the Publisher is not going to publish any data to the subscriber.
  • The Publisher would then send all the data-items individually one by one (each row/record represented as individual item). For each item, onNext() event shall be triggered.
  • Once all the items are received, we are going to get the ‘onComplete()’ event.

There is an option with Subscriber to request for specific number of data. The ‘request’ method of ‘Subscription’ interface takes in a parameter ’n’. Basically, Subscriber has control to decide, how much data it wants to receive from the Publisher.

Reactive-Library :- Its the implementation of Reactive Streams Specifications i.e. 4 interfaces (Publisher, Subscriber, Subscription and processor)that we saw as a part of Reactive stream specifications. We have 3 types of Reactive Libraries in general :-

  • Project Reactor.
  • RxJava.
  • Flow Class — JDK9.

Project Reactor :→ Built and maintained by Pivotal (i.e. Team behind the Spring boot). This comes by default with Spring-Boot. Project-Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM.

Reactor Core : → Reactor is core library of Project-Reactor. It provides implementation of the Reactive Streams specifications. We need JDK8 as the bare-minimum version to work with it. Flux and Mono are the 2 classes which have the implementation of Reactive Streams specifications. Both of these are also called as ‘Reactive Types’ of Project Reactor.

Flux represents an class with 0 to N elements. Lets see some of the examples of using the Flux now :-

Example.1 of Flux → To subscribe to the Flux, we can use it’s ‘subscribe’ method, which can consume from the Flux :-

@Test
public void fluxTestingConcat() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Finished the First-Flux."));
}

Example.2 of Flux → Once all the events are sent, the onComplete method is then invoked on the Subscriber.

@Test
public void fluxTesting() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is", "best", "policy")
.concatWith(Flux.just("Thanks"))
.log();

firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Our First Flux Completed."));
}

Here is how the output looks like for Flux :-

Example.3 of Flux → Here is how, we can perform the unit-test with the Flux. ‘StepVerifier’ plays key role of subscriber here. Also, ‘verifyComplete’ method starts the whole flow from Subscriber to the flux.

@Test
public void fluxTesting() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind")
.expectNext("the")
.expectNext("Clouds")
.expectNext("Sun")
.expectNext("still")
.expectNext("shines")
.verifyComplete();
}

Example.4 of Flux → Lets now see an example to assert, the count of messages produced by producer. Also, we shall verify the exact error message that comes with Exception :-

@Test
public void fluxTestingWithCountOfElements() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.log();

StepVerifier.create(secondFluxOfProject)
.expectNextCount(6)
.expectErrorMessage("Exception with Data-Producer")
.verify();
}

Example.5 of Flux→ We can form a flux from List / Iterable-Entity:-

@Test
public void fluxFromListOfElements() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");

Flux<String> fluxfromlist = Flux.fromIterable(listOfValues);

StepVerifier.create(fluxfromlist)
.expectNextCount(4)
.verifyComplete();
}

Example.6 of Flux → We can form a flux from Stream as well:-

@Test
public void fluxFromStreamOfElements() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromStream = Flux.fromStream(listOfValues.stream());
StepVerifier.create(fluxfromStream)
.expectNext("This", "too", "shall", "pass")
.verifyComplete();
}

Example.7 of Flux → We can form a Flux using range of values as well :-

@Test
public void fluxUsingRange() {
Flux<Integer> fluxWithRange = Flux.range(1, 10);
StepVerifier.create(fluxWithRange.log())
.expectNextCount(10)
.verifyComplete();
}

And here is how output looks like for this :-

Example.8 of Flux → We can form a Flux and then can apply filtering on the same :-

@Test
public void fluxTestingWithFiltering() {
Flux<String> fluxOfProjectWithFilter = Flux.just("Behind", "the", "Clouds", "sun", "still", "shines")
.filter(s -> s.startsWith("s"));

StepVerifier.create(fluxOfProjectWithFilter.log())
.expectNext("sun", "still", "shines")
.verifyComplete();
}

Example.9 of Flux → We can transform a Flux using ‘map’ method :-

@Test
public void fluxWithTransformation() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromlist = Flux.fromIterable(listOfValues)
.map(s -> s.toUpperCase())
.log();

StepVerifier.create(fluxfromlist)
.expectNextCount(4)
.verifyComplete();
}

Example.10 of Flux → Just like the ‘concatWith()’ method provided by Flux, there is also a ‘repeat()’ method which would actually repeat the values of the flux for specified number of times. Below an example of the same also demonstrates the ‘map’ method :-

@Test
public void fluxWithTransformationAndRepeat() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<Integer> fluxfromlist = Flux.fromIterable(listOfValues)
.map(s -> s.length())
.repeat(2)
.log();

StepVerifier.create(fluxfromlist)
.expectNext(4,3,5,4)
.expectNext(4,3,5,4)
.expectNext(4,3,5,4)
.verifyComplete();
}

Example.11 of Flux → Below is how we can form a pipeline with Flux. Pipeline is nothing but a series of operations combined together, as we demonstrated here in below example. Please note that, operations like ‘filter’, ‘map’, ‘repeat’ have been combined in below pipeline :-

@Test
public void fluxWithPipeline() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<Integer> fluxfromlist = Flux.fromIterable(listOfValues)
.filter(s -> (s.length() > 3))
.map(s -> s.length())
.repeat(1)
.log();

StepVerifier.create(fluxfromlist)
.expectNext(4,5,4)
.expectNext(4,5,4)
.verifyComplete();
}

Example.12 of Flux → Lets now make the flux a parallel-stream, in order to speed-it-up. We have used the ‘subscribeOn()’ method by passing ‘parallel()’ as parameter.

@Test
public void fluxWithflatMapFunctionWithParallel() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromlist = Flux.fromIterable(listOfValues)
.window(2)
.flatMapSequential(s -> s.map(g -> {
try {
System.out.println("Sleeping for 2 seconds in order to process this message.");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(g, "additional-baggage");
}).subscribeOn(parallel()))
.flatMap(s -> Flux.fromIterable(s))
.log();

StepVerifier.create(fluxfromlist)
.expectNextCount(8)
.verifyComplete();

}

Example.13 of Flux → Lets now see an example of merging 2 different Flux. In real-world example, it might happen that, Flux1 is flowing through service call and Flux2 is originating from DB call. Now, we can merge both of these different Fluxes. Also, note that, we have introduced the intentional delay with every element being read from the Flux, thus the order in which elements finally reaches to the merged-flux would be shuffled.

@Test
public void fluxMergeExample() {
List<String> list1OfValues = Arrays.asList("This", "time", "too", "shall");
Flux<String> flux1fromlist = Flux.fromIterable(list1OfValues).delayElements(Duration.ofMillis(3000));
List<String> list2OfValues = Arrays.asList("pass", "Have", "Faith", "in", "Almighty");
Flux<String> flux2fromlist = Flux.fromIterable(list2OfValues).delayElements(Duration.ofMillis(3000));

Flux<String> mergedFlux = Flux.merge(flux1fromlist, flux2fromlist);

StepVerifier.create(mergedFlux.log())
.expectNextCount(9)
.verifyComplete();
}

Here, Please note that, the command ‘Flux.merge(flux1fromlist, flux2fromlist)’ would try to subscribe to both of the fluxes in parallel. It’s not going to wait for flux1 to finish, before it starts consuming elements from Flux2.

Example.14 of Flux → Lets now see an example of merging 2 different Flux using ‘zip()’ method. Here, we supply the lambda function as well, which defines the business logic for merging 2 Flux.

@Test
public void fluxZipExample() {
List<String> list1OfValues = Arrays.asList("This", "time", "too", "shall", "smoothly");
Flux<String> flux1fromlist = Flux.fromIterable(list1OfValues);
List<String> list2OfValues = Arrays.asList("pass", "Have", "Faith", "in", "Almighty");
Flux<String> flux2fromlist = Flux.fromIterable(list2OfValues);

Flux<String> zippedFlux = Flux.zip(flux1fromlist, flux2fromlist, (t1, t2) -> t1.concat("_" + t2));

StepVerifier.create(zippedFlux.log())
.expectNext("This_pass", "time_Have", "too_Faith", "shall_in", "smoothly_Almighty")
.verifyComplete();
}

Example.15 of Flux → The ‘subscribe’ method subscribes the subscriber to the Flux producer. Now, there may come some exception in this process and no new event can come further to the consumer. Using the ‘log’ method, we can see the producer calling the onNext, onError methods of the Subscriber.

@Test
public void fluxTesting() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is", "best", "policy")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("Thanks"))
.log();
firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Completed"));
}

Below is how the output looks like for Flux.

Example.16 of Flux → Lets now see an example to assert, when some exception comes-in. Also note that, with some error coming in, ‘verifyComplete’ method doesn’t works and we should be using the ‘verify’ method for starting the whole flow from Subscriber to the flux. Using ‘expectError’ method, we can verify, if the exception really comes-in :-

@Test
public void fluxTestingWithError() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.log();
StepVerifier.create(secondFluxOfProject)
.expectNext("Behind")
.expectNext("the")
.expectNext("Clouds")
.expectNext("Sun")
.expectNext("still")
.expectNext("shines")
.expectError(RuntimeException.class)
.verify();
}

Example.17 of Flux → Apart from verifying the exception, we can also handle and recover from error situation. On any error in reactive-sequence-flow, the ‘onErrorResume’ method would get invoked. Below we show the same :-

@Test
public void fluxTestingWithErrorAndThenResume() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("In", "the", "Open", "Sky"))
.onErrorResume(e -> {
System.out.println("Here exception received is : " + e);
return Flux.just("Post", "Resume-Error");
})
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind", "the", "Clouds", "Sun", "still", "shines")
.expectNext("Post", "Resume-Error")
.verifyComplete();
}

Example.18 of Flux → We can also do the retry, if there comes some exception. Below we show the same :-

@Test
public void fluxTestingWithRetry() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("In", "the", "Open", "Sky"))
.retry(2)
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind", "the", "Clouds")
.expectNext("Behind", "the", "Clouds")
.expectNext("Behind", "the", "Clouds")
.expectError(RuntimeException.class)
.verify();
}

Example.19 of Flux → We can also have the infinite stream of messages being generated through Flux. Below example, we generated the flux of ‘Long’ numbers at an interval of 2 second each. Important point to note here is that, we have explicitly put the sleeper into the main thread and reason for the same is : The infinite stream thus gets generated, happens on the parallel-asynchronous thread and therefore, the main thread has to be stopped somehow. Please note here that, Although we have used Thread.sleep(), but there are other elegant ways as well in order to verify the behaviour :-

@Test
public void fluxInfiniteSequence() {
Flux<Long> infiniteFlux = Flux.interval(Duration.ofMillis(2000)).log();
infiniteFlux.subscribe(element -> System.out.println("Value thus received is :" + element));
try {
System.out.println("Main thread entering into Sleep-State now.");
Thread.sleep(10000);
System.out.println("Main thread finished from Sleep-State now.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Example.20 of Flux → Following example demonstrates the flux of finite numbers. The count of numbers has been specified as 3 using ‘take’ method. This shall consume only 3 items from the Flux of Stream.

@Test
public void fluxInfiniteSequenceTest() {
Flux<Long> finiteFluxOfSpecifiedcountOfNumbers = Flux.interval(Duration.ofMillis(2000))
.take(3)
.log();

StepVerifier.create(finiteFluxOfSpecifiedcountOfNumbers)
.expectNext(0L, 1L, 2L)
.verifyComplete();
}

Example.21 of Flux → So far, we have seen the examples, in which we don’t have any control on the rate at which producer is publishing the data. We shall now explore the scenarios where consumer now also controls the rate at which data should be emitted by the Producer. This behaviour is called as Back-pressure. In below example, our consumer expected only 2 elements and then consumer itself cancels the subscription. Further, the story so far can be verified using ‘StepVerifier’. The method ‘thenRequest’ instructs the flux to emit only 1 element and then we expect this next element. Thats how, the subscriber takes control of the data being emitted from the producer’s end.

@Test
public void fluxUsingRangeAndBackPressure() {
Flux<Integer> fluxWithRange = Flux.range(1, 10)
.log();

StepVerifier.create(fluxWithRange)
.thenRequest(1)
.expectNext(1)
.thenRequest(1)
.expectNext(2)
.thenCancel()
.verify();
}

Example.22 of Flux → Lets now see the back-pressure in action from actual subscriber’s point of view. Here, we have used the overloaded ‘subscribe()’ method, where the fourth event tells about the number of messages to subscribe to i.e. here consumer controls the count(rate) of the messages, that producer should be emitting. Please note here that, the 3rd parameter to the ‘subscribe’ method concerns about the ‘onCompleted’ event. Since, In this example, we wouldn’t receive the onCompleted event, hence the message in 3rd parameter wouldn’t get printed.

@Test
public void fluxTestingBackPressure_2() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(
element -> System.out.println("Element thus received is: " + element),
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Finished all the elements from the Flux."),
subscribtionEvent -> subscribtionEvent.request(3));
}

Example.23 of Flux → There also exists an another way of taking control of back-pressure as below. Here, the ‘hookOnNext()’ method is invoked upon every element thus received. We are also cancelling the subscription upon a random check :-

@Test
public void fluxTestingBackPressure_3() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnNext(String value) {
request(1);
System.out.println("Element thus received is: " + value);
if(value.equalsIgnoreCase("best")) {
cancel();
}
}
});
}

Example.24 of Flux → So far, whatever examples we saw are termed as “Cold- Streams” i.e. all of elements are subscribed from beginning. Lets now see an example of “Hot-Stream” where there is no ordering of how the elements shall be consumed :-

@Test
public void fluxTesting_hot_stream() throws InterruptedException {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy"))
.delayElements(Duration.ofSeconds(1)).log();

ConnectableFlux<String> connectableFlux = firstFluxOfProject.publish();
connectableFlux.connect();
connectableFlux.subscribe(element -> System.out.println("Subscriber 1:" + element));
Thread.sleep(3000);

connectableFlux.subscribe(element -> System.out.println("Subscriber 2:" + element));
Thread.sleep(2000);
}

Mono represents an class with 0 to 1 element. Whenever we have only 1 data element, we use MONO. Lets now, see some of the examples of using Mono :-

Example.1 of Mono → With Mono, we can have only 1 message. Lets see an working example of asserting with Mono :-

@Test
public void monoTesting() {
Mono<String> firstMono = Mono.just("God_is_Great")
.log();
StepVerifier.create(firstMono.log())
.expectNext("God_is_Great")
.verifyComplete();
}

Example.2 of Mono → With Mono, we can even have the exception. Below is how, we shall be asserting error with Mono :-

@Test
public void monoTestingWithError() {
Mono<String> firstMono = Mono.error(new RuntimeException("Exception with MONO-Producer"));
StepVerifier.create(firstMono.log())
.expectError(RuntimeException.class)
.verify();
}

Example.3 of Mono → We can form a Mono using ‘Supplier’ functional-interface as well. Please note that, ‘Supplier’ interface have only 1 method ‘get()’ which takes no argument and returns output.

@Test
public void monoFromSupplierfunctionalInterface() {
Mono<String> monofromStream = Mono.fromSupplier(() -> "krishna");
StepVerifier.create(monofromStream.log())
.expectNext("krishna")
.verifyComplete();
}

References :-