Skip to content

Commit

Permalink
feat: add sqs source (#2355)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
cosmic-chichu and vigith authored Feb 11, 2025
1 parent 72a11e9 commit bf0f9db
Show file tree
Hide file tree
Showing 9 changed files with 1,874 additions and 25 deletions.
687 changes: 665 additions & 22 deletions rust/Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ members = [
"numaflow-core",
"numaflow-pb",
"extns/numaflow-pulsar",
"numaflow",
"extns/numaflow-sqs",
"numaflow"
]

[workspace.lints.rust]
Expand Down Expand Up @@ -59,6 +60,7 @@ numaflow-models = { path = "numaflow-models" }
backoff = { path = "backoff" }
numaflow-pb = { path = "numaflow-pb" }
numaflow-pulsar = { path = "extns/numaflow-pulsar" }
numaflow-sqs = { path = "extns/numaflow-sqs" }
tokio = "1.43.0"
bytes = "1.9.0"
tracing = "0.1.41"
Expand Down
25 changes: 25 additions & 0 deletions rust/extns/numaflow-sqs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "numaflow-sqs"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[dependencies]
tokio.workspace = true
tracing.workspace = true
bytes.workspace = true
serde.workspace = true
aws-config = "1.5.11"
aws-sdk-sqs = "1.51.0"
aws-smithy-runtime = { version = "1.7.6", features = ["test-util"] }
aws-smithy-types = "1.2.11"
chrono = "0.4.38"
tonic = "0.12.3"
prost = "0.11.9"
thiserror = "1.0.69"


[dev-dependencies]
aws-smithy-mocks-experimental = "0.2.1"
100 changes: 100 additions & 0 deletions rust/extns/numaflow-sqs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! Library for robust SQS message handling using an actor-based architecture.
//!
//! This module provides a fault-tolerant interface for interacting with Amazon SQS,
//! with a focus on:
//! - Error propagation and handling for AWS SDK errors
//! - Actor-based concurrency model for thread safety
//! - Clean abstraction of SQS operations
use tokio::sync::oneshot;
pub mod source;

/// Custom error types for the SQS client library.
///
/// Design goals:
/// - Ergonomic error handling with thiserror
/// - Clear error propagation from AWS SDK
/// - Explicit handling of actor communication failures
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed with SQS error - {0}")]
Sqs(#[from] aws_sdk_sqs::Error),

#[error("Failed to receive message from channel. Actor task is terminated: {0:?}")]
ActorTaskTerminated(oneshot::error::RecvError),

#[error("{0}")]
Other(String),
}

pub type Result<T> = core::result::Result<T, Error>;

impl From<String> for Error {
fn from(value: String) -> Self {
Error::Other(value)
}
}

#[cfg(test)]
mod tests {
use aws_config::BehaviorVersion;
use aws_smithy_mocks_experimental::{mock, MockResponseInterceptor, RuleMode};
use aws_smithy_types::error::ErrorMetadata;

use super::*;

#[tokio::test]
async fn test_sqs_error_conversion() {
let modeled_error = mock!(aws_sdk_sqs::Client::get_queue_url).then_error(|| {
aws_sdk_sqs::operation::get_queue_url::GetQueueUrlError::generic(
ErrorMetadata::builder().code("InvalidAddress").build(),
)
});

let get_object_mocks = MockResponseInterceptor::new()
.rule_mode(RuleMode::MatchAny)
.with_rule(&modeled_error);

let sqs = aws_sdk_sqs::Client::from_conf(
aws_sdk_sqs::Config::builder()
.behavior_version(BehaviorVersion::latest())
.region(aws_sdk_sqs::config::Region::new("us-east-1"))
.credentials_provider(make_sqs_test_credentials())
.interceptor(get_object_mocks)
.build(),
);
let err = sqs.get_queue_url().send().await.unwrap_err();

let converted_error = Error::Sqs(err.into());
assert!(matches!(converted_error, Error::Sqs(_)));
assert!(converted_error
.to_string()
.contains("Failed with SQS error"));
}

#[test]
fn test_string_error_conversion() {
let str_err = "custom error message".to_string();
let err: Error = str_err.into();
assert!(matches!(err, Error::Other(_)));
assert_eq!(err.to_string(), "custom error message");
}

#[tokio::test]
async fn test_actor_task_terminated() {
let (tx, rx) = oneshot::channel::<()>();
drop(tx); // Force the error
let err = Error::ActorTaskTerminated(rx.await.unwrap_err());
assert!(matches!(err, Error::ActorTaskTerminated(_)));
assert!(err.to_string().contains("Actor task is terminated"));
}

fn make_sqs_test_credentials() -> aws_sdk_sqs::config::Credentials {
aws_sdk_sqs::config::Credentials::new(
"ATESTCLIENT",
"astestsecretkey",
Some("atestsessiontoken".to_string()),
None,
"",
)
}
}
Loading

0 comments on commit bf0f9db

Please sign in to comment.