Skip to content

Commit

Permalink
Merge branch 'main' into source-transformer-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
BulkBeing authored Sep 27, 2024
2 parents 8d52eb8 + d998ab2 commit c08137a
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 158 deletions.
5 changes: 3 additions & 2 deletions examples/simple-source/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ update:

.PHONY: image
image: update
cd ../../ && docker buildx build \
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} . --platform linux/amd64,linux/arm64 --push
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
Expand Down
2 changes: 1 addition & 1 deletion examples/sink-log/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WORKDIR /numaflow-rs/examples/sink-log
RUN cargo build --release

# our final base
FROM debian:bookworm AS simple-source
FROM debian:bookworm AS sink-log

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/examples/sink-log/target/release/server .
Expand Down
55 changes: 38 additions & 17 deletions proto/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package sink.v1;

service Sink {
// SinkFn writes the request to a user defined sink.
rpc SinkFn(stream SinkRequest) returns (SinkResponse);
rpc SinkFn(stream SinkRequest) returns (stream SinkResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
Expand All @@ -17,12 +17,32 @@ service Sink {
* SinkRequest represents a request element.
*/
message SinkRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
Expand All @@ -32,6 +52,15 @@ message ReadyResponse {
bool ready = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
Expand All @@ -44,14 +73,6 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
repeated Result results = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
Result result = 1;
optional Handshake handshake = 2;
}
Loading

0 comments on commit c08137a

Please sign in to comment.