Skip to content

WIP netty: composite cumulator benchmarks #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sergiitk
Copy link
Owner

No description provided.

@sergiitk sergiitk marked this pull request as draft September 28, 2020 18:44
@sergiitk sergiitk force-pushed the netty-adaptive-cumulator-wip-1 branch 2 times, most recently from 07252ee to bdd41fd Compare October 1, 2020 20:31
@sergiitk sergiitk force-pushed the netty-adaptive-cumulator-wip-1 branch from bdd41fd to 5514b3d Compare October 1, 2020 20:39
@sergiitk sergiitk changed the title WIP netty: create adaptive composite cumulator WIP netty: composite cumulator benchmarks Oct 7, 2020
sergiitk pushed a commit that referenced this pull request Jul 5, 2023
The PipeSocket was convenient and avoided real I/O, but the
shutdown/close while connecting/handshaking tests were triggering a
Socket bug in Java (https://bugs.openjdk.org/browse/JDK-8278326). Using
a real socket doesn't trigger the bug because the test stops sharing
state with the code under test.

Fixes grpc#10228

```
Details
==================
WARNING: ThreadSanitizer: data race (pid=4528)
  Write of size 1 at 0x0000cfb9d5f4 by thread T36 (mutexes: write M0):
    #0 java.net.Socket.setCreated()V Socket.java:687
    #1 java.net.AbstractPlainSocketImpl.create(Z)V AbstractPlainSocketImpl.java:149
    #2 java.net.Socket.createImpl(Z)V Socket.java:477
    #3 java.net.Socket.getImpl()Ljava/net/SocketImpl; Socket.java:540
    #4 java.net.Socket.setTcpNoDelay(Z)V Socket.java:998
    #5 io.grpc.okhttp.OkHttpServerTransport.startIo(Lio/grpc/internal/SerializingExecutor;)V OkHttpServerTransport.java:164
    #6 io.grpc.okhttp.OkHttpServerTransport.lambda$start$0(Lio/grpc/internal/SerializingExecutor;)V OkHttpServerTransport.java:159
    #7 io.grpc.okhttp.OkHttpServerTransport$$Lambda$56.run()V ??
    #8 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133
    #9 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130
    #10 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630
    #11 java.lang.Thread.run()V Thread.java:830
    #12 (Generated Stub) <null>

  Previous read of size 1 at 0x0000cfb9d5f4 by thread T35 (mutexes: write M1, write M2):
    #0 java.net.Socket.close()V Socket.java:1512
    #1 io.grpc.okhttp.OkHttpServerTransportTest$PipeSocket.close()V OkHttpServerTransportTest.java:1384
    #2 io.grpc.okhttp.OkHttpServerTransportTest.clientCloseDuringHandshake()V OkHttpServerTransportTest.java:290
```
sergiitk pushed a commit that referenced this pull request Jan 12, 2024
6efa9ee added `volatile` to `attributes` after TSAN detected a data
race that was added in 91d15ce. The race was because attributes may be
read from another thread after `transportReady()`, and the
post-filtering assignment occurred after `transportReady()`. The code
now filters the attributes separately so they are updated before calling
`transportReady()`.

Original TSAN failure:
```
  Read of size 4 at 0x0000cd44769c by thread T23:
    #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:327
    #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:363
    #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:183
    #3 io.grpc.internal.MetadataApplierImpl.apply(Lio/grpc/Metadata;)V MetadataApplierImpl.java:74
    #4 io.grpc.auth.GoogleAuthLibraryCallCredentials$1.onSuccess(Ljava/util/Map;)V GoogleAuthLibraryCallCredentials.java:141
    #5 com.google.auth.oauth2.OAuth2Credentials$FutureCallbackToMetadataCallbackAdapter.onSuccess(Lcom/google/auth/oauth2/OAuth2Credentials$OAuthValue;)V OAuth2Credentials.java:534
    #6 com.google.auth.oauth2.OAuth2Credentials$FutureCallbackToMetadataCallbackAdapter.onSuccess(Ljava/lang/Object;)V OAuth2Credentials.java:525
    ...

  Previous write of size 4 at 0x0000cd44769c by thread T24:
    #0 io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V NettyClientHandler.java:920
    #1 io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V DefaultHttp2ConnectionDecoder.java:515
    ...
```
sergiitk pushed a commit that referenced this pull request Feb 27, 2024
As discovered by TSAN, the adsStream field is not synchronized.
```
WARNING: ThreadSanitizer: data race (pid=1625)
  Read of size 4 at 0x00009b66fc88 by thread T23 (mutexes: write M0):
    #0 io.grpc.xds.ControlPlaneClient.isReady()Z ControlPlaneClient.java:203
    #1 io.grpc.xds.ControlPlaneClient.readyHandler()V ControlPlaneClient.java:211
    #2 io.grpc.xds.ControlPlaneClient$AdsStream.onReady()V ControlPlaneClient.java:328
    #3 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onReady()V GrpcXdsTransportFactory.java:145
    #4 io.grpc.PartialForwardingClientCallListener.onReady()V PartialForwardingClientCallListener.java:44
    #5 io.grpc.ForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:23
    #6 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:40
    #7 io.grpc.PartialForwardingClientCallListener.onReady()V PartialForwardingClientCallListener.java:44
    #8 io.grpc.ForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:23
    #9 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:40
    #10 io.grpc.internal.DelayedClientCall$DelayedListener.onReady()V DelayedClientCall.java:497
    #11 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamOnReady.runInternal()V ClientCallImpl.java:781
    #12 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamOnReady.runInContext()V ClientCallImpl.java:772
    #13 io.grpc.internal.ContextRunnable.run()V ContextRunnable.java:37
    #14 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133
    #15 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130
    #16 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630
    #17 java.lang.Thread.run()V Thread.java:830
    #18 (Generated Stub) <null>

  Previous write of size 4 at 0x00009b66fc88 by thread T4 (mutexes: write M1, write M2, write M3, write M4, write M5):
    #0 io.grpc.xds.ControlPlaneClient$AdsStream.cleanUp()V ControlPlaneClient.java:424
    #1 io.grpc.xds.ControlPlaneClient$AdsStream.close(Ljava/lang/Exception;)V ControlPlaneClient.java:418
    #2 io.grpc.xds.ControlPlaneClient$1.run()V ControlPlaneClient.java:130
    #3 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:94
    #4 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:126
    #5 io.grpc.xds.XdsClientImpl.shutdown()V XdsClientImpl.java:207
    #6 io.grpc.xds.SharedXdsClientPoolProvider$RefCountedXdsClientObjectPool.returnObject(Ljava/lang/Object;)Lio/grpc/xds/XdsClient; SharedXdsClientPoolProvider.java:144
    #7 io.grpc.xds.SharedXdsClientPoolProvider$RefCountedXdsClientObjectPool.returnObject(Ljava/lang/Object;)Ljava/lang/Object; SharedXdsClientPoolProvider.java:102
    #8 io.grpc.xds.XdsClientFederationTest.cleanUp()V XdsClientFederationTest.java:86
```
sergiitk pushed a commit that referenced this pull request Jan 8, 2025
Since approximately the LBv2 API (the current API) was introduced, gRPC
won't use a transport until it is ready. Long ago, transports could be
used before they were ready and these old tests were not waiting for the
negotiator to complete before starting. We need them to wait for the
handshake to complete to avoid a test-only data race in getAttributes()
noticed by TSAN.

Throwing away data frames in the Noop handshaker is necessary to act
like a normal handshaker; they don't allow data frames to pass until the
handshake is complete. Without the handling, it goes through invalid
code paths in NettyClientHandler where a terminated transport becomes
ready, and a similar data race.

```
  Write of size 4 at 0x00008db31e2c by thread T37:
    #0 io.grpc.netty.NettyClientHandler.handleProtocolNegotiationCompleted(Lio/grpc/Attributes;Lio/grpc/InternalChannelz$Security;)V NettyClientHandler.java:517
    #1 io.grpc.netty.ProtocolNegotiators$GrpcNegotiationHandler.userEventTriggered(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V ProtocolNegotiators.java:937
    #2 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Ljava/lang/Object;)V AbstractChannelHandlerContext.java:398
    #3 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V AbstractChannelHandlerContext.java:376
    #4 io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; AbstractChannelHandlerContext.java:368
    #5 io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1107
    #6 io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1011
    ...

  Previous read of size 4 at 0x00008db31e2c by thread T4 (mutexes: write M0, write M1, write M2, write M3):
    #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345
    #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387
    #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198
    #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:953
    #4 io.grpc.netty.NettyClientTransportTest.huffmanCodingShouldNotBePerformed()V NettyClientTransportTest.java:631
    ...
```

```
  Read of size 4 at 0x00008f983a3c by thread T4 (mutexes: write M0, write M1):
    #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345
    #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387
    #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198
    #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:973
    #4 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;)V NettyClientTransportTest.java:969
    #5 io.grpc.netty.NettyClientTransportTest.handlerExceptionDuringNegotiatonPropagatesToStatus()V NettyClientTransportTest.java:425
    ...

  Previous write of size 4 at 0x00008f983a3c by thread T56:
    #0 io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V NettyClientHandler.java:960
    ...
```
sergiitk pushed a commit that referenced this pull request Jan 28, 2025
d65d394 increased the test speed of
connect_then_mainServerDown_fallbackServerUp by using FakeClock.
However, it introduced a data race because FakeClock is not thread-safe.
This change injects a single thread for gRPC callbacks such that
syncContext is run on a thread under the test's control.

A simpler approach would be to expose syncContext from XdsClientImpl for
testing. However, this test is in a different package and I wanted to
avoid adding a public method.

```
  Read of size 8 at 0x00008dec9d50 by thread T25:
    #0 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Lio/grpc/internal/FakeClock$ScheduledTask;JLjava/util/concurrent/TimeUnit;)V FakeClock.java:140
    #1 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; FakeClock.java:150
    #2 io.grpc.SynchronizationContext.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/ScheduledExecutorService;)Lio/grpc/SynchronizationContext$ScheduledHandle; SynchronizationContext.java:153
    #3 io.grpc.xds.client.ControlPlaneClient$AdsStream.handleRpcStreamClosed(Lio/grpc/Status;)V ControlPlaneClient.java:491
    #4 io.grpc.xds.client.ControlPlaneClient$AdsStream.lambda$onStatusReceived$0(Lio/grpc/Status;)V ControlPlaneClient.java:429
    #5 io.grpc.xds.client.ControlPlaneClient$AdsStream$$Lambda+0x00000001004a95d0.run()V ??
    #6 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:96
    #7 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:128
    #8 io.grpc.xds.client.ControlPlaneClient$AdsStream.onStatusReceived(Lio/grpc/Status;)V ControlPlaneClient.java:428
    #9 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V GrpcXdsTransportFactory.java:149
    #10 io.grpc.PartialForwardingClientCallListener.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V PartialForwardingClientCallListener.java:39
    ...

  Previous write of size 8 at 0x00008dec9d50 by thread T4 (mutexes: write M0, write M1, write M2, write M3):
    #0 io.grpc.internal.FakeClock.forwardTime(JLjava/util/concurrent/TimeUnit;)I FakeClock.java:368
    #1 io.grpc.xds.XdsClientFallbackTest.connect_then_mainServerDown_fallbackServerUp()V XdsClientFallbackTest.java:358
    ...
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant