Skip to content

Commit 14bb250

Browse files
3pacccccclhotari
authored andcommitted
[fix][test]fix flaky SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858)
(cherry picked from commit 1ca1797)
1 parent 86f7363 commit 14bb250

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4043,14 +4043,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40434043
// 1) Test receiveAsync is interrupted
40444044
CountDownLatch countDownLatch = new CountDownLatch(1);
40454045
new Thread(() -> {
4046+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
40464047
try {
40474048
new Thread(() -> {
40484049
try {
4050+
subCountDownLatch.await();
40494051
consumer.close();
4050-
} catch (PulsarClientException ignore) {
4052+
} catch (PulsarClientException | InterruptedException ignore) {
40514053
}
40524054
}).start();
4053-
consumer.receiveAsync().get();
4055+
CompletableFuture<Message<String>> futhre = consumer.receiveAsync();
4056+
subCountDownLatch.countDown();
4057+
futhre.get();
40544058
Assert.fail("should be interrupted");
40554059
} catch (Exception e) {
40564060
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4067,13 +4071,17 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40674071
.batchReceivePolicy(batchReceivePolicy).subscribe();
40684072
new Thread(() -> {
40694073
try {
4074+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
40704075
new Thread(() -> {
40714076
try {
4077+
subCountDownLatch.await();
40724078
consumer2.close();
4073-
} catch (PulsarClientException ignore) {
4079+
} catch (PulsarClientException | InterruptedException ignore) {
40744080
}
40754081
}).start();
4076-
consumer2.batchReceiveAsync().get();
4082+
CompletableFuture<Messages<String>> future = consumer2.batchReceiveAsync();
4083+
subCountDownLatch.countDown();
4084+
future.get();
40774085
Assert.fail("should be interrupted");
40784086
} catch (Exception e) {
40794087
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4090,13 +4098,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40904098
.batchReceivePolicy(batchReceivePolicy).subscribe();
40914099
new Thread(() -> {
40924100
try {
4101+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
40934102
new Thread(() -> {
40944103
try {
4104+
subCountDownLatch.await();
40954105
partitionedTopicConsumer.close();
4096-
} catch (PulsarClientException ignore) {
4106+
} catch (PulsarClientException | InterruptedException ignore) {
40974107
}
40984108
}).start();
4099-
partitionedTopicConsumer.batchReceiveAsync().get();
4109+
CompletableFuture<Messages<String>> future =
4110+
partitionedTopicConsumer.batchReceiveAsync();
4111+
subCountDownLatch.countDown();
4112+
future.get();
41004113
Assert.fail("should be interrupted");
41014114
} catch (Exception e) {
41024115
Assert.assertTrue(e.getMessage().contains(errorMsg));

0 commit comments

Comments
 (0)