Skip to content

Commit b9da90a

Browse files
STAR-1690 Stop message processing after MessagingService shutdown (apache#581)
* STAR-1690 Stop message processing after MessagingService shutdown The RequestCallbacks class now stops adding callbacks after it is shutdown by the MessagingService when that is shutdown. * Make new log statements trace
1 parent 882cea0 commit b9da90a

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

src/java/org/apache/cassandra/net/RequestCallbacks.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class RequestCallbacks implements OutboundMessageCallbacks
6767
private final ScheduledExecutorService executor = new DebuggableScheduledThreadPoolExecutor("Callback-Map-Reaper");
6868
private final ConcurrentMap<CallbackKey, CallbackInfo> callbacks = new ConcurrentHashMap<>();
6969

70+
private volatile boolean shutdown;
71+
7072
RequestCallbacks(MessagingService messagingService)
7173
{
7274
this.messagingService = messagingService;
@@ -100,6 +102,12 @@ void addWithExpiration(RequestCallback cb, Message message, InetAddressAndPort t
100102
{
101103
// mutations need to call the overload with a ConsistencyLevel
102104
assert message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ && message.verb() != Verb.PAXOS_COMMIT_REQ;
105+
if (shutdown)
106+
{
107+
if (logger.isTraceEnabled())
108+
logger.trace("Received request after messaging service shutdown so ignoring it");
109+
return;
110+
}
103111
CallbackInfo previous = callbacks.put(key(message.id(), to), new CallbackInfo(message, to, cb));
104112
assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to, previous);
105113
}
@@ -112,6 +120,12 @@ public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
112120
boolean allowHints)
113121
{
114122
assert message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
123+
if (shutdown)
124+
{
125+
if (logger.isTraceEnabled())
126+
logger.trace("Received request after messaging service shutdown so ignoring it");
127+
return;
128+
}
115129
CallbackInfo previous = callbacks.put(key(message.id(), to.endpoint()), new WriteCallbackInfo(message, to, cb, consistencyLevel, allowHints));
116130
assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to.endpoint(), previous);
117131
}
@@ -181,13 +195,15 @@ private void onExpired(CallbackInfo info)
181195

182196
void shutdownNow(boolean expireCallbacks)
183197
{
198+
shutdown = true;
184199
executor.shutdownNow();
185200
if (expireCallbacks)
186201
forceExpire();
187202
}
188203

189204
void shutdownGracefully()
190205
{
206+
shutdown = true;
191207
expire();
192208
if (!callbacks.isEmpty())
193209
executor.schedule(this::shutdownGracefully, 100L, MILLISECONDS);

0 commit comments

Comments
 (0)