From 9ca9362ad511084501520e5a37d40cdcd0cdc9d9 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 20:23:44 +0530 Subject: [PATCH] send eof response Signed-off-by: Yashash H L --- numaflow/src/mapstream.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 5d11eca..6619f83 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -14,6 +14,7 @@ use tracing::{error, info}; use crate::error::{Error, ErrorKind}; use crate::servers::map as proto; +use crate::servers::map::TransmissionStatus; use crate::shared::{self, shutdown_signal, ContainerType}; const DEFAULT_CHANNEL_SIZE: usize = 1000; @@ -356,8 +357,8 @@ async fn run_map_stream( } }); - // spawn a task to catch the panics from the map_stream task - tokio::spawn({ + // Wait for the map_stream_task to complete and handle any errors + let panic_listener = tokio::spawn({ let error_tx = error_tx.clone(); async move { if let Err(e) = map_stream_task.await { @@ -367,9 +368,12 @@ async fn run_map_stream( )))) .await .expect("Sending error on channel"); + return Err(e); } + Ok(()) } - }); + }) + .await; while let Some(message) = rx.recv().await { let send_response_result = stream_response_tx @@ -391,6 +395,22 @@ async fn run_map_stream( return; } } + + // we should not end eof message if the map stream panicked + if panic_listener.is_err() { + return; + } + + // send eof message to indicate end of stream + stream_response_tx + .send(Ok(proto::MapResponse { + results: vec![], + id: message_id, + handshake: None, + status: Some(TransmissionStatus { eot: true }), + })) + .await + .expect("Sending eof message to gRPC response channel"); } /// Manages the gRPC stream. If the request handler is finished, it cancels the token. @@ -759,6 +779,11 @@ mod tests { assert_eq!(msg.value, expected_value.as_bytes()); } + // read eof response + let eof_response = resp.message().await?; + assert!(eof_response.is_some()); + assert!(eof_response.unwrap().status.is_some()); + drop(tx); shutdown_tx .send(())