From 1606bec29aae64391e69429009f5084715460f4e Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 19 Feb 2025 23:36:52 +0530 Subject: [PATCH] fix unit tests Signed-off-by: Yashash H L --- rust/numaflow-core/src/config/pipeline.rs | 17 +--- rust/numaflow-core/src/mapper/map.rs | 99 ++++++++++--------- .../src/mapper/map/user_defined.rs | 17 ++-- .../src/pipeline/isb/jetstream/reader.rs | 11 ++- rust/numaflow-core/src/sink/user_defined.rs | 19 ++-- rust/numaflow-core/src/source.rs | 13 ++- rust/numaflow-core/src/source/user_defined.rs | 32 +++--- rust/numaflow-core/src/tracker.rs | 2 +- .../src/transformer/user_defined.rs | 17 ++-- 9 files changed, 117 insertions(+), 110 deletions(-) diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index 7696bc87f2..1d272ae9cc 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -733,22 +733,7 @@ mod tests { transformer_config: None, }), metrics_config: Default::default(), - watermark_config: Some(WatermarkConfig::Source(SourceWatermarkConfig { - max_delay: Default::default(), - source_bucket_config: BucketConfig { - vertex: "in", - partitions: 1, - ot_bucket: "default-simple-pipeline-in_SOURCE_OT", - hb_bucket: "default-simple-pipeline-in_SOURCE_PROCESSORS", - }, - to_vertex_bucket_config: vec![BucketConfig { - vertex: "out", - partitions: 1, - ot_bucket: "default-simple-pipeline-in-out_OT", - hb_bucket: "default-simple-pipeline-in-out_PROCESSORS", - }], - idle_config: None, - })), + watermark_config: None, ..Default::default() }; diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 3364f93548..8145d2d3e6 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -247,7 +247,7 @@ impl MapHandle { Some(error) = error_rx.recv() => { // when we get an error we cancel the token to signal the upstream to stop // sending new messages, and we empty the input stream and return the error. - if !self.final_result.is_err() { + if self.final_result.is_ok() { error!(?error, "error received while performing unary map operation"); cln_token.cancel(); self.final_result = Err(error); @@ -325,7 +325,7 @@ impl MapHandle { Some(error) = error_rx.recv() => { // when we get an error we cancel the token to signal the upstream to stop // sending new messages, and we empty the input stream and return the error. - if !self.final_result.is_err() { + if self.final_result.is_ok() { error!(?error, "error received while performing stream map operation"); cln_token.cancel(); self.final_result = Err(error); @@ -577,7 +577,6 @@ impl MapHandle { output_tx.send(mapped_message).await.expect("failed to send response"); } Some(Err(e)) => { - error!(?e, "failed to map message"); tracker_handle .discard(read_msg.offset) .await @@ -614,17 +613,17 @@ impl MapHandle { mod tests { use std::time::Duration; - use numaflow::{batchmap, map, mapstream}; - use numaflow_pb::clients::map::map_client::MapClient; - use tempfile::TempDir; - use tokio::sync::{mpsc::Sender, oneshot}; - use super::*; use crate::{ message::{MessageID, Offset, StringOffset}, shared::grpc::create_rpc_channel, Result, }; + use numaflow::{batchmap, map, mapstream}; + use numaflow_pb::clients::map::map_client::MapClient; + use tempfile::TempDir; + use tokio::sync::{mpsc::Sender, oneshot}; + use tokio::time::sleep; struct SimpleMapper; @@ -857,29 +856,33 @@ mod tests { let (input_tx, input_rx) = mpsc::channel(10); let input_stream = ReceiverStream::new(input_rx); - let message = Message { - typ: Default::default(), - keys: Arc::from(vec!["first".into()]), - tags: None, - value: "hello".into(), - offset: Offset::String(StringOffset::new("0".to_string(), 0)), - event_time: chrono::Utc::now(), - watermark: None, - id: MessageID { - vertex_name: "vertex_name".to_string().into(), - offset: "0".to_string().into(), - index: 0, - }, - headers: Default::default(), - metadata: None, - }; - - input_tx.send(message).await.unwrap(); - let (_output_stream, map_handle) = mapper .streaming_map(input_stream, CancellationToken::new()) .await?; + // send 10 requests to the mapper + for i in 0..10 { + let message = Message { + typ: Default::default(), + keys: Arc::from(vec![format!("key_{}", i)]), + tags: None, + value: format!("value_{}", i).into(), + offset: Offset::String(StringOffset::new(i.to_string(), 0)), + event_time: chrono::Utc::now(), + watermark: None, + id: MessageID { + vertex_name: "vertex_name".to_string().into(), + offset: i.to_string().into(), + index: i, + }, + headers: Default::default(), + metadata: None, + }; + input_tx.send(message).await.unwrap(); + sleep(Duration::from_millis(10)).await; + } + + drop(input_tx); // Await the join handle and expect an error due to the panic let result = map_handle.await.unwrap(); assert!(result.is_err(), "Expected an error due to panic"); @@ -1277,32 +1280,36 @@ mod tests { ) .await?; - let message = Message { - typ: Default::default(), - keys: Arc::from(vec!["first".into()]), - tags: None, - value: "panic".into(), - offset: Offset::String(StringOffset::new("0".to_string(), 0)), - event_time: chrono::Utc::now(), - watermark: None, - id: MessageID { - vertex_name: "vertex_name".to_string().into(), - offset: "0".to_string().into(), - index: 0, - }, - headers: Default::default(), - metadata: None, - }; - let (input_tx, input_rx) = mpsc::channel(10); let input_stream = ReceiverStream::new(input_rx); - input_tx.send(message).await.unwrap(); - let (_output_stream, map_handle) = mapper .streaming_map(input_stream, CancellationToken::new()) .await?; + // send 10 requests to the mapper + for i in 0..10 { + let message = Message { + typ: Default::default(), + keys: Arc::from(vec![format!("key_{}", i)]), + tags: None, + value: format!("value_{}", i).into(), + offset: Offset::String(StringOffset::new(i.to_string(), 0)), + event_time: chrono::Utc::now(), + watermark: None, + id: MessageID { + vertex_name: "vertex_name".to_string().into(), + offset: i.to_string().into(), + index: i, + }, + headers: Default::default(), + metadata: None, + }; + input_tx.send(message).await.unwrap(); + sleep(Duration::from_millis(10)).await; + } + + drop(input_tx); // Await the join handle and expect an error due to the panic let result = map_handle.await.unwrap(); assert!(result.is_err(), "Expected an error due to panic"); diff --git a/rust/numaflow-core/src/mapper/map/user_defined.rs b/rust/numaflow-core/src/mapper/map/user_defined.rs index bbfb52f4b8..b4b2106690 100644 --- a/rust/numaflow-core/src/mapper/map/user_defined.rs +++ b/rust/numaflow-core/src/mapper/map/user_defined.rs @@ -274,16 +274,17 @@ async fn create_response_stream( let mut resp_stream = client .map_fn(Request::new(ReceiverStream::new(read_rx))) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner(); - let handshake_response = resp_stream - .message() - .await - .map_err(|e| Error::Grpc(e))? - .ok_or(Error::Mapper( - "failed to receive handshake response".to_string(), - ))?; + let handshake_response = + resp_stream + .message() + .await + .map_err(Error::Grpc)? + .ok_or(Error::Mapper( + "failed to receive handshake response".to_string(), + ))?; if handshake_response.handshake.map_or(true, |h| !h.sot) { return Err(Error::Mapper("invalid handshake response".to_string())); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index fb4b7ae04f..d421403521 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -457,12 +457,13 @@ mod tests { streams: vec![], wip_ack_interval: Duration::from_millis(5), }; + let tracker = TrackerHandle::new(None, None); let js_reader = JetStreamReader::new( "Map".to_string(), stream.clone(), context.clone(), buf_reader_config, - TrackerHandle::new(None, None), + tracker.clone(), 500, None, ) @@ -475,13 +476,16 @@ mod tests { .await .unwrap(); + let mut offsets = vec![]; for i in 0..10 { + let offset = Offset::Int(IntOffset::new(i + 1, 0)); + offsets.push(offset.clone()); let message = Message { typ: Default::default(), keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), - offset: Offset::Int(IntOffset::new(i, 0)), + offset, event_time: Utc::now(), watermark: None, id: MessageID { @@ -513,6 +517,9 @@ mod tests { "Expected 10 messages from the jetstream reader" ); + for offset in offsets { + tracker.discard(offset).await.unwrap(); + } reader_cancel_token.cancel(); js_reader_task.await.unwrap().unwrap(); diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index c70c6eeb4a..7152f37657 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -57,18 +57,19 @@ impl UserDefinedSink { let mut resp_stream = client .sink_fn(Request::new(sink_stream)) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner(); // First response from the server will be the handshake response. We need to check if the // server has accepted the handshake. - let handshake_response = resp_stream - .message() - .await - .map_err(|e| Error::Grpc(e))? - .ok_or(Error::Sink( - "failed to receive handshake response".to_string(), - ))?; + let handshake_response = + resp_stream + .message() + .await + .map_err(Error::Grpc)? + .ok_or(Error::Sink( + "failed to receive handshake response".to_string(), + ))?; // Handshake cannot be None during the initial phase, and it has to set `sot` to true. if handshake_response.handshake.map_or(true, |h| !h.sot) { @@ -118,7 +119,7 @@ impl Sink for UserDefinedSink { .resp_stream .message() .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .ok_or(Error::Sink("failed to receive response".to_string()))?; if response.status.is_some_and(|s| s.eot) { diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index af041a8a49..fe1c421046 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -371,6 +371,7 @@ impl Source { for message in messages.iter() { let (resp_ack_tx, resp_ack_rx) = oneshot::channel(); let offset = message.offset.clone(); + println!("offset: {:?}", offset); // insert the offset and the ack one shot in the tracker. self.tracker_handle.insert(message, resp_ack_tx).await?; @@ -660,10 +661,11 @@ mod tests { .map_err(|e| panic!("failed to create source reader: {:?}", e)) .unwrap(); + let tracker = TrackerHandle::new(None, None); let source = Source::new( 5, SourceType::UserDefinedSource(src_read, src_ack, lag_reader), - TrackerHandle::new(None, None), + tracker.clone(), true, None, None, @@ -683,7 +685,11 @@ mod tests { } // ack all the messages - Source::ack(sender.clone(), offsets).await.unwrap(); + Source::ack(sender.clone(), offsets.clone()).await.unwrap(); + + for offset in offsets { + tracker.discard(offset).await.unwrap(); + } // since we acked all the messages, pending should be 0 let pending = source.pending().await.unwrap(); @@ -692,6 +698,9 @@ mod tests { let partitions = Source::partitions(sender.clone()).await.unwrap(); assert_eq!(partitions, vec![1, 2]); + drop(source); + drop(sender); + cln_token.cancel(); let _ = handle.await.unwrap(); let _ = shutdown_tx.send(()); diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 91318632b6..4e2e635449 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -90,18 +90,19 @@ impl UserDefinedSourceRead { let mut resp_stream = client .read_fn(Request::new(read_stream)) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner(); // first response from the server will be the handshake response. We need to check if the // server has accepted the handshake. - let handshake_response = resp_stream - .message() - .await - .map_err(|e| Error::Grpc(e))? - .ok_or(Error::Source( - "failed to receive handshake response".to_string(), - ))?; + let handshake_response = + resp_stream + .message() + .await + .map_err(Error::Grpc)? + .ok_or(Error::Source( + "failed to receive handshake response".to_string(), + ))?; // handshake cannot to None during the initial phase, and it has to set `sot` to true. if handshake_response.handshake.map_or(true, |h| !h.sot) { return Err(Error::Source("invalid handshake response".to_string())); @@ -183,12 +184,7 @@ impl SourceReader for UserDefinedSourceRead { let mut messages = Vec::with_capacity(self.num_records); - while let Some(response) = self - .resp_stream - .message() - .await - .map_err(|e| Error::Grpc(e))? - { + while let Some(response) = self.resp_stream.message().await.map_err(Error::Grpc)? { if response.status.is_some_and(|status| status.eot) { break; } @@ -247,7 +243,7 @@ impl UserDefinedSourceAck { let mut ack_resp_stream = client .ack_fn(Request::new(ack_stream)) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner(); // first response from the server will be the handshake response. We need to check if the @@ -255,7 +251,7 @@ impl UserDefinedSourceAck { let ack_handshake_response = ack_resp_stream .message() .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .ok_or(Error::Source( "failed to receive ack handshake response".to_string(), ))?; @@ -287,7 +283,7 @@ impl SourceAcker for UserDefinedSourceAck { .ack_resp_stream .message() .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .ok_or(Error::Source("failed to receive ack response".to_string()))?; Ok(()) @@ -311,7 +307,7 @@ impl LagReader for UserDefinedSourceLagReader { .source_client .pending_fn(Request::new(())) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner() .result .map(|r| r.count as usize)) diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index bc5cbf886b..1307c55d2f 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -131,7 +131,7 @@ impl TryFrom<&Message> for ServingCallbackInfo { impl Drop for Tracker { fn drop(&mut self) { - if self.entries.len() > 0 { + if !self.entries.is_empty() { error!("Tracker dropped with non-empty entries: {:?}", self.entries); } } diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index a0a36407b9..237e52e557 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -77,16 +77,17 @@ impl UserDefinedTransformer { let mut resp_stream = client .source_transform_fn(Request::new(read_stream)) .await - .map_err(|e| Error::Grpc(e))? + .map_err(Error::Grpc)? .into_inner(); - let handshake_response = resp_stream - .message() - .await - .map_err(|e| Error::Grpc(e))? - .ok_or(Error::Transformer( - "failed to receive handshake response".to_string(), - ))?; + let handshake_response = + resp_stream + .message() + .await + .map_err(Error::Grpc)? + .ok_or(Error::Transformer( + "failed to receive handshake response".to_string(), + ))?; if handshake_response.handshake.map_or(true, |h| !h.sot) { return Err(Error::Transformer("invalid handshake response".to_string()));