Skip to content

Commit

Permalink
update proto
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 20, 2024
1 parent baecc88 commit 86bec02
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The size of the returned responses is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
// Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
// Clients sends n requests and expects n responses.
rpc AckFn(stream AckRequest) returns (stream AckResponse);

// PendingFn returns the number of pending records at the user defined source.
Expand Down Expand Up @@ -97,8 +99,8 @@ message ReadResponse {
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

Expand Down Expand Up @@ -133,6 +135,7 @@ message AckResponse {
}
// Required field holding the result.
Result result = 1;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 2;
}

Expand Down Expand Up @@ -182,4 +185,4 @@ message Offset {
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
int32 partition_id = 2;
}
}
2 changes: 1 addition & 1 deletion src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ where

let cln_token = self.cancellation_token.clone();

// Handle the handshake first
// do the handshake first to let the client know that we are ready to receive read requests.
let handshake_request = sr
.message()
.await
Expand Down

0 comments on commit 86bec02

Please sign in to comment.