Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify MapStream and Unary Map Operations Using a Shared gRPC Protocol #146

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private void buildAndStreamResponse(
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build();
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).build())
.build();
responseObserver.onNext(eotResponse);
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.mapstream.v1.Mapstream;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
Expand All @@ -14,17 +14,17 @@
*/
@AllArgsConstructor
class OutputObserverImpl implements OutputObserver {
StreamObserver<Mapstream.MapStreamResponse> responseObserver;
StreamObserver<MapOuterClass.MapResponse> responseObserver;

@Override
public void send(Message message) {
Mapstream.MapStreamResponse response = buildResponse(message);
MapOuterClass.MapResponse response = buildResponse(message);
responseObserver.onNext(response);
}

private Mapstream.MapStreamResponse buildResponse(Message message) {
return Mapstream.MapStreamResponse.newBuilder()
.setResult(Mapstream.MapStreamResponse.Result.newBuilder()
private MapOuterClass.MapResponse buildResponse(Message message) {
return MapOuterClass.MapResponse.newBuilder()
.addResults(MapOuterClass.MapResponse.Result.newBuilder()
.setValue(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
Expand All @@ -33,6 +33,5 @@ private Mapstream.MapStreamResponse buildResponse(Message message) {
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
.build()).build();

}
}
117 changes: 79 additions & 38 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
@@ -1,64 +1,105 @@
package io.numaproj.numaflow.mapstreamer;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.mapstream.v1.MapStreamGrpc;
import io.numaproj.numaflow.mapstream.v1.Mapstream;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;

import static io.numaproj.numaflow.mapstream.v1.MapStreamGrpc.getMapStreamFnMethod;

import static io.numaproj.numaflow.map.v1.MapGrpc.getMapFnMethod;

@Slf4j
@AllArgsConstructor
class Service extends MapStreamGrpc.MapStreamImplBase {
class Service extends MapGrpc.MapImplBase {

private final MapStreamer mapStreamer;

/**
* Applies a map stream function to each request.
*/
@Override
public void mapStreamFn(
Mapstream.MapStreamRequest request,
StreamObserver<Mapstream.MapStreamResponse> responseObserver) {
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {

if (this.mapStreamer == null) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(
getMapStreamFnMethod(),
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getMapFnMethod(),

Check warning on line 25 in src/main/java/io/numaproj/numaflow/mapstreamer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Service.java#L24-L25

Added lines #L24 - L25 were not covered by tests
responseObserver);
return;
}

HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
Instant.ofEpochSecond(
request.getWatermark().getSeconds(),
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos()),
request.getHeadersMap()
);
return new StreamObserver<>() {
private boolean handshakeDone = false;

// process Datum
this.mapStreamer.processMessage(request
.getKeysList()
.toArray(new String[0]), handlerDatum, new OutputObserverImpl(responseObserver));
@Override
public void onNext(MapOuterClass.MapRequest request) {
// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;

Check warning on line 40 in src/main/java/io/numaproj/numaflow/mapstreamer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Service.java#L37-L40

Added lines #L37 - L40 were not covered by tests
}
responseObserver.onNext(MapOuterClass.MapResponse.newBuilder()
.setHandshake(request.getHandshake())
.build());
handshakeDone = true;
return;
}

responseObserver.onCompleted();
}
try {
// process the message
mapStreamer.processMessage(
request
.getRequest()
.getKeysList()
.toArray(new String[0]),
constructHandlerDatum(request),
new OutputObserverImpl(responseObserver));
} catch (Exception e) {
log.error("Error processing message", e);
responseObserver.onError(Status.UNKNOWN
.withDescription(e.getMessage())
.asException());
return;
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
@Override
public void isReady(Empty request, StreamObserver<Mapstream.ReadyResponse> responseObserver) {
responseObserver.onNext(Mapstream.ReadyResponse.newBuilder().setReady(true).build());
responseObserver.onCompleted();
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build()).build();
responseObserver.onNext(eotResponse);
}

@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in mapStream Stream", throwable);
var status = Status.UNKNOWN
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
}

Check warning on line 83 in src/main/java/io/numaproj/numaflow/mapstreamer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Service.java#L78-L83

Added lines #L78 - L83 were not covered by tests

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

// Construct a HandlerDatum from a MapRequest
private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) {
return new HandlerDatum(
d.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
d.getRequest().getWatermark().getSeconds(),
d.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
d.getRequest().getEventTime().getSeconds(),
d.getRequest().getEventTime().getNanos()),
d.getRequest().getHeadersMap()
);
}
}
10 changes: 10 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public void onNext(SinkOuterClass.SinkRequest request) {
responseObserver.onNext(sinkResponse);
});

// send eot response to indicate end of transmission for the batch
SinkOuterClass.SinkResponse eotResponse = SinkOuterClass.SinkResponse
.newBuilder()
.setStatus(SinkOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build())
.build();
responseObserver.onNext(eotResponse);

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());
SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build());

Check warning on line 184 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L184

Added line #L184 was not covered by tests

requestObserver.onCompleted();

Expand All @@ -197,6 +197,9 @@
if (result.getHandshake().getSot()) {
continue;
}
if (result.hasStatus() && result.getStatus().getEot()) {
continue;

Check warning on line 201 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L201

Added line #L201 was not covered by tests
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
Expand Down
6 changes: 3 additions & 3 deletions src/main/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/*
Expand All @@ -44,7 +44,7 @@ message Handshake {
/*
* Status message to indicate the status of the message.
*/
message Status {
message TransmissionStatus {
bool eot = 1;
}

Expand All @@ -61,7 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/**
Expand Down
46 changes: 0 additions & 46 deletions src/main/proto/mapstream/v1/mapstream.proto

This file was deleted.

13 changes: 9 additions & 4 deletions src/main/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ message SinkRequest {
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}
Expand All @@ -54,6 +51,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -77,4 +81,5 @@ message SinkResponse {
}
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testErrorFromUDF() {
inputStreamObserver.onNext(request);
inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testBatchMapHappyPath() {

inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.numaproj.numaflow.mapstreamer;

import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class MapStreamOutputStreamObserver implements StreamObserver<MapOuterClass.MapResponse> {
List<MapOuterClass.MapResponse> mapResponses = new ArrayList<>();
CompletableFuture<Void> done = new CompletableFuture<>();
Integer responseCount;

public MapStreamOutputStreamObserver(Integer responseCount) {
this.responseCount = responseCount;
}

@Override
public void onNext(MapOuterClass.MapResponse mapResponse) {
System.out.println("Received response: " + mapResponse);
mapResponses.add(mapResponse);
if (mapResponses.size() == responseCount) {
done.complete(null);
}
}

@Override
public void onError(Throwable throwable) {
done.completeExceptionally(throwable);
}

@Override
public void onCompleted() {
done.complete(null);
}

public List<MapOuterClass.MapResponse> getMapResponses() {
return mapResponses;
}
}
Loading
Loading