diff --git a/examples/simple-source/Makefile b/examples/simple-source/Makefile index b7769e8..80ba511 100644 --- a/examples/simple-source/Makefile +++ b/examples/simple-source/Makefile @@ -10,10 +10,9 @@ update: .PHONY: image image: update - cd ../../ && docker build \ + cd ../../ && docker buildx build \ -f ${DOCKER_FILE_PATH} \ - -t ${IMAGE_REGISTRY} . - @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + -t ${IMAGE_REGISTRY} . --platform linux/amd64,linux/arm64 --push .PHONY: clean clean: diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index 9127211..9e828e0 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -2,14 +2,13 @@ #[tokio::main] async fn main() -> Result<(), Box> { - let source_handle = simple_source::SimpleSource::new("Hello World!".to_string()); + let source_handle = simple_source::SimpleSource::new(); numaflow::source::Server::new(source_handle).start().await } pub(crate) mod simple_source { - use std::{collections::HashSet, sync::RwLock}; - use chrono::Utc; + use std::{collections::HashSet, sync::RwLock}; use tokio::sync::mpsc::Sender; use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer}; @@ -18,14 +17,12 @@ pub(crate) mod simple_source { /// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK /// does not provide a mutable reference as explained in [`Sourcer`] pub(crate) struct SimpleSource { - payload: String, yet_to_ack: RwLock>, } impl SimpleSource { - pub(crate) fn new(payload: String) -> Self { + pub(crate) fn new() -> Self { Self { - payload, yet_to_ack: RwLock::new(HashSet::new()), } } @@ -44,7 +41,7 @@ pub(crate) mod simple_source { let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i); transmitter .send(Message { - value: format!("{}-{}", self.payload, event_time).into_bytes(), + value: format!("{}", i).into_bytes(), event_time, offset: Offset { offset: offset.clone().into_bytes(), diff --git a/proto/source.proto b/proto/source.proto index dcaf253..4c31e15 100644 --- a/proto/source.proto +++ b/proto/source.proto @@ -29,6 +29,14 @@ service Source { rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } +/* + * Handshake message between client and server to indicate the start of transmission. + */ +message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; +} + /* * ReadRequest is the request for reading datum stream from user defined source. */ @@ -43,6 +51,7 @@ message ReadRequest { } // Required field indicating the request. Request request = 1; + optional Handshake handshake = 2; } /* @@ -82,7 +91,7 @@ message ReadResponse { // End of transmission flag. bool eot = 1; Code code = 2; - Error error = 3; + optional Error error = 3; optional string msg = 4; } // Required field holding the result. @@ -90,6 +99,7 @@ message ReadResponse { // Status of the response. Holds the end of transmission flag and the status code. // Status status = 2; + optional Handshake handshake = 3; } /* diff --git a/src/source.rs b/src/source.rs index 94cc841..f472eeb 100644 --- a/src/source.rs +++ b/src/source.rs @@ -107,6 +107,7 @@ where headers: Default::default(), }), status: None, + handshake: None, })) .await .map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?; @@ -119,9 +120,10 @@ where status: Some(proto::read_response::Status { eot: true, code: 0, - error: 0, + error: None, msg: None, }), + handshake: None, })) .await .map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?; @@ -140,7 +142,7 @@ where let (stx, srx) = mpsc::channel::(DEFAULT_CHANNEL_SIZE); // spawn the rx side so that when the handler is invoked, we can stream the handler's read data - // to the gprc response stream. + // to the grpc response stream. let grpc_writer_handle: JoinHandle> = tokio::spawn(async move { Self::write_a_batch(grpc_resp_tx, srx).await }); @@ -189,6 +191,33 @@ where let cln_token = self.cancellation_token.clone(); + // Handle the handshake first + let handshake_request = sr + .message() + .await + .map_err(|e| Status::internal(format!("handshake failed {}", e)))? + .ok_or_else(|| Status::internal("stream closed before handshake"))?; + + if let Some(handshake) = handshake_request.handshake { + grpc_tx + .send(Ok(ReadResponse { + result: None, + status: Some(proto::read_response::Status { + eot: false, + code: 0, + error: None, + msg: None, + }), + handshake: Some(handshake), + })) + .await + .map_err(|e| { + Status::internal(format!("failed to send handshake response {}", e)) + })?; + } else { + return Err(Status::invalid_argument("Handshake not present")); + } + // this is the top-level stream consumer and this task will only exit when stream is closed (which // will happen when server and client are shutting down). let grpc_read_handle: JoinHandle> = tokio::spawn(async move { @@ -528,11 +557,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(50)).await; - // https://github.com/hyperium/tonic/blob/master/examples/src/uds/client.rs - // https://github.com/hyperium/tonic/blob/master/examples/src/uds/client.rs let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? .connect_with_connector(service_fn(move |_: Uri| { - // https://rust-lang.github.io/async-book/03_async_await/01_chapter.html#async-lifetimes let sock_file = sock_file.clone(); async move { Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( @@ -546,11 +572,18 @@ mod tests { // Test read_fn with bidirectional streaming let (read_tx, read_rx) = mpsc::channel(4); + let handshake_request = proto::ReadRequest { + request: None, + handshake: Some(proto::Handshake { sot: true }), + }; + read_tx.send(handshake_request).await.unwrap(); + let read_request = proto::ReadRequest { request: Some(proto::read_request::Request { num_records: 5, timeout_in_ms: 1000, }), + handshake: None, }; read_tx.send(read_request).await.unwrap(); drop(read_tx); // Close the sender to indicate no more requests