-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support passing a List of batched SQS Messages as an argument #337
Comments
Hey thanks for the suggestion! So the whole framework has been written with the concept of moving a single message around each component so updating it to cover both use cases, while still being performant with thread usage, would be a considerable effort/redesign. e.g. the the flow from MessageRetriever -> MessageBroker -> MessageProcessor are all dealing with a single message and would involve complete redesigns of those interfaces to support this use case. Also, as you said, it would You can of course provide your own batching logic in your listener, e.g. the very high level pseudo code
But at this point, considering how much effort it would be on your side to implement this and the lack of performance due to the frameworks threads sitting idle waiting for the batch to complete I am not sure if there is much value in that. I guess I am still not seeing the use case for this, so happy for you to enlighten me. What were you wanting to gain from doing the batch of messages? e.g. are you trying to reduce the number of threads running or are certain messages related to each other and therefore doing it in a batch you can reduce the number of messages actually processed etc. |
Yeah - this library has been great so far for simplifying alot of common things, like injecting messages attributes and converting the payloads and passing in the batches would break this (unless everything was modified to return a Your pseudo code is pretty much what we have implemented 😝 - for our use case, we have lots of consumer services that listen for events on SQS, do something with the payload and persist it somewhere. In some cases, we've noticed having lots of consuming threads persisting small batches does not perform as well as having 1 consumer thread persisting larger batches (mostly database transactions, where multiple threads writing increases the small chance of deadlocks etc), so we were going buffer the messages, and have another thread running every X seconds task that would consume and process the batched messages - roughly something like @QueueListener(...)
public void consumer(@Payload final EventType event) {
buffer.add(BatchedEvent.builder(event)
.onSuccess((event) -> // delete the message)
.onFailure((event, exception) -> // perform some back-off logic and log the error)
.build()
);
}
@Scheduled
public void batchProcessor() {
final List<BatchedEvent> events = new ArrayList<>();
buffer.drainTo(events);
try {
// flatten and process events
events.forEach(event -> event.onSuccess(event));
} catch (Exception e) {
events.forEach((event, e) -> event.onFailure(e));
}
} Since we can request up to 10 messages at a time from from SQS (and for the busier services there will always be enough messages) I was wondering if there was a way we could set the argument on the consumer method to be a After thinking about it more I can close this issue if you want? |
Okay cool I see the desire for it. Thinking about this more, I might be possible to semi support this with only implementation changes instead of needing to modify the API of this library. I have included a basic high level design below. Unfortunately, due to the time it would take to do this, I don't think I would get around to doing this (or at least not in the next few months so that wouldn't help you anyway). For now, I would say to just keep doing the batching in your listener. There could be tweaks that you could to do reduce the number of thread switches to improve performance e.g. make concurrency size 1 and use the If that is not performant enough you could even just use the raw AWS SDK directly instead of using this library. Lots of references in this codebase for how to do that but happy to point you in a direction if you need help with the SDK. I still think we should keep this issue open though so if others find it and desire the feature they can bump it which could kick me into gear to do it. Also happy to take PRs for this if someone takes the initiative to do it, you can see the sort of changes that I did for the High Level DesignCore ChangesMessageBrokerWe create a new broker, pretty much the same as the ConcurrentMessageBroker, that will take a message from the MessageRetriever and on the same broker thread send it to the MessageProcessor. This is in comparison to the ConcurrentMessageBroker which will call into the MessageProcessor with each message on its own message processing thread. MessageProcessorWe implement a new processor that will collect these messages into a batch and after a limit is hit, say 10 messages, you call the method that accepts a list of messages via reflection or a lambda, etc. Argument resolutionWe would need to modify how the argument resolution works for this use case. e.g. one approach we could take is by making sure all arguments have an annotation on a list parameter.
Unfortunately, due to type erasure, nothing would stop us in the above supplying a MessageListenerContainerWe implement our own container that will use these new components, and maybe the PrefetchingMessageRetriever. Spring ChangesWe create a new annotation and factory that builds the container above. I don't think this would be too significant effort, similar to what we did for the Would look like:
Improvements
Problems
|
Another edge case that I think could be useful for others 😄
From what I understand - at the minute the listener container to request up to 10 messages from a queue, and will pass each message to available consumer threads for processing. We can accept the message, or message payload as an argument to a consumer method.
Would it be possible to support passing all the messages from the receive message request to a consumer method so they can be processed in a batch? So if we have something like
If the container gets 10 messages from the request, a single thread will get those 10, then the next available thread gets the next etc to allow the consumer method to process things in a batch.
I believe this could break alot of things - for example, I'm not sure how you could pass MessageAttributes as a method parameter without having to do additional mapping, but (for myself at least 😛 ) I think this would be an acceptable trade off for increased throughput.
Let me know if I can explain things a bit better, I had attempted to write my own argument resolver but was unable to do so but I can share some of the code if you want
The text was updated successfully, but these errors were encountered: