Skip to content

Commit b43e2f8

Browse files
committed
Make sure to close executor services in tests
1 parent fc48aab commit b43e2f8

File tree

8 files changed

+99
-13
lines changed

8 files changed

+99
-13
lines changed

src/main/java/com/rabbitmq/stream/impl/ScheduledExecutorServiceWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void shutdown() {
128128

129129
@Override
130130
public List<Runnable> shutdownNow() {
131-
this.delegate.shutdownNow();
131+
this.scheduler.shutdownNow();
132132
return this.delegate.shutdownNow();
133133
}
134134

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import org.junit.jupiter.api.extension.AfterEachCallback;
20+
import org.junit.jupiter.api.extension.BeforeEachCallback;
21+
import org.junit.jupiter.api.extension.ExtensionContext;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
public class StreamClientTestExtension implements BeforeEachCallback, AfterEachCallback {
26+
27+
private static final Logger LOGGER = LoggerFactory.getLogger(StreamClientTestExtension.class);
28+
private static final ExtensionContext.Namespace NS =
29+
ExtensionContext.Namespace.create(StreamClientTestExtension.class);
30+
31+
@Override
32+
public void beforeEach(ExtensionContext ctx) {
33+
ExtensionContext.Store store = store(ctx);
34+
store.put("threads", threads());
35+
}
36+
37+
@Override
38+
public void afterEach(ExtensionContext ctx) {
39+
@SuppressWarnings("unchecked")
40+
Collection<Thread> initialThreads = (Collection<Thread>) store(ctx).remove("threads");
41+
if (initialThreads != null) {
42+
Collection<Thread> threads = threads();
43+
if (threads.size() > initialThreads.size()) {
44+
Collection<Thread> diff = new ArrayList<>(threads);
45+
diff.removeAll(initialThreads);
46+
LOGGER.warn(
47+
"[{}] There should be no new threads, initial {}, current {} (diff: {})",
48+
ctx.getTestMethod().get().getName(),
49+
initialThreads.size(),
50+
threads.size(),
51+
diff);
52+
}
53+
} else {
54+
LOGGER.warn("No threads in test context");
55+
}
56+
}
57+
58+
private static ExtensionContext.Store store(ExtensionContext ctx) {
59+
return ctx.getStore(NS);
60+
}
61+
62+
private Collection<Thread> threads() {
63+
return Thread.getAllStackTraces().keySet();
64+
}
65+
}

src/test/java/com/rabbitmq/stream/impl/AsyncRetryTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ void callbackCalledIfCompletedImmediately(ScheduledExecutorService scheduler) th
6161
completableFuture.thenAccept(result::set);
6262
assertThat(result.get()).isEqualTo(42);
6363
verify(task, times(1)).call();
64+
scheduler.shutdownNow();
6465
}
6566

6667
@ParameterizedTest
@@ -82,6 +83,7 @@ void shouldRetryWhenExecutionFails(ScheduledExecutorService scheduler) throws Ex
8283
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
8384
assertThat(result.get()).isEqualTo(42);
8485
verify(task, times(3)).call();
86+
scheduler.shutdownNow();
8587
}
8688

8789
@ParameterizedTest
@@ -112,6 +114,7 @@ void shouldTimeoutWhenExecutionFailsForTooLong(ScheduledExecutorService schedule
112114
assertThat(acceptCalled.get()).isFalse();
113115
assertThat(exceptionallyCalled.get()).isTrue();
114116
verify(task, atLeast(5)).call();
117+
scheduler.shutdownNow();
115118
}
116119

117120
@ParameterizedTest
@@ -137,6 +140,7 @@ void shouldRetryWhenPredicateAllowsIt(ScheduledExecutorService scheduler) throws
137140
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
138141
assertThat(result.get()).isEqualTo(42);
139142
verify(task, times(3)).call();
143+
scheduler.shutdownNow();
140144
}
141145

142146
@ParameterizedTest
@@ -171,6 +175,7 @@ void shouldFailWhenPredicateDoesNotAllowRetry(ScheduledExecutorService scheduler
171175
assertThat(acceptCalled.get()).isFalse();
172176
assertThat(exceptionallyCalled.get()).isTrue();
173177
verify(task, times(3)).call();
178+
scheduler.shutdownNow();
174179
}
175180

176181
@ParameterizedTest
@@ -193,6 +198,7 @@ void completeExceptionally(ScheduledExecutorService scheduler) throws Exception
193198
},
194199
scheduler);
195200
assertThat(sync).completes();
201+
scheduler.shutdownNow();
196202
}
197203

