Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refs #410: pull out annotations to own package #411

Merged
merged 3 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The following provides some examples using the library with different languages
```

1. In one of your beans, attach a
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
annotation to a method indicating that it should process messages from a queue.

```java
Expand Down Expand Up @@ -572,7 +572,7 @@ lambdaProcessor {
The [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) `@SqsListener` works by requesting
a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages
finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
ready for another message.

Expand Down Expand Up @@ -619,7 +619,7 @@ not prefetch anymore._

#### Spring Boot

The [@PrefetchingQueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java)
The [@PrefetchingQueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/prefetch/PrefetchingQueueListener.java)
annotation can be used to prefetch messages in a background thread while processing the existing messages. The usage is something like this:

```java
Expand Down
9 changes: 9 additions & 0 deletions annotations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Java Dynamic SQS Listener Annotations

Wrapper around the core library that allows for setting up using annotations to simplify the usage.

## More Information

For more information you can look at the root project [README.md](../README.md) which provides more information about the architecture
of the application. The [API](../api) is also a good location to find more information about what each part of the framework is how
they interact with each other.
11 changes: 11 additions & 0 deletions annotations/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

description = "Contains a way to attach message listeners via annotations"

dependencies {
api(project(":java-dynamic-sqs-listener-core"))
implementation(project(":common-utils"))
implementation(project(":annotation-utils"))
compileOnly(project(":documentation-annotations"))

testImplementation(project(":elasticmq-sqs-client"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.jashmore.sqs.annotations.container;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.argument.ArgumentResolverService;
import com.jashmore.sqs.client.QueueResolver;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
import com.jashmore.sqs.processor.CoreMessageProcessor;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
import com.jashmore.sqs.processor.MessageProcessor;
import com.jashmore.sqs.util.annotation.AnnotationUtils;
import com.jashmore.sqs.util.identifier.IdentifierUtils;
import com.jashmore.sqs.util.string.StringUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Builder;
import org.immutables.value.Value;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

/**
* {@link MessageListenerContainerFactory} that can be used to build against an annotated method.
*
* @param <A> annotation that is applied on the method
*/
public class AnnotationMessageListenerContainerFactory<A extends Annotation> implements MessageListenerContainerFactory {

private final Class<A> annotationClass;
private final Function<A, String> identifierMapper;
private final Function<A, String> sqsClientIdentifier;
private final Function<A, String> queueNameOrUrlMapper;
private final QueueResolver queueResolver;
private final SqsAsyncClientProvider sqsAsyncClientProvider;
private final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory;
private final ArgumentResolverService argumentResolverService;
private final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory;

/**
* Constructor.
*
* @param annotationClass the class instance of the annotation
* @param identifierMapper to convert an annotation to the identifier of the listener
* @param sqsClientIdentifierMapper to convert an annotation to the SQS Client identifier
* @param queueNameOrUrlMapper to convert an annotation to the Queue URL or name
* @param queueResolver to resolve queue names to a URL
* @param sqsAsyncClientProvider the method for obtaining a SQS client from the identifier
* @param decoratingMessageProcessorFactory to wrap the message processing with any decorators
* @param argumentResolverService to map the parameters of the method to values in the message
* @param containerFactory converts details about the annotation to the final {@link MessageListenerContainer}
*/
public AnnotationMessageListenerContainerFactory(
final Class<A> annotationClass,
final Function<A, String> identifierMapper,
final Function<A, String> sqsClientIdentifierMapper,
final Function<A, String> queueNameOrUrlMapper,
final QueueResolver queueResolver,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory,
final ArgumentResolverService argumentResolverService,
final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory
) {
this.annotationClass = annotationClass;
this.identifierMapper = identifierMapper;
this.sqsClientIdentifier = sqsClientIdentifierMapper;
this.queueNameOrUrlMapper = queueNameOrUrlMapper;
this.queueResolver = queueResolver;
this.sqsAsyncClientProvider = sqsAsyncClientProvider;
this.decoratingMessageProcessorFactory = decoratingMessageProcessorFactory;
this.argumentResolverService = argumentResolverService;
this.containerFactory = containerFactory;
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
throws MessageListenerContainerInitialisationException {
return AnnotationUtils
.findMethodAnnotation(method, this.annotationClass)
.map(annotation -> {
final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation);
final QueueProperties queueProperties = QueueProperties
.builder()
.queueUrl(queueResolver.resolveQueueUrl(sqsAsyncClient, queueNameOrUrlMapper.apply(annotation)))
.build();
final String identifier = IdentifierUtils.buildIdentifierForMethod(
identifierMapper.apply(annotation),
bean.getClass(),
method
);

final Supplier<MessageProcessor> messageProcessorSupplier = () ->
decoratingMessageProcessorFactory.decorateMessageProcessor(
sqsAsyncClient,
identifier,
queueProperties,
bean,
method,
new CoreMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, method, bean)
);

return containerFactory.apply(
AnnotationDetails
.<A>builder()
.identifier(identifier)
.queueProperties(queueProperties)
.sqsAsyncClient(sqsAsyncClient)
.messageProcessorSupplier(messageProcessorSupplier)
.annotation(annotation)
.build()
);
});
}

private SqsAsyncClient getSqsAsyncClient(final A annotation) {
final String sqsClient = sqsClientIdentifier.apply(annotation);

if (!StringUtils.hasText(sqsClient)) {
return sqsAsyncClientProvider
.getDefaultClient()
.orElseThrow(() -> new MessageListenerContainerInitialisationException("Expected the default SQS Client but there is none")
);
}

return sqsAsyncClientProvider
.getClient(sqsClient)
.orElseThrow(() ->
new MessageListenerContainerInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")
);
}

@Value
@Builder
public static class AnnotationDetails<A extends Annotation> {

public String identifier;
public SqsAsyncClient sqsAsyncClient;
public QueueProperties queueProperties;
public Supplier<MessageProcessor> messageProcessorSupplier;
public A annotation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.jashmore.sqs.annotations.core.basic;

import com.jashmore.sqs.annotations.container.AnnotationMessageListenerContainerFactory;
import com.jashmore.sqs.argument.ArgumentResolverService;
import com.jashmore.sqs.client.QueueResolver;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainer;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
import java.lang.reflect.Method;
import java.util.Optional;

/**
* {@link MessageListenerContainerFactory} that will wrap methods annotated with
* {@link QueueListener @QueueListener} with some predefined implementations of the framework.
*/
public class BasicAnnotationMessageListenerContainerFactory implements MessageListenerContainerFactory {

private final AnnotationMessageListenerContainerFactory<QueueListener> delegate;

public BasicAnnotationMessageListenerContainerFactory(
final ArgumentResolverService argumentResolverService,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final QueueResolver queueResolver,
final QueueListenerParser queueListenerParser,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
) {
this.delegate =
new AnnotationMessageListenerContainerFactory<>(
QueueListener.class,
QueueListener::identifier,
QueueListener::sqsClient,
QueueListener::value,
queueResolver,
sqsAsyncClientProvider,
decoratingMessageProcessorFactory,
argumentResolverService,
details -> {
final BatchingMessageListenerContainerProperties properties = queueListenerParser.parse(details.annotation);
return new BatchingMessageListenerContainer(
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
);
}
);
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
throws MessageListenerContainerInitialisationException {
return this.delegate.buildContainer(bean, method);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.jashmore.sqs.spring.container.basic;
package com.jashmore.sqs.annotations.core.basic;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
Expand All @@ -7,23 +7,23 @@
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.processor.CoreMessageProcessor;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
import com.jashmore.sqs.spring.client.SqsAsyncClientProvider;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import org.springframework.core.env.Environment;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

/**
* Wrap a method with a {@link MessageListenerContainer} that will execute the method whenever a message is received on the provided queue.
*
* <p>This is a simplified annotation that uses the {@link ConcurrentMessageBroker}, {@link BatchingMessageRetriever} and {@link CoreMessageProcessor}
* for the implementations of the framework. Not all of the properties for each implementation are available to simplify this usage.
* for the implementations of the framework. Not all the properties for each implementation are available to simplify this usage.
*
* @see BasicMessageListenerContainerFactory for what processes this annotation
* @see BasicAnnotationMessageListenerContainerFactory for what processes this annotation
*/
@Retention(RUNTIME)
@Target(METHOD)
Expand All @@ -40,7 +40,7 @@
* </ul>
*
* @return the queue name or URL of the queue
* @see Environment#resolveRequiredPlaceholders(String) for how the placeholders are resolved
* @see PlaceholderResolver#resolvePlaceholders(String) for how the placeholders are resolved
* @see QueueProperties#getQueueUrl() for how the URL of the queue is resolved if a queue name is supplied here
*/
String value();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package com.jashmore.sqs.spring.container.basic;
package com.jashmore.sqs.annotations.core.basic;

import com.jashmore.documentation.annotations.Max;
import com.jashmore.documentation.annotations.Nullable;
import com.jashmore.documentation.annotations.Positive;
import com.jashmore.documentation.annotations.PositiveOrZero;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.spring.container.CoreAnnotationParser;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.util.string.StringUtils;
import java.time.Duration;
import java.util.function.Supplier;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

/**
* Parser that is used to transform a {@link QueueListener} annotation to a {@link BatchingMessageListenerContainerProperties}.
*/
public class QueueListenerParser implements CoreAnnotationParser<QueueListener, BatchingMessageListenerContainerProperties> {
public class QueueListenerParser {

private final Environment environment;
private final PlaceholderResolver placeholderResolver;

public QueueListenerParser(final Environment environment) {
this.environment = environment;
public QueueListenerParser(final PlaceholderResolver placeholderResolver) {
this.placeholderResolver = placeholderResolver;
}

@Override
public BatchingMessageListenerContainerProperties parse(QueueListener annotation) {
final Supplier<Integer> concurrencySupplier = concurrencySupplier(annotation);
final Supplier<Duration> concurrencyPollingRateSupplier = concurrencyPollingRateSupplier(annotation);
Expand Down Expand Up @@ -102,7 +100,7 @@ protected Supplier<Integer> concurrencySupplier(final QueueListener annotation)
if (!StringUtils.hasText(annotation.concurrencyLevelString())) {
concurrencyLevel = annotation.concurrencyLevel();
} else {
concurrencyLevel = Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
concurrencyLevel = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.concurrencyLevelString()));
}
return () -> concurrencyLevel;
}
Expand Down Expand Up @@ -134,7 +132,7 @@ protected Supplier<Integer> batchSizeSupplier(final QueueListener annotation) {
if (!StringUtils.hasText(annotation.batchSizeString())) {
batchSize = annotation.batchSize();
} else {
batchSize = Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
batchSize = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchSizeString()));
}

return () -> batchSize;
Expand All @@ -154,7 +152,8 @@ protected Supplier<Duration> batchingPeriodSupplier(final QueueListener annotati
if (!StringUtils.hasText(annotation.batchingPeriodInMsString())) {
batchingPeriod = Duration.ofMillis(annotation.batchingPeriodInMs());
} else {
batchingPeriod = Duration.ofMillis(Integer.parseInt(environment.resolvePlaceholders(annotation.batchingPeriodInMsString())));
batchingPeriod =
Duration.ofMillis(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchingPeriodInMsString())));
}
return () -> batchingPeriod;
}
Expand Down Expand Up @@ -193,7 +192,9 @@ protected Supplier<Duration> messageVisibilityTimeoutSupplier(final QueueListene
}
} else {
messageVisibilityTimeout =
Duration.ofSeconds(Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
Duration.ofSeconds(
Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString()))
);
}

return () -> messageVisibilityTimeout;
Expand Down
Loading
Loading