Skip to content

Commit

Permalink
fix error handling for transformer
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Feb 19, 2025
1 parent e5d6c83 commit 95d85c6
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ui-test: ui-build
./hack/test-ui.sh

.PHONY: image
image: clean dist/$(BINARY_NAME)-linux-$(HOST_ARCH)
image: clean ui-build dist/$(BINARY_NAME)-linux-$(HOST_ARCH)
ifdef GITHUB_ACTIONS
# The binary will be built in a separate Github Actions job
cp -pv numaflow-rs-linux-amd64 dist/numaflow-rs-linux-amd64
Expand Down
16 changes: 5 additions & 11 deletions examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,21 @@ kind: Pipeline
metadata:
name: simple-pipeline
spec:
watermark:
disabled: true
vertices:
- name: in
scale:
min: 1
# A self data generating source
source:
udsource:
container:
image: quay.io/numaio/numaflow-go/source-simple-source:stable
imagePullPolicy: Never
generator:
rpu: 5
duration: 1s
- name: cat
partitions: 2
scale:
min: 1
max: 1
udf:
container:
image: quay.io/numaio/numaflow-go/map-forward-message:stable
imagePullPolicy: Never
builtin:
name: cat # A built-in UDF which simply cats the message
- name: out
scale:
min: 1
Expand Down
1 change: 0 additions & 1 deletion pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func (jr *jetStreamReader) NoAck(ctx context.Context, offsets []isb.Offset) {
if err := o.NoAck(); err != nil {
jr.log.Errorw("Failed to nak JetStream msg", zap.Error(err))
}
jr.log.Infow("nack message with offset", zap.String("offset", o.String()))
}(o)
}
wg.Wait()
Expand Down
1 change: 0 additions & 1 deletion pkg/udf/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) error {
var readOffsets = make([]isb.Offset, len(readMessages))
for idx, m := range readMessages {
readOffsets[idx] = m.ReadOffset
isdf.opts.logger.Infow("Read message with offset", zap.String("offset", m.ReadOffset.String()))
totalBytes += len(m.Payload)
if m.Kind == isb.Data {
dataMessages = append(dataMessages, m)
Expand Down
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ verbose_file_reads = "warn"
# This profile optimizes for runtime performance and small binary size at the expense of longer build times.
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
#[profile.release]
#lto = "fat"
[profile.release]
lto = "fat"

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand Down
8 changes: 1 addition & 7 deletions rust/numaflow-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
Connection(String),

#[error("gRPC Error - {0}")]
Grpc(String),
Grpc(tonic::Status),

#[error("Config Error - {0}")]
Config(String),
Expand Down Expand Up @@ -59,9 +59,3 @@ pub enum Error {
#[error("Watermark Error - {0}")]
Watermark(String),
}

impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
Error::Grpc(status.to_string())
}
}
13 changes: 9 additions & 4 deletions rust/numaflow-core/src/mapper/map/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,17 @@ async fn create_response_stream(

let mut resp_stream = client
.map_fn(Request::new(ReceiverStream::new(read_rx)))
.await?
.await
.map_err(|e| Error::Grpc(e))?
.into_inner();

let handshake_response = resp_stream.message().await?.ok_or(Error::Mapper(
"failed to receive handshake response".to_string(),
))?;
let handshake_response = resp_stream
.message()
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Mapper(
"failed to receive handshake response".to_string(),
))?;

if handshake_response.handshake.map_or(true, |h| !h.sot) {
return Err(Error::Mapper("invalid handshake response".to_string()));
Expand Down
16 changes: 11 additions & 5 deletions rust/numaflow-core/src/sink/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ impl UserDefinedSink {

let mut resp_stream = client
.sink_fn(Request::new(sink_stream))
.await?
.await
.map_err(|e| Error::Grpc(e))?
.into_inner();

// First response from the server will be the handshake response. We need to check if the
// server has accepted the handshake.
let handshake_response = resp_stream.message().await?.ok_or(Error::Sink(
"failed to receive handshake response".to_string(),
))?;
let handshake_response = resp_stream
.message()
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Sink(
"failed to receive handshake response".to_string(),
))?;

// Handshake cannot be None during the initial phase, and it has to set `sot` to true.
if handshake_response.handshake.map_or(true, |h| !h.sot) {
Expand Down Expand Up @@ -112,7 +117,8 @@ impl Sink for UserDefinedSink {
let response = self
.resp_stream
.message()
.await?
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Sink("failed to receive response".to_string()))?;

if response.status.is_some_and(|s| s.eot) {
Expand Down
52 changes: 42 additions & 10 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! [Watermark]: https://numaflow.numaproj.io/core-concepts/watermarks/
use std::sync::Arc;
use tracing::warn;

use numaflow_pulsar::source::PulsarSource;
use numaflow_sqs::source::SQSSource;
Expand Down Expand Up @@ -322,7 +323,7 @@ impl Source {
// this semaphore is used only if read-ahead is disabled. we hold this semaphore to
// make sure we can read only if the current inflight ones are ack'ed.
let semaphore = Arc::new(Semaphore::new(1));

let mut result = Ok(());
loop {
if cln_token.is_cancelled() {
info!("Cancellation token is cancelled. Stopping the source.");
Expand All @@ -343,7 +344,8 @@ impl Source {
Ok(messages) => messages,
Err(e) => {
error!("Error while reading messages: {:?}", e);
return Err(e);
result = Err(e);
break;
}
};

Expand All @@ -356,12 +358,15 @@ impl Source {
if let Some(watermark_handle) = self.watermark_handle.as_mut() {
watermark_handle
.publish_source_idle_watermark(
Self::partitions(self.sender.clone()).await?,
Self::partitions(self.sender.clone())
.await
.unwrap_or_default(),
)
.await;
}
}

let mut offsets = vec![];
let mut ack_batch = Vec::with_capacity(msgs_len);
for message in messages.iter() {
let (resp_ack_tx, resp_ack_rx) = oneshot::channel();
Expand All @@ -371,7 +376,8 @@ impl Source {
self.tracker_handle.insert(message, resp_ack_tx).await?;

// store the ack one shot in the batch to invoke ack later.
ack_batch.push((offset, resp_ack_rx));
ack_batch.push((offset.clone(), resp_ack_rx));
offsets.push(offset);
}

// start a background task to invoke ack on the source for the offsets that are acked.
Expand All @@ -392,7 +398,26 @@ impl Source {
// be streaming because transformation should be fast operation.
let messages = match self.transformer.as_mut() {
None => messages,
Some(transformer) => transformer.transform_batch(messages).await?,
Some(transformer) => match transformer
.transform_batch(messages, cln_token.clone())
.await
{
Ok(messages) => messages,
Err(e) => {
error!(
?e,
"Error while transforming messages, sending nack to the batch"
);
for offset in offsets {
self.tracker_handle
.discard(offset)
.await
.expect("tracker operations should never fail");
}
result = Err(e);
break;
}
},
};

if let Some(watermark_handle) = self.watermark_handle.as_mut() {
Expand All @@ -403,12 +428,19 @@ impl Source {

// write the messages to downstream.
for message in messages {
messages_tx.send(message).await.map_err(|e| {
Error::Source(format!("failed to send message to downstream {:?}", e))
})?;
messages_tx
.send(message)
.await
.expect("send should not fail");
}
}
Ok(())
info!(status=?result, "Source stopped, waiting for inflight messages to be acked");
let _permit = Arc::clone(&semaphore)
.acquire_owned()
.await
.expect("acquiring permit should not fail");
info!("All inflight messages are acked. Source stopped.");
result
});
Ok((ReceiverStream::new(messages_rx), handle))
}
Expand All @@ -429,7 +461,7 @@ impl Source {
offsets_to_ack.push(offset);
}
Ok(ReadAck::Nak) => {
error!(?offset, "Nak received for offset");
warn!(?offset, "Nak received for offset");
}
Err(e) => {
error!(?offset, err=?e, "Error receiving ack for offset");
Expand Down
44 changes: 32 additions & 12 deletions rust/numaflow-core/src/source/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,19 @@ impl UserDefinedSourceRead {

let mut resp_stream = client
.read_fn(Request::new(read_stream))
.await?
.await
.map_err(|e| Error::Grpc(e))?
.into_inner();

// first response from the server will be the handshake response. We need to check if the
// server has accepted the handshake.
let handshake_response = resp_stream.message().await?.ok_or(Error::Source(
"failed to receive handshake response".to_string(),
))?;
let handshake_response = resp_stream
.message()
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Source(
"failed to receive handshake response".to_string(),
))?;
// handshake cannot to None during the initial phase, and it has to set `sot` to true.
if handshake_response.handshake.map_or(true, |h| !h.sot) {
return Err(Error::Source("invalid handshake response".to_string()));
Expand Down Expand Up @@ -178,7 +183,12 @@ impl SourceReader for UserDefinedSourceRead {

let mut messages = Vec::with_capacity(self.num_records);

while let Some(response) = self.resp_stream.message().await? {
while let Some(response) = self
.resp_stream
.message()
.await
.map_err(|e| Error::Grpc(e))?
{
if response.status.is_some_and(|status| status.eot) {
break;
}
Expand Down Expand Up @@ -234,14 +244,22 @@ impl UserDefinedSourceAck {
.await
.map_err(|e| Error::Source(format!("failed to send ack handshake request: {}", e)))?;

let mut ack_resp_stream = client.ack_fn(Request::new(ack_stream)).await?.into_inner();
let mut ack_resp_stream = client
.ack_fn(Request::new(ack_stream))
.await
.map_err(|e| Error::Grpc(e))?
.into_inner();

// first response from the server will be the handshake response. We need to check if the
// server has accepted the handshake.
let ack_handshake_response = ack_resp_stream.message().await?.ok_or(Error::Source(
"failed to receive ack handshake response".to_string(),
))?;
// handshake cannot to None during the initial phase and it has to set `sot` to true.
let ack_handshake_response = ack_resp_stream
.message()
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Source(
"failed to receive ack handshake response".to_string(),
))?;
// handshake cannot to None during the initial phase, and it has to set `sot` to true.
if ack_handshake_response.handshake.map_or(true, |h| !h.sot) {
return Err(Error::Source("invalid ack handshake response".to_string()));
}
Expand All @@ -268,7 +286,8 @@ impl SourceAcker for UserDefinedSourceAck {
let _ = self
.ack_resp_stream
.message()
.await?
.await
.map_err(|e| Error::Grpc(e))?
.ok_or(Error::Source("failed to receive ack response".to_string()))?;

Ok(())
Expand All @@ -291,7 +310,8 @@ impl LagReader for UserDefinedSourceLagReader {
Ok(self
.source_client
.pending_fn(Request::new(()))
.await?
.await
.map_err(|e| Error::Grpc(e))?
.into_inner()
.result
.map(|r| r.count as usize))
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};
use serving::callback::CallbackHandler;
use serving::{DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER};
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info};
use tracing::error;

/// TrackerEntry represents the state of a tracked message.
#[derive(Debug)]
Expand Down
Loading

0 comments on commit 95d85c6

Please sign in to comment.