diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index 9029775e15..1f8bebe6e7 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -51,6 +51,7 @@ pub(crate) async fn start_forwarder( js_context.clone(), &config.to_vertex_config, source_config, + cln_token.clone(), ) .await?, ) @@ -85,6 +86,7 @@ pub(crate) async fn start_forwarder( js_context.clone(), edge_config, &config.to_vertex_config, + cln_token.clone(), ) .await?, ) @@ -119,6 +121,7 @@ pub(crate) async fn start_forwarder( js_context.clone(), edge_config, &config.to_vertex_config, + cln_token.clone(), ) .await?, ) diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 036e0741b6..ae9b22d8ad 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -220,7 +220,7 @@ impl JetStreamReader { } if let Some(watermark_handle) = watermark_handle.as_ref() { - let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await?; + let watermark = watermark_handle.fetch_watermark(message.offset.clone()).await; message.watermark = Some(watermark); } diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 6067ec3565..03252f1d5f 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -469,11 +469,7 @@ impl JetstreamWriter { ) { match watermark_handle { WatermarkHandle::ISB(handle) => { - handle - .publish_watermark(stream, offset) - .await - .map_err(|e| Error::ISB(format!("Failed to update watermark: {:?}", e))) - .expect("Failed to publish watermark"); + handle.publish_watermark(stream, offset).await; } WatermarkHandle::Source(handle) => { let input_partition = match &message.offset { diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index f7e1cc1fc9..40e3e49a66 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -398,7 +398,7 @@ impl Source { if let Some(watermark_handle) = watermark_handle.as_mut() { watermark_handle .generate_and_publish_source_watermark(&messages) - .await?; + .await; } // write the messages to downstream. diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index 286b91ae61..3637708f1e 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -217,10 +217,7 @@ impl Tracker { ); if let Some(watermark_handle) = &self.watermark_handle { - watermark_handle - .insert_offset(offset, watermark) - .await - .expect("Failed to insert offset"); + watermark_handle.insert_offset(offset, watermark).await; } } @@ -282,10 +279,7 @@ impl Tracker { .send(ReadAck::Nak) .expect("Failed to send nak"); if let Some(watermark_handle) = &self.watermark_handle { - watermark_handle - .remove_offset(offset) - .await - .expect("Failed to remove offset"); + watermark_handle.remove_offset(offset).await; } } @@ -314,10 +308,7 @@ impl Tracker { ack_send.send(ReadAck::Ack).expect("Failed to send ack"); if let Some(watermark_handle) = &self.watermark_handle { - watermark_handle - .remove_offset(offset) - .await - .expect("Failed to remove offset"); + watermark_handle.remove_offset(offset).await; } let Some(ref callback_handler) = self.serving_callback_handler else { diff --git a/rust/numaflow-core/src/watermark/isb.rs b/rust/numaflow-core/src/watermark/isb.rs index b1581d9d6e..4467e97c17 100644 --- a/rust/numaflow-core/src/watermark/isb.rs +++ b/rust/numaflow-core/src/watermark/isb.rs @@ -24,6 +24,7 @@ use std::collections::{BTreeSet, HashMap}; use std::time::Duration; use tokio::sync::mpsc::Receiver; +use tokio_util::sync::CancellationToken; use tracing::error; use crate::config::pipeline::isb::Stream; @@ -239,6 +240,7 @@ impl ISBWatermarkHandle { js_context: async_nats::jetstream::Context, config: &EdgeWatermarkConfig, to_vertex_configs: &[ToVertexConfig], + cln_token: CancellationToken, ) -> Result { let (sender, receiver) = tokio::sync::mpsc::channel(100); @@ -272,14 +274,17 @@ impl ISBWatermarkHandle { tokio::spawn({ let isb_watermark_handle = isb_watermark_handle.clone(); let mut interval_ticker = tokio::time::interval(idle_timeout); + let cln_token = cln_token.clone(); async move { loop { - // TODO(idle): add cancellation token. - interval_ticker.tick().await; - isb_watermark_handle - .publish_idle_watermark() - .await - .expect("failed to publish idle watermark"); + tokio::select! { + _ = interval_ticker.tick() => { + isb_watermark_handle.publish_idle_watermark().await; + } + _ = cln_token.cancelled() => { + break; + } + } } } }); @@ -287,55 +292,66 @@ impl ISBWatermarkHandle { Ok(isb_watermark_handle) } - /// Fetches the watermark for the given offset. - pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Result { + /// Fetches the watermark for the given offset, if we are not able to compute the watermark we + /// return -1. + pub(crate) async fn fetch_watermark(&self, offset: Offset) -> Watermark { if let Offset::Int(offset) = offset { let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - self.sender + if let Err(e) = self + .sender .send(ISBWaterMarkActorMessage::FetchWatermark { offset, oneshot_tx }) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; + { + error!(?e, "Failed to send message"); + return Watermark::from_timestamp_millis(-1).expect("failed to parse time"); + } - oneshot_rx - .await - .map_err(|_| Error::Watermark("failed to receive response".to_string()))? + match oneshot_rx.await { + Ok(watermark) => watermark.unwrap_or_else(|e| { + error!(?e, "Failed to fetch watermark"); + Watermark::from_timestamp_millis(-1).expect("failed to parse time") + }), + Err(e) => { + error!(?e, "Failed to receive response"); + Watermark::from_timestamp_millis(-1).expect("failed to parse time") + } + } } else { - Err(Error::Watermark("invalid offset type".to_string())) + error!(?offset, "Invalid offset type, cannot compute watermark"); + Watermark::from_timestamp_millis(-1).expect("failed to parse time") } } /// publish_watermark publishes the watermark for the given stream and offset. - pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) -> Result<()> { + pub(crate) async fn publish_watermark(&self, stream: Stream, offset: Offset) { if let Offset::Int(offset) = offset { self.sender .send(ISBWaterMarkActorMessage::PublishWatermark { offset, stream }) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; - Ok(()) + .unwrap_or_else(|e| { + error!("Failed to send message: {:?}", e); + }); } else { - Err(Error::Watermark("invalid offset type".to_string())) + error!(?offset, "Invalid offset type, cannot publish watermark"); } } /// remove_offset removes the offset from the tracked offsets. - pub(crate) async fn remove_offset(&self, offset: Offset) -> Result<()> { + pub(crate) async fn remove_offset(&self, offset: Offset) { if let Offset::Int(offset) = offset { self.sender .send(ISBWaterMarkActorMessage::RemoveOffset(offset)) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; - Ok(()) + .unwrap_or_else(|e| { + error!("Failed to send message: {:?}", e); + }); } else { - Err(Error::Watermark("invalid offset type".to_string())) + error!(?offset, "Invalid offset type, cannot remove offset"); } } /// insert_offset inserts the offset to the tracked offsets. - pub(crate) async fn insert_offset( - &self, - offset: Offset, - watermark: Option, - ) -> Result<()> { + pub(crate) async fn insert_offset(&self, offset: Offset, watermark: Option) { if let Offset::Int(offset) = offset { self.sender .send(ISBWaterMarkActorMessage::InsertOffset { @@ -345,19 +361,22 @@ impl ISBWatermarkHandle { ), }) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; - Ok(()) + .unwrap_or_else(|e| { + error!("Failed to send message: {:?}", e); + }); } else { - Err(Error::Watermark("invalid offset type".to_string())) + error!(?offset, "Invalid offset type, cannot insert offset"); } } - pub(crate) async fn publish_idle_watermark(&self) -> Result<()> { + /// publishes the idle watermark for the downstream idle partitions. + pub(crate) async fn publish_idle_watermark(&self) { self.sender .send(ISBWaterMarkActorMessage::CheckAndPublishIdleWatermark) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; - Ok(()) + .unwrap_or_else(|e| { + error!("Failed to send message: {:?}", e); + }); } } @@ -457,6 +476,7 @@ mod tests { }, conditions: None, }], + CancellationToken::new(), ) .await .expect("Failed to create ISBWatermarkHandle"); @@ -469,8 +489,7 @@ mod tests { }), Some(Watermark::from_timestamp_millis(100).unwrap()), ) - .await - .expect("Failed to insert offset"); + .await; handle .insert_offset( @@ -480,8 +499,7 @@ mod tests { }), Some(Watermark::from_timestamp_millis(200).unwrap()), ) - .await - .expect("Failed to insert offset"); + .await; handle .publish_watermark( @@ -495,8 +513,7 @@ mod tests { partition_idx: 0, }), ) - .await - .expect("Failed to publish watermark"); + .await; let ot_bucket = js_context .get_key_value(ot_bucket_name) @@ -528,8 +545,7 @@ mod tests { offset: 1, partition_idx: 0, })) - .await - .expect("Failed to remove offset"); + .await; handle .publish_watermark( @@ -543,8 +559,7 @@ mod tests { partition_idx: 0, }), ) - .await - .unwrap(); + .await; let mut wmb_found = true; for _ in 0..10 { @@ -641,6 +656,7 @@ mod tests { }, conditions: None, }], + CancellationToken::new(), ) .await .expect("Failed to create ISBWatermarkHandle"); @@ -658,8 +674,7 @@ mod tests { offset.clone(), Some(Watermark::from_timestamp_millis(i * 100).unwrap()), ) - .await - .expect("Failed to insert offset"); + .await; handle .publish_watermark( @@ -673,26 +688,21 @@ mod tests { partition_idx: 0, }), ) - .await - .expect("Failed to publish watermark"); + .await; let watermark = handle .fetch_watermark(Offset::Int(IntOffset { offset: 3, partition_idx: 0, })) - .await - .expect("Failed to fetch watermark"); + .await; if watermark.timestamp_millis() != -1 { fetched_watermark = watermark.timestamp_millis(); break; } sleep(Duration::from_millis(10)).await; - handle - .remove_offset(offset.clone()) - .await - .expect("Failed to insert offset"); + handle.remove_offset(offset.clone()).await; } assert_ne!(fetched_watermark, -1); @@ -792,6 +802,7 @@ mod tests { }, conditions: None, }], + CancellationToken::new(), ) .await .expect("Failed to create ISBWatermarkHandle"); @@ -806,8 +817,7 @@ mod tests { }), Some(Watermark::from_timestamp_millis(i * 100).unwrap()), ) - .await - .expect("Failed to insert offset"); + .await; } // Wait for the idle timeout to trigger diff --git a/rust/numaflow-core/src/watermark/source.rs b/rust/numaflow-core/src/watermark/source.rs index 760ff78bba..0ee978765c 100644 --- a/rust/numaflow-core/src/watermark/source.rs +++ b/rust/numaflow-core/src/watermark/source.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::time::Duration; use tokio::sync::mpsc::Receiver; +use tokio_util::sync::CancellationToken; use tracing::error; use crate::config::pipeline::isb::Stream; @@ -249,6 +250,7 @@ impl SourceWatermarkHandle { js_context: async_nats::jetstream::Context, to_vertex_configs: &[ToVertexConfig], config: &SourceWatermarkConfig, + cln_token: CancellationToken, ) -> Result { let (sender, receiver) = tokio::sync::mpsc::channel(100); let processor_manager = @@ -284,8 +286,14 @@ impl SourceWatermarkHandle { let mut interval_ticker = tokio::time::interval(idle_timeout); async move { loop { - interval_ticker.tick().await; - source_watermark_handle.publish_isb_idle_watermark().await; + tokio::select! { + _ = interval_ticker.tick() => { + source_watermark_handle.publish_isb_idle_watermark().await; + } + _ = cln_token.cancelled() => { + break; + } + } } } }); @@ -294,10 +302,7 @@ impl SourceWatermarkHandle { } /// Generates and Publishes the source watermark for the given messages. - pub(crate) async fn generate_and_publish_source_watermark( - &self, - messages: &[Message], - ) -> Result<()> { + pub(crate) async fn generate_and_publish_source_watermark(&self, messages: &[Message]) { // we need to build a hash-map of the lowest event time for each partition let partition_to_lowest_event_time = messages.iter().fold(HashMap::new(), |mut acc, message| { @@ -320,9 +325,7 @@ impl SourceWatermarkHandle { map: partition_to_lowest_event_time, }) .await - .map_err(|_| Error::Watermark("failed to send message".to_string()))?; - - Ok(()) + .unwrap_or_else(|e| error!("failed to send message: {:?}", e)); } /// Publishes the watermark for the given input partition on to the ISB of the next vertex. @@ -426,6 +429,7 @@ mod tests { js_context.clone(), Default::default(), &source_config, + CancellationToken::new(), ) .await .expect("Failed to create source watermark handle"); @@ -451,8 +455,7 @@ mod tests { handle .generate_and_publish_source_watermark(&messages) - .await - .expect("Failed to publish source watermark"); + .await; // try getting the value for the processor from the ot bucket to make sure // the watermark is getting published(min event time in the batch), wait until @@ -574,6 +577,7 @@ mod tests { partitions: 1, }], &source_config, + CancellationToken::new(), ) .await .expect("Failed to create source watermark handle"); @@ -613,8 +617,7 @@ mod tests { handle .generate_and_publish_source_watermark(&messages) - .await - .expect("Failed to publish source watermark"); + .await; let offset = Offset::Int(IntOffset { offset: i, @@ -763,6 +766,7 @@ mod tests { to_vertex_bucket_config: vec![to_vertex_bucket_config], idle_config: Some(source_idle_config), }, + CancellationToken::new(), ) .await .expect("Failed to create SourceWatermarkHandle"); @@ -953,6 +957,7 @@ mod tests { to_vertex_bucket_config: vec![to_vertex_bucket_config], idle_config: Some(source_idle_config), }, + CancellationToken::new(), ) .await .expect("Failed to create SourceWatermarkHandle"); @@ -969,8 +974,7 @@ mod tests { // generate some watermarks to make partition active handle .generate_and_publish_source_watermark(&messages) - .await - .expect("Failed to publish source watermark"); + .await; // get ot and hb buckets for source and publish some wmb and heartbeats let ot_bucket = js_context diff --git a/rust/serving/src/config.rs b/rust/serving/src/config.rs index 6b94cb28b5..87585d065c 100644 --- a/rust/serving/src/config.rs +++ b/rust/serving/src/config.rs @@ -9,6 +9,7 @@ use rcgen::{generate_simple_self_signed, Certificate, CertifiedKey, KeyPair}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; +use std::time::Duration; const ENV_NUMAFLOW_SERVING_HOST_IP: &str = "NUMAFLOW_SERVING_HOST_IP"; const ENV_NUMAFLOW_SERVING_APP_PORT: &str = "NUMAFLOW_SERVING_APP_LISTEN_PORT"; @@ -17,6 +18,7 @@ const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT"; pub const DEFAULT_ID_HEADER: &str = "X-Numaflow-Id"; pub const DEFAULT_CALLBACK_URL_HEADER_KEY: &str = "X-Numaflow-Callback-Url"; +pub const DEFAULT_REDIS_TTL_IN_SECS: u32 = 86400; pub fn generate_certs() -> std::result::Result<(Certificate, KeyPair), String> { let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) @@ -41,7 +43,7 @@ impl Default for RedisConfig { retries: 5, retries_duration_millis: 100, // TODO: we might need an option type here. Zero value of u32 can be used instead of None - ttl_secs: Some(86400), + ttl_secs: Some(DEFAULT_REDIS_TTL_IN_SECS), } } } @@ -176,7 +178,10 @@ impl TryFrom> for Settings { // Update redis.addr from source_spec, currently we only support redis as callback storage settings.redis.addr = serving_spec.store.url; - settings.redis.ttl_secs = settings.redis.ttl_secs; + settings.redis.ttl_secs = match serving_spec.store.ttl { + Some(ttl) => Some(Duration::from(ttl).as_secs() as u32), + None => Some(DEFAULT_REDIS_TTL_IN_SECS), + }; if let Some(auth) = serving_spec.auth { let token = auth.token.unwrap(); @@ -247,7 +252,7 @@ mod tests { max_tasks: 50, retries: 5, retries_duration_millis: 100, - ttl_secs: Some(86400), + ttl_secs: Some(DEFAULT_REDIS_TTL_IN_SECS), }, host_ip: "10.2.3.5".into(), api_auth_token: None,