198204
static List<ScheduledExecutorService> schedulers() {

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.junit.jupiter.api.AfterEach;
6161
import org.junit.jupiter.api.BeforeEach;
6262
import org.junit.jupiter.api.Test;
63+
import org.junit.jupiter.api.TestInfo;
6364
import org.junit.jupiter.params.ParameterizedTest;
6465
import org.junit.jupiter.params.provider.MethodSource;
6566
import org.junit.jupiter.params.provider.ValueSource;
@@ -74,6 +75,7 @@ public class ConsumersCoordinatorTest {
7475
private static final SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = subscriptionContext -> {};
7576
private static final Runnable NO_OP_TRACKING_CLOSING_CALLBACK = () -> {};
7677

78+
TestInfo info;
7779
@Mock StreamEnvironment environment;
7880
@Mock StreamConsumer consumer;
7981
@Mock Client locator;
@@ -113,7 +115,8 @@ static Stream<Consumer<ConsumersCoordinatorTest>> disruptionArguments() {
113115
}
114116

115117
@BeforeEach
116-
void init() {
118+
void init(TestInfo info) {
119+
this.info = info;
117120
Client.ClientParameters clientParameters =
118121
new Client.ClientParameters() {
119122
@Override
@@ -169,6 +172,7 @@ public Client.ClientParameters shutdownListener(
169172

170173
@AfterEach
171174
void tearDown() throws Exception {
175+
this.info = null;
172176
if (coordinator != null) {
173177
// just taking the opportunity to check toString() generates valid JSON
174178
MonitoringTestUtils.extract(coordinator);
@@ -2155,15 +2159,16 @@ private MessageListener lastMessageListener() {
21552159
return this.messageListeners.get(messageListeners.size() - 1);
21562160
}
21572161

2158-
private static ScheduledExecutorService createScheduledExecutorService() {
2162+
private ScheduledExecutorService createScheduledExecutorService() {
21592163
return createScheduledExecutorService(1);
21602164
}
21612165

2162-
private static ScheduledExecutorService createScheduledExecutorService(int nbThreads) {
2166+
private ScheduledExecutorService createScheduledExecutorService(int nbThreads) {
2167+
ThreadFactory tf = ThreadUtils.threadFactory(info.getTestMethod().get().getName() + "-");
21632168
return new ScheduledExecutorServiceWrapper(
21642169
nbThreads == 1
2165-
? Executors.newSingleThreadScheduledExecutor()
2166-
: Executors.newScheduledThreadPool(nbThreads));
2170+
? Executors.newSingleThreadScheduledExecutor(tf)
2171+
: Executors.newScheduledThreadPool(nbThreads, tf));
21672172
}
21682173

21692174
private static Response responseOk() {

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.net.ConnectException;
6060
import java.nio.charset.StandardCharsets;
6161
import java.time.Duration;
62+
import java.util.ArrayList;
6263
import java.util.Arrays;
6364
import java.util.Collection;
6465
import java.util.Collections;
@@ -140,11 +141,18 @@ void environmentCreationShouldFailWithUrlUsingWrongPort() throws Exception {
140141
// no thread leak
141142
waitAtMost(
142143
Duration.ofSeconds(20),
143-
() -> threads().size() == initialThreads.size(),
144-
() ->
145-
String.format(
146-
"There should be no new threads, initial %s, current %s",
147-
initialThreads, threads()));
144+
() -> threads().size() <= initialThreads.size(),
145+
() -> {
146+
Collection<Thread> current = threads();
147+
Collection<Thread> diff = Collections.emptySet();
148+
if (current.size() > initialThreads.size()) {
149+
diff = new ArrayList<>(current);
150+
diff.removeAll(initialThreads);
151+
}
152+
return String.format(
153+
"There should be no new threads, initial %d, current %d (diff: %s)",
154+
initialThreads.size(), current.size(), diff);
155+
});
148156
}
149157

150158
@Test

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,6 @@ public void afterAll(ExtensionContext context) {
714714
try {
715715
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
716716
} catch (InterruptedException e) {
717-
// happens at the end of the test suite
718717
LOGGER.debug("Error while asynchronously closing Netty event loop group", e);
719718
Thread.currentThread().interrupt();
720719
} catch (Exception e) {
@@ -740,7 +739,8 @@ private static class ExecutorServiceCloseableResourceWrapper implements AutoClos
740739
private final ExecutorService executorService;
741740

742741
private ExecutorServiceCloseableResourceWrapper() {
743-
this.executorService = Executors.newCachedThreadPool();
742+
ThreadFactory tf = ThreadUtils.threadFactory("closing-resource-");
743+
this.executorService = Executors.newCachedThreadPool(tf);
744744
}
745745

746746
@Override
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.rabbitmq.stream.StreamClientTestExtension
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
junit.jupiter.extensions.autodetection.enabled=false

0 commit comments

Comments
 (0)