|
6 | 6 | import io.projectriff.invoker.rpc.OutputFrame;
|
7 | 7 | import io.projectriff.invoker.rpc.OutputSignal;
|
8 | 8 | import io.projectriff.invoker.rpc.ReactorRiffGrpc;
|
| 9 | +import org.reactivestreams.Publisher; |
| 10 | +import org.reactivestreams.Subscription; |
9 | 11 | import org.springframework.cloud.function.context.FunctionCatalog;
|
10 | 12 | import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
11 | 13 | import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
|
14 | 16 | import org.springframework.messaging.support.GenericMessage;
|
15 | 17 | import org.springframework.messaging.support.MessageBuilder;
|
16 | 18 | import org.springframework.util.MimeType;
|
| 19 | +import reactor.core.CoreSubscriber; |
17 | 20 | import reactor.core.publisher.Flux;
|
18 | 21 | import reactor.core.publisher.GroupedFlux;
|
| 22 | +import reactor.core.publisher.Operators; |
19 | 23 | import reactor.core.publisher.Signal;
|
20 | 24 | import reactor.util.function.Tuple2;
|
21 | 25 | import reactor.util.function.Tuples;
|
22 | 26 |
|
23 | 27 | import java.lang.reflect.Type;
|
24 | 28 | import java.util.Comparator;
|
| 29 | +import java.util.concurrent.atomic.AtomicBoolean; |
25 | 30 | import java.util.function.Function;
|
26 | 31 |
|
27 | 32 | /**
|
@@ -111,6 +116,9 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
|
111 | 116 | return
|
112 | 117 | // stick dummy messages in front to force the creation of each arg-index group
|
113 | 118 | flux -> flux.startWith(startTuples)
|
| 119 | + // Work around bug in reactive-grpc which freaks out on cancels happening after complete when |
| 120 | + // it shouldn't. Those cancels are a consequence of FluxGroupBy.complete() |
| 121 | + .transform(ignoreCancelsAfterComplete()) |
114 | 122 | // group by arg index (ie de-mux)
|
115 | 123 | .groupBy(Tuple2::getT1, Tuple2::getT2)
|
116 | 124 | // chop the outer flux. We know there will ever be exactly that many groups
|
@@ -138,6 +146,47 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
|
138 | 146 | ;
|
139 | 147 | }
|
140 | 148 |
|
| 149 | + // Used to transform the publisher chain into one that doesn't forward cancel() calls once it has complete()d. |
| 150 | + private Function<? super Publisher<Tuple2<Integer, Message<byte[]>>>, ? extends Publisher<Tuple2<Integer, Message<byte[]>>>> ignoreCancelsAfterComplete() { |
| 151 | + return Operators.lift((f, actual) -> |
| 152 | + new CoreSubscriber<>() { |
| 153 | + AtomicBoolean completed = new AtomicBoolean(); |
| 154 | + |
| 155 | + @Override |
| 156 | + public void onSubscribe(Subscription s) { |
| 157 | + actual.onSubscribe(new Subscription() { |
| 158 | + @Override |
| 159 | + public void request(long n) { |
| 160 | + s.request(n); |
| 161 | + } |
| 162 | + |
| 163 | + @Override |
| 164 | + public void cancel() { |
| 165 | + if (!completed.get()) { |
| 166 | + s.cancel(); |
| 167 | + } |
| 168 | + } |
| 169 | + }); |
| 170 | + } |
| 171 | + |
| 172 | + @Override |
| 173 | + public void onNext(Tuple2<Integer, Message<byte[]>> objects) { |
| 174 | + actual.onNext(objects); |
| 175 | + } |
| 176 | + |
| 177 | + @Override |
| 178 | + public void onError(Throwable t) { |
| 179 | + actual.onError(t); |
| 180 | + } |
| 181 | + |
| 182 | + @Override |
| 183 | + public void onComplete() { |
| 184 | + completed.compareAndSet(false, true); |
| 185 | + actual.onComplete(); |
| 186 | + } |
| 187 | + }); |
| 188 | + } |
| 189 | + |
141 | 190 | private Flux<Message<byte[]>>[] promoteToArray(Object result) {
|
142 | 191 | if (result instanceof Tuple2) {
|
143 | 192 | Object[] objects = ((Tuple2) result).toArray();
|
|
0 commit comments