Skip to content

Commit 373165a

Browse files
authored
BAEL-7538 - Mono.fromCallable vs Mono.justOrEmpty in Spring Reactive (#17921)
* BAEL-7538 - Mono.fromCallable vs Mono.justOrEmpty in Spring Reactive
1 parent f6240fd commit 373165a

File tree

1 file changed

+97
-0
lines changed

1 file changed

+97
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.baeldung.reactor.fromcallable.justempty;
2+
3+
import java.util.List;
4+
import java.util.Optional;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import org.assertj.core.data.Offset;
11+
import org.junit.jupiter.api.Test;
12+
13+
import lombok.extern.slf4j.Slf4j;
14+
import reactor.core.publisher.Mono;
15+
import reactor.test.StepVerifier;
16+
17+
@Slf4j
18+
public class FromCallableJustEmptyUnitTest {
19+
20+
@Test
21+
public void givenDataAvailable_whenCallingFromCallable_thenLazyEvaluation() {
22+
AtomicLong timeTakenForCompletion = new AtomicLong();
23+
Mono<String> dataFetched = Mono.fromCallable(this::fetchData)
24+
.doOnSubscribe(subscription -> timeTakenForCompletion.set(-1 * System.nanoTime()))
25+
.doFinally(consumer -> timeTakenForCompletion.addAndGet(System.nanoTime()));
26+
27+
StepVerifier.create(dataFetched)
28+
.expectNext("Data Fetched")
29+
.verifyComplete();
30+
31+
log.debug("Time Taken to Retrieve Data with Lazy Execution with Subscription");
32+
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForCompletion.get())).isCloseTo(5000L, Offset.offset(50L));
33+
}
34+
35+
@Test
36+
public void givenExceptionThrown_whenCallingFromCallable_thenFromCallableCapturesError() {
37+
Mono<String> dataFetched = Mono.fromCallable(() -> {
38+
String data = fetchData();
39+
if (data.equals("Data Fetched")) {
40+
throw new RuntimeException("ERROR");
41+
}
42+
return data;
43+
})
44+
.onErrorResume(error -> Mono.just("COMPLETED"));
45+
46+
StepVerifier.create(dataFetched)
47+
.expectNext("COMPLETED")
48+
.verifyComplete();
49+
}
50+
51+
@Test
52+
public void givenDataAvailable_whenCallingJustOrEmpty_thenEagerEvaluation() {
53+
AtomicLong timeTakenToReceiveOnCompleteSignalAfterSubscription = new AtomicLong();
54+
AtomicLong timeTakenForMethodCompletion = new AtomicLong(-1 * System.nanoTime());
55+
Mono<String> dataFetched = Mono.justOrEmpty(fetchData())
56+
.doOnSubscribe(subscription -> timeTakenToReceiveOnCompleteSignalAfterSubscription.set(-1 * System.nanoTime()))
57+
.doFinally(consumer -> timeTakenToReceiveOnCompleteSignalAfterSubscription.addAndGet(System.nanoTime()));
58+
59+
timeTakenForMethodCompletion.addAndGet(System.nanoTime());
60+
61+
StepVerifier.create(dataFetched)
62+
.expectNext("Data Fetched")
63+
.verifyComplete();
64+
65+
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenToReceiveOnCompleteSignalAfterSubscription.get())).isCloseTo(1L, Offset.offset(1L));
66+
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForMethodCompletion.get())).isCloseTo(5000L, Offset.offset(50L));
67+
}
68+
69+
@Test
70+
public void givenLatestStatusIsEmpty_thenCallingFromCallableForEagerEvaluation() {
71+
Optional<String> latestStatus = fetchLatestStatus();
72+
String updatedStatus = "ACTIVE";
73+
Mono<String> currentStatus = Mono.justOrEmpty(latestStatus)
74+
.switchIfEmpty(Mono.fromCallable(() -> updatedStatus));
75+
76+
StepVerifier.create(currentStatus)
77+
.expectNext(updatedStatus)
78+
.verifyComplete();
79+
}
80+
81+
private Optional<String> fetchLatestStatus() {
82+
List<String> activeStatusList = List.of("ARCHIVED", "ACTIVE");
83+
if (activeStatusList.contains("ARCHIVED")) {
84+
return Optional.empty();
85+
}
86+
return Optional.of(activeStatusList.get(0));
87+
}
88+
89+
private String fetchData() {
90+
try {
91+
Thread.sleep(5000);
92+
} catch (InterruptedException e) {
93+
throw new RuntimeException(e);
94+
}
95+
return "Data Fetched";
96+
}
97+
}

0 commit comments

Comments
 (0)