Skip to content

Commit

Permalink
feat: streaming source
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Sep 20, 2024
1 parent 86bec02 commit 58bf810
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
2 changes: 1 addition & 1 deletion examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) mod simple_source {
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
let payload = self.counter.fetch_add(1, Ordering::SeqCst).to_string();
let payload = self.counter.fetch_add(1, Ordering::Relaxed).to_string();
transmitter
.send(Message {
value: payload.into_bytes(),
Expand Down
9 changes: 3 additions & 6 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,7 @@ where
grpc_tx
.send(Ok(ReadResponse {
result: None,
status: Some(proto::read_response::Status {
eot: false,
code: 0,
error: None,
msg: None,
}),
status: None,
handshake: Some(handshake),
}))
.await
Expand Down Expand Up @@ -329,6 +324,8 @@ where
})
.await;

// the return of handler_fn implicitly means that the ack is successful; hence
// we are able to send success. There is no path for failure.
ack_resp_tx
.send(Ok(AckResponse {
result: Some(proto::ack_response::Result { success: Some(()) }),
Expand Down

0 comments on commit 58bf810

Please sign in to comment.