Skip to content

Commit

Permalink
chore: code review 39/39
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Feb 11, 2025
1 parent 3d28526 commit 10e997a
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 43 deletions.
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use crate::shared::grpc::prost_timestamp_from_utc;
use crate::Error;
use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use prost::Message as ProtoMessage;
use serde::{Deserialize, Serialize};

use crate::shared::grpc::prost_timestamp_from_utc;
use crate::Error;

const DROP: &str = "U+005C__DROP__";

/// The message that is passed from the source to the sink.
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/shared/forward.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use numaflow_models::models::ForwardConditions;
use std::hash::{DefaultHasher, Hasher};
use std::sync::Arc;

use numaflow_models::models::ForwardConditions;

/// Checks if the message should to written to downstream vertex based the conditions
/// and message tags. If not tags are provided by there are edge conditions present, we will
/// still forward to all vertices.
Expand Down
20 changes: 12 additions & 8 deletions rust/numaflow-core/src/watermark/idle/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
//! Fetch the `min(wm(Head Idle Offset), wm(smallest offset of inflight messages))` and publish as
//! idle.
use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::ToVertexConfig;
use bytes::BytesMut;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use bytes::BytesMut;
use chrono::{DateTime, Utc};

use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::ToVertexConfig;

/// State of each partition in the ISB. It has the information required to identify whether the
/// partition is idling or not.
#[derive(Clone)]
Expand Down Expand Up @@ -191,14 +193,16 @@ impl ISBIdleDetector {

#[cfg(test)]
mod tests {
use std::time::Duration;

use async_nats::jetstream;
use async_nats::jetstream::stream;
use tokio::time::sleep;

use super::*;
use crate::config::pipeline::isb::BufferWriterConfig;
use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::ToVertexConfig;
use async_nats::jetstream;
use async_nats::jetstream::stream;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::test]
async fn test_mark_active() {
Expand Down
8 changes: 5 additions & 3 deletions rust/numaflow-core/src/watermark/idle/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
//! The current watermark + increment_by (provided by the user). We will ensure that the
//! increment will never cross `(time.now() - max_delay)`.
use crate::config::pipeline::watermark::IdleConfig;
use chrono::{DateTime, Utc};
use tracing::warn;

use crate::config::pipeline::watermark::IdleConfig;

/// Responsible for detecting the idle state of the source and publishing idle watermarks.
pub(crate) struct SourceIdleDetector {
config: IdleConfig,
Expand Down Expand Up @@ -76,7 +77,7 @@ impl SourceIdleDetector {
// this could happen if step interval and increment-by are set aggressively
let now = Utc::now().timestamp_millis();
if idle_wm > now {
warn!(?idle_wm, "idle config is aggressive (reduce step/increment-by), wm > now(), resetting to now");
warn!(?idle_wm, "idle config is aggressive (reduce step/increment-by), wm > now(), resetting to now()");
idle_wm = now;
}

Expand All @@ -87,9 +88,10 @@ impl SourceIdleDetector {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::config::pipeline::watermark::IdleConfig;
use std::time::Duration;

#[test]
fn test_is_source_idling() {
Expand Down
12 changes: 7 additions & 5 deletions rust/numaflow-core/src/watermark/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
//! ```text
//! (Write to ISB) -------> (Publish Watermark) ------> (Remove tracked Offset)
//! ```
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap};
use std::time::Duration;

use tokio::sync::mpsc::Receiver;
use tracing::error;

use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::watermark::EdgeWatermarkConfig;
use crate::config::pipeline::ToVertexConfig;
Expand All @@ -29,11 +36,6 @@ use crate::watermark::isb::wm_fetcher::ISBWatermarkFetcher;
use crate::watermark::isb::wm_publisher::ISBWatermarkPublisher;
use crate::watermark::processor::manager::ProcessorManager;
use crate::watermark::wmb::Watermark;
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap};
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tracing::error;

pub(crate) mod wm_fetcher;
pub(crate) mod wm_publisher;
Expand Down
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/watermark/isb/wm_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
//! last fetched watermark per partition and returns the smallest watermark among all the last fetched
//! watermarks across the partitions this is to make sure the watermark is min across all the incoming
//! partitions.
use std::collections::HashMap;

use crate::config::pipeline::watermark::BucketConfig;
use crate::error::Result;
use crate::watermark::processor::manager::ProcessorManager;
use crate::watermark::wmb::Watermark;
use std::collections::HashMap;

/// ISBWatermarkFetcher is the watermark fetcher for the incoming edges.
pub(crate) struct ISBWatermarkFetcher {
Expand Down Expand Up @@ -171,9 +172,9 @@ impl ISBWatermarkFetcher {
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use bytes::Bytes;
use std::sync::RwLock;

use super::*;
use crate::watermark::processor::manager::{Processor, Status};
Expand Down
7 changes: 4 additions & 3 deletions rust/numaflow-core/src/watermark/isb/wm_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use std::collections::HashMap;
use std::time::UNIX_EPOCH;
use std::time::{Duration, SystemTime};

use bytes::BytesMut;
use prost::Message;
use tracing::{debug, error, info};

use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::watermark::BucketConfig;
use crate::error::{Error, Result};
use crate::watermark::wmb::WMB;
use bytes::BytesMut;
use prost::Message;
use tracing::{debug, error, info};

/// Interval at which the pod sends heartbeats.
const DEFAULT_POD_HEARTBEAT_INTERVAL: u16 = 5;
Expand Down
8 changes: 5 additions & 3 deletions rust/numaflow-core/src/watermark/processor/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use std::time::SystemTime;

Expand All @@ -14,7 +15,6 @@ use backoff::strategy::fixed;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use prost::Message as ProtoMessage;
use std::sync::RwLock;
use tracing::{debug, error, info, warn};

use crate::config::pipeline::watermark::BucketConfig;
Expand Down Expand Up @@ -112,7 +112,8 @@ impl Drop for ProcessorManager {
}

impl ProcessorManager {
/// Creates a new ProcessorManager.
/// Creates a new ProcessorManager. It prepopulates the processor-map with previous data
/// fetched from the OT and HB buckets.
pub(crate) async fn new(
js_context: async_nats::jetstream::Context,
bucket_config: &BucketConfig,
Expand All @@ -137,9 +138,10 @@ impl ProcessorManager {
))
})?;

// fetch old data
let (processors_map, heartbeats_map) =
Self::prepopulate_processors(&hb_bucket, &ot_bucket, bucket_config).await;

// point to populated data
let processors = Arc::new(RwLock::new(processors_map));
let heartbeats = Arc::new(RwLock::new(heartbeats_map));

Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/watermark/processor/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;

use std::sync::RwLock;

use tracing::{debug, error};

use crate::watermark::wmb::WMB;
Expand Down
21 changes: 12 additions & 9 deletions rust/numaflow-core/src/watermark/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
//! [Source]: https://numaflow.numaproj.io/user-guide/sources/overview/
//! [ISB]: https://numaflow.numaproj.io/core-concepts/inter-step-buffer/
use std::collections::HashMap;
use std::time::Duration;

use tokio::sync::mpsc::Receiver;
use tracing::error;

use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::watermark::SourceWatermarkConfig;
use crate::config::pipeline::ToVertexConfig;
Expand All @@ -25,10 +31,6 @@ use crate::watermark::idle::source::SourceIdleDetector;
use crate::watermark::processor::manager::ProcessorManager;
use crate::watermark::source::source_wm_fetcher::SourceWatermarkFetcher;
use crate::watermark::source::source_wm_publisher::SourceWatermarkPublisher;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tracing::error;

/// fetcher for fetching the source watermark
pub(crate) mod source_wm_fetcher;
Expand Down Expand Up @@ -364,11 +366,6 @@ impl SourceWatermarkHandle {

#[cfg(test)]
mod tests {
use super::*;
use crate::config::pipeline::isb::BufferWriterConfig;
use crate::config::pipeline::watermark::{BucketConfig, IdleConfig};
use crate::message::{IntOffset, Message};
use crate::watermark::wmb::WMB;
use async_nats::jetstream;
use async_nats::jetstream::kv::Config;
use async_nats::jetstream::stream;
Expand All @@ -378,6 +375,12 @@ mod tests {
use prost::Message as _;
use tokio::time::sleep;

use super::*;
use crate::config::pipeline::isb::BufferWriterConfig;
use crate::config::pipeline::watermark::{BucketConfig, IdleConfig};
use crate::message::{IntOffset, Message};
use crate::watermark::wmb::WMB;

#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_publish_source_watermark() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ impl SourceWatermarkFetcher {
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use bytes::Bytes;
use std::sync::RwLock;

use super::*;
use crate::watermark::processor::manager::{Processor, Status};
Expand Down
13 changes: 8 additions & 5 deletions rust/numaflow-core/src/watermark/source/source_wm_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
//! the watermark across the source partitions. Since we write the messages to the ISB, we will also publish
//! the watermark to the ISB. Unlike other vertices we don't use pod as the processing entity for publishing
//! watermark we use the partition(watermark originates here).
use std::collections::HashMap;
use std::time::Duration;

use chrono::Utc;
use tracing::info;

use crate::config::pipeline::isb::Stream;
use crate::config::pipeline::watermark::BucketConfig;
use crate::error;
use crate::watermark::isb::wm_publisher::ISBWatermarkPublisher;
use chrono::Utc;
use std::collections::HashMap;
use std::time::Duration;
use tracing::info;

/// SourcePublisher is the watermark publisher for the source vertex.
pub(crate) struct SourceWatermarkPublisher {
Expand Down Expand Up @@ -117,9 +119,10 @@ impl SourceWatermarkPublisher {

#[cfg(test)]
mod tests {
use std::time::Duration;

use async_nats::jetstream;
use async_nats::jetstream::kv::Config;
use std::time::Duration;

use crate::config::pipeline::isb::Stream;
use crate::watermark::source::source_wm_publisher::{BucketConfig, SourceWatermarkPublisher};
Expand Down

0 comments on commit 10e997a

Please sign in to comment.