Skip to content

Commit 486d6bb

Browse files
committed
Merge pull request #882 from danielmarbach/consumer-channel-sync
Switch WorkPool of ConsumerWorkService to channels (cherry picked from commit c9eb9f1)
1 parent 35de9e9 commit 486d6bb

File tree

2 files changed

+32
-44
lines changed

2 files changed

+32
-44
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim lim
137137
}
138138
catch (Exception)
139139
{
140-
140+
// ignored
141141
}
142142
finally
143143
{

projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs

+31-43
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Threading;
4+
using System.Threading.Channels;
45
using System.Threading.Tasks;
56

67
namespace RabbitMQ.Client.Impl
@@ -61,20 +62,16 @@ internal Task StopWorkAsync(IModel model)
6162

6263
class WorkPool
6364
{
64-
readonly ConcurrentQueue<Action> _actions;
65-
readonly CancellationTokenSource _tokenSource;
66-
readonly CancellationTokenRegistration _tokenRegistration;
67-
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
65+
private readonly Channel<Action> _channel;
6866
private readonly int _concurrency;
6967
private Task _worker;
68+
CancellationTokenSource _tokenSource;
7069
private SemaphoreSlim _limiter;
7170

7271
public WorkPool(int concurrency)
7372
{
7473
_concurrency = concurrency;
75-
_actions = new ConcurrentQueue<Action>();
76-
_tokenSource = new CancellationTokenSource();
77-
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
74+
_channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
7875
}
7976

8077
public void Start()
@@ -86,37 +83,27 @@ public void Start()
8683
else
8784
{
8885
_limiter = new SemaphoreSlim(_concurrency);
86+
_tokenSource = new CancellationTokenSource();
8987
_worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None);
9088
}
9189
}
9290

9391
public void Enqueue(Action action)
9492
{
95-
_actions.Enqueue(action);
96-
_syncSource.TrySetResult(true);
93+
_channel.Writer.TryWrite(action);
9794
}
9895

9996
async Task Loop()
10097
{
101-
while (_tokenSource.IsCancellationRequested == false)
98+
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
10299
{
103-
try
104-
{
105-
await _syncSource.Task.ConfigureAwait(false);
106-
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
107-
}
108-
catch (TaskCanceledException)
109-
{
110-
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
111-
}
112-
113-
while (_actions.TryDequeue(out Action action))
100+
while (_channel.Reader.TryRead(out Action work))
114101
{
115102
try
116103
{
117-
action();
104+
work();
118105
}
119-
catch (Exception)
106+
catch(Exception)
120107
{
121108
// ignored
122109
}
@@ -126,36 +113,37 @@ async Task Loop()
126113

127114
async Task LoopWithConcurrency(CancellationToken cancellationToken)
128115
{
129-
while (_tokenSource.IsCancellationRequested == false)
116+
try
130117
{
131-
try
132-
{
133-
await _syncSource.Task.ConfigureAwait(false);
134-
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
135-
}
136-
catch (TaskCanceledException)
118+
while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
137119
{
138-
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
139-
}
140-
141-
while (_actions.TryDequeue(out Action action))
142-
{
143-
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
144-
if(!_limiter.Wait(0))
120+
while (_channel.Reader.TryRead(out Action action))
145121
{
146-
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
147-
}
122+
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
123+
if(!_limiter.Wait(0))
124+
{
125+
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
126+
}
148127

149-
_ = OffloadToWorkerThreadPool(action, _limiter);
128+
_ = OffloadToWorkerThreadPool(action, _limiter);
129+
}
150130
}
151131
}
132+
catch (OperationCanceledException)
133+
{
134+
// ignored
135+
}
152136
}
153137

154138
static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter)
155139
{
156140
try
157141
{
158-
await Task.Run(() => action());
142+
// like Task.Run but doesn't closure allocate
143+
await Task.Factory.StartNew(state =>
144+
{
145+
((Action)state)();
146+
}, action, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
159147
}
160148
catch (Exception)
161149
{
@@ -169,8 +157,8 @@ static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter
169157

170158
public Task Stop()
171159
{
172-
_tokenSource.Cancel();
173-
_tokenRegistration.Dispose();
160+
_channel.Writer.Complete();
161+
_tokenSource?.Cancel();
174162
_limiter?.Dispose();
175163
return _worker;
176164
}

0 commit comments

Comments
 (0)