Skip to content

Commit 803bc7c

Browse files
authored
Merge pull request #1656 from rabbitmq/call-consumer-shutdown-listener-when-close-times-out
Call consumer shutdown callback even if close times out
2 parents d292064 + f9b69f3 commit 803bc7c

File tree

4 files changed

+45
-28
lines changed

4 files changed

+45
-28
lines changed

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.CopyOnWriteArrayList;
3434
import java.util.concurrent.CountDownLatch;
3535
import java.util.concurrent.TimeoutException;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
/**
3839
* Main interface to AMQP protocol functionality. Public API -
@@ -605,10 +606,13 @@ protected void close(int closeCode,
605606
signal.initCause(cause);
606607
}
607608

609+
AtomicBoolean finishProcessShutdownSignalCalled = new AtomicBoolean(false);
608610
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
609611
@Override
610612
public AMQCommand transformReply(AMQCommand command) {
611-
ChannelN.this.finishProcessShutdownSignal();
613+
if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) {
614+
ChannelN.this.finishProcessShutdownSignal();
615+
}
612616
return command;
613617
}};
614618
boolean notify = false;
@@ -639,6 +643,13 @@ public AMQCommand transformReply(AMQCommand command) {
639643
if (!abort)
640644
throw ioe;
641645
} finally {
646+
if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) {
647+
try {
648+
ChannelN.this.finishProcessShutdownSignal();
649+
} catch (Exception e) {
650+
LOGGER.info("Error while processing shutdown signal: {}", e.getMessage());
651+
}
652+
}
642653
if (abort || notify) {
643654
// Now we know everything's been cleaned up and there should
644655
// be no more surprises arriving on the wire. Release the

src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,16 @@
2020
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
2121
import static org.assertj.core.api.Assertions.assertThat;
2222
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23-
import static org.assertj.core.api.Assertions.fail;
2423

2524
import com.rabbitmq.client.Channel;
2625
import com.rabbitmq.client.Connection;
2726
import com.rabbitmq.client.ConnectionFactory;
2827

29-
import java.io.IOException;
3028
import java.time.Duration;
3129
import java.util.concurrent.CountDownLatch;
32-
import java.util.concurrent.TimeUnit;
3330
import java.util.concurrent.TimeoutException;
3431

35-
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
36-
import com.rabbitmq.client.DeliverCallback;
37-
import com.rabbitmq.client.Delivery;
3832
import com.rabbitmq.client.MessageProperties;
39-
import com.rabbitmq.client.ShutdownListener;
40-
import com.rabbitmq.client.ShutdownSignalException;
41-
import org.assertj.core.api.Assertions;
4233
import org.junit.jupiter.api.Test;
4334
import org.junit.jupiter.params.ParameterizedTest;
4435
import org.junit.jupiter.params.provider.ValueSource;
@@ -86,6 +77,9 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
8677
CountDownLatch blockedLatch = new CountDownLatch(1);
8778
c.addBlockedListener(reason -> blockedLatch.countDown(), () -> {});
8879
Channel ch = c.createChannel();
80+
String q = ch.queueDeclare().getQueue();
81+
CountDownLatch consShutdownLatch = new CountDownLatch(1);
82+
ch.basicConsume(q, (ctag, msg) -> { }, (ctag, r) -> consShutdownLatch.countDown());
8983
CountDownLatch chShutdownLatch = new CountDownLatch(1);
9084
ch.addShutdownListener(cause -> chShutdownLatch.countDown());
9185
ch.confirmSelect();
@@ -98,6 +92,7 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
9892
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
9993
assertThatThrownBy(() -> ch.waitForConfirmsOrDie(confirmTimeout))
10094
.isInstanceOf(TimeoutException.class);
95+
assertThat(consShutdownLatch).is(completed());
10196
assertThat(chShutdownLatch).is(completed());
10297
} finally {
10398
if (blocked) {

src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java renamed to src/test/java/com/rabbitmq/client/test/functional/ConsumerNotifications.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.TimeUnit;
3030

31+
import static org.assertj.core.api.Assertions.assertThat;
3132
import static org.junit.jupiter.api.Assertions.assertTrue;
3233
import static org.junit.jupiter.api.Assertions.fail;
3334

34-
public class ConsumerCancelNotification extends BrokerTestCase {
35+
public class ConsumerNotifications extends BrokerTestCase {
3536

3637
private final String queue = "cancel_notification_queue";
3738

@@ -42,7 +43,7 @@ public class ConsumerCancelNotification extends BrokerTestCase {
4243
channel.queueDeclare(queue, false, true, false, null);
4344
Consumer consumer = new DefaultConsumer(channel) {
4445
@Override
45-
public void handleCancel(String consumerTag) throws IOException {
46+
public void handleCancel(String consumerTag) {
4647
try {
4748
result.put(true);
4849
} catch (InterruptedException e) {
@@ -55,7 +56,31 @@ public void handleCancel(String consumerTag) throws IOException {
5556
assertTrue(result.take());
5657
}
5758

58-
class AlteringConsumer extends DefaultConsumer {
59+
@Test public void consumerCancellationHandlerUsesBlockingOperations()
60+
throws IOException, InterruptedException {
61+
final String altQueue = "basic.cancel.fallback";
62+
channel.queueDeclare(queue, false, true, false, null);
63+
64+
CountDownLatch latch = new CountDownLatch(1);
65+
final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch);
66+
67+
channel.basicConsume(queue, consumer);
68+
channel.queueDelete(queue);
69+
70+
latch.await(2, TimeUnit.SECONDS);
71+
}
72+
73+
@Test
74+
void handleShutdownShouldBeCalledWhenChannelIsClosed() throws Exception {
75+
Channel ch = connection.createChannel();
76+
String q = ch.queueDeclare().getQueue();
77+
CountDownLatch latch = new CountDownLatch(1);
78+
ch.basicConsume(q, (ctag, msg) -> {}, (ctag, r) -> latch.countDown());
79+
ch.close();
80+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
81+
}
82+
83+
private static class AlteringConsumer extends DefaultConsumer {
5984
private final String altQueue;
6085
private final CountDownLatch latch;
6186

@@ -81,18 +106,4 @@ public void handleCancel(String consumerTag) {
81106
}
82107
}
83108
}
84-
85-
@Test public void consumerCancellationHandlerUsesBlockingOperations()
86-
throws IOException, InterruptedException {
87-
final String altQueue = "basic.cancel.fallback";
88-
channel.queueDeclare(queue, false, true, false, null);
89-
90-
CountDownLatch latch = new CountDownLatch(1);
91-
final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch);
92-
93-
channel.basicConsume(queue, consumer);
94-
channel.queueDelete(queue);
95-
96-
latch.await(2, TimeUnit.SECONDS);
97-
}
98109
}

src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
DefaultExchange.class,
5454
UnbindAutoDeleteExchange.class,
5555
Confirm.class,
56-
ConsumerCancelNotification.class,
56+
ConsumerNotifications.class,
5757
UnexpectedFrames.class,
5858
PerQueueTTL.class,
5959
PerMessageTTL.class,

0 commit comments

Comments
 (0)