Peek into Reactive Spring Java | Part-1

  • Eventual Scaling based on the load.
  • Efficient usage of resources.
  • Response-time should be faster.
  • Callbacks → These doesn’t returns any value, but it takes the object of callback as parameter. These are not very great, as we might end-up with state of system called as ‘Callback-Hell’.
  • Future → Here, we basically returns the Future instance, which can be used to check if data is available. Its hard to compose multiple Asynchronous options.
  • Completable Future → Introduced as part of Java8 and supports functional style of programming and multiple Asynchronous operations. This option is not suitable for operation, where request contains multiple items. Lot of plumbing is required to handle the error-handling as well.
  • New Programming paradigm.
  • Asynchronous and Non-Blocking.
  • Supports Back-pressure on data-streams. If the data-source is producing more data than the expected, there exists a way to provide feedback to the data-source indicating that, Hey Data-source you need to slow-down, until I catch-up.
  • Embraces Functional Style of coding. Its similar to Streams-API and Lambdas.
  • Data flow as an Event/Message driven stream.
  • onNext(item) → Data stream events.
  • onComplete() → Completion / Success events.
  • onError() → Error events.
  • Publisher :- It has single method called as ‘subscribe’, which takes in ‘Subscriber’ instance. By making this call, the Subscriber registers to the Publisher. Publishers are generally the Data Producers, e.g. → DataBase, Remote APIs.
  • Subscriber :- This is nothing but the application in our above example. Application is a subscriber and have subscribed to the RDBMS data-source. It has 4 methods defined in it.
  • Subscription :- It has 2 methods defined in it.
  • Processor :- It has got NO method.
  • Subscriber is going to invoke the ‘subscribe()’ method of the Publisher and pass on the instance of Subscriber as an input.
  • Publisher now sends the ‘Subscription’ object back to the Subscriber, confirming that subscription is successful.
  • Now, Subscriber calls the ‘request’ method from Subscription object with an intent to get all the data from the data-source. The Subscriber can also invoke the ‘cancel’ method from Subscription object, if it wants to cancel the subscription itself. In case of ‘cancel’ method, the Publisher is not going to publish any data to the subscriber.
  • The Publisher would then send all the data-items individually one by one (each row/record represented as individual item). For each item, onNext() event shall be triggered.
  • Once all the items are received, we are going to get the ‘onComplete()’ event.
  • Project Reactor.
  • RxJava.
  • Flow Class — JDK9.
@Test
public void fluxTestingConcat() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Finished the First-Flux."));
}
@Test
public void fluxTesting() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is", "best", "policy")
.concatWith(Flux.just("Thanks"))
.log();

firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Our First Flux Completed."));
}
@Test
public void fluxTesting() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind")
.expectNext("the")
.expectNext("Clouds")
.expectNext("Sun")
.expectNext("still")
.expectNext("shines")
.verifyComplete();
}
@Test
public void fluxTestingWithCountOfElements() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.log();

StepVerifier.create(secondFluxOfProject)
.expectNextCount(6)
.expectErrorMessage("Exception with Data-Producer")
.verify();
}
@Test
public void fluxFromListOfElements() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");

Flux<String> fluxfromlist = Flux.fromIterable(listOfValues);

StepVerifier.create(fluxfromlist)
.expectNextCount(4)
.verifyComplete();
}
@Test
public void fluxFromStreamOfElements() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromStream = Flux.fromStream(listOfValues.stream());
StepVerifier.create(fluxfromStream)
.expectNext("This", "too", "shall", "pass")
.verifyComplete();
}
@Test
public void fluxUsingRange() {
Flux<Integer> fluxWithRange = Flux.range(1, 10);
StepVerifier.create(fluxWithRange.log())
.expectNextCount(10)
.verifyComplete();
}
@Test
public void fluxTestingWithFiltering() {
Flux<String> fluxOfProjectWithFilter = Flux.just("Behind", "the", "Clouds", "sun", "still", "shines")
.filter(s -> s.startsWith("s"));

StepVerifier.create(fluxOfProjectWithFilter.log())
.expectNext("sun", "still", "shines")
.verifyComplete();
}
@Test
public void fluxWithTransformation() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromlist = Flux.fromIterable(listOfValues)
.map(s -> s.toUpperCase())
.log();

