From c2ff43f47a74728e31584e9268eb0647c868f223 Mon Sep 17 00:00:00 2001 From: Shrivardhan Rao Date: Mon, 10 Feb 2025 19:45:44 -0800 Subject: [PATCH] fix: watermark in sqs Signed-off-by: Shrivardhan Rao --- rust/extns/numaflow-sqs/src/source.rs | 107 ++++++++++++-------------- rust/numaflow-core/src/source/sqs.rs | 44 ++++------- 2 files changed, 61 insertions(+), 90 deletions(-) diff --git a/rust/extns/numaflow-sqs/src/source.rs b/rust/extns/numaflow-sqs/src/source.rs index 3fe9c2e7c7..8da838ec5a 100644 --- a/rust/extns/numaflow-sqs/src/source.rs +++ b/rust/extns/numaflow-sqs/src/source.rs @@ -22,16 +22,6 @@ use crate::{Error, Result}; pub const SQS_DEFAULT_REGION: &str = "us-west-2"; -/// Returns the name of the SQS message system attribute for the sent timestamp. -/// -/// This method is needed to retrieve the `SentTimestamp` attribute from SQS messages, -/// which is used to set the watermark for message processing. The watermark helps -/// in tracking the event time of messages and ensuring that messages are processed -/// in the correct order. -pub fn sent_timestamp_message_system_attribute_name() -> &'static str { - MessageSystemAttributeName::SentTimestamp.as_str() -} - /// Configuration for an SQS message source. /// /// Used to initialize the SQS client with region and queue settings. @@ -199,6 +189,9 @@ impl SqsActor { let key = msg.message_id.clone().unwrap_or_default(); let payload = Bytes::from(msg.body.clone().unwrap_or_default()); let offset = msg.receipt_handle.clone().unwrap_or_default(); + + // event_time is set to match the SentTimestamp attribute if available + // see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestSyntax let event_time = msg .attributes .as_ref() @@ -338,17 +331,17 @@ pub struct SqsSourceBuilder { impl Default for SqsSourceBuilder { fn default() -> Self { - Self::new() + Self::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "".to_string(), + }) } } impl SqsSourceBuilder { - pub fn new() -> Self { + pub fn new(config: SqsSourceConfig) -> Self { Self { - config: SqsSourceConfig { - region: SQS_DEFAULT_REGION.to_string(), - queue_name: "".to_string(), - }, + config, batch_size: 1, timeout: Duration::from_secs(1), client: None, @@ -540,17 +533,16 @@ mod tests { let sqs_mock_client = Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); - let source = SqsSourceBuilder::new() - .config(SqsSourceConfig { - region: SQS_DEFAULT_REGION.to_string(), - queue_name: "test-q".to_string(), - }) - .batch_size(1) - .timeout(Duration::from_secs(0)) - .client(sqs_mock_client) - .build() - .await - .unwrap(); + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); // Read messages from the source let messages = source.read_messages().await.unwrap(); @@ -581,17 +573,16 @@ mod tests { let sqs_mock_client = Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); - let source = SqsSourceBuilder::new() - .config(SqsSourceConfig { - region: SQS_DEFAULT_REGION.to_string(), - queue_name: "test-q".to_string(), - }) - .batch_size(1) - .timeout(Duration::from_secs(0)) - .client(sqs_mock_client) - .build() - .await - .unwrap(); + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); // Test acknowledgment let offset = "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q"; @@ -612,17 +603,16 @@ mod tests { let sqs_mock_client = Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); - let source = SqsSourceBuilder::new() - .config(SqsSourceConfig { - region: SQS_DEFAULT_REGION.to_string(), - queue_name: "test-q".to_string(), - }) - .batch_size(1) - .timeout(Duration::from_secs(0)) - .client(sqs_mock_client) - .build() - .await - .unwrap(); + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); let count = source.pending_count().await; assert_eq!(count, Some(0)); @@ -638,16 +628,15 @@ mod tests { let sqs_mock_client = Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); - let source = SqsSourceBuilder::new() - .config(SqsSourceConfig { - region: "invalid-region".to_string(), - queue_name: "test-q".to_string(), - }) - .batch_size(1) - .timeout(Duration::from_secs(0)) - .client(sqs_mock_client) - .build() - .await; + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await; assert!(source.is_err()); } diff --git a/rust/numaflow-core/src/source/sqs.rs b/rust/numaflow-core/src/source/sqs.rs index 35e7856186..3de406ed3d 100644 --- a/rust/numaflow-core/src/source/sqs.rs +++ b/rust/numaflow-core/src/source/sqs.rs @@ -1,11 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use chrono::DateTime; -use numaflow_sqs::source::{ - sent_timestamp_message_system_attribute_name, SqsMessage, SqsSource, SqsSourceBuilder, - SqsSourceConfig, -}; +use numaflow_sqs::source::{SqsMessage, SqsSource, SqsSourceBuilder, SqsSourceConfig}; use crate::config::{get_vertex_name, get_vertex_replica}; use crate::error::Error; @@ -18,25 +14,13 @@ impl TryFrom for Message { fn try_from(message: SqsMessage) -> crate::Result { let offset = Offset::String(StringOffset::new(message.offset, *get_vertex_replica())); - // find the SentTimestamp header and convert it to a chrono::DateTime - let sent_timestamp = match message - .headers - .get(sent_timestamp_message_system_attribute_name()) - { - Some(sent_timestamp_str) => sent_timestamp_str - .parse::() - .ok() - .and_then(DateTime::from_timestamp_millis), - None => None, - }; - Ok(Message { keys: Arc::from(vec![message.key]), tags: None, value: message.payload, offset: offset.clone(), event_time: message.event_time, - watermark: sent_timestamp, + watermark: Some(message.event_time), id: MessageID { vertex_name: get_vertex_name().to_string().into(), offset: offset.to_string().into(), @@ -65,8 +49,7 @@ pub(crate) async fn new_sqs_source( batch_size: usize, timeout: Duration, ) -> crate::Result { - Ok(SqsSourceBuilder::new() - .config(cfg) + Ok(SqsSourceBuilder::new(cfg) .batch_size(batch_size) .timeout(timeout) .build() @@ -179,17 +162,16 @@ pub mod tests { let sqs_client = aws_sdk_sqs::Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); - let sqs_source = SqsSourceBuilder::new() - .config(SqsSourceConfig { - region: SQS_DEFAULT_REGION.to_string(), - queue_name: "test-q".to_string(), - }) - .batch_size(1) - .timeout(Duration::from_secs(1)) - .client(sqs_client) - .build() - .await - .unwrap(); + let sqs_source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(1)) + .client(sqs_client) + .build() + .await + .unwrap(); // create SQS source with test client use crate::tracker::TrackerHandle;