@@ -23,6 +23,10 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable
23
23
24
24
private readonly IClientProtocolConnectionFactory _connectionFactory ;
25
25
26
+ private readonly TimeSpan _connectTimeout ;
27
+
28
+ private Task ? _disposeTask ;
29
+
26
30
private readonly object _mutex = new ( ) ;
27
31
28
32
// New connections in the process of connecting. They can be returned only after ConnectAsync succeeds.
@@ -33,6 +37,8 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable
33
37
34
38
private readonly CancellationTokenSource _shutdownCts = new ( ) ;
35
39
40
+ private readonly TimeSpan _shutdownTimeout ;
41
+
36
42
/// <summary>Constructs a connection cache.</summary>
37
43
/// <param name="options">The connection cache options.</param>
38
44
/// <param name="duplexClientTransport">The duplex transport used to create ice protocol connections.</param>
@@ -52,6 +58,9 @@ public ConnectionCache(
52
58
multiplexedClientTransport ,
53
59
logger ) ;
54
60
61
+ _connectTimeout = options . ConnectionOptions . ConnectTimeout ;
62
+ _shutdownTimeout = options . ConnectionOptions . ShutdownTimeout ;
63
+
55
64
_preferExistingConnection = options . PreferExistingConnection ;
56
65
}
57
66
@@ -63,36 +72,35 @@ public ConnectionCache()
63
72
64
73
/// <summary>Releases all resources allocated by this connection cache.</summary>
65
74
/// <returns>A value task that completes when all connections managed by this cache are disposed.</returns>
66
- public async ValueTask DisposeAsync ( )
75
+ public ValueTask DisposeAsync ( )
67
76
{
68
77
lock ( _mutex )
69
78
{
70
- // We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token
71
- // does not change.
72
- try
79
+ if ( _disposeTask is null )
73
80
{
74
81
_shutdownCts . Cancel ( ) ;
82
+ if ( _backgroundConnectionDisposeCount == 0 )
83
+ {
84
+ // There is no outstanding background dispose.
85
+ _ = _backgroundConnectionDisposeTcs . TrySetResult ( ) ;
86
+ }
87
+ _disposeTask = PerformDisposeAsync ( ) ;
75
88
}
76
- catch ( ObjectDisposedException )
77
- {
78
- // already disposed by a previous or concurrent call.
79
- }
80
-
81
- if ( _backgroundConnectionDisposeCount == 0 )
82
- {
83
- // There is no outstanding background dispose.
84
- _ = _backgroundConnectionDisposeTcs . TrySetResult ( ) ;
85
- }
89
+ return new ( _disposeTask ) ;
86
90
}
87
91
88
- // Dispose all connections managed by this cache.
89
- IEnumerable < IProtocolConnection > allConnections = _pendingConnections . Values . Select ( value => value . Connection )
90
- . Concat ( _activeConnections . Values ) ;
92
+ async Task PerformDisposeAsync ( )
93
+ {
94
+ await Task . Yield ( ) ; // exit mutex lock
95
+
96
+ IEnumerable < IProtocolConnection > allConnections =
97
+ _pendingConnections . Values . Select ( value => value . Connection ) . Concat ( _activeConnections . Values ) ;
91
98
92
- await Task . WhenAll ( allConnections . Select ( connection => connection . DisposeAsync ( ) . AsTask ( ) )
93
- . Append ( _backgroundConnectionDisposeTcs . Task ) ) . ConfigureAwait ( false ) ;
99
+ await Task . WhenAll ( allConnections . Select ( connection => connection . DisposeAsync ( ) . AsTask ( ) )
100
+ . Append ( _backgroundConnectionDisposeTcs . Task ) ) . ConfigureAwait ( false ) ;
94
101
95
- _shutdownCts . Dispose ( ) ;
102
+ _shutdownCts . Dispose ( ) ;
103
+ }
96
104
}
97
105
98
106
/// <summary>Sends an outgoing request and returns the corresponding incoming response. If the request
@@ -186,7 +194,9 @@ async Task<IncomingResponse> PerformInvokeAsync()
186
194
{
187
195
try
188
196
{
189
- connection = await ConnectAsync ( mainServerAddress , cancellationToken ) . ConfigureAwait ( false ) ;
197
+ // TODO: this code generates a UTE
198
+ connection = await ConnectAsync ( mainServerAddress ) . WaitAsync ( cancellationToken )
199
+ . ConfigureAwait ( false ) ;
190
200
}
191
201
catch ( Exception ) when ( serverAddressFeature . AltServerAddresses . Count > 0 )
192
202
{
@@ -203,7 +213,7 @@ async Task<IncomingResponse> PerformInvokeAsync()
203
213
204
214
try
205
215
{
206
- connection = await ConnectAsync ( mainServerAddress , cancellationToken )
216
+ connection = await ConnectAsync ( mainServerAddress ) . WaitAsync ( cancellationToken )
207
217
. ConfigureAwait ( false ) ;
208
218
break ; // for
209
219
}
@@ -238,52 +248,69 @@ public Task ShutdownAsync(CancellationToken cancellationToken = default)
238
248
{
239
249
lock ( _mutex )
240
250
{
241
- // We always cancel _shutdownCts with _mutex lock. This way, when _mutex is locked, _shutdownCts.Token
242
- // does not change.
243
- try
251
+ if ( _disposeTask is not null )
244
252
{
245
- _shutdownCts . Cancel ( ) ;
253
+ throw new ObjectDisposedException ( $ " { typeof ( ConnectionCache ) } " ) ;
246
254
}
247
- catch ( ObjectDisposedException )
255
+ if ( _shutdownCts . IsCancellationRequested )
248
256
{
249
- throw new ObjectDisposedException ( $ "{ typeof ( ConnectionCache ) } ") ;
257
+ throw new InvalidOperationException ( $ "The connection cache is already shut down or shutting down. ") ;
250
258
}
259
+
260
+ // We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token
261
+ // does not change.
262
+ _shutdownCts . Cancel ( ) ;
251
263
}
252
264
253
- // Shut down all connections managed by this cache.
254
- IEnumerable < IProtocolConnection > allConnections = _pendingConnections . Values . Select ( value => value . Connection )
255
- . Concat ( _activeConnections . Values ) ;
265
+ return PerformShutdownAsync ( ) ;
266
+
267
+ async Task PerformShutdownAsync ( )
268
+ {
269
+ IEnumerable < IProtocolConnection > allConnections =
270
+ _pendingConnections . Values . Select ( value => value . Connection ) . Concat ( _activeConnections . Values ) ;
271
+
272
+ using var cts = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
273
+ cts . CancelAfter ( _shutdownTimeout ) ;
256
274
257
- return Task . WhenAll ( allConnections . Select ( connection => connection . ShutdownAsync ( cancellationToken ) ) ) ;
275
+ try
276
+ {
277
+ // Note: this throws the first exception, not all of them.
278
+ await Task . WhenAll ( allConnections . Select ( connection => connection . ShutdownAsync ( cts . Token ) ) )
279
+ . ConfigureAwait ( false ) ;
280
+ }
281
+ catch ( OperationCanceledException )
282
+ {
283
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
284
+ throw new TimeoutException (
285
+ $ "The connection cache shutdown timed out after { _shutdownTimeout . TotalSeconds } s.") ;
286
+ }
287
+ }
258
288
}
259
289
260
290
/// <summary>Creates a connection and attempts to connect this connection unless there is an active or pending
261
291
/// connection for the desired server address.</summary>
262
292
/// <param name="serverAddress">The server address.</param>
263
- /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
264
293
/// <returns>A connected connection.</returns>
265
- private async ValueTask < IProtocolConnection > ConnectAsync (
266
- ServerAddress serverAddress ,
267
- CancellationToken cancellationToken )
294
+ private async Task < IProtocolConnection > ConnectAsync ( ServerAddress serverAddress )
268
295
{
269
296
( IProtocolConnection Connection , Task Task ) pendingConnectionValue ;
270
297
271
298
CancellationToken shutdownCancellationToken ;
272
299
273
300
lock ( _mutex )
274
301
{
275
- try
276
- {
277
- shutdownCancellationToken = _shutdownCts . Token ;
278
- }
279
- catch ( ObjectDisposedException )
302
+ if ( _disposeTask is not null )
280
303
{
281
304
throw new ObjectDisposedException ( $ "{ typeof ( ConnectionCache ) } ") ;
282
305
}
283
306
307
+ // We make a copy of _shutdownCts.Token with the mutex locked. This copy remains usable after we release
308
+ // the lock unlike _shutdownCts.Token that throws ObjectDisposedException once _shutdownCts is disposed.
309
+ shutdownCancellationToken = _shutdownCts . Token ;
310
+
284
311
if ( shutdownCancellationToken . IsCancellationRequested )
285
312
{
286
- throw new InvalidOperationException ( "connection cache is shut down or shutting down" ) ;
313
+ throw new InvalidOperationException ( "Connection cache is shut down or shutting down. " ) ;
287
314
}
288
315
289
316
if ( _activeConnections . TryGetValue ( serverAddress , out IProtocolConnection ? connection ) )
@@ -309,12 +336,21 @@ async Task PerformConnectAsync(IProtocolConnection connection)
309
336
{
310
337
await Task . Yield ( ) ; // exit mutex lock
311
338
312
- using var cts = CancellationTokenSource . CreateLinkedTokenSource (
313
- cancellationToken ,
314
- shutdownCancellationToken ) ;
339
+ using var cts = new CancellationTokenSource ( _connectTimeout ) ;
340
+ using CancellationTokenRegistration tokenRegistration =
341
+ shutdownCancellationToken . UnsafeRegister ( cts => ( ( CancellationTokenSource ) cts ! ) . Cancel ( ) , cts ) ;
342
+
315
343
try
316
344
{
317
- _ = await connection . ConnectAsync ( cts . Token ) . ConfigureAwait ( false ) ;
345
+ try
346
+ {
347
+ _ = await connection . ConnectAsync ( cts . Token ) . ConfigureAwait ( false ) ;
348
+ }
349
+ catch ( OperationCanceledException ) when ( ! shutdownCancellationToken . IsCancellationRequested )
350
+ {
351
+ throw new TimeoutException (
352
+ $ "The connection establishment timed out after { _connectTimeout . TotalSeconds } s.") ;
353
+ }
318
354
}
319
355
catch
320
356
{
@@ -337,8 +373,6 @@ async Task PerformConnectAsync(IProtocolConnection connection)
337
373
}
338
374
339
375
await connection . DisposeAsync ( ) . ConfigureAwait ( false ) ;
340
-
341
- cancellationToken . ThrowIfCancellationRequested ( ) ; // throws OCE
342
376
throw ;
343
377
}
344
378
@@ -365,24 +399,22 @@ async Task PerformConnectAsync(IProtocolConnection connection)
365
399
366
400
async Task RemoveFromActiveAsync ( IProtocolConnection connection , CancellationToken shutdownCancellationToken )
367
401
{
402
+ bool shutdownRequested ;
403
+
368
404
try
369
405
{
370
- await connection . ShutdownComplete . WaitAsync ( shutdownCancellationToken ) . ConfigureAwait ( false ) ;
371
- }
372
- catch ( OperationCanceledException exception ) when ( exception . CancellationToken == shutdownCancellationToken )
373
- {
374
- // The connection cache is being shut down or disposed and cache's DisposeAsync is responsible to
375
- // DisposeAsync this connection.
376
- return ;
406
+ shutdownRequested = await Task . WhenAny ( connection . ShutdownRequested , connection . Closed )
407
+ . WaitAsync ( shutdownCancellationToken ) . ConfigureAwait ( false ) == connection . ShutdownRequested ;
377
408
}
378
- catch
409
+ catch ( OperationCanceledException )
379
410
{
380
- // ignore and continue: the connection was aborted
411
+ // The connection cache is being shut down or disposed. We handle it below after locking the mutex.
412
+ shutdownRequested = false ;
381
413
}
414
+ // no other exception can be thrown
382
415
383
416
lock ( _mutex )
384
417
{
385
- // shutdownCancellationToken.IsCancellationRequested remains the same when _mutex is locked.
386
418
if ( shutdownCancellationToken . IsCancellationRequested )
387
419
{
388
420
// ConnectionCache.DisposeAsync is responsible to dispose this connection.
@@ -396,6 +428,27 @@ async Task RemoveFromActiveAsync(IProtocolConnection connection, CancellationTok
396
428
}
397
429
}
398
430
431
+ if ( shutdownRequested )
432
+ {
433
+ using var cts = new CancellationTokenSource ( _shutdownTimeout ) ;
434
+
435
+ try
436
+ {
437
+ await connection . ShutdownAsync ( cts . Token ) . ConfigureAwait ( false ) ;
438
+ }
439
+ catch ( OperationCanceledException )
440
+ {
441
+ }
442
+ catch ( IceRpcException )
443
+ {
444
+ }
445
+ catch ( Exception exception )
446
+ {
447
+ Debug . Fail ( $ "Unexpected connection shutdown exception: { exception } ") ;
448
+ throw ;
449
+ }
450
+ }
451
+
399
452
await connection . DisposeAsync ( ) . ConfigureAwait ( false ) ;
400
453
401
454
lock ( _mutex )
0 commit comments