Skip to content

Commit

Permalink
chore: avoid panics because of watermark code (#2393)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Feb 12, 2025
1 parent 8d51c66 commit c888818
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 92 deletions.
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) async fn start_forwarder(
js_context.clone(),
&config.to_vertex_config,
source_config,
cln_token.clone(),
)
.await?,
)
Expand Down Expand Up @@ -85,6 +86,7 @@ pub(crate) async fn start_forwarder(
js_context.clone(),
edge_config,
&config.to_vertex_config,
cln_token.clone(),
)
.await?,
)
Expand Down Expand Up @@ -119,6 +121,7 @@ pub(crate) async fn start_forwarder(
js_context.clone(),
edge_config,
&config.to_vertex_config,
cln_token.clone(),
)
.await?,
)
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl JetStreamReader {
}

if let Some(watermark_handle) = watermark_handle.as_ref() {
let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await?;
let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await;
message.watermark = Some(watermark);
}

Expand Down
6 changes: 1 addition & 5 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,7 @@ impl JetstreamWriter {
) {
match watermark_handle {
WatermarkHandle::ISB(handle) => {
handle
.publish_watermark(stream, offset)
.await
.map_err(|e| Error::ISB(format!("Failed to update watermark: {:?}", e)))
.expect("Failed to publish watermark");
handle.publish_watermark(stream, offset).await;
}
WatermarkHandle::Source(handle) => {
let input_partition = match &message.offset {
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl Source {
if let Some(watermark_handle) = watermark_handle.as_mut() {
watermark_handle
.generate_and_publish_source_watermark(&messages)
.await?;
.await;
}

// write the messages to downstream.
Expand Down
15 changes: 3 additions & 12 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ impl Tracker {
);

if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.insert_offset(offset, watermark)
.await
.expect("Failed to insert offset");
watermark_handle.insert_offset(offset, watermark).await;
}
}

Expand Down Expand Up @@ -282,10 +279,7 @@ impl Tracker {
.send(ReadAck::Nak)
.expect("Failed to send nak");
if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.remove_offset(offset)
.await
.expect("Failed to remove offset");
watermark_handle.remove_offset(offset).await;
}
}

Expand Down Expand Up @@ -314,10 +308,7 @@ impl Tracker {
ack_send.send(ReadAck::Ack).expect("Failed to send ack");

if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.remove_offset(offset)
.await
.expect("Failed to remove offset");
watermark_handle.remove_offset(offset).await;
}

let Some(ref callback_handler) = self.serving_callback_handler else {
Expand Down
120 changes: 65 additions & 55 deletions rust/numaflow-core/src/watermark/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::{BTreeSet, HashMap};
use std::time::Duration;

use tokio::sync::mpsc::Receiver;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::config::pipeline::isb::Stream;
Expand Down Expand Up @@ -239,6 +240,7 @@ impl ISBWatermarkHandle {
js_context: async_nats::jetstream::Context,
config: &EdgeWatermarkConfig,
to_vertex_configs: &[ToVertexConfig],
cln_token: CancellationToken,
) -> Result<Self> {
let (sender, receiver) = tokio::sync::mpsc::channel(100);

Expand Down Expand Up @@ -272,70 +274,84 @@ impl ISBWatermarkHandle {
tokio::spawn({
let isb_watermark_handle = isb_watermark_handle.clone();
let mut interval_ticker = tokio::time::interval(idle_timeout);
let cln_token = cln_token.clone();
async move {
loop {
// TODO(idle): add cancellation token.
interval_ticker.tick().await;
isb_watermark_handle
.publish_idle_watermark()
.await
.expect("failed to publish idle watermark");
tokio::select! {
_ = interval_ticker.tick() => {
isb_watermark_handle.publish_idle_watermark().await;
}
_ = cln_token.cancelled() => {
break;
}
}
}
}
});

Ok(isb_watermark_handle)
}

/// Fetches the watermark for the given offset.
pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Result<Watermark> {
/// Fetches the watermark for the given offset, if we are not able to compute the watermark we
/// return -1.
pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Watermark {
if let Offset::Int(offset) = offset {
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.sender
if let Err(e) = self
.sender
.send(ISBWaterMarkActorMessage::FetchWatermark { offset, oneshot_tx })
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
{
error!(?e, "Failed to send message");
return Watermark::from_timestamp_millis(-1).expect("failed to parse time");
}

oneshot_rx
.await
.map_err(|_| Error::Watermark("failed to receive response".to_string()))?
match oneshot_rx.await {
Ok(watermark) => watermark.unwrap_or_else(|e| {
error!(?e, "Failed to fetch watermark");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}),
Err(e) => {
error!(?e, "Failed to receive response");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}
}
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot compute watermark");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}
}

/// publish_watermark publishes the watermark for the given stream and offset.
pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) -> Result<()> {
pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::PublishWatermark { offset, stream })
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot publish watermark");
}
}

/// remove_offset removes the offset from the tracked offsets.
pub(crate) async fn remove_offset(&self, offset: Offset) -> Result<()> {
pub(crate) async fn remove_offset(&self, offset: Offset) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::RemoveOffset(offset))
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot remove offset");
}
}

/// insert_offset inserts the offset to the tracked offsets.
pub(crate) async fn insert_offset(
&self,
offset: Offset,
watermark: Option<Watermark>,
) -> Result<()> {
pub(crate) async fn insert_offset(&self, offset: Offset, watermark: Option<Watermark>) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::InsertOffset {
Expand All @@ -345,19 +361,22 @@ impl ISBWatermarkHandle {
),
})
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot insert offset");
}
}

