From 6f34d25e7b2e631ac7a132af9d7e038e6b92a521 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Mon, 10 Feb 2025 21:37:17 -0800 Subject: [PATCH] chore: code review 35/39 Signed-off-by: Vigith Maurice --- rust/numaflow-core/src/watermark/idle/isb.rs | 6 +++--- rust/numaflow-core/src/watermark/isb.rs | 4 +++- rust/numaflow-core/src/watermark/source.rs | 12 +++++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/rust/numaflow-core/src/watermark/idle/isb.rs b/rust/numaflow-core/src/watermark/idle/isb.rs index 3fef002221..eddd2d5325 100644 --- a/rust/numaflow-core/src/watermark/idle/isb.rs +++ b/rust/numaflow-core/src/watermark/idle/isb.rs @@ -88,7 +88,7 @@ impl ISBIdleDetector { } } - /// resets the stream's idle metadata by updating the last published time and resets the ctrl + /// resets the stream's idle metadata by updating the last published time and resets the ctrl /// message offset. It implicitly marks that stream as active. pub(crate) async fn reset_idle(&mut self, stream: &Stream) { let mut write_guard = self @@ -106,8 +106,8 @@ impl ISBIdleDetector { } /// fetches the offset to be used for publishing the idle watermark. Only a WMB can be used - /// to send idle watermark, hence if not WMB's are published, we publish an WMB and return its - /// offset, or return the current "active" WMB's offset. + /// to send idle watermark, hence if not WMB's are published, we publish an WMB and return its + /// offset, or return the current "active" WMB's offset. pub(crate) async fn fetch_idle_offset(&self, stream: &Stream) -> crate::error::Result { let idle_state = { let read_guard = self diff --git a/rust/numaflow-core/src/watermark/isb.rs b/rust/numaflow-core/src/watermark/isb.rs index 5cbf0c85b5..1af4dfa290 100644 --- a/rust/numaflow-core/src/watermark/isb.rs +++ b/rust/numaflow-core/src/watermark/isb.rs @@ -186,7 +186,9 @@ impl ISBWatermarkActor { self.publisher .publish_watermark(stream, offset, min_wm.timestamp_millis(), true) .await; - self.idle_manager.update_idle_metadata(&stream, offset).await; + self.idle_manager + .update_idle_metadata(&stream, offset) + .await; } } } diff --git a/rust/numaflow-core/src/watermark/source.rs b/rust/numaflow-core/src/watermark/source.rs index 5bf5d6bfc4..f84d1b0926 100644 --- a/rust/numaflow-core/src/watermark/source.rs +++ b/rust/numaflow-core/src/watermark/source.rs @@ -179,7 +179,9 @@ impl SourceWatermarkActor { } // mark the vertex and partition as idle, since we published the idle watermark - self.isb_idle_manager.update_idle_metadata(&stream, offset).await; + self.isb_idle_manager + .update_idle_metadata(&stream, offset) + .await; } } @@ -217,7 +219,9 @@ impl SourceWatermarkActor { ) .await; } - self.isb_idle_manager.update_idle_metadata(stream, offset).await; + self.isb_idle_manager + .update_idle_metadata(stream, offset) + .await; } // clear the cache since we published the idle watermarks self.active_input_partitions.clear(); @@ -340,7 +344,9 @@ impl SourceWatermarkHandle { pub(crate) async fn publish_source_idle_watermark(&self, partitions: Vec) { self.sender - .send(SourceActorMessage::CheckAndPublishSourceIdleWatermark { source_partitions: partitions }) + .send(SourceActorMessage::CheckAndPublishSourceIdleWatermark { + source_partitions: partitions, + }) .await .map_err(|_| Error::Watermark("failed to send message".to_string())) .expect("failed to send message");