Skip to content

Commit 51d1eeb

Browse files
Merge pull request #1775 from rabbitmq/rabbitmq-dotnet-client-1759
Create cancellation token from `timeout`
2 parents 21fb198 + fa9bc43 commit 51d1eeb

File tree

3 files changed

+51
-8
lines changed

3 files changed

+51
-8
lines changed

projects/RabbitMQ.Client/Impl/Connection.cs

+8-7
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b
320320
///</remarks>
321321
internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout, CancellationToken cancellationToken)
322322
{
323+
using var timeoutCts = new CancellationTokenSource(timeout);
324+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken);
325+
323326
if (false == SetCloseReason(reason))
324327
{
325328
// close reason is already set
@@ -330,11 +333,9 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
330333
}
331334
else
332335
{
333-
cancellationToken.ThrowIfCancellationRequested();
334-
335336
await OnShutdownAsync(reason)
336337
.ConfigureAwait(false);
337-
await _session0.SetSessionClosingAsync(false, cancellationToken)
338+
await _session0.SetSessionClosingAsync(false, cts.Token)
338339
.ConfigureAwait(false);
339340

340341
try
@@ -343,7 +344,7 @@ await _session0.SetSessionClosingAsync(false, cancellationToken)
343344
if (false == _closed)
344345
{
345346
var method = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
346-
await _session0.TransmitAsync(method, cancellationToken)
347+
await _session0.TransmitAsync(method, cts.Token)
347348
.ConfigureAwait(false);
348349
}
349350
}
@@ -392,14 +393,14 @@ await _session0.TransmitAsync(method, cancellationToken)
392393

393394
try
394395
{
395-
await _mainLoopTask.WaitAsync(timeout, cancellationToken)
396+
await _mainLoopTask.WaitAsync(timeout, cts.Token)
396397
.ConfigureAwait(false);
397398
}
398399
catch
399400
{
400401
try
401402
{
402-
await _frameHandler.CloseAsync(cancellationToken)
403+
await _frameHandler.CloseAsync(cts.Token)
403404
.ConfigureAwait(false);
404405
}
405406
catch
@@ -518,7 +519,6 @@ await this.AbortAsync()
518519
}
519520

520521
_session0.Dispose();
521-
_mainLoopCts.Dispose();
522522

523523
await _channel0.DisposeAsync()
524524
.ConfigureAwait(false);
@@ -529,6 +529,7 @@ await _channel0.DisposeAsync()
529529
}
530530
finally
531531
{
532+
_mainLoopCts.Dispose();
532533
_disposed = true;
533534
}
534535
}

projects/RabbitMQ.Client/Impl/MainSession.cs

+27-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ internal sealed class MainSession : Session, IDisposable
4747
private volatile bool _closeIsServerInitiated;
4848
private volatile bool _closing;
4949
private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1);
50+
private bool _disposed = false;
5051

5152
public MainSession(Connection connection, uint maxBodyLength)
5253
: base(connection, 0, maxBodyLength)
@@ -83,6 +84,13 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc
8384

8485
public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken)
8586
{
87+
if (_disposed)
88+
{
89+
_closing = true;
90+
_closeIsServerInitiated = closeIsServerInitiated;
91+
return;
92+
}
93+
8694
if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken)
8795
.ConfigureAwait(false))
8896
{
@@ -122,6 +130,24 @@ public override ValueTask TransmitAsync<T>(in T cmd, CancellationToken cancellat
122130
return base.TransmitAsync(in cmd, cancellationToken);
123131
}
124132

125-
public void Dispose() => ((IDisposable)_closingSemaphore).Dispose();
133+
public void Dispose()
134+
{
135+
if (_disposed)
136+
{
137+
return;
138+
}
139+
140+
try
141+
{
142+
_closingSemaphore.Dispose();
143+
}
144+
catch
145+
{
146+
}
147+
finally
148+
{
149+
_disposed = true;
150+
}
151+
}
126152
}
127153
}

projects/Test/Integration/GH/TestGitHubIssues.cs

+16
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,21 @@ public async Task TestHeartbeatTimeoutValue_GH1756()
131131

132132
Assert.True(_conn.Heartbeat != default);
133133
}
134+
135+
[Fact]
136+
public async Task DisposeWhileCatchingTimeoutDeadlocksRepro_GH1759()
137+
{
138+
_connFactory = new ConnectionFactory();
139+
_conn = await _connFactory.CreateConnectionAsync();
140+
try
141+
{
142+
await _conn.CloseAsync(TimeSpan.Zero);
143+
}
144+
catch (Exception)
145+
{
146+
}
147+
148+
await _conn.DisposeAsync();
149+
}
134150
}
135151
}

0 commit comments

Comments
 (0)