Skip to content

Commit 27e32b1

Browse files
ejona86larry-safran
authored andcommitted
xds: Fix fallback test FakeClock TSAN failure
d65d394 increased the test speed of connect_then_mainServerDown_fallbackServerUp by using FakeClock. However, it introduced a data race because FakeClock is not thread-safe. This change injects a single thread for gRPC callbacks such that syncContext is run on a thread under the test's control. A simpler approach would be to expose syncContext from XdsClientImpl for testing. However, this test is in a different package and I wanted to avoid adding a public method. ``` Read of size 8 at 0x00008dec9d50 by thread T25: #0 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Lio/grpc/internal/FakeClock$ScheduledTask;JLjava/util/concurrent/TimeUnit;)V FakeClock.java:140 #1 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; FakeClock.java:150 #2 io.grpc.SynchronizationContext.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/ScheduledExecutorService;)Lio/grpc/SynchronizationContext$ScheduledHandle; SynchronizationContext.java:153 #3 io.grpc.xds.client.ControlPlaneClient$AdsStream.handleRpcStreamClosed(Lio/grpc/Status;)V ControlPlaneClient.java:491 #4 io.grpc.xds.client.ControlPlaneClient$AdsStream.lambda$onStatusReceived$0(Lio/grpc/Status;)V ControlPlaneClient.java:429 #5 io.grpc.xds.client.ControlPlaneClient$AdsStream$$Lambda+0x00000001004a95d0.run()V ?? #6 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:96 #7 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:128 #8 io.grpc.xds.client.ControlPlaneClient$AdsStream.onStatusReceived(Lio/grpc/Status;)V ControlPlaneClient.java:428 #9 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V GrpcXdsTransportFactory.java:149 #10 io.grpc.PartialForwardingClientCallListener.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V PartialForwardingClientCallListener.java:39 ... Previous write of size 8 at 0x00008dec9d50 by thread T4 (mutexes: write M0, write M1, write M2, write M3): #0 io.grpc.internal.FakeClock.forwardTime(JLjava/util/concurrent/TimeUnit;)I FakeClock.java:368 #1 io.grpc.xds.XdsClientFallbackTest.connect_then_mainServerDown_fallbackServerUp()V XdsClientFallbackTest.java:358 ... ```
1 parent d4e5064 commit 27e32b1

File tree

1 file changed

+21
-2
lines changed

1 file changed

+21
-2
lines changed

xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import com.google.common.collect.ImmutableList;
3333
import com.google.common.collect.ImmutableMap;
34+
import io.grpc.ChannelCredentials;
35+
import io.grpc.Grpc;
3436
import io.grpc.MetricRecorder;
3537
import io.grpc.Status;
3638
import io.grpc.internal.ExponentialBackoffPolicy;
@@ -43,11 +45,14 @@
4345
import io.grpc.xds.client.XdsClientImpl;
4446
import io.grpc.xds.client.XdsClientMetricReporter;
4547
import io.grpc.xds.client.XdsInitializationException;
48+
import io.grpc.xds.client.XdsTransportFactory;
4649
import java.net.InetSocketAddress;
4750
import java.util.Arrays;
4851
import java.util.Collections;
4952
import java.util.Map;
5053
import java.util.UUID;
54+
import java.util.concurrent.ExecutorService;
55+
import java.util.concurrent.Executors;
5156
import java.util.concurrent.TimeUnit;
5257
import java.util.logging.Level;
5358
import java.util.logging.Logger;
@@ -338,9 +343,21 @@ private static void verifyNoSubscribers(ControlPlaneRule rule) {
338343
public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
339344
mainXdsServer.restartXdsServer();
340345
fallbackServer.restartXdsServer();
346+
ExecutorService executor = Executors.newFixedThreadPool(1);
347+
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
348+
@Override
349+
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
350+
ChannelCredentials channelCredentials =
351+
(ChannelCredentials) serverInfo.implSpecificConfig();
352+
return new GrpcXdsTransportFactory.GrpcXdsTransport(
353+
Grpc.newChannelBuilder(serverInfo.target(), channelCredentials)
354+
.executor(executor)
355+
.build());
356+
}
357+
};
341358
XdsClientImpl xdsClient = CommonBootstrapperTestUtils.createXdsClient(
342359
new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride()),
343-
DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, new ExponentialBackoffPolicy.Provider(),
360+
xdsTransportFactory, fakeClock, new ExponentialBackoffPolicy.Provider(),
344361
MessagePrinter.INSTANCE, xdsClientMetricReporter);
345362

346363
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher);
@@ -355,7 +372,8 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
355372
// Sleep for the ADS stream disconnect to be processed and for the retry to fail. Between those
356373
// two sleeps we need the fakeClock to progress by 1 second to restart the ADS stream.
357374
for (int i = 0; i < 5; i++) {
358-
fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
375+
// FakeClock is not thread-safe, and the retry scheduling is concurrent to this test thread
376+
executor.submit(() -> fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS)).get();
359377
TimeUnit.SECONDS.sleep(1);
360378
}
361379

@@ -393,6 +411,7 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
393411
fakeClock.forwardTime(15000, TimeUnit.MILLISECONDS); // Does not exist timer
394412
verify(cdsWatcher2, timeout(5000)).onResourceDoesNotExist(eq(CLUSTER_NAME));
395413
xdsClient.shutdown();
414+
executor.shutdown();
396415
}
397416

398417
@Test

0 commit comments

Comments
 (0)