Skip to content

Commit 9c221e9

Browse files
refs #313: Add implementation for listening to FIFO SQS queues (#335)
1 parent 9f87385 commit 9c221e9

File tree

40 files changed

+3431
-117
lines changed

40 files changed

+3431
-117
lines changed

Diff for: README.md

+65
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,71 @@ prefetchingMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.her
659659
}
660660
```
661661
662+
### Listening to a FIFO SQS Queue
663+
664+
FIFO SQS Queues can be used when the order of the SQS messages are important. The FIFO message listener guarantees messages
665+
in the same message group run in the order they are generated and two messages in the same group executed concurrently. For more information about
666+
the configuration options for the message listener take a look at the
667+
[FifoMessageListenerContainerProperties](core/src/main/java/com/jashmore/sqs/container/fifo/FifoMessageListenerContainerProperties.java) or the specific
668+
annotation or DSL builder for the framework implementation.
669+
670+
### Java
671+
672+
```java
673+
public class Main {
674+
675+
public static void main(String[] args) throws InterruptedException {
676+
final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
677+
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
678+
final MessageListenerContainer container = new FifoMessageListenerContainer(
679+
queueProperties,
680+
sqsAsyncClient,
681+
() ->
682+
new LambdaMessageProcessor(
683+
sqsAsyncClient,
684+
queueProperties,
685+
message -> {
686+
// process the message here
687+
}
688+
),
689+
ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
690+
);
691+
container.start();
692+
Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
693+
Thread.currentThread().join();
694+
}
695+
}
696+
697+
```
698+
699+
### Spring Boot
700+
701+
```java
702+
@Component
703+
class MessageListeners {
704+
705+
@FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
706+
public void fifoListener(@Payload final String body) {
707+
// process message here
708+
}
709+
}
710+
711+
```
712+
713+
### Kotlin DSL/Ktor
714+
715+
```kotlin
716+
fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
717+
concurrencyLevel = { 10 }
718+
719+
processor = lambdaProcessor {
720+
method { message ->
721+
// process the message payload here
722+
}
723+
}
724+
}
725+
```
726+
662727
### Comparing other SQS Libraries
663728
664729
If you want to see the difference in usage between this library and others like the

Diff for: build.gradle.kts

+2-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ subprojects {
117117
violationRules {
118118
rule {
119119
excludes = listOf(
120-
"com.jashmore.sqs.examples*"
120+
"com.jashmore.sqs.examples*",
121+
"com.jashmore.sqs.container.fifo*" // these classes are better handled by integration tests
121122
)
122123
element = "PACKAGE"
123124
limit {

Diff for: core/build.gradle.kts

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11

22
description = "Contains the core functionality for the library"
33

4+
val immutablesVersion: String by project
45
val jacksonVersion: String by project
56
val slf4jVersion: String by project
67

78
dependencies {
9+
// External dependencies
10+
implementation("org.slf4j:slf4j-api:$slf4jVersion")
11+
api("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
12+
annotationProcessor("org.immutables:value:$immutablesVersion")
13+
api("org.immutables:value-annotations:$immutablesVersion")
14+
815
api(project(":java-dynamic-sqs-listener-api"))
916
implementation(project(":annotation-utils"))
1017
compileOnly(project(":documentation-annotations"))
1118
implementation(project(":common-utils"))
12-
implementation("org.slf4j:slf4j-api:$slf4jVersion")
13-
api("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
1419

1520
testImplementation(project(":elasticmq-sqs-client"))
1621
testImplementation(project(":proxy-method-interceptor"))

0 commit comments

Comments
 (0)