|
| 1 | +package io.github.toquery.example.spring.reactive; |
| 2 | + |
| 3 | +import io.github.toquery.example.spring.reactive.model.Feed; |
| 4 | +import io.github.toquery.example.spring.reactive.model.FeedPhoto; |
| 5 | +import org.junit.jupiter.api.Test; |
| 6 | +import reactor.core.publisher.Flux; |
| 7 | +import reactor.core.publisher.Mono; |
| 8 | +import reactor.core.scheduler.Schedulers; |
| 9 | +import reactor.test.StepVerifier; |
| 10 | +import reactor.util.function.Tuple2; |
| 11 | + |
| 12 | +import java.time.Duration; |
| 13 | +import java.time.LocalDateTime; |
| 14 | +import java.util.Collection; |
| 15 | +import java.util.Map; |
| 16 | + |
| 17 | +/** |
| 18 | + * |
| 19 | + */ |
| 20 | +public class ExampleReactiveTests { |
| 21 | + |
| 22 | + @Test |
| 23 | + public void mergeFluxes() { |
| 24 | + Flux<String> characterFlux = Flux |
| 25 | + .just("Garfield", "Kojak", "Barbossa") |
| 26 | + .delayElements(Duration.ofMillis(500)); |
| 27 | + |
| 28 | + Flux<String> foodFlux = Flux |
| 29 | + .just("Lasagna", "Lollipops", "Apples") |
| 30 | + .delaySubscription(Duration.ofMillis(250)) |
| 31 | + .delayElements(Duration.ofMillis(500)); |
| 32 | + |
| 33 | + Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux); |
| 34 | + |
| 35 | + StepVerifier.create(mergedFlux) |
| 36 | + .expectNext("Garfield") |
| 37 | + .expectNext("Lasagna") |
| 38 | + .expectNext("Kojak") |
| 39 | + .expectNext("Lollipops") |
| 40 | + .expectNext("Barbossa") |
| 41 | + .expectNext("Apples") |
| 42 | + .verifyComplete(); |
| 43 | + } |
| 44 | + |
| 45 | + @Test |
| 46 | + public void zipFluxes() { |
| 47 | + Flux<String> characterFlux = Flux.just("Garfield", "Kojak"); |
| 48 | + Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples"); |
| 49 | + |
| 50 | + Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux); |
| 51 | + |
| 52 | + zippedFlux.log().subscribe(System.out::println); |
| 53 | + |
| 54 | +// StepVerifier.create(zippedFlux) |
| 55 | +// .expectNextMatches(p -> |
| 56 | +// p.getT1().equals("Garfield") && |
| 57 | +// p.getT2().equals("Lasagna")) |
| 58 | +// .expectNextMatches(p -> |
| 59 | +// p.getT1().equals("Kojak") && |
| 60 | +// p.getT2().equals("Lollipops")) |
| 61 | +// .expectNextMatches(p -> |
| 62 | +// p.getT1().equals("Barbossa") && |
| 63 | +// p.getT2().equals("Apples")) |
| 64 | +// .verifyComplete(); |
| 65 | + } |
| 66 | + |
| 67 | + @Test |
| 68 | + public void zipFluxesToObject() { |
| 69 | + Flux<String> characterFlux = Flux.just("Garfield", "Kojak"); |
| 70 | + Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples"); |
| 71 | + |
| 72 | + Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux, |
| 73 | + (c, f) -> c + " eats " + f); |
| 74 | + |
| 75 | + zippedFlux.log().subscribe(System.out::println); |
| 76 | +// StepVerifier.create(zippedFlux) |
| 77 | +// .expectNext("Garfield eats Lasagna") |
| 78 | +// .expectNext("Kojak eats Lollipops") |
| 79 | +// .expectNext("Barbossa eats Apples") |
| 80 | +// .verifyComplete(); |
| 81 | + } |
| 82 | + |
| 83 | + @Test |
| 84 | + public void testZipWith() { |
| 85 | + Flux.just("a", "b").zipWith(Flux.just("c", "d", "e")).subscribe(System.out::println); |
| 86 | + |
| 87 | + Flux.just("a", "b").zipWith(Flux.just("c", "d", "e"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println); |
| 88 | + |
| 89 | +// [a,c] |
| 90 | +// [b,d] |
| 91 | +// a-c |
| 92 | +// b-d |
| 93 | + } |
| 94 | + @Test |
| 95 | + public void combineLatest(){ |
| 96 | + Flux.combineLatest(Flux.just(1, 2), Flux.just("c", "d", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println); |
| 97 | + } |
| 98 | + |
| 99 | + @Test |
| 100 | + public void testReduce() { |
| 101 | + Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println); |
| 102 | + Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println); |
| 103 | + } |
| 104 | + |
| 105 | + |
| 106 | + @Test |
| 107 | + public void testMerge() { |
| 108 | + Flux.merge(Flux.just(1, 2), Flux.just("a", "b", "c")).log().subscribe(System.out::println); |
| 109 | + Flux.mergeSequential(Flux.just(1, 2), Flux.just("a", "b", "c")).log().subscribe(System.out::println); |
| 110 | + } |
| 111 | + |
| 112 | + @Test |
| 113 | + public void testFlatMap() { |
| 114 | + Flux.just(1, 2).flatMap(x -> Flux.just(x + "3", "4")).toStream().forEach(System.out::println); |
| 115 | + } |
| 116 | + |
| 117 | + @Test |
| 118 | + public void testConcatMap() { |
| 119 | + Flux.just(1, 2).concatMap(x -> Flux.just(x + "3", "4")).toStream().forEach(System.out::println); |
| 120 | + } |
| 121 | + |
| 122 | + @Test |
| 123 | + public void mergeStream() { |
| 124 | + Flux<Feed> feedFlux = Flux.just( |
| 125 | + Feed.builder().id("1").feedText("feed1").createTime(LocalDateTime.now()).build(), |
| 126 | + Feed.builder().id("2").feedText("feed2").createTime(LocalDateTime.now()).build(), |
| 127 | + Feed.builder().id("3").feedText("feed3").createTime(LocalDateTime.now()).build() |
| 128 | + ); |
| 129 | + |
| 130 | + Flux<FeedPhoto> feedPhotoFlux = Flux.just( |
| 131 | + FeedPhoto.builder().id("1").feedId("1").url("url1").build(), |
| 132 | + FeedPhoto.builder().id("2").feedId("1").url("url2").build(), |
| 133 | + FeedPhoto.builder().id("3").feedId("1").url("url3").build(), |
| 134 | + FeedPhoto.builder().id("4").feedId("2").url("url4").build(), |
| 135 | + FeedPhoto.builder().id("5").feedId("2").url("url5").build(), |
| 136 | + FeedPhoto.builder().id("6").feedId("3").url("url6").build() |
| 137 | + ); |
| 138 | + |
| 139 | + |
| 140 | + Mono<Map<String, Collection<FeedPhoto>>> collectMultiMap = feedPhotoFlux.collectMultimap(FeedPhoto::getFeedId); |
| 141 | +// feedPhotoFlux.collectMultimap(FeedPhoto::getFeedId).doOnNext(item -> item.get("")).subscribe(System.out::println); |
| 142 | + Map<String, Collection<FeedPhoto>> map = collectMultiMap.block(); |
| 143 | + |
| 144 | + feedFlux.map(feed -> { |
| 145 | +// collectMultiMap.doOnNext(map -> feed.setPhotos(map.get(feed.getId()))).subscribe(); |
| 146 | + feed.setPhotos(map.get(feed.getId())); |
| 147 | + |
| 148 | + return feed; |
| 149 | + }).subscribe(System.out::println); |
| 150 | + |
| 151 | +// Flux.concat(feedPhotoFlux).w, (feed, map) -> { |
| 152 | +// Collection<FeedPhoto> feedPhotos = map.get(feed.getId()); |
| 153 | +// feed.setPhotos(feedPhotos); |
| 154 | +// return feed; |
| 155 | +// }).subscribe(System.out::println); |
| 156 | + |
| 157 | +// Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = feedFlux |
| 158 | +// .flatMap(userId -> Mono.just(userId) |
| 159 | +// .zipWith(collectMultiMap, (feed, photos) -> new Tuple2(feed.getId(), photos.get(feed.getId()))))); |
| 160 | + |
| 161 | + |
| 162 | +// feedFlux.flatMap(feed -> Flux.zip(feed, collectMultiMap) |
| 163 | +// .flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple)))) |
| 164 | + |
| 165 | +// feedFlux.zipWith(collectMultiMap).subscribe(System.out::println); |
| 166 | +// |
| 167 | +// feedFlux.zipWith(collectMultiMap, (feedResponse, feedPhotoMap) -> { |
| 168 | +// feedResponse.setPhotos(feedPhotoMap.get(feedResponse.getId())); |
| 169 | +// return feedResponse; |
| 170 | +// }).log().subscribe(System.out::println); |
| 171 | + |
| 172 | + } |
| 173 | +} |
0 commit comments