Skip to content

Commit 17188c4

Browse files
committed
Replace lock with SemaphoreSlim
1 parent 7966dcb commit 17188c4

File tree

4 files changed

+119
-73
lines changed

4 files changed

+119
-73
lines changed

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 86 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
5959
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
6060
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
6161

62-
// TODO replace with SemaphoreSlim
63-
private object _confirmLock;
62+
private SemaphoreSlim _confirmSemaphore;
6463
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();
6564

6665
private bool _onlyAcksReceived = true;
@@ -420,7 +419,7 @@ internal void FinishClose()
420419
m_connectionStartCell?.TrySetResult(null);
421420
}
422421

423-
private bool ConfirmsAreEnabled => _confirmLock != null;
422+
private bool ConfirmsAreEnabled => _confirmSemaphore != null;
424423

425424
private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
426425
{
@@ -484,7 +483,8 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
484483

485484
if (ConfirmsAreEnabled)
486485
{
487-
lock (_confirmLock)
486+
_confirmSemaphore.Wait();
487+
try
488488
{
489489
if (_confirmsTaskCompletionSources?.Count > 0)
490490
{
@@ -497,6 +497,10 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
497497
_confirmsTaskCompletionSources.Clear();
498498
}
499499
}
500+
finally
501+
{
502+
_confirmSemaphore.Release();
503+
}
500504
}
501505

502506
_flowControlBlock.Set();
@@ -542,6 +546,7 @@ protected virtual void Dispose(bool disposing)
542546

543547
ConsumerDispatcher.Dispose();
544548
_rpcSemaphore.Dispose();
549+
_confirmSemaphore?.Dispose();
545550
}
546551
}
547552

@@ -596,7 +601,8 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
596601
if (ConfirmsAreEnabled)
597602
{
598603
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
599-
lock (_confirmLock)
604+
_confirmSemaphore.Wait();
605+
try
600606
{
601607
// No need to do anything if there are no delivery tags in the list
602608
if (_pendingDeliveryTags.Count > 0)
@@ -633,6 +639,10 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
633639
_onlyAcksReceived = true;
634640
}
635641
}
642+
finally
643+
{
644+
_confirmSemaphore.Release();
645+
}
636646
}
637647
}
638648

