Skip to content

Commit

Permalink
send eof response
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Dec 19, 2024
1 parent 00db37e commit 9ca9362
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions numaflow/src/mapstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{error, info};

use crate::error::{Error, ErrorKind};
use crate::servers::map as proto;
use crate::servers::map::TransmissionStatus;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_CHANNEL_SIZE: usize = 1000;
Expand Down Expand Up @@ -356,8 +357,8 @@ async fn run_map_stream<T>(
}
});

// spawn a task to catch the panics from the map_stream task
tokio::spawn({
// Wait for the map_stream_task to complete and handle any errors
let panic_listener = tokio::spawn({
let error_tx = error_tx.clone();
async move {
if let Err(e) = map_stream_task.await {
Expand All @@ -367,9 +368,12 @@ async fn run_map_stream<T>(
))))
.await
.expect("Sending error on channel");
return Err(e);
}
Ok(())
}
});
})
.await;

while let Some(message) = rx.recv().await {
let send_response_result = stream_response_tx
Expand All @@ -391,6 +395,22 @@ async fn run_map_stream<T>(
return;
}
}

// we should not end eof message if the map stream panicked
if panic_listener.is_err() {
return;
}

// send eof message to indicate end of stream
stream_response_tx
.send(Ok(proto::MapResponse {
results: vec![],
id: message_id,
handshake: None,
status: Some(TransmissionStatus { eot: true }),
}))
.await
.expect("Sending eof message to gRPC response channel");
}

/// Manages the gRPC stream. If the request handler is finished, it cancels the token.
Expand Down Expand Up @@ -759,6 +779,11 @@ mod tests {
assert_eq!(msg.value, expected_value.as_bytes());
}

// read eof response
let eof_response = resp.message().await?;
assert!(eof_response.is_some());
assert!(eof_response.unwrap().status.is_some());

drop(tx);
shutdown_tx
.send(())
Expand Down

0 comments on commit 9ca9362

Please sign in to comment.