Skip to content

Commit 0f4482a

Browse files
closes #410: reduce scope of spring package by pulling logic to core and new annotations package (#411)
1 parent 5630a7a commit 0f4482a

File tree

110 files changed

+1340
-1269
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+1340
-1269
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ The following provides some examples using the library with different languages
3232
```
3333

3434
1. In one of your beans, attach a
35-
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
35+
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
3636
annotation to a method indicating that it should process messages from a queue.
3737

3838
```java
@@ -572,7 +572,7 @@ lambdaProcessor {
572572
The [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) `@SqsListener` works by requesting
573573
a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages
574574
finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The
575-
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
575+
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
576576
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
577577
ready for another message.
578578
@@ -619,7 +619,7 @@ not prefetch anymore._
619619
620620
#### Spring Boot
621621
622-
The [@PrefetchingQueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java)
622+
The [@PrefetchingQueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/prefetch/PrefetchingQueueListener.java)
623623
annotation can be used to prefetch messages in a background thread while processing the existing messages. The usage is something like this:
624624
625625
```java

annotations/README.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Java Dynamic SQS Listener Annotations
2+
3+
Wrapper around the core library that allows for setting up using annotations to simplify the usage.
4+
5+
## More Information
6+
7+
For more information you can look at the root project [README.md](../README.md) which provides more information about the architecture
8+
of the application. The [API](../api) is also a good location to find more information about what each part of the framework is how
9+
they interact with each other.

annotations/build.gradle.kts

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
description = "Contains a way to attach message listeners via annotations"
3+
4+
dependencies {
5+
api(project(":java-dynamic-sqs-listener-core"))
6+
implementation(project(":common-utils"))
7+
implementation(project(":annotation-utils"))
8+
compileOnly(project(":documentation-annotations"))
9+
10+
testImplementation(project(":elasticmq-sqs-client"))
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package com.jashmore.sqs.annotations.container;
2+
3+
import com.jashmore.sqs.QueueProperties;
4+
import com.jashmore.sqs.argument.ArgumentResolverService;
5+
import com.jashmore.sqs.client.QueueResolver;
6+
import com.jashmore.sqs.client.SqsAsyncClientProvider;
7+
import com.jashmore.sqs.container.MessageListenerContainer;
8+
import com.jashmore.sqs.container.MessageListenerContainerFactory;
9+
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
10+
import com.jashmore.sqs.processor.CoreMessageProcessor;
11+
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
12+
import com.jashmore.sqs.processor.MessageProcessor;
13+
import com.jashmore.sqs.util.annotation.AnnotationUtils;
14+
import com.jashmore.sqs.util.identifier.IdentifierUtils;
15+
import com.jashmore.sqs.util.string.StringUtils;
16+
import java.lang.annotation.Annotation;
17+
import java.lang.reflect.Method;
18+
import java.util.Optional;
19+
import java.util.function.Function;
20+
import java.util.function.Supplier;
21+
import lombok.Builder;
22+
import org.immutables.value.Value;
23+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24+
25+
/**
26+
* {@link MessageListenerContainerFactory} that can be used to build against an annotated method.
27+
*
28+
* @param <A> annotation that is applied on the method
29+
*/
30+
public class AnnotationMessageListenerContainerFactory<A extends Annotation> implements MessageListenerContainerFactory {
31+
32+
private final Class<A> annotationClass;
33+
private final Function<A, String> identifierMapper;
34+
private final Function<A, String> sqsClientIdentifier;
35+
private final Function<A, String> queueNameOrUrlMapper;
36+
private final QueueResolver queueResolver;
37+
private final SqsAsyncClientProvider sqsAsyncClientProvider;
38+
private final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory;
39+
private final ArgumentResolverService argumentResolverService;
40+
private final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory;
41+
42+
/**
43+
* Constructor.
44+
*
45+
* @param annotationClass the class instance of the annotation
46+
* @param identifierMapper to convert an annotation to the identifier of the listener
47+
* @param sqsClientIdentifierMapper to convert an annotation to the SQS Client identifier
48+
* @param queueNameOrUrlMapper to convert an annotation to the Queue URL or name
49+
* @param queueResolver to resolve queue names to a URL
50+
* @param sqsAsyncClientProvider the method for obtaining a SQS client from the identifier
51+
* @param decoratingMessageProcessorFactory to wrap the message processing with any decorators
52+
* @param argumentResolverService to map the parameters of the method to values in the message
53+
* @param containerFactory converts details about the annotation to the final {@link MessageListenerContainer}
54+
*/
55+
public AnnotationMessageListenerContainerFactory(
56+
final Class<A> annotationClass,
57+
final Function<A, String> identifierMapper,
58+
final Function<A, String> sqsClientIdentifierMapper,
59+
final Function<A, String> queueNameOrUrlMapper,
60+
final QueueResolver queueResolver,
61+
final SqsAsyncClientProvider sqsAsyncClientProvider,
62+
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory,
63+
final ArgumentResolverService argumentResolverService,
64+
final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory
65+
) {
66+
this.annotationClass = annotationClass;
67+
this.identifierMapper = identifierMapper;
68+
this.sqsClientIdentifier = sqsClientIdentifierMapper;
69+
this.queueNameOrUrlMapper = queueNameOrUrlMapper;
70+
this.queueResolver = queueResolver;
71+
this.sqsAsyncClientProvider = sqsAsyncClientProvider;
72+
this.decoratingMessageProcessorFactory = decoratingMessageProcessorFactory;
73+
this.argumentResolverService = argumentResolverService;
74+
this.containerFactory = containerFactory;
75+
}
76+
77+
@Override
78+
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
79+
throws MessageListenerContainerInitialisationException {
80+
return AnnotationUtils
81+
.findMethodAnnotation(method, this.annotationClass)
82+
.map(annotation -> {
83+
final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation);
84+
final QueueProperties queueProperties = QueueProperties
85+
.builder()
86+
.queueUrl(queueResolver.resolveQueueUrl(sqsAsyncClient, queueNameOrUrlMapper.apply(annotation)))
87+
.build();
88+
final String identifier = IdentifierUtils.buildIdentifierForMethod(
89+
identifierMapper.apply(annotation),
90+
bean.getClass(),
91+
method
92+
);
93+
94+
final Supplier<MessageProcessor> messageProcessorSupplier = () ->
95+
decoratingMessageProcessorFactory.decorateMessageProcessor(
96+
sqsAsyncClient,
97+
identifier,
98+
queueProperties,
99+
bean,
100+
method,
101+
new CoreMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, method, bean)
102+
);
103+
104+
return containerFactory.apply(
105+
AnnotationDetails
106+
.<A>builder()
107+
.identifier(identifier)
108+
.queueProperties(queueProperties)
109+
.sqsAsyncClient(sqsAsyncClient)
110+
.messageProcessorSupplier(messageProcessorSupplier)
111+
.annotation(annotation)
112+
.build()
113+
);
114+
});
115+
}
116+
117+
private SqsAsyncClient getSqsAsyncClient(final A annotation) {
118+
final String sqsClient = sqsClientIdentifier.apply(annotation);
119+
120+
if (!StringUtils.hasText(sqsClient)) {
121+
return sqsAsyncClientProvider
122+
.getDefaultClient()
123+
.orElseThrow(() -> new MessageListenerContainerInitialisationException("Expected the default SQS Client but there is none")
124+
);
125+
}
126+
127+
return sqsAsyncClientProvider
128+
.getClient(sqsClient)
129+
.orElseThrow(() ->
130+
new MessageListenerContainerInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")
131+
);
132+
}
133+
134+
@Value
135+
@Builder
136+
public static class AnnotationDetails<A extends Annotation> {
137+
138+
public String identifier;
139+
public SqsAsyncClient sqsAsyncClient;
140+
public QueueProperties queueProperties;
141+
public Supplier<MessageProcessor> messageProcessorSupplier;
142+
public A annotation;
143+
}
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.jashmore.sqs.annotations.core.basic;
2+
3+
import com.jashmore.sqs.annotations.container.AnnotationMessageListenerContainerFactory;
4+
import com.jashmore.sqs.argument.ArgumentResolverService;
5+
import com.jashmore.sqs.client.QueueResolver;
6+
import com.jashmore.sqs.client.SqsAsyncClientProvider;
7+
import com.jashmore.sqs.container.MessageListenerContainer;
8+
import com.jashmore.sqs.container.MessageListenerContainerFactory;
9+
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
10+
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainer;
11+
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
12+
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
13+
import java.lang.reflect.Method;
14+
import java.util.Optional;
15+
16+
/**
17+
* {@link MessageListenerContainerFactory} that will wrap methods annotated with
18+
* {@link QueueListener @QueueListener} with some predefined implementations of the framework.
19+
*/
20+
public class BasicAnnotationMessageListenerContainerFactory implements MessageListenerContainerFactory {
21+
22+
private final AnnotationMessageListenerContainerFactory<QueueListener> delegate;
23+
24+
public BasicAnnotationMessageListenerContainerFactory(
25+
final ArgumentResolverService argumentResolverService,
26+
final SqsAsyncClientProvider sqsAsyncClientProvider,
27+
final QueueResolver queueResolver,
28+
final QueueListenerParser queueListenerParser,
29+
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
30+
) {
31+
this.delegate =
32+
new AnnotationMessageListenerContainerFactory<>(
33+
QueueListener.class,
34+
QueueListener::identifier,
35+
QueueListener::sqsClient,
36+
QueueListener::value,
37+
queueResolver,
38+
sqsAsyncClientProvider,
39+
decoratingMessageProcessorFactory,
40+
argumentResolverService,
41+
details -> {
42+
final BatchingMessageListenerContainerProperties properties = queueListenerParser.parse(details.annotation);
43+
return new BatchingMessageListenerContainer(
44+
details.identifier,
45+
details.queueProperties,
46+
details.sqsAsyncClient,
47+
details.messageProcessorSupplier,
48+
properties
49+
);
50+
}
51+
);
52+
}
53+
54+
@Override
55+
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
56+
throws MessageListenerContainerInitialisationException {
57+
return this.delegate.buildContainer(bean, method);
58+
}
59+
}

spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.jashmore.sqs.spring.container.basic;
1+
package com.jashmore.sqs.annotations.core.basic;
22

33
import static java.lang.annotation.ElementType.METHOD;
44
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@@ -7,23 +7,23 @@
77
import com.jashmore.sqs.aws.AwsConstants;
88
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
99
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
10+
import com.jashmore.sqs.client.SqsAsyncClientProvider;
1011
import com.jashmore.sqs.container.MessageListenerContainer;
12+
import com.jashmore.sqs.placeholder.PlaceholderResolver;
1113
import com.jashmore.sqs.processor.CoreMessageProcessor;
1214
import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever;
1315
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
14-
import com.jashmore.sqs.spring.client.SqsAsyncClientProvider;
1516
import java.lang.annotation.Retention;
1617
import java.lang.annotation.Target;
17-
import org.springframework.core.env.Environment;
1818
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
1919

2020
/**
2121
* Wrap a method with a {@link MessageListenerContainer} that will execute the method whenever a message is received on the provided queue.
2222
*
2323
* <p>This is a simplified annotation that uses the {@link ConcurrentMessageBroker}, {@link BatchingMessageRetriever} and {@link CoreMessageProcessor}
24-
* for the implementations of the framework. Not all of the properties for each implementation are available to simplify this usage.
24+
* for the implementations of the framework. Not all the properties for each implementation are available to simplify this usage.
2525
*
26-
* @see BasicMessageListenerContainerFactory for what processes this annotation
26+
* @see BasicAnnotationMessageListenerContainerFactory for what processes this annotation
2727
*/
2828
@Retention(RUNTIME)
2929
@Target(METHOD)
@@ -40,7 +40,7 @@
4040
* </ul>
4141
*
4242
* @return the queue name or URL of the queue
43-
* @see Environment#resolveRequiredPlaceholders(String) for how the placeholders are resolved
43+
* @see PlaceholderResolver#resolvePlaceholders(String) for how the placeholders are resolved
4444
* @see QueueProperties#getQueueUrl() for how the URL of the queue is resolved if a queue name is supplied here
4545
*/
4646
String value();

spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerParser.java annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListenerParser.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
1-
package com.jashmore.sqs.spring.container.basic;
1+
package com.jashmore.sqs.annotations.core.basic;
22

33
import com.jashmore.documentation.annotations.Max;
44
import com.jashmore.documentation.annotations.Nullable;
55
import com.jashmore.documentation.annotations.Positive;
66
import com.jashmore.documentation.annotations.PositiveOrZero;
77
import com.jashmore.sqs.aws.AwsConstants;
88
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
9-
import com.jashmore.sqs.spring.container.CoreAnnotationParser;
9+
import com.jashmore.sqs.placeholder.PlaceholderResolver;
10+
import com.jashmore.sqs.util.string.StringUtils;
1011
import java.time.Duration;
1112
import java.util.function.Supplier;
12-
import org.springframework.core.env.Environment;
13-
import org.springframework.util.StringUtils;
1413

1514
/**
1615
* Parser that is used to transform a {@link QueueListener} annotation to a {@link BatchingMessageListenerContainerProperties}.
1716
*/
18-
public class QueueListenerParser implements CoreAnnotationParser<QueueListener, BatchingMessageListenerContainerProperties> {
17+
public class QueueListenerParser {
1918

20-
private final Environment environment;
19+
private final PlaceholderResolver placeholderResolver;
2120

22-
public QueueListenerParser(final Environment environment) {
23-
this.environment = environment;
21+
public QueueListenerParser(final PlaceholderResolver placeholderResolver) {
22+
this.placeholderResolver = placeholderResolver;
2423
}
2524

26-
@Override
2725
public BatchingMessageListenerContainerProperties parse(QueueListener annotation) {
2826
final Supplier<Integer> concurrencySupplier = concurrencySupplier(annotation);
2927
final Supplier<Duration> concurrencyPollingRateSupplier = concurrencyPollingRateSupplier(annotation);
@@ -102,7 +100,7 @@ protected Supplier<Integer> concurrencySupplier(final QueueListener annotation)
102100
if (!StringUtils.hasText(annotation.concurrencyLevelString())) {
103101
concurrencyLevel = annotation.concurrencyLevel();
104102
} else {
105-
concurrencyLevel = Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
103+
concurrencyLevel = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.concurrencyLevelString()));
106104
}
107105
return () -> concurrencyLevel;
108106
}
@@ -134,7 +132,7 @@ protected Supplier<Integer> batchSizeSupplier(final QueueListener annotation) {
134132
if (!StringUtils.hasText(annotation.batchSizeString())) {
135133
batchSize = annotation.batchSize();
136134
} else {
137-
batchSize = Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
135+
batchSize = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchSizeString()));
138136
}
139137

140138
return () -> batchSize;
@@ -154,7 +152,8 @@ protected Supplier<Duration> batchingPeriodSupplier(final QueueListener annotati
154152
if (!StringUtils.hasText(annotation.batchingPeriodInMsString())) {
155153
batchingPeriod = Duration.ofMillis(annotation.batchingPeriodInMs());
156154
} else {
157-
batchingPeriod = Duration.ofMillis(Integer.parseInt(environment.resolvePlaceholders(annotation.batchingPeriodInMsString())));
155+
batchingPeriod =
156+
Duration.ofMillis(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchingPeriodInMsString())));
158157
}
159158
return () -> batchingPeriod;
160159
}
@@ -193,7 +192,9 @@ protected Supplier<Duration> messageVisibilityTimeoutSupplier(final QueueListene
193192
}
194193
} else {
195194
messageVisibilityTimeout =
196-
Duration.ofSeconds(Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
195+
Duration.ofSeconds(
196+
Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString()))
197+
);
197198
}
198199

199200
return () -> messageVisibilityTimeout;

0 commit comments

Comments
 (0)