@@ -1054,10 +1064,16 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
10541064
{
10551065
if (ConfirmsAreEnabled)
10561066
{
1057-
lock (_confirmLock)
1067+
await _confirmSemaphore.WaitAsync(cancellationToken)
1068+
.ConfigureAwait(false);
1069+
try
10581070
{
10591071
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
10601072
}
1073+
finally
1074+
{
1075+
_confirmSemaphore.Release();
1076+
}
10611077
}
10621078

10631079
try
@@ -1084,11 +1100,17 @@ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
10841100
{
10851101
if (ConfirmsAreEnabled)
10861102
{
1087-
lock (_confirmLock)
1103+
await _confirmSemaphore.WaitAsync(cancellationToken)
1104+
.ConfigureAwait(false);
1105+
try
10881106
{
10891107
NextPublishSeqNo--;
10901108
_pendingDeliveryTags.RemoveLast();
10911109
}
1110+
finally
1111+
{
1112+
_confirmSemaphore.Release();
1113+
}
10921114
}
10931115

10941116
throw;
@@ -1102,10 +1124,16 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
11021124
{
11031125
if (ConfirmsAreEnabled)
11041126
{
1105-
lock (_confirmLock)
1127+
await _confirmSemaphore.WaitAsync(cancellationToken)
1128+
.ConfigureAwait(false);
1129+
try
11061130
{
11071131
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
11081132
}
1133+
finally
1134+
{
1135+
_confirmSemaphore.Release();
1136+
}
11091137
}
11101138

11111139
try
@@ -1133,11 +1161,17 @@ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
11331161
{
11341162
if (ConfirmsAreEnabled)
11351163
{
1136-
lock (_confirmLock)
1164+
await _confirmSemaphore.WaitAsync(cancellationToken)
1165+
.ConfigureAwait(false);
1166+
try
11371167
{
11381168
NextPublishSeqNo--;
11391169
_pendingDeliveryTags.RemoveLast();
11401170
}
1171+
finally
1172+
{
1173+
_confirmSemaphore.Release();
1174+
}
11411175
}
11421176

11431177
throw;
@@ -1242,7 +1276,7 @@ await ModelSendAsync(method, k.CancellationToken)
12421276

12431277
// Note:
12441278
// Non-null means confirms are enabled
1245-
_confirmLock = new object();
1279+
_confirmSemaphore = new SemaphoreSlim(1, 1);
12461280

12471281
return;
12481282
}
@@ -1742,63 +1776,50 @@ await ModelSendAsync(method, k.CancellationToken)
17421776

17431777
private List<TaskCompletionSource<bool>> _confirmsTaskCompletionSources;
17441778

1745-
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
1779+
public async Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default)
17461780
{
17471781
if (false == ConfirmsAreEnabled)
17481782
{
17491783
throw new InvalidOperationException("Confirms not selected");
17501784
}
17511785

17521786
TaskCompletionSource<bool> tcs;
1753-
lock (_confirmLock)
1787+
await _confirmSemaphore.WaitAsync(cancellationToken)
1788+
.ConfigureAwait(false);
1789+
try
17541790
{
17551791
if (_pendingDeliveryTags.Count == 0)
17561792
{
17571793
if (_onlyAcksReceived == false)
17581794
{
17591795
_onlyAcksReceived = true;
1760-
return Task.FromResult(false);
1796+
return false;
17611797
}
17621798

1763-
return Task.FromResult(true);
1799+
return true;
17641800
}
17651801

17661802
tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
17671803
_confirmsTaskCompletionSources.Add(tcs);
17681804
}
1769-
1770-
if (!token.CanBeCanceled)
1805+
finally
17711806
{
1772-
return tcs.Task;
1807+
_confirmSemaphore.Release();
17731808
}
17741809

1775-
return WaitForConfirmsWithTokenAsync(tcs, token);
1776-
}
1810+
bool rv;
17771811

1778-
private async Task<bool> WaitForConfirmsWithTokenAsync(TaskCompletionSource<bool> tcs, CancellationToken token)
1779-
{
1780-
CancellationTokenRegistration tokenRegistration =
1781-
#if NET6_0_OR_GREATER
1782-
token.UnsafeRegister(
1783-
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(), tcs);
1784-
#else
1785-
token.Register(
1786-
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(),
1787-
state: tcs, useSynchronizationContext: false);
1788-
#endif
1789-
try
1812+
if (false == cancellationToken.CanBeCanceled)
17901813
{
1791-
return await tcs.Task.ConfigureAwait(false);
1814+
rv = await tcs.Task.ConfigureAwait(false);
17921815
}
1793-
finally
1816+
else
17941817
{
1795-
#if NET6_0_OR_GREATER
1796-
await tokenRegistration.DisposeAsync()
1818+
rv = await WaitForConfirmsWithTokenAsync(tcs, cancellationToken)
17971819
.ConfigureAwait(false);
1798-
#else
1799-
tokenRegistration.Dispose();
1800-
#endif
18011820
}
1821+
1822+
return rv;
18021823
}
18031824

18041825
public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
@@ -1830,6 +1851,33 @@ await CloseAsync(ea, false, token)
18301851
}
18311852
}
18321853

1854+
private async Task<bool> WaitForConfirmsWithTokenAsync(TaskCompletionSource<bool> tcs,
1855+
CancellationToken cancellationToken)
1856+
{
1857+
CancellationTokenRegistration tokenRegistration =
1858+
#if NET6_0_OR_GREATER
1859+
cancellationToken.UnsafeRegister(
1860+
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(), tcs);
1861+
#else
1862+
cancellationToken.Register(
1863+
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(),
1864+
state: tcs, useSynchronizationContext: false);
1865+
#endif
1866+
try
1867+
{
1868+
return await tcs.Task.ConfigureAwait(false);
1869+
}
1870+
finally
1871+
{
1872+
#if NET6_0_OR_GREATER
1873+
await tokenRegistration.DisposeAsync()
1874+
.ConfigureAwait(false);
1875+
#else
1876+
tokenRegistration.Dispose();
1877+
#endif
1878+
}
1879+
}
1880+
18331881
private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(TProperties basicProperties,
18341882
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
18351883
{

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ private Task NotifyCredentialRefreshed(bool succesfully)
204204
if (succesfully)
205205
{
206206
return UpdateSecretAsync(_config.CredentialsProvider.Password, "Token refresh",
207-
CancellationToken.None); // TODO
207+
CancellationToken.None); // TODO cancellation token
208208
}
209209
else
210210
{

projects/Test/Common/IntegrationFixture.cs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ public virtual async Task InitializeAsync()
144144
_channel = await _conn.CreateChannelAsync();
145145
}
146146

147-
AddCallbackHandlers();
147+
if (IsVerbose)
148+
{
149+
AddCallbackHandlers();
150+
}
148151
}
149152

