Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: udsink bidirectional streaming #141

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
136 changes: 75 additions & 61 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@

import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.numaproj.numaflow.sink.v1.SinkGrpc.getSinkFnMethod;

@Slf4j
class Service extends SinkGrpc.SinkImplBase {
// sinkTaskExecutor is the executor for the sinker. It is a fixed size thread pool
Expand All @@ -24,12 +23,6 @@
private final ExecutorService sinkTaskExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
private final long SHUTDOWN_TIME = 30;


private final Sinker sinker;

public Service(Sinker sinker) {
Expand All @@ -41,25 +34,58 @@
*/
@Override
public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(StreamObserver<SinkOuterClass.SinkResponse> responseObserver) {
if (this.sinker == null) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getSinkFnMethod(),
responseObserver);
}
return new StreamObserver<>() {
private boolean startOfStream = true;
private CompletableFuture<ResponseList> result;
private DatumIteratorImpl datumStream;
private boolean handshakeDone = false;

DatumIteratorImpl datumStream = new DatumIteratorImpl();
@Override
public void onNext(SinkOuterClass.SinkRequest request) {
// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.getHandshake().getSot()) {
responseObserver.onError(new Exception("Handshake request not received"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please cover L48-49

return;

Check warning on line 49 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L48-L49

Added lines #L48 - L49 were not covered by tests
}
responseObserver.onNext(SinkOuterClass.SinkResponse.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In go sdk, we also set result status to success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to, I will change that in go sdk.

.setHandshake(request.getHandshake())
.build());
handshakeDone = true;
return;
}

Future<ResponseList> result = sinkTaskExecutor.submit(() -> this.sinker.processMessages(
datumStream));
// Create a DatumIterator to write the messages to the sinker
// and start the sinker if it is the start of the stream
if (startOfStream) {
datumStream = new DatumIteratorImpl();
result = CompletableFuture.supplyAsync(
() -> sinker.processMessages(datumStream),
RohanAshar marked this conversation as resolved.
Show resolved Hide resolved
sinkTaskExecutor);
startOfStream = false;
}

return new StreamObserver<SinkOuterClass.SinkRequest>() {
@Override
public void onNext(SinkOuterClass.SinkRequest d) {
try {
datumStream.writeMessage(constructHandlerDatum(d));
} catch (InterruptedException e) {
Thread.interrupted();
onError(e);
if (request.getStatus().getEot()) {
// End of transmission, write EOF datum to the stream
// Wait for the result and send the response back to the client
datumStream.writeMessage(HandlerDatum.EOF_DATUM);

ResponseList responses = result.join();
responses.getResponses().forEach(response -> {
SinkOuterClass.SinkResponse sinkResponse = buildResponse(response);
responseObserver.onNext(sinkResponse);
});

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
} else {
datumStream.writeMessage(constructHandlerDatum(request));
}
} catch (Exception e) {
log.error("Encountered error in sinkFn - {}", e.getMessage());
responseObserver.onError(e);
}
}

Expand All @@ -71,26 +97,23 @@

@Override
public void onCompleted() {
SinkOuterClass.SinkResponse response = SinkOuterClass.SinkResponse
.newBuilder()
.build();
try {
datumStream.writeMessage(HandlerDatum.EOF_DATUM);
// wait until the sink handler returns, result.get() is a blocking call
ResponseList responses = result.get();
// construct responseList from responses
response = buildResponseList(responses);

} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
onError(e);
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}

private SinkOuterClass.SinkResponse buildResponse(Response response) {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
return SinkOuterClass.SinkResponse.newBuilder()
.setResult(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build())
.build();
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
Expand All @@ -104,37 +127,28 @@

private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
return new HandlerDatum(
d.getKeysList().toArray(new String[0]),
d.getValue().toByteArray(),
d.getRequest().getKeysList().toArray(new String[0]),
d.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
d.getWatermark().getSeconds(),
d.getWatermark().getNanos()),
d.getRequest().getWatermark().getSeconds(),
d.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
d.getEventTime().getSeconds(),
d.getEventTime().getNanos()),
d.getId(),
d.getHeadersMap()
d.getRequest().getEventTime().getSeconds(),
d.getRequest().getEventTime().getNanos()),
d.getRequest().getId(),
d.getRequest().getHeadersMap()
);
}

public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
var responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
responses.getResponses().forEach(response -> {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
responseBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build());
});
return responseBuilder.build();
}

