From a5cdc00e0dadea386fdae709e3d240fae6e352d7 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 13 Oct 2024 19:43:47 +0530 Subject: [PATCH] chore: send eot response for batchmap operations (#145) Signed-off-by: Yashash H L --- .../io/numaproj/numaflow/batchmapper/Service.java | 5 +++++ src/main/proto/map/v1/map.proto | 11 ++++++++--- .../numaproj/numaflow/batchmapper/ServerErrTest.java | 2 +- .../io/numaproj/numaflow/batchmapper/ServerTest.java | 6 +++--- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index f61c3d2..92b80dc 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -146,6 +146,11 @@ private void buildAndStreamResponse( .build(); responseObserver.onNext(singleRequestResponse); }); + // Send an EOT message to indicate the end of the transmission for the batch. + MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse + .newBuilder() + .setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build(); + responseObserver.onNext(eotResponse); responseObserver.onCompleted(); } diff --git a/src/main/proto/map/v1/map.proto b/src/main/proto/map/v1/map.proto index f702cc3..e64dba2 100644 --- a/src/main/proto/map/v1/map.proto +++ b/src/main/proto/map/v1/map.proto @@ -26,9 +26,6 @@ message MapRequest { google.protobuf.Timestamp watermark = 4; map headers = 5; } - message Status { - bool eot = 1; - } Request request = 1; // This ID is used to uniquely identify a map request string id = 2; @@ -44,6 +41,13 @@ message Handshake { bool sot = 1; } +/* + * Status message to indicate the status of the message. + */ +message Status { + bool eot = 1; +} + /** * MapResponse represents a response element. */ @@ -57,6 +61,7 @@ message MapResponse { // This ID is used to refer the responses to the request it corresponds to. string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /** diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index 937031d..44a2100 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -127,7 +127,7 @@ public void testErrorFromUDF() { inputStreamObserver.onNext(request); inputStreamObserver.onNext(MapOuterClass.MapRequest .newBuilder() - .setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true)) + .setStatus(MapOuterClass.Status.newBuilder().setEot(true)) .build()); inputStreamObserver.onCompleted(); try { diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java index a139e8b..bae49bb 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java @@ -63,7 +63,7 @@ public void tearDown() throws Exception { @Test public void testBatchMapHappyPath() { - BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(11); + BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(12); StreamObserver inputStreamObserver = MapGrpc .newStub(inProcessChannel) .mapFn(outputStreamObserver); @@ -90,7 +90,7 @@ public void testBatchMapHappyPath() { inputStreamObserver.onNext(MapOuterClass.MapRequest .newBuilder() - .setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true)) + .setStatus(MapOuterClass.Status.newBuilder().setEot(true)) .build()); inputStreamObserver.onCompleted(); @@ -100,7 +100,7 @@ public void testBatchMapHappyPath() { fail("Error in getting done signal from the observer " + e.getMessage()); } List result = outputStreamObserver.getMapResponses(); - assertEquals(11, result.size()); + assertEquals(12, result.size()); // first response is handshake assertTrue(result.get(0).hasHandshake());