-
Notifications
You must be signed in to change notification settings - Fork 13
Replace ShutdownComplete by Closed and ShutdownRequested #2396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
d8c701f
50df1e7
ea4f976
7f6c289
ced41d2
3c75f19
f6aef04
7a7e7de
adb7bd7
e992028
5b3cdd8
09ffa7a
62e01b4
5e39e9c
ca0bc39
9327b68
a86b93b
9cbc9df
6f29dd9
9e9fffe
7ca378b
541c072
c42c36e
7f36cf3
b760eb3
6c7a278
d9eb2bb
48f2f4a
326090d
e90d3fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,10 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable | |
|
||
private readonly IClientProtocolConnectionFactory _connectionFactory; | ||
|
||
private readonly TimeSpan _connectTimeout; | ||
|
||
private Task? _disposeTask; | ||
|
||
private readonly object _mutex = new(); | ||
|
||
// New connections in the process of connecting. They can be returned only after ConnectAsync succeeds. | ||
|
@@ -33,6 +37,8 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable | |
|
||
private readonly CancellationTokenSource _shutdownCts = new(); | ||
|
||
private readonly TimeSpan _shutdownTimeout; | ||
|
||
/// <summary>Constructs a connection cache.</summary> | ||
/// <param name="options">The connection cache options.</param> | ||
/// <param name="duplexClientTransport">The duplex transport used to create ice protocol connections.</param> | ||
|
@@ -52,6 +58,9 @@ public ConnectionCache( | |
multiplexedClientTransport, | ||
logger); | ||
|
||
_connectTimeout = options.ConnectionOptions.ConnectTimeout; | ||
_shutdownTimeout = options.ConnectionOptions.ShutdownTimeout; | ||
|
||
_preferExistingConnection = options.PreferExistingConnection; | ||
} | ||
|
||
|
@@ -63,36 +72,35 @@ public ConnectionCache() | |
|
||
/// <summary>Releases all resources allocated by this connection cache.</summary> | ||
/// <returns>A value task that completes when all connections managed by this cache are disposed.</returns> | ||
public async ValueTask DisposeAsync() | ||
public ValueTask DisposeAsync() | ||
{ | ||
lock (_mutex) | ||
{ | ||
// We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token | ||
// does not change. | ||
try | ||
if (_disposeTask is null) | ||
{ | ||
_shutdownCts.Cancel(); | ||
if (_backgroundConnectionDisposeCount == 0) | ||
{ | ||
// There is no outstanding background dispose. | ||
_ = _backgroundConnectionDisposeTcs.TrySetResult(); | ||
} | ||
_disposeTask = PerformDisposeAsync(); | ||
} | ||
catch (ObjectDisposedException) | ||
{ | ||
// already disposed by a previous or concurrent call. | ||
} | ||
|
||
if (_backgroundConnectionDisposeCount == 0) | ||
{ | ||
// There is no outstanding background dispose. | ||
_ = _backgroundConnectionDisposeTcs.TrySetResult(); | ||
} | ||
return new(_disposeTask); | ||
} | ||
|
||
// Dispose all connections managed by this cache. | ||
IEnumerable<IProtocolConnection> allConnections = _pendingConnections.Values.Select(value => value.Connection) | ||
.Concat(_activeConnections.Values); | ||
async Task PerformDisposeAsync() | ||
{ | ||
await Task.Yield(); // exit mutex lock | ||
|
||
IEnumerable<IProtocolConnection> allConnections = | ||
_pendingConnections.Values.Select(value => value.Connection).Concat(_activeConnections.Values); | ||
|
||
await Task.WhenAll(allConnections.Select(connection => connection.DisposeAsync().AsTask()) | ||
.Append(_backgroundConnectionDisposeTcs.Task)).ConfigureAwait(false); | ||
await Task.WhenAll(allConnections.Select(connection => connection.DisposeAsync().AsTask()) | ||
.Append(_backgroundConnectionDisposeTcs.Task)).ConfigureAwait(false); | ||
|
||
_shutdownCts.Dispose(); | ||
_shutdownCts.Dispose(); | ||
} | ||
} | ||
|
||
/// <summary>Sends an outgoing request and returns the corresponding incoming response. If the request | ||
|
@@ -186,7 +194,9 @@ async Task<IncomingResponse> PerformInvokeAsync() | |
{ | ||
try | ||
{ | ||
connection = await ConnectAsync(mainServerAddress, cancellationToken).ConfigureAwait(false); | ||
// TODO: this code generates a UTE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't it be fixed with this PR? Or is there an issue we could reference here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll create an issue. |
||
connection = await ConnectAsync(mainServerAddress).WaitAsync(cancellationToken) | ||
.ConfigureAwait(false); | ||
} | ||
catch (Exception) when (serverAddressFeature.AltServerAddresses.Count > 0) | ||
{ | ||
|
@@ -203,7 +213,7 @@ async Task<IncomingResponse> PerformInvokeAsync() | |
|
||
try | ||
{ | ||
connection = await ConnectAsync(mainServerAddress, cancellationToken) | ||
connection = await ConnectAsync(mainServerAddress).WaitAsync(cancellationToken) | ||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.ConfigureAwait(false); | ||
break; // for | ||
} | ||
|
@@ -238,52 +248,67 @@ public Task ShutdownAsync(CancellationToken cancellationToken = default) | |
{ | ||
lock (_mutex) | ||
{ | ||
// We always cancel _shutdownCts with _mutex lock. This way, when _mutex is locked, _shutdownCts.Token | ||
// does not change. | ||
try | ||
if (_disposeTask is not null) | ||
{ | ||
_shutdownCts.Cancel(); | ||
throw new ObjectDisposedException($"{typeof(ConnectionCache)}"); | ||
} | ||
catch (ObjectDisposedException) | ||
if (_shutdownCts.IsCancellationRequested) | ||
{ | ||
throw new ObjectDisposedException($"{typeof(ConnectionCache)}"); | ||
throw new InvalidOperationException($"The connection cache is already shut down or shutting down."); | ||
} | ||
|
||
// We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token | ||
// does not change. | ||
_shutdownCts.Cancel(); | ||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Shut down all connections managed by this cache. | ||
IEnumerable<IProtocolConnection> allConnections = _pendingConnections.Values.Select(value => value.Connection) | ||
.Concat(_activeConnections.Values); | ||
return PerformShutdownAsync(); | ||
|
||
async Task PerformShutdownAsync() | ||
{ | ||
IEnumerable<IProtocolConnection> allConnections = | ||
_pendingConnections.Values.Select(value => value.Connection).Concat(_activeConnections.Values); | ||
|
||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | ||
cts.CancelAfter(_shutdownTimeout); | ||
|
||
return Task.WhenAll(allConnections.Select(connection => connection.ShutdownAsync(cancellationToken))); | ||
try | ||
{ | ||
// Note: this throws the first exception, not all of them. | ||
await Task.WhenAll(allConnections.Select(connection => connection.ShutdownAsync(cts.Token))) | ||
.ConfigureAwait(false); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
cancellationToken.ThrowIfCancellationRequested(); | ||
throw new TimeoutException( | ||
$"The connection cache shutdown timed out after {_shutdownTimeout.TotalSeconds} s."); | ||
} | ||
} | ||
} | ||
|
||
/// <summary>Creates a connection and attempts to connect this connection unless there is an active or pending | ||
/// connection for the desired server address.</summary> | ||
/// <param name="serverAddress">The server address.</param> | ||
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> | ||
/// <returns>A connected connection.</returns> | ||
private async ValueTask<IProtocolConnection> ConnectAsync( | ||
ServerAddress serverAddress, | ||
CancellationToken cancellationToken) | ||
private async Task<IProtocolConnection> ConnectAsync(ServerAddress serverAddress) | ||
{ | ||
(IProtocolConnection Connection, Task Task) pendingConnectionValue; | ||
|
||
CancellationToken shutdownCancellationToken; | ||
|
||
lock (_mutex) | ||
{ | ||
try | ||
{ | ||
shutdownCancellationToken = _shutdownCts.Token; | ||
} | ||
catch (ObjectDisposedException) | ||
if (_disposeTask is not null) | ||
{ | ||
throw new ObjectDisposedException($"{typeof(ConnectionCache)}"); | ||
} | ||
|
||
shutdownCancellationToken = _shutdownCts.Token; | ||
bernardnormier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if (shutdownCancellationToken.IsCancellationRequested) | ||
{ | ||
throw new InvalidOperationException("connection cache is shut down or shutting down"); | ||
throw new InvalidOperationException("Connection cache is shut down or shutting down."); | ||
} | ||
|
||
if (_activeConnections.TryGetValue(serverAddress, out IProtocolConnection? connection)) | ||
|
@@ -309,12 +334,21 @@ async Task PerformConnectAsync(IProtocolConnection connection) | |
{ | ||
await Task.Yield(); // exit mutex lock | ||
|
||
using var cts = CancellationTokenSource.CreateLinkedTokenSource( | ||
cancellationToken, | ||
shutdownCancellationToken); | ||
using var cts = new CancellationTokenSource(_connectTimeout); | ||
using CancellationTokenRegistration tokenRegistration = | ||
shutdownCancellationToken.UnsafeRegister(cts => ((CancellationTokenSource)cts!).Cancel(), cts); | ||
|
||
try | ||
{ | ||
_ = await connection.ConnectAsync(cts.Token).ConfigureAwait(false); | ||
try | ||
{ | ||
_ = await connection.ConnectAsync(cts.Token).ConfigureAwait(false); | ||
} | ||
catch (OperationCanceledException) when (!shutdownCancellationToken.IsCancellationRequested) | ||
{ | ||
throw new TimeoutException( | ||
$"The connection establishment timed out after {_connectTimeout.TotalSeconds} s."); | ||
} | ||
Comment on lines
+345
to
+353
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would get rid of the inner try/catch with something along these lines:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your suggestion is not correct. Once we lock the mutex, we have to check |
||
} | ||
catch | ||
{ | ||
|
@@ -337,8 +371,6 @@ async Task PerformConnectAsync(IProtocolConnection connection) | |
} | ||
|
||
await connection.DisposeAsync().ConfigureAwait(false); | ||
|
||
cancellationToken.ThrowIfCancellationRequested(); // throws OCE | ||
throw; | ||
} | ||
|
||
|
@@ -365,24 +397,22 @@ async Task PerformConnectAsync(IProtocolConnection connection) | |
|
||
async Task RemoveFromActiveAsync(IProtocolConnection connection, CancellationToken shutdownCancellationToken) | ||
{ | ||
bool shutdownRequested; | ||
|
||
try | ||
{ | ||
await connection.ShutdownComplete.WaitAsync(shutdownCancellationToken).ConfigureAwait(false); | ||
shutdownRequested = await Task.WhenAny(connection.ShutdownRequested, connection.Closed) | ||
.WaitAsync(shutdownCancellationToken).ConfigureAwait(false) == connection.ShutdownRequested; | ||
} | ||
catch (OperationCanceledException exception) when (exception.CancellationToken == shutdownCancellationToken) | ||
{ | ||
// The connection cache is being shut down or disposed and cache's DisposeAsync is responsible to | ||
// DisposeAsync this connection. | ||
return; | ||
} | ||
catch | ||
{ | ||
// ignore and continue: the connection was aborted | ||
} | ||
|
||
lock (_mutex) | ||
{ | ||
// shutdownCancellationToken.IsCancellationRequested remains the same when _mutex is locked. | ||
if (shutdownCancellationToken.IsCancellationRequested) | ||
{ | ||
// ConnectionCache.DisposeAsync is responsible to dispose this connection. | ||
|
@@ -396,6 +426,28 @@ async Task RemoveFromActiveAsync(IProtocolConnection connection, CancellationTok | |
} | ||
} | ||
|
||
if (shutdownRequested) | ||
{ | ||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(shutdownCancellationToken); | ||
cts.CancelAfter(_shutdownTimeout); | ||
|
||
try | ||
{ | ||
await connection.ShutdownAsync(cts.Token).ConfigureAwait(false); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
} | ||
catch (IceRpcException) | ||
{ | ||
} | ||
catch (Exception exception) | ||
{ | ||
Debug.Fail($"Unexpected connection shutdown exception: {exception}"); | ||
throw; | ||
} | ||
} | ||
|
||
await connection.DisposeAsync().ConfigureAwait(false); | ||
|
||
lock (_mutex) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,23 +10,23 @@ namespace IceRpc; | |
/// </summary> | ||
public interface IProtocolConnection : IInvoker, IAsyncDisposable | ||
{ | ||
/// <summary>Gets a task that completes when the connection is closed.</summary> | ||
/// <value>A task that completes when the connection is closed. If the connection was shut down gracefully, this | ||
/// task completes with a null exception; otherwise, it completes with the exception that aborted the connection. | ||
/// </value> | ||
/// <remarks>This task is never faulted or canceled.</remarks> | ||
Task<Exception?> Closed { get; } | ||
|
||
/// <summary>Gets the server address of this connection.</summary> | ||
/// <value>The server address of this connection. Its <see cref="ServerAddress.Transport" /> property is always | ||
/// non-null.</value> | ||
ServerAddress ServerAddress { get; } | ||
|
||
/// <summary>Gets a task that completes when the connection is shut down or fails. The connection shutdown is | ||
/// initiated by any of the following events: | ||
/// <list type="bullet"> | ||
/// <item><description>The application calls <see cref="ShutdownAsync" /> on the connection.</description></item> | ||
/// <item><description>The connection shuts down itself because it remained idle for longer than its configured idle | ||
/// timeout.</description></item> | ||
/// <item><description>The peer shuts down the connection.</description></item> | ||
/// </list> | ||
/// <summary>Gets a task that completes when the peer or the idle monitor requests the shutdown of this connection. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't it also complete when the connection is closed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it does not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it relied on the peer requesting shutdown? Is it guaranteed to be completed even if the connection is lost with the peer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it can never complete, which is completely fine. It's like registering a callback that is never called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not obvious that this can't create leaks since the task continuation will never run. If this continuation references objects, these might never be collected. It would be good to investigate this to verify that it's safe. |
||
/// </summary> | ||
/// <value>A task that completes when the connection is successfully shut down. It completes with an exception when | ||
/// the connection fails.</value> | ||
Task ShutdownComplete { get; } | ||
/// <remarks>This task is never faulted or canceled.</remarks> | ||
/// <seealso cref="ConnectionOptions.IdleTimeout" /> | ||
Task ShutdownRequested { get; } | ||
|
||
/// <summary>Establishes the connection to the peer.</summary> | ||
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> | ||
|
@@ -37,33 +37,27 @@ public interface IProtocolConnection : IInvoker, IAsyncDisposable | |
/// </item> | ||
/// <item><description><see cref="OperationCanceledException" />if cancellation was requested through the | ||
/// cancellation token.</description></item> | ||
/// <item><description><see cref="TimeoutException" />if the connection establishment attempt exceeded <see | ||
/// cref="ConnectionOptions.ConnectTimeout" />.</description></item> | ||
/// </list> | ||
/// </returns> | ||
/// <exception cref="IceRpcException">Thrown if the connection is closed but not disposed yet.</exception> | ||
/// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception> | ||
/// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception> | ||
Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default); | ||
|
||
/// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to | ||
/// complete. For a speedier graceful shutdown, call <see cref="IAsyncDisposable.DisposeAsync" /> instead. It will | ||
/// cancel pending invocations and dispatches.</summary> | ||
/// <summary>Gracefully shuts down the connection.</summary> | ||
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> | ||
/// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the | ||
/// following exceptions: | ||
/// <list type="bullet"> | ||
/// <item><description><see cref="IceRpcException" />if the connection shutdown failed.</description></item> | ||
/// <item><description><see cref="OperationCanceledException" />if cancellation was requested through the | ||
/// cancellation token.</description></item> | ||
/// <item><description><see cref="TimeoutException" />if this shutdown attempt or a previous attempt exceeded <see | ||
/// cref="ConnectionOptions.ShutdownTimeout" />.</description></item> | ||
/// </list> | ||
/// </returns> | ||
/// <exception cref="IceRpcException">Thrown if the connection is closed but not disposed yet.</exception> | ||
/// <exception cref="InvalidOperationException">Thrown if the connection was not connected successfully prior to | ||
/// call.</exception> | ||
/// this call, or if this method is called more than once.</exception> | ||
/// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception> | ||
/// <remarks>If shutdown is canceled, the protocol connection transitions to a faulted state and the disposal of the | ||
/// connection will abort the connection instead of performing a graceful speedy-shutdown.</remarks> | ||
/// <remarks>If cancellation token is canceled, the protocol connection is aborted.</remarks> | ||
Task ShutdownAsync(CancellationToken cancellationToken = default); | ||
} |
Uh oh!
There was an error while loading. Please reload this page.