diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index 0a5ba67508..45d1f59c66 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -353,8 +353,6 @@ mod tests { reader_config: BufferReaderConfig { partitions: 1, streams: vec![("default-simple-pipeline-out-0".into(), 0)], - batch_size: 500, - read_timeout: Duration::from_secs(1), wip_ack_interval: Duration::from_secs(1), }, partitions: 0, diff --git a/rust/numaflow-core/src/config/pipeline/isb.rs b/rust/numaflow-core/src/config/pipeline/isb.rs index 704d19de08..c010f9a15d 100644 --- a/rust/numaflow-core/src/config/pipeline/isb.rs +++ b/rust/numaflow-core/src/config/pipeline/isb.rs @@ -3,7 +3,6 @@ use std::fmt; use std::time::Duration; const DEFAULT_PARTITION_IDX: u16 = 0; -const DEFAULT_BATCH_SIZE: usize = 500; const DEFAULT_PARTITIONS: u16 = 1; const DEFAULT_MAX_LENGTH: usize = 30000; const DEFAULT_USAGE_LIMIT: f64 = 0.8; @@ -11,7 +10,6 @@ const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1; const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess; const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10; const DEFAULT_WIP_ACK_INTERVAL_MILLIS: u64 = 1000; -const DEFAULT_READ_TIMEOUT_MILLIS: u64 = 1000; pub(crate) mod jetstream { const DEFAULT_URL: &str = "localhost:4222"; @@ -78,8 +76,6 @@ impl fmt::Display for BufferFullStrategy { pub(crate) struct BufferReaderConfig { pub(crate) partitions: u16, pub(crate) streams: Vec<(String, u16)>, - pub(crate) batch_size: usize, - pub(crate) read_timeout: Duration, pub(crate) wip_ack_interval: Duration, } @@ -88,9 +84,7 @@ impl Default for BufferReaderConfig { BufferReaderConfig { partitions: DEFAULT_PARTITIONS, streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)], - batch_size: DEFAULT_BATCH_SIZE, wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS), - read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS), } } } @@ -145,9 +139,7 @@ mod tests { let expected = BufferReaderConfig { partitions: DEFAULT_PARTITIONS, streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)], - batch_size: DEFAULT_BATCH_SIZE, wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS), - read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS), }; let config = BufferReaderConfig::default(); assert_eq!(config, expected); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index a9724780ea..f5896b3cc7 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -1,11 +1,11 @@ -use std::collections::HashMap; - -use async_nats::jetstream; use async_nats::jetstream::Context; +use async_nats::{jetstream, ConnectOptions}; use futures::future::try_join_all; use numaflow_pb::clients::sink::sink_client::SinkClient; use numaflow_pb::clients::source::source_client::SourceClient; use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; +use std::collections::HashMap; +use std::time::Duration; use tokio_util::sync::CancellationToken; use tonic::transport::Channel; @@ -289,17 +289,22 @@ async fn create_transformer( /// Creates a jetstream context based on the provided configuration async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Result { - let js_client = match (config.user, config.password) { - (Some(user), Some(password)) => { - async_nats::connect_with_options( - config.url, - async_nats::ConnectOptions::with_user_and_password(user, password), - ) - .await - } - _ => async_nats::connect(config.url).await, + // TODO: make these configurable. today this is hardcoded on Golang code too. + let mut opts = ConnectOptions::new() + .max_reconnects(None) // -1 for unlimited reconnects + .ping_interval(Duration::from_secs(3)) + .max_reconnects(None) + .ping_interval(Duration::from_secs(3)) + .retry_on_initial_connect(); + + if let (Some(user), Some(password)) = (config.user, config.password) { + opts = opts.user_and_password(user, password); } - .map_err(|e| error::Error::Connection(e.to_string()))?; + + let js_client = async_nats::connect_with_options(&config.url, opts) + .await + .map_err(|e| error::Error::Connection(e.to_string()))?; + Ok(jetstream::new(js_client)) } @@ -562,8 +567,6 @@ mod tests { .enumerate() .map(|(i, key)| (key.to_string(), i as u16)) .collect(), - batch_size: 500, - read_timeout: Duration::from_secs(1), wip_ack_interval: Duration::from_secs(1), }, partitions: 0, diff --git a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs index 5dd94290de..9ba2ba94fd 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs @@ -162,6 +162,7 @@ impl Forwarder { /// Writes messages to the jetstream, it writes to all the downstream buffers. async fn write_to_jetstream(&mut self, messages: Vec) -> Result<(), Error> { + let start_time = tokio::time::Instant::now(); if messages.is_empty() { return Ok(()); } @@ -186,6 +187,11 @@ impl Forwarder { .await .map_err(|e| Error::Forwarder(format!("Failed to write to jetstream {:?}", e)))??; } + debug!( + len = messages.len(), + elapsed_ms = start_time.elapsed().as_millis(), + "Wrote messages to jetstream", + ); Ok(()) } } diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 5f3d2926bf..46faf2e95b 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -3,13 +3,14 @@ use std::time::Duration; use async_nats::jetstream::{ consumer::PullConsumer, AckKind, Context, Message as JetstreamMessage, }; + use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{self, Instant}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use tracing::{error, warn}; +use tracing::{debug, error, info, warn}; use crate::config::pipeline::isb::BufferReaderConfig; use crate::config::pipeline::PipelineConfig; @@ -72,7 +73,8 @@ impl JetstreamReader { cancel_token: CancellationToken, pipeline_config: &PipelineConfig, ) -> Result<(Receiver, JoinHandle>)> { - let (messages_tx, messages_rx) = mpsc::channel(2 * self.config.batch_size); + // FIXME: factor of 2 should be configurable, at the least a const + let (messages_tx, messages_rx) = mpsc::channel(2 * pipeline_config.batch_size); let handle: JoinHandle> = tokio::spawn({ let this = self.clone(); @@ -104,41 +106,44 @@ impl JetstreamReader { .messages() .await .unwrap() - .chunks_timeout(this.config.batch_size, this.config.read_timeout); + .chunks_timeout(pipeline_config.batch_size, pipeline_config.read_timeout); tokio::pin!(chunk_stream); // The .next() call will not return if there is no data even if read_timeout is // reached. + let mut total_messages = 0; + let mut chunk_time = Instant::now(); + let mut start_time = Instant::now(); while let Some(messages) = chunk_stream.next().await { + debug!( + len = messages.len(), + elapsed_ms = chunk_time.elapsed().as_millis(), + "Received messages from Jetstream", + ); + total_messages += messages.len(); for message in messages { - let jetstream_message = match message { - Ok(message) => message, - Err(e) => { - error!(?e, "Failed to fetch messages from the Jetstream"); - continue; - } - }; - - let msg_info = match jetstream_message.info() { - Ok(info) => info, - Err(e) => { - error!(?e, "Failed to get message info from Jetstream"); - continue; - } - }; + let jetstream_message = message.map_err(|e| { + Error::ISB(format!( + "Error while fetching message from Jetstream: {:?}", + e + )) + })?; + + let msg_info = jetstream_message.info().map_err(|e| { + Error::ISB(format!( + "Error while fetching message info from Jetstream: {:?}", + e + )) + })?; let mut message: Message = - match jetstream_message.payload.clone().try_into() { - Ok(message) => message, - Err(e) => { - error!( - ?e, - "Failed to parse message payload received from Jetstream" - ); - continue; - } - }; + jetstream_message.payload.clone().try_into().map_err(|e| { + Error::ISB(format!( + "Error while converting Jetstream message to Message: {:?}", + e + )) + })?; message.offset = Some(Offset::Int(IntOffset::new( msg_info.stream_sequence, @@ -158,21 +163,31 @@ impl JetstreamReader { ack: ack_tx, }; - if messages_tx.send(read_message).await.is_err() { - error!("Failed to send message to the channel"); - return Ok(()); - } + messages_tx.send(read_message).await.map_err(|e| { + Error::ISB(format!("Error while sending message to channel: {:?}", e)) + })?; forward_pipeline_metrics() .forwarder .data_read .get_or_create(labels) .inc(); + + if start_time.elapsed() >= Duration::from_millis(1000) { + info!( + len = total_messages, + elapsed_ms = start_time.elapsed().as_millis(), + "Total messages read from Jetstream" + ); + start_time = Instant::now(); + total_messages = 0; + } } if cancel_token.is_cancelled() { warn!("Cancellation token is cancelled. Exiting JetstreamReader"); break; } + chunk_time = Instant::now(); } Ok(()) } @@ -279,8 +294,6 @@ mod tests { let buf_reader_config = BufferReaderConfig { partitions: 0, streams: vec![], - batch_size: 2, - read_timeout: Duration::from_millis(1000), wip_ack_interval: Duration::from_millis(5), }; let js_reader = JetstreamReader::new( diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 65d10963ca..9fbc7603a9 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -12,7 +12,7 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio::time::sleep; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::config::pipeline::isb::BufferWriterConfig; use crate::error::Error; @@ -207,7 +207,7 @@ impl JetstreamWriter { /// an error it means it is fatal non-retryable error. pub(super) async fn blocking_write(&self, payload: Vec) -> Result { let js_ctx = self.js_ctx.clone(); - + let start_time = tokio::time::Instant::now(); loop { match js_ctx .publish(self.stream_name.clone(), Bytes::from(payload.clone())) @@ -219,8 +219,12 @@ impl JetstreamWriter { // should we return an error here? Because duplicate messages are not fatal // But it can mess up the watermark progression because the offset will be // same as the previous message offset - warn!("Duplicate message detected, ignoring {:?}", ack); + warn!(ack = ?ack, "Duplicate message detected, ignoring"); } + debug!( + elapsed_ms = start_time.elapsed().as_millis(), + "Blocking write successful in", + ); return Ok(ack); } Err(e) => {