diff --git a/proto/source.proto b/proto/source.proto index e32b63c..8878ac6 100644 --- a/proto/source.proto +++ b/proto/source.proto @@ -7,9 +7,10 @@ package source.v1; service Source { // Read returns a stream of datum responses. - // The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest. - // If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list). + // The size of the returned responses is less than or equal to the num_records specified in each ReadRequest. + // If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list). // The server will continue to read and respond to subsequent ReadRequests until the client closes the stream. + // Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true. rpc ReadFn(stream ReadRequest) returns (stream ReadResponse); // AckFn acknowledges a stream of datum offsets. @@ -17,6 +18,7 @@ service Source { // The caller (numa) expects the AckFn to be successful, and it does not expect any errors. // If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, // then it is best to crash because there are no other retry mechanisms possible. + // Clients sends n requests and expects n responses. rpc AckFn(stream AckRequest) returns (stream AckResponse); // PendingFn returns the number of pending records at the user defined source. @@ -97,8 +99,8 @@ message ReadResponse { // Required field holding the result. Result result = 1; // Status of the response. Holds the end of transmission flag and the status code. - // Status status = 2; + // Handshake message between client and server to indicate the start of transmission. optional Handshake handshake = 3; } @@ -133,6 +135,7 @@ message AckResponse { } // Required field holding the result. Result result = 1; + // Handshake message between client and server to indicate the start of transmission. optional Handshake handshake = 2; } @@ -182,4 +185,4 @@ message Offset { // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. int32 partition_id = 2; -} +} \ No newline at end of file diff --git a/src/source.rs b/src/source.rs index 2187275..977ec15 100644 --- a/src/source.rs +++ b/src/source.rs @@ -190,7 +190,7 @@ where let cln_token = self.cancellation_token.clone(); - // Handle the handshake first + // do the handshake first to let the client know that we are ready to receive read requests. let handshake_request = sr .message() .await