Skip to content

Commit

Permalink
chore: code review 35/39
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Feb 11, 2025
1 parent cd512cb commit 6f34d25
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
6 changes: 3 additions & 3 deletions rust/numaflow-core/src/watermark/idle/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ISBIdleDetector {
}
}

/// resets the stream's idle metadata by updating the last published time and resets the ctrl
/// resets the stream's idle metadata by updating the last published time and resets the ctrl
/// message offset. It implicitly marks that stream as active.
pub(crate) async fn reset_idle(&mut self, stream: &Stream) {
let mut write_guard = self
Expand All @@ -106,8 +106,8 @@ impl ISBIdleDetector {
}

/// fetches the offset to be used for publishing the idle watermark. Only a WMB can be used
/// to send idle watermark, hence if not WMB's are published, we publish an WMB and return its
/// offset, or return the current "active" WMB's offset.
/// to send idle watermark, hence if not WMB's are published, we publish an WMB and return its
/// offset, or return the current "active" WMB's offset.
pub(crate) async fn fetch_idle_offset(&self, stream: &Stream) -> crate::error::Result<i64> {
let idle_state = {
let read_guard = self
Expand Down
4 changes: 3 additions & 1 deletion rust/numaflow-core/src/watermark/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ impl ISBWatermarkActor {
self.publisher
.publish_watermark(stream, offset, min_wm.timestamp_millis(), true)
.await;
self.idle_manager.update_idle_metadata(&stream, offset).await;
self.idle_manager
.update_idle_metadata(&stream, offset)
.await;
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions rust/numaflow-core/src/watermark/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ impl SourceWatermarkActor {
}

// mark the vertex and partition as idle, since we published the idle watermark
self.isb_idle_manager.update_idle_metadata(&stream, offset).await;
self.isb_idle_manager
.update_idle_metadata(&stream, offset)
.await;
}
}

Expand Down Expand Up @@ -217,7 +219,9 @@ impl SourceWatermarkActor {
)
.await;
}
self.isb_idle_manager.update_idle_metadata(stream, offset).await;
self.isb_idle_manager
.update_idle_metadata(stream, offset)
.await;
}
// clear the cache since we published the idle watermarks
self.active_input_partitions.clear();
Expand Down Expand Up @@ -340,7 +344,9 @@ impl SourceWatermarkHandle {

pub(crate) async fn publish_source_idle_watermark(&self, partitions: Vec<u16>) {
self.sender
.send(SourceActorMessage::CheckAndPublishSourceIdleWatermark { source_partitions: partitions })
.send(SourceActorMessage::CheckAndPublishSourceIdleWatermark {
source_partitions: partitions,
})
.await
.map_err(|_| Error::Watermark("failed to send message".to_string()))
.expect("failed to send message");
Expand Down

0 comments on commit 6f34d25

Please sign in to comment.