Skip to content

Commit

Permalink
Handle stream errors and shutdown gRPC server
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Sep 25, 2024
1 parent d98dd6d commit 8d52eb8
Showing 1 changed file with 66 additions and 50 deletions.
116 changes: 66 additions & 50 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::info;
use tracing::{error, info};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock";
Expand Down Expand Up @@ -237,7 +237,6 @@ where
let mut stream = request.into_inner();
let handler = Arc::clone(&self.handler);

// tx (read from client), rx (write to client) pair for gRPC response
let (tx, rx) =
mpsc::channel::<Result<SourceTransformResponse, Status>>(DEFAULT_CHANNEL_SIZE);

Expand All @@ -260,60 +259,77 @@ where
return Err(Status::invalid_argument("Handshake not present"));
}

let shutdown_tx = self.shutdown_tx.clone();
let cancellation_token = self.cancellation_token.clone();

let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
loop {
tokio::select! {
transform_request = stream.message() => {
let transform_request = transform_request.map_err(|e| SourceTransformerError(ErrorKind::InternalError(e.to_string())))?
.ok_or_else(||SourceTransformerError(ErrorKind::InternalError("Stream closed".to_string())))?;

let Some(request) = transform_request.request else {
return Err(SourceTransformerError(ErrorKind::InternalError("Transform request can not be none".to_string())));
};

let message_id = request.id.clone();
let handler_input = SourceTransformRequest{
keys: request.keys,
value: request.value,
watermark: utc_from_timestamp(request.watermark),
eventtime: utc_from_timestamp(request.event_time),
headers: request.headers
};

let handler = handler.clone();
// let messages = handler.transform(handler_input).await;
let udf_tranform_task = tokio::spawn(async move { handler.transform(handler_input).await });
let messages = tokio::select! {
result = udf_tranform_task => {
match result {
Ok(messages) => messages,
Err(e) => {
tracing::error!("Failed to run transform function: {e:?}");
// Send a shutdown signal to the server to do a graceful shutdown because there was
// a panic in the handler.
shutdown_tx.send(()).await.expect("Sending shutdown signal to gRPC server");
return Err(SourceTransformerError(ErrorKind::UserDefinedError("panic in transform UDF".to_string())));
let handle: JoinHandle<Result<(), Error>> = tokio::spawn({
let shutdown_tx = self.shutdown_tx.clone();
let cancellation_token = self.cancellation_token.clone();
let tx = tx.clone();
async move {
loop {
tokio::select! {
transform_request = stream.message() => {
let transform_request = transform_request.map_err(|e| SourceTransformerError(ErrorKind::InternalError(e.to_string())))?
.ok_or_else(||SourceTransformerError(ErrorKind::InternalError("Stream closed".to_string())))?;

let Some(request) = transform_request.request else {
return Err(SourceTransformerError(ErrorKind::InternalError("Transform request can not be none".to_string())));
};

let message_id = request.id.clone();
let handler_input = SourceTransformRequest{
keys: request.keys,
value: request.value,
watermark: utc_from_timestamp(request.watermark),
eventtime: utc_from_timestamp(request.event_time),
headers: request.headers
};

let handler = handler.clone();
// let messages = handler.transform(handler_input).await;
let udf_tranform_task = tokio::spawn(async move { handler.transform(handler_input).await });
let messages = tokio::select! {
result = udf_tranform_task => {
match result {
Ok(messages) => messages,
Err(e) => {
tracing::error!("Failed to run transform function: {e:?}");
// Send a shutdown signal to the server to do a graceful shutdown because there was
// a panic in the handler.
shutdown_tx.send(()).await.expect("Sending shutdown signal to gRPC server");
return Err(SourceTransformerError(ErrorKind::UserDefinedError("panic in transform UDF".to_string())));
}
}
}
}
};
tx.send(Ok(SourceTransformResponse{
results: messages.into_iter().map(|msg| msg.into()).collect(),
id: message_id,
handshake: None,
})).await.expect("sending messages to the client over gRPC channel");
};
tx.send(Ok(SourceTransformResponse{
results: messages.into_iter().map(|msg| msg.into()).collect(),
id: message_id,
handshake: None,
})).await.expect("sending messages to the client over gRPC channel");

}
_ = cancellation_token.cancelled() => {
info!("Cancellation token is cancelled, shutting down");
break;
}
_ = cancellation_token.cancelled() => {
info!("Cancellation token is cancelled, shutting down");
break;
}
}
}
Ok(())
}
Ok(())
});

let shutdown_tx = self.shutdown_tx.clone();
tokio::spawn(async move {
let Err(e) = handle.await else {
return;
};
error!("Shutting down gRPC channel: {e:?}");
tx.send(Err(Status::internal(e.to_string())))
.await
.expect("Sending error message to gRPC response channel");
shutdown_tx
.send(())
.await
.expect("Writing to shutdown channel");
});

Ok(Response::new(ReceiverStream::new(rx)))
Expand Down

0 comments on commit 8d52eb8

Please sign in to comment.