Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 4, 2024
1 parent d33ae72 commit 5137d16
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
33 changes: 33 additions & 0 deletions examples/simple-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-pipeline
spec:
vertices:
- name: in
scale:
min: 1
source:
generator:
rpu: 5
duration: 1s
- name: cat
scale:
min: 1
udf:
builtin:
name: cat
- name: java-sink
scale:
min: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:keran-3
imagePullPolicy: Always
edges:
- from: in
to: cat
- from: cat
to: java-sink

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static void main(String[] args) throws Exception {

// wait for the server to shut down
server.awaitTermination();

log.info("Server stopped.");
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public Server(Sinker sinker) {
* @param sinker sink to process the message
*/
public Server(Sinker sinker, GRPCConfig grpcConfig) {
this.service = new Service(sinker);
this.service = new Service(sinker, this);
this.grpcConfig = grpcConfig;
}

Expand Down Expand Up @@ -121,6 +121,7 @@ public void awaitTermination() throws InterruptedException {
// what the difference between this method and awaitTermination?
public void stop() throws InterruptedException {
log.info("Server.stop started. Shutting down sink service");
// TODO - should server shutdown take care of service shutdown?
this.service.shutDown();
log.info("sink service is successfully shut down");
if (server != null) {
Expand Down
45 changes: 41 additions & 4 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,9 +24,11 @@ class Service extends SinkGrpc.SinkImplBase {
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

private final Sinker sinker;
private final Server server;

public Service(Sinker sinker) {
public Service(Sinker sinker, Server server) {
this.sinker = sinker;
this.server = server;
}

/**
Expand All @@ -49,6 +52,12 @@ public void onNext(SinkOuterClass.SinkRequest request) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
try {
server.stop();
} catch (InterruptedException e) {
log.error("Error while stopping the server - {}", e.getMessage());
throw new RuntimeException(e);

Check warning on line 59 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L57-L59

Added lines #L57 - L59 were not covered by tests
}
return;
}
log.info("Handshake request received, sending handshake response");
Expand Down Expand Up @@ -77,11 +86,14 @@ public void onNext(SinkOuterClass.SinkRequest request) {
// Wait for the result and send the response back to the client
datumStream.write(HandlerDatum.EOF_DATUM);

log.info("Collecting responses from the CompletableFuture - result");
ResponseList responses = result.join();
log.info("Responses collected from the CompletableFuture - result");
SinkOuterClass.SinkResponse.Builder responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
for (Response response : responses.getResponses()) {
responseBuilder.addResults(buildResult(response));
}
log.info("Sending response back to the client - {}", responseBuilder.build());
responseObserver.onNext(responseBuilder.build());

// send eot response to indicate end of transmission for the batch
Expand All @@ -92,6 +104,7 @@ public void onNext(SinkOuterClass.SinkRequest request) {
.setEot(true)
.build())
.build();
log.info("Sending EOT response back to the client {}", eotResponse);
responseObserver.onNext(eotResponse);
log.info("All responses sent back to the client, including the EOT response");

Expand All @@ -102,16 +115,40 @@ public void onNext(SinkOuterClass.SinkRequest request) {
} else {
datumStream.write(constructHandlerDatum(request));
}
} catch (CompletionException ce) {
Throwable cause = ce.getCause(); // Get the original RuntimeException, if any.
log.error("Error occurred while processing messages: {}", cause.getMessage());
responseObserver.onError(cause); // Pass the error back to the client.

// Initiate server shutdown.
try {
server.stop();
} catch (InterruptedException ex) {
log.error("Error while stopping the server: {}", ex.getMessage());

Check warning on line 127 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L126-L127

Added lines #L126 - L127 were not covered by tests
}
} catch (Exception e) {
log.error("Encountered error in sink onNext - {}", e.getMessage());

Check warning on line 130 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L130

Added line #L130 was not covered by tests
responseObserver.onError(e);
// Initiate server shutdown.
try {
server.stop();
} catch (InterruptedException ex) {
log.error("Error while stopping the server - {}", ex.getMessage());
throw new RuntimeException(ex);
}

Check warning on line 138 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L134-L138

Added lines #L134 - L138 were not covered by tests
}
}

@Override
public void onError(Throwable throwable) {
log.error("Encountered error in sinkFn - {}", throwable.getMessage());
responseObserver.onError(throwable);
try {
server.stop();
} catch (InterruptedException e) {
log.info("Error while stopping the server - {}", e.getMessage());
throw new RuntimeException(e);
}

Check warning on line 151 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L147-L151

Added lines #L147 - L151 were not covered by tests
}

@Override
Expand All @@ -131,9 +168,9 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
.build();
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
/**
* IsReady is the heartbeat endpoint for gRPC.
*/
@Override
public void isReady(
Empty request,
Expand Down

0 comments on commit 5137d16

Please sign in to comment.