Open
Description
It seems that in some cases refreshSchema()
can hang the thread calling it and not recover. Easy way to trigger this is to call it when connection to the cluster was just lost. It is expected that the refresh won't succeed in such case, but I'm assuming it should not hang the thread.
Steps to reproduce
Create the following test method (it's easier as a test because reproducer uses test-infra dependency):
@Test
public void reproducer() {
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("127.0.1.1:9042"))
.build();
CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
CqlSession session = null;
Collection<Node> nodes;
int counter = 0;
while (counter < 2) {
counter++;
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
ccmBridge.create();
ccmBridge.start();
if (session == null) {
session = builder.build();
}
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
System.out.println("Before refreshSchema call (n==" + counter +")");
session.refreshSchema();
System.out.println("# After refreshSchema call (n==" + counter +")");
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
}
}
Add following logger to logback-test.xml to see the exception:
<logger name="com.datastax.oss.driver.internal.core.metadata" level="DEBUG"/>
And run the method
Result
The test will hang at some point at refreshSchema()
call. The following exception should be visible in the logs
12:06:23.280 [main] INFO c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, remove, --config-dir=/tmp/ccm447078114819940528]
12:06:23.586 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.587 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.588 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.588 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:23.594 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.595 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.595 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.597 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.597 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:24.649 [main] INFO c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, create, ccm_1, -i, 127.0.2., -n, 3:0, -v, release:6.0.1, --scylla, --config-dir=/tmp/ccm7441250503244669492]
12:06:25.089 [Exec Stream Pumper] INFO c.d.o.d.api.testinfra.ccm.CcmBridge - ccmout> Current cluster is now: ccm_1
12:06:25.154 [main] INFO c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, updateconf, auto_snapshot:false, --config-dir=/tmp/ccm7441250503244669492]
12:06:25.568 [main] INFO c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, start, --wait-for-binary-proto, --wait-other-notice, --config-dir=/tmp/ccm7441250503244669492]
Before refreshSchema call (n==2)
12:06:32.124 [s0-admin-0] DEBUG c.d.o.d.i.c.metadata.MetadataManager - [s0] Starting schema refresh
12:06:32.124 [s0-admin-0] DEBUG c.d.o.d.i.c.m.SchemaAgreementChecker - [s0] Checking schema agreement
12:06:32.126 [s0-io-3] DEBUG c.d.o.d.i.c.m.SchemaAgreementChecker - [s0] Error while checking schema agreement, completing now (false)
java.util.concurrent.CompletionException: io.netty.channel.StacklessClosedChannelException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1102)
at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1034)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.setFinalError(AdminRequestHandler.java:206)
at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.onWriteComplete(AdminRequestHandler.java:150)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:999)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:860)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:889)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1015)
at io.netty.channel.AbstractChannel.write(AbstractChannel.java:301)
at com.datastax.oss.driver.internal.core.channel.DefaultWriteCoalescer$Flusher.runOnEventLoop(DefaultWriteCoalescer.java:102)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.StacklessClosedChannelException: null
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)