Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: avoid panics because of watermark code #2393

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub(crate) async fn start_forwarder(
js_context.clone(),
edge_config,
&config.to_vertex_config,
cln_token.clone(),
)
.await?,
)
Expand Down Expand Up @@ -119,6 +120,7 @@ pub(crate) async fn start_forwarder(
js_context.clone(),
edge_config,
&config.to_vertex_config,
cln_token.clone(),
)
.await?,
)
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl JetStreamReader {
}

if let Some(watermark_handle) = watermark_handle.as_ref() {
let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await?;
let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await;
message.watermark = Some(watermark);
}

Expand Down
6 changes: 1 addition & 5 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,7 @@ impl JetstreamWriter {
) {
match watermark_handle {
WatermarkHandle::ISB(handle) => {
handle
.publish_watermark(stream, offset)
.await
.map_err(|e| Error::ISB(format!("Failed to update watermark: {:?}", e)))
.expect("Failed to publish watermark");
handle.publish_watermark(stream, offset).await;
}
WatermarkHandle::Source(handle) => {
let input_partition = match &message.offset {
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl Source {
if let Some(watermark_handle) = watermark_handle.as_mut() {
watermark_handle
.generate_and_publish_source_watermark(&messages)
.await?;
.await;
}

// write the messages to downstream.
Expand Down
15 changes: 3 additions & 12 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ impl Tracker {
);

if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.insert_offset(offset, watermark)
.await
.expect("Failed to insert offset");
watermark_handle.insert_offset(offset, watermark).await;
}
}

Expand Down Expand Up @@ -282,10 +279,7 @@ impl Tracker {
.send(ReadAck::Nak)
.expect("Failed to send nak");
if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.remove_offset(offset)
.await
.expect("Failed to remove offset");
watermark_handle.remove_offset(offset).await;
}
}

Expand Down Expand Up @@ -314,10 +308,7 @@ impl Tracker {
ack_send.send(ReadAck::Ack).expect("Failed to send ack");

if let Some(watermark_handle) = &self.watermark_handle {
watermark_handle
.remove_offset(offset)
.await
.expect("Failed to remove offset");
watermark_handle.remove_offset(offset).await;
}

let Some(ref callback_handler) = self.serving_callback_handler else {
Expand Down
120 changes: 65 additions & 55 deletions rust/numaflow-core/src/watermark/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::{BTreeSet, HashMap};
use std::time::Duration;

use tokio::sync::mpsc::Receiver;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::config::pipeline::isb::Stream;
Expand Down Expand Up @@ -239,6 +240,7 @@ impl ISBWatermarkHandle {
js_context: async_nats::jetstream::Context,
config: &EdgeWatermarkConfig,
to_vertex_configs: &[ToVertexConfig],
cln_token: CancellationToken,
) -> Result<Self> {
let (sender, receiver) = tokio::sync::mpsc::channel(100);

Expand Down Expand Up @@ -272,70 +274,84 @@ impl ISBWatermarkHandle {
tokio::spawn({
let isb_watermark_handle = isb_watermark_handle.clone();
let mut interval_ticker = tokio::time::interval(idle_timeout);
let cln_token = cln_token.clone();
async move {
loop {
// TODO(idle): add cancellation token.
interval_ticker.tick().await;
isb_watermark_handle
.publish_idle_watermark()
.await
.expect("failed to publish idle watermark");
tokio::select! {
_ = interval_ticker.tick() => {
isb_watermark_handle.publish_idle_watermark().await;
}
_ = cln_token.cancelled() => {
break;
}
}
}
}
});

Ok(isb_watermark_handle)
}

/// Fetches the watermark for the given offset.
pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Result<Watermark> {
/// Fetches the watermark for the given offset, if we are not able to compute the watermark we
/// return -1.
pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Watermark {
if let Offset::Int(offset) = offset {
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.sender
if let Err(e) = self
.sender
.send(ISBWaterMarkActorMessage::FetchWatermark { offset, oneshot_tx })
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
{
error!(?e, "Failed to send message");
return Watermark::from_timestamp_millis(-1).expect("failed to parse time");
}

oneshot_rx
.await
.map_err(|_| Error::Watermark("failed to receive response".to_string()))?
match oneshot_rx.await {
Ok(watermark) => watermark.unwrap_or_else(|e| {
error!(?e, "Failed to fetch watermark");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}),
Err(e) => {
error!(?e, "Failed to receive response");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}
}
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot compute watermark");
Watermark::from_timestamp_millis(-1).expect("failed to parse time")
}
}

/// publish_watermark publishes the watermark for the given stream and offset.
pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) -> Result<()> {
pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::PublishWatermark { offset, stream })
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot publish watermark");
}
}

/// remove_offset removes the offset from the tracked offsets.
pub(crate) async fn remove_offset(&self, offset: Offset) -> Result<()> {
pub(crate) async fn remove_offset(&self, offset: Offset) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::RemoveOffset(offset))
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot remove offset");
}
}

/// insert_offset inserts the offset to the tracked offsets.
pub(crate) async fn insert_offset(
&self,
offset: Offset,
watermark: Option<Watermark>,
) -> Result<()> {
pub(crate) async fn insert_offset(&self, offset: Offset, watermark: Option<Watermark>) {
if let Offset::Int(offset) = offset {
self.sender
.send(ISBWaterMarkActorMessage::InsertOffset {
Expand All @@ -345,19 +361,22 @@ impl ISBWatermarkHandle {
),
})
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
} else {
Err(Error::Watermark("invalid offset type".to_string()))
error!(?offset, "Invalid offset type, cannot insert offset");
}
}

pub(crate) async fn publish_idle_watermark(&self) -> Result<()> {
/// publishes the idle watermark for the downstream idle partitions.
pub(crate) async fn publish_idle_watermark(&self) {
self.sender
.send(ISBWaterMarkActorMessage::CheckAndPublishIdleWatermark)
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))?;
Ok(())
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
}
}

