Skip to content
This repository was archived by the owner on Mar 13, 2021. It is now read-only.

Work around cancel() after complete() wrong behavior in reactive-grpc #143

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>11</java.version>
<grpc.version>1.23.0</grpc.version>
<reactive-grpc.version>0.10.0-RC1</reactive-grpc.version>
<reactive-grpc.version>1.0.0</reactive-grpc.version>
<spring-cloud-function.version>3.0.0.BUILD-SNAPSHOT</spring-cloud-function.version>
<reactor.version>3.2.5.RELEASE</reactor.version>
<protoc.version>3.6.1</protoc.version>
<reactor.version>3.2.12.RELEASE</reactor.version>
<protoc.version>3.7.1</protoc.version>
</properties>

<dependencyManagement>
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/io/projectriff/invoker/server/GrpcServerAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.projectriff.invoker.rpc.OutputFrame;
import io.projectriff.invoker.rpc.OutputSignal;
import io.projectriff.invoker.rpc.ReactorRiffGrpc;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
Expand All @@ -14,14 +16,18 @@
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -113,6 +119,9 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
return
// stick dummy messages in front to force the creation of each arg-index group
flux -> flux.startWith(startTuples)
// Work around bug in reactive-grpc which freaks out on cancels happening after complete when
// it shouldn't. Those cancels are a consequence of FluxGroupBy.complete()
.transform(ignoreCancelsAfterComplete())
// group by arg index (ie de-mux)
.groupBy(Tuple2::getT1, Tuple2::getT2)
// chop the outer flux. We know there will ever be exactly that many groups
Expand Down Expand Up @@ -140,6 +149,52 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
;
}

// Used to transform the publisher chain into one that doesn't forward cancel() calls once it has complete()d.
private Function<? super Publisher<Tuple2<Integer, Message<byte[]>>>, ? extends Publisher<Tuple2<Integer, Message<byte[]>>>> ignoreCancelsAfterComplete() {
return Operators.lift((f, actual) ->
new CoreSubscriber<Tuple2<Integer, Message<byte[]>>>() {
private volatile boolean completed;

@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
if (!completed) {
s.cancel();
}
}
});
}

@Override
public void onNext(Tuple2<Integer, Message<byte[]>> objects) {
actual.onNext(objects);
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
completed = true;
actual.onComplete();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
}

private Flux<Message<byte[]>>[] promoteToArray(Object result) {
if (result instanceof Tuple2) {
Object[] objects = ((Tuple2) result).toArray();
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</encoder>
</appender>

<root level="debug">
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>