Skip to content

Commit

Permalink
chore: send eot response for batchmap operations (#145)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 13, 2024
1 parent fa2f746 commit a5cdc00
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ private void buildAndStreamResponse(
.build();
responseObserver.onNext(singleRequestResponse);
});
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build();
responseObserver.onNext(eotResponse);
responseObserver.onCompleted();
}

Expand Down
11 changes: 8 additions & 3 deletions src/main/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ message MapRequest {
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}
message Status {
bool eot = 1;
}
Request request = 1;
// This ID is used to uniquely identify a map request
string id = 2;
Expand All @@ -44,6 +41,13 @@ message Handshake {
bool sot = 1;
}

/*
* Status message to indicate the status of the message.
*/
message Status {
bool eot = 1;
}

/**
* MapResponse represents a response element.
*/
Expand All @@ -57,6 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testErrorFromUDF() {
inputStreamObserver.onNext(request);
inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void tearDown() throws Exception {

@Test
public void testBatchMapHappyPath() {
BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(11);
BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(12);
StreamObserver<MapOuterClass.MapRequest> inputStreamObserver = MapGrpc
.newStub(inProcessChannel)
.mapFn(outputStreamObserver);
Expand All @@ -90,7 +90,7 @@ public void testBatchMapHappyPath() {

inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();

Expand All @@ -100,7 +100,7 @@ public void testBatchMapHappyPath() {
fail("Error in getting done signal from the observer " + e.getMessage());
}
List<MapOuterClass.MapResponse> result = outputStreamObserver.getMapResponses();
assertEquals(11, result.size());
assertEquals(12, result.size());

// first response is handshake
assertTrue(result.get(0).hasHandshake());
Expand Down

0 comments on commit a5cdc00

Please sign in to comment.