Expand Down Expand Up @@ -457,6 +476,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -469,8 +489,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.insert_offset(
Expand All @@ -480,8 +499,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(200).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.publish_watermark(
Expand All @@ -495,8 +513,7 @@ mod tests {
partition_idx: 0,
}),
)
.await
.expect("Failed to publish watermark");
.await;

let ot_bucket = js_context
.get_key_value(ot_bucket_name)
Expand Down Expand Up @@ -528,8 +545,7 @@ mod tests {
offset: 1,
partition_idx: 0,
}))
.await
.expect("Failed to remove offset");
.await;

handle
.publish_watermark(
Expand All @@ -543,8 +559,7 @@ mod tests {
partition_idx: 0,
}),
)
.await
.unwrap();
.await;

let mut wmb_found = true;
for _ in 0..10 {
Expand Down Expand Up @@ -641,6 +656,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -658,8 +674,7 @@ mod tests {
offset.clone(),
Some(Watermark::from_timestamp_millis(i * 100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;

handle
.publish_watermark(
Expand All @@ -673,26 +688,21 @@ mod tests {
partition_idx: 0,
}),
)
.await
.expect("Failed to publish watermark");
.await;

let watermark = handle
.fetch_watermark(Offset::Int(IntOffset {
offset: 3,
partition_idx: 0,
}))
.await
.expect("Failed to fetch watermark");
.await;

if watermark.timestamp_millis() != -1 {
fetched_watermark = watermark.timestamp_millis();
break;
}
sleep(Duration::from_millis(10)).await;
handle
.remove_offset(offset.clone())
.await
.expect("Failed to insert offset");
handle.remove_offset(offset.clone()).await;
}

assert_ne!(fetched_watermark, -1);
Expand Down Expand Up @@ -792,6 +802,7 @@ mod tests {
},
conditions: None,
}],
CancellationToken::new(),
)
.await
.expect("Failed to create ISBWatermarkHandle");
Expand All @@ -806,8 +817,7 @@ mod tests {
}),
Some(Watermark::from_timestamp_millis(i * 100).unwrap()),
)
.await
.expect("Failed to insert offset");
.await;
}

// Wait for the idle timeout to trigger
Expand Down
Loading
Loading