StepVerifier.create(fluxfromlist)
.expectNextCount(4)
.verifyComplete();
}
@Test
public void fluxWithTransformationAndRepeat() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<Integer> fluxfromlist = Flux.fromIterable(listOfValues)
.map(s -> s.length())
.repeat(2)
.log();

StepVerifier.create(fluxfromlist)
.expectNext(4,3,5,4)
.expectNext(4,3,5,4)
.expectNext(4,3,5,4)
.verifyComplete();
}
@Test
public void fluxWithPipeline() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<Integer> fluxfromlist = Flux.fromIterable(listOfValues)
.filter(s -> (s.length() > 3))
.map(s -> s.length())
.repeat(1)
.log();

StepVerifier.create(fluxfromlist)
.expectNext(4,5,4)
.expectNext(4,5,4)
.verifyComplete();
}
@Test
public void fluxWithflatMapFunctionWithParallel() {
List<String> listOfValues = Arrays.asList("This", "too", "shall", "pass");
Flux<String> fluxfromlist = Flux.fromIterable(listOfValues)
.window(2)
.flatMapSequential(s -> s.map(g -> {
try {
System.out.println("Sleeping for 2 seconds in order to process this message.");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(g, "additional-baggage");
}).subscribeOn(parallel()))
.flatMap(s -> Flux.fromIterable(s))
.log();

StepVerifier.create(fluxfromlist)
.expectNextCount(8)
.verifyComplete();

}
@Test
public void fluxMergeExample() {
List<String> list1OfValues = Arrays.asList("This", "time", "too", "shall");
Flux<String> flux1fromlist = Flux.fromIterable(list1OfValues).delayElements(Duration.ofMillis(3000));
List<String> list2OfValues = Arrays.asList("pass", "Have", "Faith", "in", "Almighty");
Flux<String> flux2fromlist = Flux.fromIterable(list2OfValues).delayElements(Duration.ofMillis(3000));

Flux<String> mergedFlux = Flux.merge(flux1fromlist, flux2fromlist);

StepVerifier.create(mergedFlux.log())
.expectNextCount(9)
.verifyComplete();
}
@Test
public void fluxZipExample() {
List<String> list1OfValues = Arrays.asList("This", "time", "too", "shall", "smoothly");
Flux<String> flux1fromlist = Flux.fromIterable(list1OfValues);
List<String> list2OfValues = Arrays.asList("pass", "Have", "Faith", "in", "Almighty");
Flux<String> flux2fromlist = Flux.fromIterable(list2OfValues);

Flux<String> zippedFlux = Flux.zip(flux1fromlist, flux2fromlist, (t1, t2) -> t1.concat("_" + t2));

StepVerifier.create(zippedFlux.log())
.expectNext("This_pass", "time_Have", "too_Faith", "shall_in", "smoothly_Almighty")
.verifyComplete();
}
@Test
public void fluxTesting() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is", "best", "policy")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("Thanks"))
.log();
firstFluxOfProject.subscribe(System.out::println,
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Completed"));
}
@Test
public void fluxTestingWithError() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.log();
StepVerifier.create(secondFluxOfProject)
.expectNext("Behind")
.expectNext("the")
.expectNext("Clouds")
.expectNext("Sun")
.expectNext("still")
.expectNext("shines")
.expectError(RuntimeException.class)
.verify();
}
@Test
public void fluxTestingWithErrorAndThenResume() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds", "Sun", "still", "shines")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("In", "the", "Open", "Sky"))
.onErrorResume(e -> {
System.out.println("Here exception received is : " + e);
return Flux.just("Post", "Resume-Error");
})
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind", "the", "Clouds", "Sun", "still", "shines")
.expectNext("Post", "Resume-Error")
.verifyComplete();
}
@Test
public void fluxTestingWithRetry() {
Flux<String> secondFluxOfProject = Flux.just("Behind", "the", "Clouds")
.concatWith(Flux.error(new RuntimeException("Exception with Data-Producer")))
.concatWith(Flux.just("In", "the", "Open", "Sky"))
.retry(2)
.log();

StepVerifier.create(secondFluxOfProject)
.expectNext("Behind", "the", "Clouds")
.expectNext("Behind", "the", "Clouds")
.expectNext("Behind", "the", "Clouds")
.expectError(RuntimeException.class)
.verify();
}
@Test
public void fluxInfiniteSequence() {
Flux<Long> infiniteFlux = Flux.interval(Duration.ofMillis(2000)).log();
infiniteFlux.subscribe(element -> System.out.println("Value thus received is :" + element));
try {
System.out.println("Main thread entering into Sleep-State now.");
Thread.sleep(10000);
System.out.println("Main thread finished from Sleep-State now.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void fluxInfiniteSequenceTest() {
Flux<Long> finiteFluxOfSpecifiedcountOfNumbers = Flux.interval(Duration.ofMillis(2000))
.take(3)
.log();

StepVerifier.create(finiteFluxOfSpecifiedcountOfNumbers)
.expectNext(0L, 1L, 2L)
.verifyComplete();
}
@Test
public void fluxUsingRangeAndBackPressure() {
Flux<Integer> fluxWithRange = Flux.range(1, 10)
.log();

StepVerifier.create(fluxWithRange)
.thenRequest(1)
.expectNext(1)
.thenRequest(1)
.expectNext(2)
.thenCancel()
.verify();
}
@Test
public void fluxTestingBackPressure_2() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(
element -> System.out.println("Element thus received is: " + element),
e -> System.err.println("Exception got is :" + e),
() -> System.out.println(" Finished all the elements from the Flux."),
subscribtionEvent -> subscribtionEvent.request(3));
}
@Test
public void fluxTestingBackPressure_3() {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy")).log();

firstFluxOfProject.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnNext(String value) {
request(1);
System.out.println("Element thus received is: " + value);
if(value.equalsIgnoreCase("best")) {
cancel();
}
}
});
}
@Test
public void fluxTesting_hot_stream() throws InterruptedException {
Flux<String> firstFluxOfProject = Flux.just("Honesty", "is")
.concatWith(Flux.just("best", "policy"))
.delayElements(Duration.ofSeconds(1)).log();

ConnectableFlux<String> connectableFlux = firstFluxOfProject.publish();
connectableFlux.connect();
connectableFlux.subscribe(element -> System.out.println("Subscriber 1:" + element));
Thread.sleep(3000);

connectableFlux.subscribe(element -> System.out.println("Subscriber 2:" + element));
Thread.sleep(2000);
}
@Test
public void monoTesting() {
Mono<String> firstMono = Mono.just("God_is_Great")
.log();
StepVerifier.create(firstMono.log())
.expectNext("God_is_Great")
.verifyComplete();
}
@Test
public void monoTestingWithError() {
Mono<String> firstMono = Mono.error(new RuntimeException("Exception with MONO-Producer"));
StepVerifier.create(firstMono.log())
.expectError(RuntimeException.class)
.verify();
}
@Test
public void monoFromSupplierfunctionalInterface() {
Mono<String> monofromStream = Mono.fromSupplier(() -> "krishna");
StepVerifier.create(monofromStream.log())
.expectNext("krishna")
.verifyComplete();
}

--

--

--

Software Engineer for Big Data distributed systems

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
aditya goel

aditya goel

Software Engineer for Big Data distributed systems

More from Medium

Full-stack application development with Inverno

Generate P12 file for spring boot HTTPS configuration using OpenSSL

Compiling and running Java Source code using GraalVM and Oracle Java Development Kit

How To Manage Multi JAVA Versions On Mac