diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 2961bf4934..ccdd13a3c7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -53,6 +53,17 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "assert-json-diff" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259cbe96513d2f1073027a259fc2ca917feb3026a5a8d984e3628e490255cc0" +dependencies = [ + "extend", + "serde", + "serde_json", +] + [[package]] name = "async-nats" version = "0.38.0" @@ -134,6 +145,48 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-config" +version = "1.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f40e82e858e02445402906e454a73e244c7f501fcae198977585946c48e8697" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 0.2.12", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" version = "1.12.2" @@ -159,6 +212,320 @@ dependencies = [ "paste", ] +[[package]] +name = "aws-runtime" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee7643696e7fdd74c10f9eb42848a87fe469d35eae9c3323f80aa98f350baac" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-sqs" +version = "1.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebfafe6700b71329b055d118c97ec00c6d569a5a56fd2a07d8b2e13d7ace32a5" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "921a13ed6aabe2d1258f65ef7804946255c799224440774c30e1a2c65cdf983a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.55.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196c952738b05dfc917d82a3e9b5ba850822a6d6a86d677afda2a156cc172ceb" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.55.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ef5b73a927ed80b44096f8c20fb4abae65469af15198367e179ae267256e9d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "690118821e46967b3c4501d67d7d52dd75106a9c54cf36cefa1985cedbe94e05" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.2.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-mocks-experimental" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1069164f54cd37cdcf67e30f77ed996ccd71ad85344b9bb0412a1ca224617b" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-protocol-test" +version = "0.63.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b92b62199921f10685c6b588fdbeb81168ae4e7950ae3e5f50145a01bb5f1ad" +dependencies = [ + "assert-json-diff", + "aws-smithy-runtime-api", + "base64-simd", + "cbor-diag", + "ciborium", + "http 0.2.12", + "pretty_assertions", + "regex-lite", + "roxmltree", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-protocol-test", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "http-body 1.0.1", + "httparse", + "hyper 0.14.31", + "hyper-rustls 0.24.2", + "indexmap 2.7.0", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.2.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28f6feb647fb5e0d5b50f0472c19a7db9462b74e2fec01bb0b44eedcc834e97" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.2.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0df5a18c4f951c645300d365fec53a61418bcf4650f604f85fe2a665bfaa0c2" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.9" @@ -169,7 +536,7 @@ dependencies = [ "axum-core 0.4.5", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "itoa", @@ -196,7 +563,7 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", @@ -229,7 +596,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "mime", @@ -248,7 +615,7 @@ checksum = "eab1b0df7cded837c40dacaa2e1c33aa17c84fc3356ae67b5645f1e83190753e" dependencies = [ "bytes", "futures-core", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "mime", @@ -280,7 +647,7 @@ dependencies = [ "arc-swap", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", @@ -330,6 +697,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -386,6 +763,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -407,6 +793,35 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + +[[package]] +name = "cbor-diag" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc245b6ecd09b23901a4fbad1ad975701fd5061ceaef6afa93a2d70605a64429" +dependencies = [ + "bs58", + "chrono", + "data-encoding", + "half", + "nom", + "num-bigint", + "num-rational", + "num-traits", + "separator", + "url", + "uuid", +] + [[package]] name = "cc" version = "1.2.2" @@ -454,6 +869,33 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -544,6 +986,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -607,6 +1055,12 @@ dependencies = [ "serde", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.7" @@ -615,6 +1069,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -693,6 +1148,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "extend" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "fastrand" version = "2.2.0" @@ -892,7 +1359,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.1.0", + "http 1.2.0", "indexmap 2.7.0", "slab", "tokio", @@ -900,6 +1367,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -921,7 +1398,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http 1.1.0", + "http 1.2.0", "httpdate", "mime", "sha1", @@ -933,7 +1410,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http 1.1.0", + "http 1.2.0", ] [[package]] @@ -948,6 +1425,21 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.9" @@ -970,9 +1462,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" dependencies = [ "bytes", "fnv", @@ -997,7 +1489,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.1.0", + "http 1.2.0", ] [[package]] @@ -1008,7 +1500,7 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "pin-project-lite", ] @@ -1059,7 +1551,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "httparse", "httpdate", @@ -1079,7 +1571,7 @@ dependencies = [ "bytes", "futures-util", "headers", - "http 1.1.0", + "http 1.2.0", "hyper 1.5.1", "hyper-rustls 0.27.3", "hyper-util", @@ -1099,7 +1591,9 @@ dependencies = [ "futures-util", "http 0.2.12", "hyper 0.14.31", + "log", "rustls 0.21.12", + "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.24.1", ] @@ -1111,7 +1605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", - "http 1.1.0", + "http 1.2.0", "hyper 1.5.1", "hyper-util", "log", @@ -1146,7 +1640,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "hyper 1.5.1", "pin-project-lite", @@ -1336,6 +1830,7 @@ checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", "hashbrown 0.15.2", + "serde", ] [[package]] @@ -1447,7 +1942,7 @@ dependencies = [ "either", "futures", "home", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", @@ -1481,7 +1976,7 @@ checksum = "f3030bd91c9db544a50247e7d48d7db9cf633c172732dce13351854526b1e666" dependencies = [ "chrono", "form_urlencoded", - "http 1.1.0", + "http 1.2.0", "k8s-openapi", "serde", "serde-value", @@ -1695,6 +2190,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1768,6 +2274,11 @@ name = "numaflow-core" version = "0.1.0" dependencies = [ "async-nats", + "aws-config", + "aws-sdk-sqs", + "aws-smithy-mocks-experimental", + "aws-smithy-runtime", + "aws-smithy-types", "axum 0.8.2", "axum-server", "backoff", @@ -1775,12 +2286,14 @@ dependencies = [ "bytes", "chrono", "futures", + "http 1.2.0", "hyper-util", "kube", "numaflow 0.2.1", "numaflow-models", "numaflow-pb", "numaflow-pulsar", + "numaflow-sqs", "parking_lot", "pep440_rs", "pin-project", @@ -1847,6 +2360,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "numaflow-sqs" +version = "0.1.0" +dependencies = [ + "aws-config", + "aws-sdk-sqs", + "aws-smithy-mocks-experimental", + "aws-smithy-runtime", + "aws-smithy-types", + "bytes", + "chrono", + "prost 0.11.9", + "serde", + "thiserror 1.0.69", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "object" version = "0.36.5" @@ -1877,6 +2409,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "overload" version = "0.1.1" @@ -2066,6 +2604,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pretty_assertions" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.1.25" @@ -2086,6 +2634,30 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.92" @@ -2426,6 +2998,12 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -2491,7 +3069,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", @@ -2539,6 +3117,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "roxmltree" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b" +dependencies = [ + "xmlparser", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2607,6 +3194,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.7.3" @@ -2770,6 +3369,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "separator" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97841a747eef040fcd2e7b3b9a220a7205926e60488e673d9e4926d27772ce5" + [[package]] name = "serde" version = "1.0.217" @@ -2806,6 +3411,7 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ + "indexmap 2.7.0", "itoa", "memchr", "ryu", @@ -3317,7 +3923,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http 1.1.0", + "http 1.2.0", "httparse", "rand", "ring", @@ -3340,7 +3946,7 @@ dependencies = [ "base64 0.22.1", "bytes", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", @@ -3416,7 +4022,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.6.0", "bytes", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.1", "http-body-util", "mime", @@ -3483,6 +4089,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -3493,12 +4109,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -3588,6 +4207,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -3623,6 +4248,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -3984,6 +4615,18 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yasna" version = "0.5.2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 48f9e31dd5..da5a3f74c4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -8,7 +8,8 @@ members = [ "numaflow-core", "numaflow-pb", "extns/numaflow-pulsar", - "numaflow", + "extns/numaflow-sqs", + "numaflow" ] [workspace.lints.rust] @@ -59,6 +60,7 @@ numaflow-models = { path = "numaflow-models" } backoff = { path = "backoff" } numaflow-pb = { path = "numaflow-pb" } numaflow-pulsar = { path = "extns/numaflow-pulsar" } +numaflow-sqs = { path = "extns/numaflow-sqs" } tokio = "1.43.0" bytes = "1.9.0" tracing = "0.1.41" diff --git a/rust/extns/numaflow-sqs/Cargo.toml b/rust/extns/numaflow-sqs/Cargo.toml new file mode 100644 index 0000000000..3d44f22b5a --- /dev/null +++ b/rust/extns/numaflow-sqs/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "numaflow-sqs" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +tokio.workspace = true +tracing.workspace = true +bytes.workspace = true +serde.workspace = true +aws-config = "1.5.11" +aws-sdk-sqs = "1.51.0" +aws-smithy-runtime = { version = "1.7.6", features = ["test-util"] } +aws-smithy-types = "1.2.11" +chrono = "0.4.38" +tonic = "0.12.3" +prost = "0.11.9" +thiserror = "1.0.69" + + +[dev-dependencies] +aws-smithy-mocks-experimental = "0.2.1" \ No newline at end of file diff --git a/rust/extns/numaflow-sqs/src/lib.rs b/rust/extns/numaflow-sqs/src/lib.rs new file mode 100644 index 0000000000..87eeee5fbb --- /dev/null +++ b/rust/extns/numaflow-sqs/src/lib.rs @@ -0,0 +1,100 @@ +//! Library for robust SQS message handling using an actor-based architecture. +//! +//! This module provides a fault-tolerant interface for interacting with Amazon SQS, +//! with a focus on: +//! - Error propagation and handling for AWS SDK errors +//! - Actor-based concurrency model for thread safety +//! - Clean abstraction of SQS operations +use tokio::sync::oneshot; +pub mod source; + +/// Custom error types for the SQS client library. +/// +/// Design goals: +/// - Ergonomic error handling with thiserror +/// - Clear error propagation from AWS SDK +/// - Explicit handling of actor communication failures +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed with SQS error - {0}")] + Sqs(#[from] aws_sdk_sqs::Error), + + #[error("Failed to receive message from channel. Actor task is terminated: {0:?}")] + ActorTaskTerminated(oneshot::error::RecvError), + + #[error("{0}")] + Other(String), +} + +pub type Result = core::result::Result; + +impl From for Error { + fn from(value: String) -> Self { + Error::Other(value) + } +} + +#[cfg(test)] +mod tests { + use aws_config::BehaviorVersion; + use aws_smithy_mocks_experimental::{mock, MockResponseInterceptor, RuleMode}; + use aws_smithy_types::error::ErrorMetadata; + + use super::*; + + #[tokio::test] + async fn test_sqs_error_conversion() { + let modeled_error = mock!(aws_sdk_sqs::Client::get_queue_url).then_error(|| { + aws_sdk_sqs::operation::get_queue_url::GetQueueUrlError::generic( + ErrorMetadata::builder().code("InvalidAddress").build(), + ) + }); + + let get_object_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&modeled_error); + + let sqs = aws_sdk_sqs::Client::from_conf( + aws_sdk_sqs::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .region(aws_sdk_sqs::config::Region::new("us-east-1")) + .credentials_provider(make_sqs_test_credentials()) + .interceptor(get_object_mocks) + .build(), + ); + let err = sqs.get_queue_url().send().await.unwrap_err(); + + let converted_error = Error::Sqs(err.into()); + assert!(matches!(converted_error, Error::Sqs(_))); + assert!(converted_error + .to_string() + .contains("Failed with SQS error")); + } + + #[test] + fn test_string_error_conversion() { + let str_err = "custom error message".to_string(); + let err: Error = str_err.into(); + assert!(matches!(err, Error::Other(_))); + assert_eq!(err.to_string(), "custom error message"); + } + + #[tokio::test] + async fn test_actor_task_terminated() { + let (tx, rx) = oneshot::channel::<()>(); + drop(tx); // Force the error + let err = Error::ActorTaskTerminated(rx.await.unwrap_err()); + assert!(matches!(err, Error::ActorTaskTerminated(_))); + assert!(err.to_string().contains("Actor task is terminated")); + } + + fn make_sqs_test_credentials() -> aws_sdk_sqs::config::Credentials { + aws_sdk_sqs::config::Credentials::new( + "ATESTCLIENT", + "astestsecretkey", + Some("atestsessiontoken".to_string()), + None, + "", + ) + } +} diff --git a/rust/extns/numaflow-sqs/src/source.rs b/rust/extns/numaflow-sqs/src/source.rs new file mode 100644 index 0000000000..f0f05a63ab --- /dev/null +++ b/rust/extns/numaflow-sqs/src/source.rs @@ -0,0 +1,730 @@ +//! Implementation of the SQS message source using an actor-based architecture. +//! +//! Key design features: +//! - Actor model for thread-safe state management +//! - Batched message handling for efficiency +//! - Robust error handling and retry logic +//! - Configurable timeouts and batch sizes +use std::collections::HashMap; +use std::time::Duration; + +use aws_config::{meta::region::RegionProviderChain, BehaviorVersion}; +use aws_sdk_sqs::config::Region; +use aws_sdk_sqs::types::{MessageSystemAttributeName, QueueAttributeName}; +use aws_sdk_sqs::Client; +use aws_smithy_types::timeout::TimeoutConfig; +use bytes::Bytes; +use chrono::{DateTime, TimeZone, Utc}; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::Instant; + +use crate::Error::ActorTaskTerminated; +use crate::{Error, Result}; + +pub const SQS_DEFAULT_REGION: &str = "us-west-2"; + +/// Configuration for an SQS message source. +/// +/// Used to initialize the SQS client with region and queue settings. +/// Implements serde::Deserialize to support loading from configuration files. +/// TODO: add support for all sqs configs and different ways to authenticate +#[derive(serde::Deserialize, Clone, PartialEq)] +pub struct SqsSourceConfig { + pub region: String, + pub queue_name: String, +} + +/// Internal message types for the actor implementation. +/// +/// The actor pattern is used to: +/// - Ensure thread-safe access to the SQS client +/// - Manage connection state and retries +/// - Handle concurrent requests without locks +enum SqsActorMessage { + Receive { + respond_to: oneshot::Sender>>, + count: i32, + timeout_at: Instant, + }, + Delete { + respond_to: oneshot::Sender>, + offsets: Vec, + }, + GetPending { + respond_to: oneshot::Sender>>, + }, +} + +/// Represents a message received from SQS with metadata. +/// +/// Design choices: +/// - Uses Bytes for efficient handling of message payloads +/// - Includes full message metadata for debugging +/// - Maintains original SQS attributes and headers +#[derive(Debug)] +pub struct SqsMessage { + pub key: String, + pub payload: Bytes, + pub offset: String, + pub event_time: DateTime, + pub attributes: Option>, + pub headers: HashMap, +} + +/// Internal actor implementation for managing SQS interactions. +/// +/// The actor maintains: +/// - Single SQS client instance +/// - Message channel for handling concurrent requests +struct SqsActor { + handler_rx: mpsc::Receiver, + client: Client, + queue_url: String, +} + +impl SqsActor { + fn new(handler_rx: mpsc::Receiver, client: Client, queue_url: String) -> Self { + Self { + handler_rx, + client, + queue_url, + } + } + + async fn create_sqs_client(config: Option) -> Client { + let region = match config { + Some(config) => config.region.clone(), + None => SQS_DEFAULT_REGION.to_string(), + }; + + tracing::info!(region = region.clone(), "Creating SQS client in region"); + + // read aws config + let region_provider = RegionProviderChain::first_try(Region::new(region.clone())) + .or_default_provider() + .or_else(Region::new(SQS_DEFAULT_REGION)); + + let shared_config = aws_config::defaults(BehaviorVersion::v2024_03_28()) + .timeout_config( + TimeoutConfig::builder() + .operation_attempt_timeout(Duration::from_secs(10)) + .build(), + ) + .region(region_provider) + .load() + .await; + + // create sqs client + Client::new(&shared_config) + } + + async fn run(&mut self) { + while let Some(msg) = self.handler_rx.recv().await { + self.handle_message(msg).await; + } + } + + async fn handle_message(&mut self, msg: SqsActorMessage) { + match msg { + SqsActorMessage::Receive { + respond_to, + count, + timeout_at, + } => { + let messages = self.get_messages(count, timeout_at).await; + respond_to + .send(messages) + .expect("failed to send response from SqsActorMessage::Receive"); + } + SqsActorMessage::Delete { + respond_to, + offsets, + } => { + let status = self.delete_messages(offsets).await; + respond_to + .send(status) + .expect("failed to send response from SqsActorMessage::Delete"); + } + SqsActorMessage::GetPending { respond_to } => { + let status = self.get_pending_messages().await; + respond_to + .send(status) + .expect("failed to send response from SqsActorMessage::GetPending"); + } + } + } + + /// Retrieves messages from SQS with timeout and batching. + /// + /// Implementation details: + /// - Respects timeout for long polling + /// - Processes message attributes and system metadata + /// - Returns messages in a normalized format + async fn get_messages(&mut self, count: i32, timeout_at: Instant) -> Result> { + let remaining_time = timeout_at - Instant::now(); + + // default to one second if remaining time is less than one second + // as sqs sdk requires wait_time_seconds to be at least 1 + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-short-long-polling-differences + // TODO: find a better way to handle user input timeout. should allow the users + // to choose long/short polling. For now, we default to 1 second (long polling). + let remaining_time = if remaining_time.as_millis() < 1000 { + Duration::from_secs(1) + } else { + remaining_time + }; + + let sdk_response = self + .client + .receive_message() + .queue_url(self.queue_url.clone()) + .max_number_of_messages(count) + .message_attribute_names("All") + .message_system_attribute_names(MessageSystemAttributeName::All) + .wait_time_seconds(remaining_time.as_secs() as i32) + .send() + .await; + + let receive_message_output = match sdk_response { + Ok(output) => output, + Err(err) => { + tracing::error!( + ?err, + queue_url = self.queue_url, + "failed to receive messages from SQS" + ); + return Err(Error::Sqs(err.into())); + } + }; + + let messages = receive_message_output + .messages + .unwrap_or_default() + .iter() + .map(|msg| { + let key = msg.message_id.clone().unwrap_or_default(); + let payload = Bytes::from(msg.body.clone().unwrap_or_default()); + let offset = msg.receipt_handle.clone().unwrap_or_default(); + + // event_time is set to match the SentTimestamp attribute if available + // see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestSyntax + let event_time = msg + .attributes + .as_ref() + .and_then(|attrs| attrs.get(&MessageSystemAttributeName::SentTimestamp)) + .and_then(|timestamp| timestamp.parse::().ok()) + .and_then(|timestamp| Utc.timestamp_millis_opt(timestamp).single()) + .unwrap_or_else(Utc::now); + + let attributes = msg.message_attributes.as_ref().map(|attrs| { + attrs + .iter() + .map(|(k, v)| (k.clone(), v.string_value.clone().unwrap_or_default())) + .collect() + }); + + let headers = msg + .attributes + .as_ref() + .map(|attrs| { + attrs + .iter() + .map(|(k, v)| (k.to_string().clone(), v.to_string().clone())) + .collect() + }) + .unwrap_or_default(); + + SqsMessage { + key, + payload, + offset, + event_time, + attributes, + headers, + } + }) + .collect(); + + Ok(messages) + } + + // delete message from SQS, serves as Numaflow source ack. + async fn delete_messages(&mut self, offsets: Vec) -> Result<()> { + for offset in offsets { + let offset = match std::str::from_utf8(&offset) { + Ok(offset) => offset, + Err(err) => { + tracing::error!(?err, "failed to parse offset"); + return Err(Error::Other("failed to parse offset".to_string())); + } + }; + if let Err(err) = self + .client + .delete_message() + .queue_url(self.queue_url.clone()) + .receipt_handle(offset) + .send() + .await + { + tracing::error!( + ?err, + "{} {}", + self.queue_url.clone(), + "Error while deleting message from SQS" + ); + return Err(Error::Sqs(err.into())); + } + } + Ok(()) + } + + // get the pending message count from SQS using the ApproximateNumberOfMessages attribute + // Note: The ApproximateNumberOfMessages metrics may not achieve consistency until at least + // 1 minute after the producers stop sending messages. + // This period is required for the queue metadata to reach eventual consistency. + async fn get_pending_messages(&mut self) -> Result> { + let sdk_response = self + .client + .get_queue_attributes() + .queue_url(self.queue_url.clone()) + .attribute_names(aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages) + .send() + .await; + + let get_queue_attributes_output = match sdk_response { + Ok(output) => output, + Err(err) => { + tracing::error!( + ?err, + queue_url = self.queue_url, + "failed to get queue attributes from SQS" + ); + return Err(Error::Sqs(err.into())); + } + }; + + let attributes = match get_queue_attributes_output.attributes { + Some(attributes) => attributes, + None => return Ok(None), + }; + + let value = match attributes.get(&QueueAttributeName::ApproximateNumberOfMessages) { + Some(value) => value, + None => return Ok(None), + }; + + let approx_pending_messages_count = match value.parse::() { + Ok(count) => count, + Err(err) => { + tracing::error!(?err, "failed to parse ApproximateNumberOfMessages"); + return Err(Error::Other( + "failed to parse ApproximateNumberOfMessages".to_string(), + )); + } + }; + Ok(Some(approx_pending_messages_count)) + } +} + +/// Public interface for interacting with SQS queues. +/// +/// Design principles: +/// - Thread-safe through actor model +/// - Configurable batch sizes and timeouts +/// - Clean abstraction of SQS complexity +/// - Efficient message processing +#[derive(Clone)] +pub struct SqsSource { + batch_size: usize, + /// timeout for each batch read request + timeout: Duration, + actor_tx: mpsc::Sender, +} + +/// Builder for creating an `SqsSource`. +/// +/// This builder allows for configuring the SQS source with various parameters +/// such as region, queue name, batch size, timeout, and an optional SQS client. +#[derive(Clone)] +pub struct SqsSourceBuilder { + config: SqsSourceConfig, + batch_size: usize, + timeout: Duration, + client: Option, +} + +impl Default for SqsSourceBuilder { + fn default() -> Self { + Self::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "".to_string(), + }) + } +} + +impl SqsSourceBuilder { + pub fn new(config: SqsSourceConfig) -> Self { + Self { + config, + batch_size: 1, + timeout: Duration::from_secs(1), + client: None, + } + } + pub fn config(mut self, config: SqsSourceConfig) -> Self { + self.config = config; + self + } + + pub fn batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + pub fn client(mut self, client: Client) -> Self { + self.client = Some(client); + self + } + + /// Builds an `SqsSource` instance with the provided configuration. + /// + /// This method consumes `self` and initializes the SQS client, retrieves the queue URL, + /// and spawns an actor to handle SQS interactions. It returns a `Result` containing + /// the constructed `SqsSource` or an error if the initialization fails. + /// + /// # Returns + /// - `Ok(SqsSource)` if the source is successfully built. + /// - `Err(Error)` if there is an error during the initialization process. + pub async fn build(self) -> Result { + let sqs_client = match self.client { + Some(client) => client, + None => SqsActor::create_sqs_client(Some(self.config.clone())).await, + }; + + let queue_name = self.config.queue_name.clone(); + + let get_queue_url_output = sqs_client + .get_queue_url() + .queue_name(queue_name) + .send() + .await + .map_err(|err| Error::Sqs(err.into()))?; + + let queue_url = get_queue_url_output + .queue_url + .ok_or_else(|| Error::Other("Queue URL not found".to_string()))?; + + tracing::info!(queue_url = queue_url.clone(), "Queue URL found"); + + let (handler_tx, handler_rx) = mpsc::channel(10); + tokio::spawn(async move { + let mut actor = SqsActor::new(handler_rx, sqs_client, queue_url); + actor.run().await; + }); + + Ok(SqsSource { + batch_size: self.batch_size, + timeout: self.timeout, + actor_tx: handler_tx, + }) + } +} + +impl SqsSource { + // read messages from SQS, corresponding sqs sdk method is receive_message + pub async fn read_messages(&self) -> Result> { + tracing::debug!("Reading messages from SQS"); + let start = Instant::now(); + let (tx, rx) = oneshot::channel(); + + let msg = SqsActorMessage::Receive { + respond_to: tx, + count: self.batch_size as i32, + timeout_at: start + self.timeout, + }; + + let _ = self.actor_tx.send(msg).await; + let messages = rx.await.map_err(ActorTaskTerminated)??; + tracing::debug!( + count = messages.len(), + requested_count = self.batch_size, + time_taken_ms = start.elapsed().as_millis(), + "Got messages from sqs" + ); + Ok(messages) + } + + // acknowledge the offsets of the messages read from SQS + // corresponding sqs sdk method is delete_message + pub async fn ack_offsets(&self, offsets: Vec) -> Result<()> { + tracing::debug!(offsets = ?offsets, "Acknowledging offsets"); + let (tx, rx) = oneshot::channel(); + let msg = SqsActorMessage::Delete { + offsets, + respond_to: tx, + }; + let _ = self.actor_tx.send(msg).await; + rx.await.map_err(Error::ActorTaskTerminated)? + } + + // get the pending message count from SQS + // corresponding sqs sdk method is get_queue_attributes + // with the attribute name ApproximateNumberOfMessages + pub async fn pending_count(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let msg = SqsActorMessage::GetPending { respond_to: tx }; + let _ = self.actor_tx.send(msg).await; + + let actor_result = rx.await.map_err(Error::ActorTaskTerminated); + + match actor_result { + Ok(Ok(Some(count))) => { + tracing::debug!(pending_count = count, "Pending message count retrieved"); + Some(count) + } + _ => None, + } + } + + /// Returns the partitions for the SQS source. + /// + /// This method is currently unimplemented in this module. + /// Note: It is implemented in the core to return the current vertex replica. + /// See `numaflow-core/src/source/sqs.rs` for the implementation. + pub fn partitions(&self) -> Vec { + unimplemented!() + } +} + +#[cfg(test)] +mod tests { + use aws_sdk_sqs::Config; + use aws_smithy_mocks_experimental::{mock, MockResponseInterceptor, Rule, RuleMode}; + use aws_smithy_types::error::ErrorMetadata; + + use super::*; + + #[tokio::test] + async fn test_sqssourcehandle_read() { + let queue_url_output = get_queue_url_output(); + + let receive_message_output = get_receive_message_output(); + + let sqs_operation_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&queue_url_output) + .with_rule(&receive_message_output); + + let sqs_mock_client = + Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); + + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); + + // Read messages from the source + let messages = source.read_messages().await.unwrap(); + + // Assert we got the expected number of messages + assert_eq!(messages.len(), 1, "Should receive exactly 1 message"); + + // Verify first message + let msg1 = &messages[0]; + assert_eq!(msg1.key, "219f8380-5770-4cc2-8c3e-5c715e145f5e"); + assert_eq!(msg1.payload, "This is a test message"); + assert_eq!( + msg1.offset, + "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q" + ); + } + + #[tokio::test] + async fn test_sqssource_ack() { + let queue_url_output = get_queue_url_output(); + let delete_message_output = get_delete_message_output(); + + let sqs_operation_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&queue_url_output) + .with_rule(&delete_message_output); + + let sqs_mock_client = + Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); + + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); + + // Test acknowledgment + let offset = "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q"; + let result = source.ack_offsets(vec![Bytes::from(offset)]).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_sqssource_pending_count() { + let queue_url_output = get_queue_url_output(); + let queue_attrs_output = get_queue_attributes_output(); + + let sqs_operation_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&queue_url_output) + .with_rule(&queue_attrs_output); + + let sqs_mock_client = + Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); + + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await + .unwrap(); + + let count = source.pending_count().await; + assert_eq!(count, Some(0)); + } + + #[tokio::test] + async fn test_error_cases() { + // Test invalid region error + let sqs_operation_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&get_queue_url_output_err()); + + let sqs_mock_client = + Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); + + let source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(0)) + .client(sqs_mock_client) + .build() + .await; + assert!(source.is_err()); + } + + #[tokio::test] + #[should_panic(expected = "not implemented")] + async fn test_partitions_unimplemented() { + let source = SqsSource { + batch_size: 1, + timeout: Duration::from_secs(0), + actor_tx: mpsc::channel(1).0, + }; + source.partitions(); + } + + fn get_queue_attributes_output() -> Rule { + let queue_attributes_output = mock!(aws_sdk_sqs::Client::get_queue_attributes) + .match_requests(|inp| { + inp.queue_url().unwrap() + == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + }) + .then_output(|| { + aws_sdk_sqs::operation::get_queue_attributes::GetQueueAttributesOutput::builder() + .attributes( + aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages, + "0", + ) + .build() + }); + queue_attributes_output + } + + fn get_delete_message_output() -> Rule { + let delete_message_output = mock!(aws_sdk_sqs::Client::delete_message) + .match_requests(|inp| { + inp.queue_url().unwrap() == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + && inp.receipt_handle().unwrap() == "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q" + }) + .then_output(|| { + aws_sdk_sqs::operation::delete_message::DeleteMessageOutput::builder().build() + }); + delete_message_output + } + + fn get_receive_message_output() -> Rule { + let receive_message_output = mock!(aws_sdk_sqs::Client::receive_message) + .match_requests(|inp| { + inp.queue_url().unwrap() == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + }) + .then_output(|| { + aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput::builder() + .messages( + aws_sdk_sqs::types::Message::builder() + .message_id("219f8380-5770-4cc2-8c3e-5c715e145f5e") + .body("This is a test message") + .receipt_handle("AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q") + .attributes(MessageSystemAttributeName::SentTimestamp, "1677112427387") + .build() + ) + .build() + }); + receive_message_output + } + + fn get_queue_url_output() -> Rule { + let queue_url_output = mock!(aws_sdk_sqs::Client::get_queue_url) + .match_requests(|inp| inp.queue_name().unwrap() == "test-q") + .then_output(|| { + aws_sdk_sqs::operation::get_queue_url::GetQueueUrlOutput::builder() + .queue_url("https://sqs.us-west-2.amazonaws.com/926113353675/test-q/") + .build() + }); + queue_url_output + } + + fn get_queue_url_output_err() -> Rule { + mock!(aws_sdk_sqs::Client::get_queue_url).then_error(|| { + aws_sdk_sqs::operation::get_queue_url::GetQueueUrlError::generic( + ErrorMetadata::builder().code("InvalidAddress").build(), + ) + }) + } + + fn get_test_config_with_interceptor(interceptor: MockResponseInterceptor) -> Config { + aws_sdk_sqs::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(make_sqs_test_credentials()) + .region(aws_sdk_sqs::config::Region::new(SQS_DEFAULT_REGION)) + .interceptor(interceptor) + .build() + } + + fn make_sqs_test_credentials() -> aws_sdk_sqs::config::Credentials { + aws_sdk_sqs::config::Credentials::new( + "ATESTCLIENT", + "astestsecretkey", + Some("atestsessiontoken".to_string()), + None, + "", + ) + } +} diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index f22424a5fd..46c544e84b 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" nats-tests = [] pulsar-tests = [] redis-tests = [] -all-tests = ["nats-tests", "pulsar-tests", "redis-tests"] +sqs-tests = [] +all-tests = ["nats-tests", "pulsar-tests", "redis-tests", "sqs-tests"] [lints] workspace = true @@ -16,6 +17,7 @@ workspace = true tokio.workspace = true tracing.workspace = true numaflow-pulsar.workspace = true +numaflow-sqs.workspace = true numaflow-models.workspace = true numaflow-pb.workspace = true serving.workspace = true @@ -54,6 +56,12 @@ numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362 pulsar = { version = "6.3.0", default-features = false, features = [ "tokio-rustls-runtime", ] } +aws-smithy-runtime = { version = "1.7.6", features = ["test-util"] } +aws-smithy-types = "1.2.11" +http = "1.2.0" +aws-config = "1.5.11" +aws-sdk-sqs = "1.51.0" reqwest = { workspace = true, features = ["json"] } +aws-smithy-mocks-experimental = "0.2.1" [build-dependencies] diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 92ca072287..01ca35742d 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -18,7 +18,7 @@ use crate::{metrics, shared}; /// - Invokes the SourceTransformer concurrently /// - Calls the Sinker to write the batch to the Sink /// - Send Acknowledgement back to the Source -mod forwarder; +pub(crate) mod forwarder; pub(crate) async fn start_forwarder( cln_token: CancellationToken, diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index 27f594f5f5..33f9def62c 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use numaflow_pulsar::source::PulsarSource; +use numaflow_sqs::source::SqsSource; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::sync::{mpsc, oneshot}; @@ -45,6 +46,8 @@ pub(crate) mod generator; pub(crate) mod pulsar; pub(crate) mod serving; +pub(crate) mod sqs; + use serving::ServingSource; use crate::transformer::Transformer; @@ -81,6 +84,7 @@ pub(crate) enum SourceType { generator::GeneratorLagReader, ), Pulsar(PulsarSource), + Sqs(SqsSource), Serving(ServingSource), } @@ -202,6 +206,17 @@ impl Source { actor.run().await; }); } + SourceType::Sqs(sqs_source) => { + tokio::spawn(async move { + let actor = SourceActor::new( + receiver, + sqs_source.clone(), + sqs_source.clone(), + sqs_source, + ); + actor.run().await; + }); + } SourceType::Serving(serving) => { tokio::spawn(async move { let actor = diff --git a/rust/numaflow-core/src/source/sqs.rs b/rust/numaflow-core/src/source/sqs.rs new file mode 100644 index 0000000000..3de406ed3d --- /dev/null +++ b/rust/numaflow-core/src/source/sqs.rs @@ -0,0 +1,326 @@ +use std::sync::Arc; +use std::time::Duration; + +use numaflow_sqs::source::{SqsMessage, SqsSource, SqsSourceBuilder, SqsSourceConfig}; + +use crate::config::{get_vertex_name, get_vertex_replica}; +use crate::error::Error; +use crate::message::{Message, MessageID, Offset, StringOffset}; +use crate::source; + +impl TryFrom for Message { + type Error = Error; + + fn try_from(message: SqsMessage) -> crate::Result { + let offset = Offset::String(StringOffset::new(message.offset, *get_vertex_replica())); + + Ok(Message { + keys: Arc::from(vec![message.key]), + tags: None, + value: message.payload, + offset: offset.clone(), + event_time: message.event_time, + watermark: Some(message.event_time), + id: MessageID { + vertex_name: get_vertex_name().to_string().into(), + offset: offset.to_string().into(), + index: 0, + }, + headers: message.headers, + metadata: None, + }) + } +} + +impl From for Error { + fn from(value: numaflow_sqs::Error) -> Self { + match value { + numaflow_sqs::Error::Sqs(e) => Error::Source(e.to_string()), + numaflow_sqs::Error::ActorTaskTerminated(_) => { + Error::ActorPatternRecv(value.to_string()) + } + numaflow_sqs::Error::Other(e) => Error::Source(e), + } + } +} + +pub(crate) async fn new_sqs_source( + cfg: SqsSourceConfig, + batch_size: usize, + timeout: Duration, +) -> crate::Result { + Ok(SqsSourceBuilder::new(cfg) + .batch_size(batch_size) + .timeout(timeout) + .build() + .await?) +} + +impl source::SourceReader for SqsSource { + fn name(&self) -> &'static str { + "Sqs" + } + + async fn read(&mut self) -> crate::Result> { + self.read_messages() + .await? + .into_iter() + .map(|msg| msg.try_into()) + .collect() + } + + // if source doesn't support partitions, we should return the vec![vertex_replica] + fn partitions(&self) -> Vec { + vec![*get_vertex_replica()] + } +} + +impl source::SourceAcker for SqsSource { + async fn ack(&mut self, offsets: Vec) -> crate::error::Result<()> { + let mut sqs_offsets = Vec::with_capacity(offsets.len()); + for offset in offsets { + let Offset::String(string_offset) = offset else { + return Err(Error::Source(format!( + "Expected Offset::String type for SQS. offset={offset:?}" + ))); + }; + sqs_offsets.push(string_offset.offset); + } + self.ack_offsets(sqs_offsets).await.map_err(Into::into) + } +} + +impl source::LagReader for SqsSource { + async fn pending(&mut self) -> crate::error::Result> { + Ok(self.pending_count().await) + } +} + +#[cfg(feature = "sqs-tests")] +#[cfg(test)] +pub mod tests { + use std::collections::HashMap; + + use aws_sdk_sqs::config::BehaviorVersion; + use aws_sdk_sqs::types::MessageSystemAttributeName; + use aws_sdk_sqs::Config; + use aws_smithy_mocks_experimental::{mock, MockResponseInterceptor, Rule, RuleMode}; + use bytes::Bytes; + use chrono::Utc; + use numaflow_sqs::source::{SqsSourceBuilder, SQS_DEFAULT_REGION}; + use tokio::task::JoinHandle; + use tokio_util::sync::CancellationToken; + + use super::*; + use crate::source::{Source, SourceType}; + + #[tokio::test] + async fn test_sqs_message_conversion() { + let ts = Utc::now(); + let mut headers = HashMap::new(); + headers.insert("foo".to_string(), "bar".to_string()); + + let sqs_message = SqsMessage { + key: "key".to_string(), + payload: Bytes::from("value".to_string()), + offset: "offset".to_string(), + event_time: ts, + attributes: None, + headers: headers.clone(), + }; + + let message: Message = sqs_message.try_into().unwrap(); + + assert_eq!(message.keys.len(), 1); + assert_eq!(message.keys[0], "key"); + assert_eq!(message.value, "value"); + assert_eq!( + message.offset, + Offset::String(StringOffset::new("offset".to_string(), 0)), + ); + assert_eq!(message.event_time, ts); + assert_eq!(message.headers, headers); + } + + #[tokio::test] + async fn test_sqs_e2e() { + let queue_url_output = get_queue_url_output(); + + let receive_message_output = get_receive_message_output(); + + let delete_message_output = get_delete_message_output(); + + let queue_attributes_output = get_queue_attributes_output(); + + let sqs_operation_mocks = MockResponseInterceptor::new() + .rule_mode(RuleMode::MatchAny) + .with_rule(&queue_url_output) + .with_rule(&receive_message_output) + .with_rule(&delete_message_output) + .with_rule(&queue_attributes_output); + + let sqs_client = + aws_sdk_sqs::Client::from_conf(get_test_config_with_interceptor(sqs_operation_mocks)); + + let sqs_source = SqsSourceBuilder::new(SqsSourceConfig { + region: SQS_DEFAULT_REGION.to_string(), + queue_name: "test-q".to_string(), + }) + .batch_size(1) + .timeout(Duration::from_secs(1)) + .client(sqs_client) + .build() + .await + .unwrap(); + + // create SQS source with test client + use crate::tracker::TrackerHandle; + let tracker_handle = TrackerHandle::new(None, None); + let source = Source::new( + 1, + SourceType::Sqs(sqs_source), + tracker_handle.clone(), + true, + None, + None, + ); + + // create sink writer + use crate::sink::{SinkClientType, SinkWriterBuilder}; + let sink_writer = SinkWriterBuilder::new( + 10, + Duration::from_millis(100), + SinkClientType::Log, + tracker_handle.clone(), + ) + .build() + .await + .unwrap(); + + // create the forwarder with the source and sink writer + let cln_token = CancellationToken::new(); + let forwarder = crate::monovertex::forwarder::Forwarder::new( + source.clone(), + sink_writer, + cln_token.clone(), + ); + + let _forwarder_handle: JoinHandle> = tokio::spawn(async move { + forwarder.start().await?; + Ok(()) + }); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + // wait for one sec to check if the pending becomes zero, because all the messages + // should be read and acked; if it doesn't, then fail the test + let tokio_result = tokio::time::timeout(Duration::from_secs(5), async move { + loop { + let pending = source.pending().await.unwrap(); + if pending == Some(0) { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await; + + assert!( + tokio_result.is_ok(), + "Timeout occurred before pending became zero" + ); + + tracing::info!("queue url output calls: {}", queue_url_output.num_calls()); + tracing::info!( + "receive message output calls: {}", + receive_message_output.num_calls() + ); + tracing::info!( + "delete message output calls: {}", + delete_message_output.num_calls() + ); + tracing::info!( + "queue attributes output calls: {}", + queue_attributes_output.num_calls() + ); + } + + fn get_queue_attributes_output() -> Rule { + let queue_attributes_output = mock!(aws_sdk_sqs::Client::get_queue_attributes) + .match_requests(|inp| { + inp.queue_url().unwrap() + == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + }) + .then_output(|| { + aws_sdk_sqs::operation::get_queue_attributes::GetQueueAttributesOutput::builder() + .attributes( + aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages, + "0", + ) + .build() + }); + queue_attributes_output + } + + fn get_delete_message_output() -> Rule { + let delete_message_output = mock!(aws_sdk_sqs::Client::delete_message) + .match_requests(|inp| { + inp.queue_url().unwrap() == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + && inp.receipt_handle().unwrap() == "AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q" + }) + .then_output(|| { + aws_sdk_sqs::operation::delete_message::DeleteMessageOutput::builder().build() + }); + delete_message_output + } + + fn get_receive_message_output() -> Rule { + let receive_message_output = mock!(aws_sdk_sqs::Client::receive_message) + .match_requests(|inp| { + inp.queue_url().unwrap() == "https://sqs.us-west-2.amazonaws.com/926113353675/test-q/" + }) + .then_output(|| { + aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput::builder() + .messages( + aws_sdk_sqs::types::Message::builder() + .message_id("219f8380-5770-4cc2-8c3e-5c715e145f5e") + .body("This is a test message") + .receipt_handle("AQEBaZ+j5qUoOAoxlmrCQPkBm9njMWXqemmIG6shMHCO6fV20JrQYg/AiZ8JELwLwOu5U61W+aIX5Qzu7GGofxJuvzymr4Ph53RiR0mudj4InLSgpSspYeTRDteBye5tV/txbZDdNZxsi+qqZA9xPnmMscKQqF6pGhnGIKrnkYGl45Nl6GPIZv62LrIRb6mSqOn1fn0yqrvmWuuY3w2UzQbaYunJWGxpzZze21EOBtywknU3Je/g7G9is+c6K9hGniddzhLkK1tHzZKjejOU4jokaiB4nmi0dF3JqLzDsQuPF0Gi8qffhEvw56nl8QCbluSJScFhJYvoagGnDbwOnd9z50L239qtFIgETdpKyirlWwl/NGjWJ45dqWpiW3d2Ws7q") + .attributes(MessageSystemAttributeName::SentTimestamp, "1677112427387") + .build() + ) + .build() + }); + receive_message_output + } + + fn get_queue_url_output() -> Rule { + let queue_url_output = mock!(aws_sdk_sqs::Client::get_queue_url) + .match_requests(|inp| inp.queue_name().unwrap() == "test-q") + .then_output(|| { + aws_sdk_sqs::operation::get_queue_url::GetQueueUrlOutput::builder() + .queue_url("https://sqs.us-west-2.amazonaws.com/926113353675/test-q/") + .build() + }); + queue_url_output + } + + fn get_test_config_with_interceptor(interceptor: MockResponseInterceptor) -> Config { + aws_sdk_sqs::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(make_sqs_test_credentials()) + .region(aws_sdk_sqs::config::Region::new("us-west-2")) + .interceptor(interceptor) + .build() + } + + fn make_sqs_test_credentials() -> aws_sdk_sqs::config::Credentials { + aws_sdk_sqs::config::Credentials::new( + "ATESTCLIENT", + "astestsecretkey", + Some("atestsessiontoken".to_string()), + None, + "", + ) + } +}