From 58bf81071e728370b97553d50c85bfadc35eaaa6 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 20 Sep 2024 08:36:54 -0700 Subject: [PATCH] feat: streaming source Signed-off-by: Vigith Maurice --- examples/simple-source/src/main.rs | 2 +- src/source.rs | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index f90694e..8e78970 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -42,7 +42,7 @@ pub(crate) mod simple_source { let mut message_offsets = Vec::with_capacity(request.count); for i in 0..request.count { let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i); - let payload = self.counter.fetch_add(1, Ordering::SeqCst).to_string(); + let payload = self.counter.fetch_add(1, Ordering::Relaxed).to_string(); transmitter .send(Message { value: payload.into_bytes(), diff --git a/src/source.rs b/src/source.rs index 977ec15..a36ca96 100644 --- a/src/source.rs +++ b/src/source.rs @@ -201,12 +201,7 @@ where grpc_tx .send(Ok(ReadResponse { result: None, - status: Some(proto::read_response::Status { - eot: false, - code: 0, - error: None, - msg: None, - }), + status: None, handshake: Some(handshake), })) .await @@ -329,6 +324,8 @@ where }) .await; + // the return of handler_fn implicitly means that the ack is successful; hence + // we are able to send success. There is no path for failure. ack_resp_tx .send(Ok(AckResponse { result: Some(proto::ack_response::Result { success: Some(()) }),