pub(crate) async fn publish_idle_watermark(&self) -> Result<()> {
/// publishes the idle watermark for the downstream idle partitions.
pub(crate) async fn publish_idle_watermark(&self) {
self.sender
.send(ISBWaterMarkActorMessage::CheckAndPublishIdleWatermark)
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
}
}

Expand Down Expand Up @@ -457,6 +476,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -469,8 +489,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.insert_offset(
Expand All @@ -480,8 +499,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(200).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.publish_watermark(
Expand All @@ -495,8 +513,7 @@ mod tests {
partition_idx: 0,
}),
)
.await
.expect("Failed to publish watermark");
.await;

let ot_bucket = js_context
.get_key_value(ot_bucket_name)
Expand Down Expand Up @@ -528,8 +545,7 @@ mod tests {
offset: 1,
partition_idx: 0,
}))
.await
.expect("Failed to remove offset");
.await;

handle
.publish_watermark(
Expand All @@ -543,8 +559,7 @@ mod tests {
partition_idx: 0,
}),
)
.await
.unwrap();
.await;

let mut wmb_found = true;
for _ in 0..10 {
Expand Down Expand Up @@ -641,6 +656,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -658,8 +674,7 @@ mod tests {
offset.clone(),
Some(Watermark::from_timestamp_millis(i * 100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.publish_watermark(
Expand All @@ -673,26 +688,21 @@ mod tests {
partition_idx: 0,
}),
)
.await
.expect("Failed to publish watermark");
.await;

let watermark = handle
.fetch_watermark(Offset::Int(IntOffset {
offset: 3,
partition_idx: 0,
}))
.await
.expect("Failed to fetch watermark");
.await;

if watermark.timestamp_millis() != -1 {
fetched_watermark = watermark.timestamp_millis();
break;
}
sleep(Duration::from_millis(10)).await;
handle
.remove_offset(offset.clone())
.await
.expect("Failed to insert offset");
handle.remove_offset(offset.clone()).await;
}

assert_ne!(fetched_watermark, -1);
Expand Down Expand Up @@ -792,6 +802,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -806,8 +817,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(i * 100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;
}

// Wait for the idle timeout to trigger
Expand Down
Loading

0 comments on commit c888818

Please sign in to comment.