150153
if (_connFactory.AutomaticRecoveryEnabled)
@@ -182,43 +185,40 @@ public virtual async Task DisposeAsync()
182185

183186
protected virtual void AddCallbackHandlers()
184187
{
185-
if (IsVerbose)
188+
if (_conn != null)
186189
{
187-
if (_conn != null)
190+
_conn.CallbackException += (o, ea) =>
188191
{
189-
_conn.CallbackException += (o, ea) =>
190-
{
191-
_output.WriteLine("{0} connection callback exception: {1}",
192-
_testDisplayName, ea.Exception);
193-
};
192+
_output.WriteLine("{0} connection callback exception: {1}",
193+
_testDisplayName, ea.Exception);
194+
};
194195

195-
_conn.ConnectionShutdown += (o, ea) =>
196+
_conn.ConnectionShutdown += (o, ea) =>
197+
{
198+
HandleConnectionShutdown(_conn, ea, (args) =>
196199
{
197-
HandleConnectionShutdown(_conn, ea, (args) =>
198-
{
199-
_output.WriteLine("{0} connection shutdown, args: {1}",
200-
_testDisplayName, args);
201-
});
202-
};
203-
}
200+
_output.WriteLine("{0} connection shutdown, args: {1}",
201+
_testDisplayName, args);
202+
});
203+
};
204+
}
204205

205-
if (_channel != null)
206+
if (_channel != null)
207+
{
208+
_channel.CallbackException += (o, ea) =>
206209
{
207-
_channel.CallbackException += (o, ea) =>
208-
{
209-
_output.WriteLine("{0} channel callback exception: {1}",
210-
_testDisplayName, ea.Exception);
211-
};
210+
_output.WriteLine("{0} channel callback exception: {1}",
211+
_testDisplayName, ea.Exception);
212+
};
212213

213-
_channel.ChannelShutdown += (o, ea) =>
214+
_channel.ChannelShutdown += (o, ea) =>
215+
{
216+
HandleChannelShutdown(_channel, ea, (args) =>
214217
{
215-
HandleChannelShutdown(_channel, ea, (args) =>
216-
{
217-
_output.WriteLine("{0} channel shutdown, args: {1}",
218-
_testDisplayName, args);
219-
});
220-
};
221-
}
218+
_output.WriteLine("{0} channel shutdown, args: {1}",
219+
_testDisplayName, args);
220+
});
221+
};
222222
}
223223
}
224224

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ public async Task TestBasicAckAsync()
392392
var c = sender as AsyncEventingBasicConsumer;
393393
Assert.NotNull(c);
394394
await _channel.BasicAckAsync(args.DeliveryTag, false);
395-
messagesReceived++;
395+
Interlocked.Increment(ref messagesReceived);
396396
if (messagesReceived == messageCount)
397397
{
398398
publishSyncSource.SetResult(true);
@@ -413,14 +413,12 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
413413
{
414414
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
415415
await _channel.BasicPublishAsync(string.Empty, queueName, _body);
416+
await _channel.WaitForConfirmsOrDieAsync();
416417
}
417418
});
418419

419-
await _channel.WaitForConfirmsOrDieAsync();
420420
Assert.True(await publishSyncSource.Task);
421-
422421
Assert.Equal(messageCount, messagesReceived);
423-
424422
await _channel.CloseAsync(_closeArgs, false, CancellationToken.None);
425423
}
426424

0 commit comments

Comments
 (0)