Skip to content

Commit 6a775f3

Browse files
authored
Concurrency issue when task executor is provided. (#123)
* Concurrency issue when task executor is provided. * remove exit * Removed unused property * increase timeout
1 parent d5520ae commit 6a775f3

File tree

12 files changed

+79
-58
lines changed

12 files changed

+79
-58
lines changed

CHANGELOG.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# [Rqueue] New and Notable Changes
22

3+
### [2.10.1] - 18-Oct-2021
4+
5+
* Fixes for concurrency when task executor is provided see issue #[122]
6+
37
### [2.10.0] - 10-Oct-2021
48

59
### Fixes
@@ -8,13 +12,14 @@
812
* Fixes message move message count (by default 1000 messages are moved)
913
* Potential issue in rename collection
1014
* More than one (-) sign in the dashboard
11-
* Fixes for server context path. Rqueue end points would be served relative to x-forwarded-prefix/server.servlet.context-path
15+
* Fixes for server context path. Rqueue end points would be served relative to
16+
x-forwarded-prefix/server.servlet.context-path
1217

1318
### Features
1419

1520
* Display completed jobs in the dashboard
1621
* Option to choose number of days in the chart
17-
ReactiveWebViewTest
22+
1823
### [2.9.0] - 30-Jul-2021
1924

2025
### Fixes
@@ -273,3 +278,7 @@ Fixes:
273278
[2.9.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.9.0-RELEASE
274279

275280
[2.10.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.0-RELEASE
281+
282+
[2.10.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.1-RELEASE
283+
284+
[122]: https://github.com/sonus21/rqueue/issues/122

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
7171
* Add dependency
7272
* Gradle
7373
```groovy
74-
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.0-RELEASE'
74+
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.1-RELEASE'
7575
```
7676
* Maven
7777
```xml
7878
<dependency>
7979
<groupId>com.github.sonus21</groupId>
8080
<artifactId>rqueue-spring-boot-starter</artifactId>
81-
<version>2.10.0-RELEASE</version>
81+
<version>2.10.1-RELEASE</version>
8282
</dependency>
8383
```
8484
@@ -91,14 +91,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
9191
* Add Dependency
9292
* Gradle
9393
```groovy
94-
implementation 'com.github.sonus21:rqueue-spring:2.10.0-RELEASE'
94+
implementation 'com.github.sonus21:rqueue-spring:2.10.1-RELEASE'
9595
```
9696
* Maven
9797
```xml
9898
<dependency>
9999
<groupId>com.github.sonus21</groupId>
100100
<artifactId>rqueue-spring</artifactId>
101-
<version>2.10.0-RELEASE</version>
101+
<version>2.10.1-RELEASE</version>
102102
</dependency>
103103
```
104104

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ext {
7070

7171
subprojects {
7272
group = 'com.github.sonus21'
73-
version = '2.10.0-RELEASE'
73+
version = '2.10.1-RELEASE'
7474

7575
dependencies {
7676
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
* have the same concurrency.
145145
*
146146
* @return concurrency for this worker.
147+
* @see #priority()
147148
*/
148149
String concurrency() default "-1";
149150

@@ -158,7 +159,7 @@
158159
*
159160
* <p>Priority can be any number. There are two priority control modes. 1. Strict 2. Weighted, in
160161
* strict priority mode queue with higher priority is preferred over other queues. In case of
161-
* weighted a round robin approach is used, and weight is followed.
162+
* weighted a round-robin approach is used, and weight is followed.
162163
*
163164
* @return the priority for this listener.
164165
* @see #priorityGroup()

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,6 @@ public class RqueueConfig {
113113
@Value("${rqueue.retry.per.poll:1}")
114114
private int retryPerPoll;
115115

116-
@Value("${rqueue.add.default.queue.with.queue.level.priority:true}")
117-
private boolean addDefaultQueueWithQueueLevelPriority;
118-
119-
@Value("${rqueue.default.queue.with.queue.level.priority:-1}")
120-
private int defaultQueueWithQueueLevelPriority;
121-
122116
@Value("${rqueue.net.proxy.host:}")
123117
private String proxyHost;
124118

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,16 +211,17 @@ public Integer getMaxNumWorkers() {
211211
* for every queue.
212212
*
213213
* <p>When you're using custom executor then you should set this number as (thread pool max size -
214-
* number of queues) given executor is not shared. The maxNumWorkers tells how many workers you
215-
* want to run in parallel for all listeners, for example if you have 3 listeners, and you have
216-
* set this as 10 then all 3 listeners would be running maximum **combined 10 jobs** at any point
217-
* of time.
214+
* number of queues) given executor is not shared with other application component. The
215+
* maxNumWorkers tells how many workers you want to run in parallel for all listeners those are
216+
* not having configured concurrency. For example if you have 3 queues without concurrency, and
217+
* you have set this as 10 then all 3 listeners would be running maximum **combined 10 jobs** at
218+
* any point of time. Queues having concurrency will be running at the configured concurrency.
218219
*
219220
* <p>What would happen if I set this to very high value while using custom executor? <br>
220221
* 1. Task(s) would be rejected by the executor unless queue size is non-zero <br>
221222
* 2. When queue size is non-zero then it can create duplicate message problem, since the polled
222223
* message has not been processed yet. This will happen when {@link
223-
* RqueueListener#visibilityTimeout()} is smaller than the time a task took to execute from the
224+
* RqueueListener#visibilityTimeout()} is smaller than the time a task takes to execute from the
224225
* time of polling to final execution.
225226
*
226227
* @param maxNumWorkers Maximum number of workers.

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public QueueConfig toConfig() {
101101
List<QueueDetail> expandQueueDetail(boolean addDefault, int priority) {
102102
List<QueueDetail> queueDetails = new ArrayList<>();
103103
for (Entry<String, Integer> entry : getPriority().entrySet()) {
104-
QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue(), true, name);
104+
QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue(), name);
105105
queueDetails.add(cloneQueueDetail);
106106
}
107107
if (addDefault) {
@@ -121,7 +121,7 @@ List<QueueDetail> expandQueueDetail(boolean addDefault, int priority) {
121121
}
122122

123123
private QueueDetail cloneQueueDetail(
124-
String priorityName, Integer priority, boolean systemGenerated, String priorityGroup) {
124+
String priorityName, Integer priority, String priorityGroup) {
125125
if (priority == null || priorityName == null) {
126126
throw new IllegalStateException("priority name is null");
127127
}
@@ -140,7 +140,7 @@ private QueueDetail cloneQueueDetail(
140140
.completedQueueName(completedQueueName + suffix)
141141
.active(active)
142142
.batchSize(batchSize)
143-
.systemGenerated(systemGenerated)
143+
.systemGenerated(true)
144144
.priorityGroup(priorityGroup)
145145
.concurrency(concurrency)
146146
.priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, priority))

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,29 @@ private void initializeQueue() {
255255
defaultTaskExecutor = true;
256256
taskExecutor = createDefaultTaskExecutor(queueDetails);
257257
} else {
258-
initializeThreadMap(queueDetails, taskExecutor, false, queueDetails.size());
258+
initializeThreadMapForNonDefaultExecutor(queueDetails);
259259
}
260260
initializeRunningQueueState();
261261
}
262262

263+
private void initializeThreadMapForNonDefaultExecutor(
264+
List<QueueDetail> registeredActiveQueueDetail) {
265+
List<QueueDetail> queueDetails =
266+
registeredActiveQueueDetail.stream()
267+
.filter(e -> !e.isSystemGenerated())
268+
.collect(Collectors.toList());
269+
List<QueueDetail> withoutConcurrency = new ArrayList<>();
270+
for (QueueDetail queueDetail : queueDetails) {
271+
if (queueDetail.getConcurrency().isValid()) {
272+
addExecutorForConcurrencyBasedQueue(queueDetail, taskExecutor, false);
273+
} else {
274+
withoutConcurrency.add(queueDetail);
275+
}
276+
}
277+
initializeThreadMap(
278+
withoutConcurrency, taskExecutor, false, getWorkersCount(withoutConcurrency.size()));
279+
}
280+
263281
private void initialize() {
264282
initializeQueue();
265283
this.postProcessingHandler =
@@ -296,9 +314,12 @@ private void initializeThreadMap(
296314
AsyncTaskExecutor taskExecutor,
297315
boolean defaultExecutor,
298316
int workersCount) {
317+
if (queueDetails.isEmpty()) {
318+
return;
319+
}
320+
QueueThreadPool pool = new QueueThreadPool(taskExecutor, defaultExecutor, workersCount);
299321
for (QueueDetail queueDetail : queueDetails) {
300-
queueThreadMap.put(
301-
queueDetail.getName(), new QueueThreadPool(taskExecutor, defaultExecutor, workersCount));
322+
queueThreadMap.put(queueDetail.getName(), pool);
302323
}
303324
}
304325

@@ -332,16 +353,19 @@ private AsyncTaskExecutor createNonConcurrencyBasedExecutor(
332353
return executor;
333354
}
334355

356+
private void addExecutorForConcurrencyBasedQueue(
357+
QueueDetail queueDetail, AsyncTaskExecutor executor, boolean defaultTaskExecutor) {
358+
int maxJobs = queueDetail.getConcurrency().getMax();
359+
QueueThreadPool threadPool = new QueueThreadPool(executor, defaultTaskExecutor, maxJobs);
360+
queueThreadMap.put(queueDetail.getName(), threadPool);
361+
}
362+
335363
private void createExecutor(QueueDetail queueDetail) {
336364
Concurrency concurrency = queueDetail.getConcurrency();
337-
int queueCapacity = 0;
338-
int maxJobs = concurrency.getMax();
339365
int corePoolSize = concurrency.getMin();
340366
int maxPoolSize = concurrency.getMax();
341-
AsyncTaskExecutor executor =
342-
createTaskExecutor(queueDetail, corePoolSize, maxPoolSize, queueCapacity);
343-
QueueThreadPool threadPool = new QueueThreadPool(executor, true, maxJobs);
344-
queueThreadMap.put(queueDetail.getName(), threadPool);
367+
AsyncTaskExecutor executor = createTaskExecutor(queueDetail, corePoolSize, maxPoolSize);
368+
addExecutorForConcurrencyBasedQueue(queueDetail, executor, true);
345369
}
346370

347371
public AsyncTaskExecutor createDefaultTaskExecutor(
@@ -362,15 +386,14 @@ public AsyncTaskExecutor createDefaultTaskExecutor(
362386
}
363387

364388
private AsyncTaskExecutor createTaskExecutor(
365-
QueueDetail queueDetail, int corePoolSize, int maxPoolSize, int queueCapacity) {
389+
QueueDetail queueDetail, int corePoolSize, int maxPoolSize) {
366390
String name = ThreadUtils.getWorkerName(queueDetail.getName());
367-
return ThreadUtils.createTaskExecutor(
368-
name, name + "-", corePoolSize, maxPoolSize, queueCapacity);
391+
return ThreadUtils.createTaskExecutor(name, name + "-", corePoolSize, maxPoolSize, 0);
369392
}
370393

371394
private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappingInformation) {
372395
int numRetry = mappingInformation.getNumRetry();
373-
if (!mappingInformation.getDeadLetterQueueName().isEmpty() && numRetry == -1) {
396+
if (!StringUtils.isEmpty(mappingInformation.getDeadLetterQueueName()) && numRetry == -1) {
374397
log.warn(
375398
"Dead letter queue {} is set but retry is not set",
376399
mappingInformation.getDeadLetterQueueName());
@@ -403,12 +426,13 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin
403426
.priority(priority)
404427
.priorityGroup(priorityGroup)
405428
.build();
429+
List<QueueDetail> queueDetails;
406430
if (queueDetail.getPriority().size() <= 1) {
407-
return Collections.singletonList(queueDetail);
431+
queueDetails = Collections.singletonList(queueDetail);
432+
} else {
433+
queueDetails = queueDetail.expandQueueDetail(true, -1);
408434
}
409-
return queueDetail.expandQueueDetail(
410-
rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(),
411-
rqueueConfig.getDefaultQueueWithQueueLevelPriority());
435+
return queueDetails;
412436
}
413437

414438
@Override
@@ -450,6 +474,7 @@ protected void doStart() {
450474

451475
private Map<String, QueueThreadPool> getQueueThreadMap(
452476
String groupName, List<QueueDetail> queueDetails) {
477+
// this happens only for queue having priorities like critical:10,high:5,low:3
453478
QueueThreadPool queueThreadPool = queueThreadMap.get(groupName);
454479
if (queueThreadPool != null) {
455480
return queueDetails.stream()

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private void execute(
8282
queueThreadPool));
8383
} catch (Exception e) {
8484
if (e instanceof TaskRejectedException) {
85-
queueThreadPool.taskRejected();
85+
queueThreadPool.taskRejected(queueDetail, message);
8686
}
8787
log(Level.WARN, "Execution failed Msg: {}", e, message);
8888
release(postProcessingHandler, queueThreadPool, queueDetail, message);

rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.github.sonus21.rqueue.utils;
1818

19+
import com.github.sonus21.rqueue.core.RqueueMessage;
20+
import com.github.sonus21.rqueue.listener.QueueDetail;
1921
import java.util.concurrent.Semaphore;
2022
import java.util.concurrent.TimeUnit;
2123
import lombok.extern.slf4j.Slf4j;
@@ -81,7 +83,10 @@ public String destroy() {
8183
return null;
8284
}
8385

84-
public void taskRejected() {
85-
log.warn("Task rejected by executor");
86+
public void taskRejected(QueueDetail queueDetail, RqueueMessage message) {
87+
log.warn(
88+
"Task rejected by executor Queue: {}, Message: {}",
89+
queueDetail.getName(),
90+
message.getMessage());
8691
}
8792
}

0 commit comments

Comments
 (0)