Skip to content

Commit

Permalink
fix: watermark in sqs
Browse files Browse the repository at this point in the history
Signed-off-by: Shrivardhan Rao <[email protected]>
  • Loading branch information
cosmic-chichu committed Feb 11, 2025
1 parent 4d4e9d6 commit c2ff43f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 90 deletions.
107 changes: 48 additions & 59 deletions rust/extns/numaflow-sqs/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ use crate::{Error, Result};

pub const SQS_DEFAULT_REGION: &str = "us-west-2";

/// Returns the name of the SQS message system attribute for the sent timestamp.
///
/// This method is needed to retrieve the `SentTimestamp` attribute from SQS messages,
/// which is used to set the watermark for message processing. The watermark helps
/// in tracking the event time of messages and ensuring that messages are processed
/// in the correct order.
pub fn sent_timestamp_message_system_attribute_name() -> &'static str {
MessageSystemAttributeName::SentTimestamp.as_str()
}

/// Configuration for an SQS message source.
///
/// Used to initialize the SQS client with region and queue settings.
Expand Down Expand Up @@ -199,6 +189,9 @@ impl SqsActor {
let key = msg.message_id.clone().unwrap_or_default();
let payload = Bytes::from(msg.body.clone().unwrap_or_default());
let offset = msg.receipt_handle.clone().unwrap_or_default();

// event_time is set to match the SentTimestamp attribute if available
// see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestSyntax
let event_time = msg
.attributes
.as_ref()
Expand Down Expand Up @@ -338,17 +331,17 @@ pub struct SqsSourceBuilder {

impl Default for SqsSourceBuilder {
fn default() -> Self {
Self::new()
Self::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "".to_string(),
})
}
}

impl SqsSourceBuilder {
pub fn new() -> Self {
pub fn new(config: SqsSourceConfig) -> Self {
Self {
config: SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "".to_string(),
},
config,
batch_size: 1,
timeout: Duration::from_secs(1),
client: None,
Expand Down Expand Up @@ -540,17 +533,16 @@ mod tests {
let sqs_mock_client =
Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks));

let source = SqsSourceBuilder::new()
.config(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();
let source = SqsSourceBuilder::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();

// Read messages from the source
let messages = source.read_messages().await.unwrap();
Expand Down Expand Up @@ -581,17 +573,16 @@ mod tests {
let sqs_mock_client =
Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks));

let source = SqsSourceBuilder::new()
.config(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();
let source = SqsSourceBuilder::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();

// Test acknowledgment
let offset = "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q";
Expand All @@ -612,17 +603,16 @@ mod tests {
let sqs_mock_client =
Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks));

let source = SqsSourceBuilder::new()
.config(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();
let source = SqsSourceBuilder::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await
.unwrap();

let count = source.pending_count().await;
assert_eq!(count, Some(0));
Expand All @@ -638,16 +628,15 @@ mod tests {
let sqs_mock_client =
Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks));

let source = SqsSourceBuilder::new()
.config(SqsSourceConfig {
region: "invalid-region".to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await;
let source = SqsSourceBuilder::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(0))
.client(sqs_mock_client)
.build()
.await;
assert!(source.is_err());
}

Expand Down
44 changes: 13 additions & 31 deletions rust/numaflow-core/src/source/sqs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use chrono::DateTime;
use numaflow_sqs::source::{
sent_timestamp_message_system_attribute_name, SqsMessage, SqsSource, SqsSourceBuilder,
SqsSourceConfig,
};
use numaflow_sqs::source::{SqsMessage, SqsSource, SqsSourceBuilder, SqsSourceConfig};

use crate::config::{get_vertex_name, get_vertex_replica};
use crate::error::Error;
Expand All @@ -18,25 +14,13 @@ impl TryFrom<SqsMessage> for Message {
fn try_from(message: SqsMessage) -> crate::Result<Self> {
let offset = Offset::String(StringOffset::new(message.offset, *get_vertex_replica()));

// find the SentTimestamp header and convert it to a chrono::DateTime<Utc>
let sent_timestamp = match message
.headers
.get(sent_timestamp_message_system_attribute_name())
{
Some(sent_timestamp_str) => sent_timestamp_str
.parse::<i64>()
.ok()
.and_then(DateTime::from_timestamp_millis),
None => None,
};

Ok(Message {
keys: Arc::from(vec![message.key]),
tags: None,
value: message.payload,
offset: offset.clone(),
event_time: message.event_time,
watermark: sent_timestamp,
watermark: Some(message.event_time),
id: MessageID {
vertex_name: get_vertex_name().to_string().into(),
offset: offset.to_string().into(),
Expand Down Expand Up @@ -65,8 +49,7 @@ pub(crate) async fn new_sqs_source(
batch_size: usize,
timeout: Duration,
) -> crate::Result<SqsSource> {
Ok(SqsSourceBuilder::new()
.config(cfg)
Ok(SqsSourceBuilder::new(cfg)
.batch_size(batch_size)
.timeout(timeout)
.build()
Expand Down Expand Up @@ -179,17 +162,16 @@ pub mod tests {
let sqs_client =
aws_sdk_sqs::Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks));

let sqs_source = SqsSourceBuilder::new()
.config(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(1))
.client(sqs_client)
.build()
.await
.unwrap();
let sqs_source = SqsSourceBuilder::new(SqsSourceConfig {
region: SQS_DEFAULT_REGION.to_string(),
queue_name: "test-q".to_string(),
})
.batch_size(1)
.timeout(Duration::from_secs(1))
.client(sqs_client)
.build()
.await
.unwrap();

// create SQS source with test client
use crate::tracker::TrackerHandle;
Expand Down

0 comments on commit c2ff43f

Please sign in to comment.