From 5137d16b23c2bf89d6e1996c478ec7804b93d442 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 4 Nov 2024 11:37:22 -0500 Subject: [PATCH] . Signed-off-by: Keran Yang --- examples/simple-pipeline.yaml | 33 ++++++++++++++ .../examples/sink/simple/SimpleSink.java | 2 + .../io/numaproj/numaflow/sinker/Server.java | 3 +- .../io/numaproj/numaflow/sinker/Service.java | 45 +++++++++++++++++-- 4 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 examples/simple-pipeline.yaml diff --git a/examples/simple-pipeline.yaml b/examples/simple-pipeline.yaml new file mode 100644 index 0000000..4aefe0e --- /dev/null +++ b/examples/simple-pipeline.yaml @@ -0,0 +1,33 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-pipeline +spec: + vertices: + - name: in + scale: + min: 1 + source: + generator: + rpu: 5 + duration: 1s + - name: cat + scale: + min: 1 + udf: + builtin: + name: cat + - name: java-sink + scale: + min: 1 + sink: + udsink: + container: + image: quay.io/numaio/numaflow-java/simple-sink:keran-3 + imagePullPolicy: Always + edges: + - from: in + to: cat + - from: cat + to: java-sink + diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java index e2a4224..9301990 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java @@ -24,6 +24,8 @@ public static void main(String[] args) throws Exception { // wait for the server to shut down server.awaitTermination(); + + log.info("Server stopped."); } @Override diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 9b0edb0..9599d96 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -38,7 +38,7 @@ public Server(Sinker sinker) { * @param sinker sink to process the message */ public Server(Sinker sinker, GRPCConfig grpcConfig) { - this.service = new Service(sinker); + this.service = new Service(sinker, this); this.grpcConfig = grpcConfig; } @@ -121,6 +121,7 @@ public void awaitTermination() throws InterruptedException { // what the difference between this method and awaitTermination? public void stop() throws InterruptedException { log.info("Server.stop started. Shutting down sink service"); + // TODO - should server shutdown take care of service shutdown? this.service.shutDown(); log.info("sink service is successfully shut down"); if (server != null) { diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 7cb9e3c..b8cd595 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -10,6 +10,7 @@ import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -23,9 +24,11 @@ class Service extends SinkGrpc.SinkImplBase { .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); private final Sinker sinker; + private final Server server; - public Service(Sinker sinker) { + public Service(Sinker sinker, Server server) { this.sinker = sinker; + this.server = server; } /** @@ -49,6 +52,12 @@ public void onNext(SinkOuterClass.SinkRequest request) { responseObserver.onError(Status.INVALID_ARGUMENT .withDescription("Handshake request not received") .asException()); + try { + server.stop(); + } catch (InterruptedException e) { + log.error("Error while stopping the server - {}", e.getMessage()); + throw new RuntimeException(e); + } return; } log.info("Handshake request received, sending handshake response"); @@ -77,11 +86,14 @@ public void onNext(SinkOuterClass.SinkRequest request) { // Wait for the result and send the response back to the client datumStream.write(HandlerDatum.EOF_DATUM); + log.info("Collecting responses from the CompletableFuture - result"); ResponseList responses = result.join(); + log.info("Responses collected from the CompletableFuture - result"); SinkOuterClass.SinkResponse.Builder responseBuilder = SinkOuterClass.SinkResponse.newBuilder(); for (Response response : responses.getResponses()) { responseBuilder.addResults(buildResult(response)); } + log.info("Sending response back to the client - {}", responseBuilder.build()); responseObserver.onNext(responseBuilder.build()); // send eot response to indicate end of transmission for the batch @@ -92,6 +104,7 @@ public void onNext(SinkOuterClass.SinkRequest request) { .setEot(true) .build()) .build(); + log.info("Sending EOT response back to the client {}", eotResponse); responseObserver.onNext(eotResponse); log.info("All responses sent back to the client, including the EOT response"); @@ -102,9 +115,27 @@ public void onNext(SinkOuterClass.SinkRequest request) { } else { datumStream.write(constructHandlerDatum(request)); } + } catch (CompletionException ce) { + Throwable cause = ce.getCause(); // Get the original RuntimeException, if any. + log.error("Error occurred while processing messages: {}", cause.getMessage()); + responseObserver.onError(cause); // Pass the error back to the client. + + // Initiate server shutdown. + try { + server.stop(); + } catch (InterruptedException ex) { + log.error("Error while stopping the server: {}", ex.getMessage()); + } } catch (Exception e) { log.error("Encountered error in sink onNext - {}", e.getMessage()); responseObserver.onError(e); + // Initiate server shutdown. + try { + server.stop(); + } catch (InterruptedException ex) { + log.error("Error while stopping the server - {}", ex.getMessage()); + throw new RuntimeException(ex); + } } } @@ -112,6 +143,12 @@ public void onNext(SinkOuterClass.SinkRequest request) { public void onError(Throwable throwable) { log.error("Encountered error in sinkFn - {}", throwable.getMessage()); responseObserver.onError(throwable); + try { + server.stop(); + } catch (InterruptedException e) { + log.info("Error while stopping the server - {}", e.getMessage()); + throw new RuntimeException(e); + } } @Override @@ -131,9 +168,9 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) { .build(); } - /** - * IsReady is the heartbeat endpoint for gRPC. - */ + /** + * IsReady is the heartbeat endpoint for gRPC. + */ @Override public void isReady( Empty request,