Skip to content

Commit

Permalink
rename and add eot to sink
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 14, 2024
1 parent 0f0ec35 commit 4b35fa4
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private void buildAndStreamResponse(
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build();
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).build())
.build();
responseObserver.onNext(eotResponse);
responseObserver.onCompleted();
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public void onNext(MapOuterClass.MapRequest request) {
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build();
.setStatus(MapOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build()).build();
responseObserver.onNext(eotResponse);
}

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public void onNext(SinkOuterClass.SinkRequest request) {
responseObserver.onNext(sinkResponse);
});

// send eot response to indicate end of transmission for the batch
SinkOuterClass.SinkResponse eotResponse = SinkOuterClass.SinkResponse
.newBuilder()
.setStatus(SinkOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build())
.build();
responseObserver.onNext(eotResponse);

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void onCompleted() {
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());
SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build());

Check warning on line 184 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L184

Added line #L184 was not covered by tests

requestObserver.onCompleted();

Expand All @@ -197,6 +197,9 @@ public void onCompleted() {
if (result.getHandshake().getSot()) {
continue;
}
if (result.hasStatus() && result.getStatus().getEot()) {
continue;

Check warning on line 201 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L201

Added line #L201 was not covered by tests
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
Expand Down
6 changes: 3 additions & 3 deletions src/main/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/*
Expand All @@ -44,7 +44,7 @@ message Handshake {
/*
* Status message to indicate the status of the message.
*/
message Status {
message TransmissionStatus {
bool eot = 1;
}

Expand All @@ -61,7 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/**
Expand Down
13 changes: 9 additions & 4 deletions src/main/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ message SinkRequest {
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}
Expand All @@ -54,6 +51,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -77,4 +81,5 @@ message SinkResponse {
}
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testErrorFromUDF() {
inputStreamObserver.onNext(request);
inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testBatchMapHappyPath() {

inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void sinkerException() {

// send eot message
inputStreamObserver.onNext(SinkOuterClass.SinkRequest.newBuilder()
.setStatus(SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());
.setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build());

inputStreamObserver.onCompleted();

Expand Down
7 changes: 5 additions & 2 deletions src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void sinkerSuccess() {
// If it's the end of the batch, send an EOT message
if (i % 10 == 0) {
SinkOuterClass.SinkRequest eotRequest = SinkOuterClass.SinkRequest.newBuilder()
.setStatus(SinkOuterClass.SinkRequest.Status
.setStatus(SinkOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build())
Expand All @@ -114,12 +114,15 @@ public void sinkerSuccess() {

while (!outputStreamObserver.completed.get()) ;
List<SinkOuterClass.SinkResponse> responseList = outputStreamObserver.getSinkResponse();
assertEquals(101, responseList.size());
assertEquals(111, responseList.size());
// first response is the handshake response
assertTrue(responseList.get(0).getHandshake().getSot());

responseList = responseList.subList(1, responseList.size());
responseList.forEach(response -> {
if (response.hasStatus() && response.getStatus().getEot()) {
return;
}
assertEquals(response.getResult().getId(), expectedId);
if (response.getResult().getStatus() == SinkOuterClass.Status.FAILURE) {
assertEquals(response.getResult().getErrMsg(), "error message");
Expand Down

0 comments on commit 4b35fa4

Please sign in to comment.