// shuts down the executor service which is used for reduce
public void shutDown() {
this.sinkTaskExecutor.shutdown();
try {
// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
long SHUTDOWN_TIME = 30;

if (!sinkTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("Sink executor did not terminate in the specified time.");
List<Runnable> droppedTasks = sinkTaskExecutor.shutdownNow();
Expand Down
49 changes: 33 additions & 16 deletions src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@
* @return response from the server as a ResponseList
*/
public ResponseList sendRequest(DatumIterator datumIterator) {
CompletableFuture<SinkOuterClass.SinkResponse> future = new CompletableFuture<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, as to why SinkerTestKit lives in this package?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for enabling users do "Component testing".

List<SinkOuterClass.SinkResponse> responses = new ArrayList<>();
CompletableFuture<List<SinkOuterClass.SinkResponse>> future = new CompletableFuture<>();

Check warning on line 117 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L116-L117

Added lines #L116 - L117 were not covered by tests

StreamObserver<SinkOuterClass.SinkResponse> responseObserver = new StreamObserver<>() {
@Override
public void onNext(SinkOuterClass.SinkResponse response) {
future.complete(response);
responses.add(response);

Check warning on line 122 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L122

Added line #L122 was not covered by tests
}

@Override
Expand All @@ -127,16 +129,19 @@

@Override
public void onCompleted() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException(
"Server completed without a response"));
}
future.complete(responses);

Check warning on line 132 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L132

Added line #L132 was not covered by tests
}
};

StreamObserver<SinkOuterClass.SinkRequest> requestObserver = sinkStub.sinkFn(
responseObserver);

// send handshake request
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder()
.setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build())
.build());

Check warning on line 142 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L140-L142

Added lines #L140 - L142 were not covered by tests

// send actual requests
while (true) {
Datum datum = null;
try {
Expand All @@ -148,7 +153,8 @@
if (datum == null) {
break;
}
SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder()
SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request
.newBuilder()

Check warning on line 157 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L157

Added line #L157 was not covered by tests
.addAllKeys(
datum.getKeys()
== null ? new ArrayList<>() : List.of(datum.getKeys()))
Expand All @@ -168,28 +174,39 @@
.putAllHeaders(
datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders())
.build();
requestObserver.onNext(request);
requestObserver.onNext(SinkOuterClass.SinkRequest
.newBuilder()
.setRequest(request)
.build());

Check warning on line 180 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L177-L180

Added lines #L177 - L180 were not covered by tests
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());

Check warning on line 184 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L183-L184

Added lines #L183 - L184 were not covered by tests

requestObserver.onCompleted();

SinkOuterClass.SinkResponse response;
List<SinkOuterClass.SinkResponse> outputResponses;
try {
response = future.get();
outputResponses = future.get();

Check warning on line 190 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L190

Added line #L190 was not covered by tests
} catch (Exception e) {
throw new RuntimeException(e);
}

ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) {
if (result.getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result.getId()));
} else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
for (SinkOuterClass.SinkResponse result : outputResponses) {
if (result.getHandshake().getSot()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does getSot do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sot means start of the transmission. when set to true, it means the handshake is successful.

continue;

Check warning on line 198 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L198

Added line #L198 was not covered by tests
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
.getId()));

Check warning on line 203 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L201-L203

Added lines #L201 - L203 were not covered by tests
} else if (result.getResult().getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
result.getId()));
result.getResult().getId()));

Check warning on line 206 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L206

Added line #L206 was not covered by tests
} else {
responseListBuilder.addResponse(Response.responseFailure(
result.getId(), result.getErrMsg()));
result.getResult().getId(), result.getResult().getErrMsg()));

Check warning on line 209 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L209

Added line #L209 was not covered by tests
}
}

Expand Down
27 changes: 17 additions & 10 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,22 @@
@Override
public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;
@Override
public void onNext(SourceOuterClass.ReadRequest request) {
// if the request is a handshake, send handshake response.
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the read requests
if (!handshakeDone) {
if (!request.getHandshake().getSot()) {
responseObserver.onError(new Exception("Handshake request not received"));
return;
}
responseObserver.onNext(SourceOuterClass.ReadResponse.newBuilder()
.setHandshake(request.getHandshake())
.setStatus(SourceOuterClass.ReadResponse.Status.newBuilder()
.setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS)
.build())
.build());
handshakeDone = true;
return;
}

ReadRequestImpl readRequest = new ReadRequestImpl(
request.getRequest().getNumRecords(),
Duration.ofMillis(request.getRequest().getTimeoutInMs()));
Expand Down Expand Up @@ -89,16 +93,19 @@
@Override
public StreamObserver<SourceOuterClass.AckRequest> ackFn(final StreamObserver<SourceOuterClass.AckResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;
@Override
public void onNext(SourceOuterClass.AckRequest request) {
// if the request is a handshake, send a handshake response
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the ack requests
if (!handshakeDone) {
if (!request.getHandshake().getSot()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the hasHandshake check? it could be null. Same for sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added it back.

responseObserver.onError(new Exception("Handshake request not received"));
return;

Check warning on line 103 in src/main/java/io/numaproj/numaflow/sourcer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcer/Service.java#L102-L103

Added lines #L102 - L103 were not covered by tests
}
responseObserver.onNext(SourceOuterClass.AckResponse.newBuilder()
.setHandshake(request.getHandshake())
.setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess(
Empty.newBuilder().build()))
.build());
return;
handshakeDone = true;
}

SourceOuterClass.Offset offset = request.getRequest().getOffset();
Expand Down
Loading
Loading