Skip to content

Batching tasks for the AsyncConsumerWorkService to increase throughput. #806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
/// </summary>
public bool DispatchConsumersAsync { get; set; } = false;

/// <summary>
/// Set to true will run message handlers for asynchronous consumers in parallel. NOTE: This removes the guarantee that consumers handle messages in the order they receive them.
/// </summary>
public bool DispatcAsyncConsumersInParallel { get; set; } = false;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,10 @@ public interface IAsyncConnectionFactory : IConnectionFactory
/// </summary>
/// <value><see langword="true" /> if an asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/> is used; otherwise, <see langword="false" />.</value>
bool DispatchConsumersAsync { get; set; }

/// <summary>
/// Set to true will run message handlers for asynchronous consumers in parallel. NOTE: This removes the guarantee that consumers handle messages in the order they receive them.
/// </summary>
bool DispatcAsyncConsumersInParallel { get; set; }
}
}
}
52 changes: 46 additions & 6 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -7,6 +8,12 @@ namespace RabbitMQ.Client.Impl
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
{
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
private readonly bool _runInParallel;

internal AsyncConsumerWorkService(bool runInParallel)
{
_runInParallel = runInParallel;
}

public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
{
Expand All @@ -15,7 +22,7 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work

private WorkPool StartNewWorkPool(IModel model)
{
var newWorkPool = new WorkPool(model as ModelBase);
var newWorkPool = new WorkPool(model as ModelBase, _runInParallel);
newWorkPool.Start();
return newWorkPool;
}
Expand All @@ -33,18 +40,30 @@ public Task Stop(IModel model)
class WorkPool
{
readonly ConcurrentQueue<Work> _workQueue;
readonly ConcurrentBag<Work> _workBag;
readonly CancellationTokenSource _tokenSource;
readonly ModelBase _model;
readonly CancellationTokenRegistration _tokenRegistration;
private readonly bool _runInParallel;
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private Task _worker;
private readonly List<Task> _workTasks = new List<Task>();
Copy link

@quixoticaxis quixoticaxis Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_workTasks can be made into a local that is created per loop iteration. It is not used anywhere else as far as I can see. That way it can be even optimized a little by creating it with the default "predictable" capacity: var tasksToAwaitOnThisIteration = new List<Task>(_workBag.Count).
Is the idea to let the list be as big as the biggest encountered batch?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that than you are making a tradeoff between the list growing and the allocation per iteration. The GC costs can quickly become higher. If the suggestion of limiting the concurrency that I made are taken into account the list can have a fixed size and never need to grow plus you don't create Gen0 garbage


public WorkPool(ModelBase model)
public WorkPool(ModelBase model, bool runInParallel)
{
_model = model;
_workQueue = new ConcurrentQueue<Work>();
if (runInParallel)
{
_workBag = new ConcurrentBag<Work>();
}
else
{
_workQueue = new ConcurrentQueue<Work>();
}

_tokenSource = new CancellationTokenSource();
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
_runInParallel = runInParallel;
}

public void Start()
Expand All @@ -54,7 +73,15 @@ public void Start()

public void Enqueue(Work work)
{
_workQueue.Enqueue(work);
if (_runInParallel)
{
_workBag.Add(work);
}
else
{
_workQueue.Enqueue(work);
}

_syncSource.TrySetResult(true);
}

Expand All @@ -72,9 +99,22 @@ async Task Loop()
// Swallowing the task cancellation in case we are stopping work.
}

while (_workQueue.TryDequeue(out Work work))
if (_runInParallel)
{
while (_workBag.TryTake(out Work work))
{
_workTasks.Add(work.Execute(_model));
}

await Task.WhenAll(_workTasks).ConfigureAwait(false);
_workTasks.Clear();
}
else
{
await work.Execute(_model).ConfigureAwait(false);
while (_workQueue.TryDequeue(out Work work))
{
await work.Execute(_model).ConfigureAwait(false);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa

if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
{
ConsumerWorkService = new AsyncConsumerWorkService();
ConsumerWorkService = new AsyncConsumerWorkService(asyncConnectionFactory.DispatcAsyncConsumersInParallel);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions projects/Unit/APIApproval.Approve.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace RabbitMQ.Client
public System.Collections.Generic.IDictionary<string, object> ClientProperties { get; set; }
public string ClientProvidedName { get; set; }
public System.TimeSpan ContinuationTimeout { get; set; }
public bool DispatcAsyncConsumersInParallel { get; set; }
public bool DispatchConsumersAsync { get; set; }
public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; }
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
Expand Down Expand Up @@ -183,6 +184,7 @@ namespace RabbitMQ.Client
}
public interface IAsyncConnectionFactory : RabbitMQ.Client.IConnectionFactory
{
bool DispatcAsyncConsumersInParallel { get; set; }
bool DispatchConsumersAsync { get; set; }
}
public interface IAuthMechanism
Expand Down
62 changes: 62 additions & 0 deletions projects/Unit/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,36 @@ public void TestBasicRoundtrip()
}
}

[Test]
public void TestBasicRoundtripParallel()
{
var cf = new ConnectionFactory { DispatchConsumersAsync = true, DispatcAsyncConsumersInParallel = true };
using (IConnection c = cf.CreateConnection())
using (IModel m = c.CreateModel())
{
QueueDeclareOk q = m.QueueDeclare();
IBasicProperties bp = m.CreateBasicProperties();
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
m.BasicPublish("", q.QueueName, bp, body);
var consumer = new AsyncEventingBasicConsumer(m);
var are = new AutoResetEvent(false);
consumer.Received += async (o, a) =>
{
are.Set();
await Task.Yield();
};
string tag = m.BasicConsume(q.QueueName, true, consumer);
// ensure we get a delivery
bool waitRes = are.WaitOne(2000);
Assert.IsTrue(waitRes);
// unsubscribe and ensure no further deliveries
m.BasicCancel(tag);
m.BasicPublish("", q.QueueName, bp, body);
bool waitResFalse = are.WaitOne(2000);
Assert.IsFalse(waitResFalse);
}
}

[Test]
public void TestBasicRoundtripNoWait()
{
Expand Down Expand Up @@ -113,6 +143,38 @@ public void TestBasicRoundtripNoWait()
}
}

[Test]
public void TestBasicRoundtripNoWaitParallel()
{
var cf = new ConnectionFactory { DispatchConsumersAsync = true, DispatcAsyncConsumersInParallel = true };
using (IConnection c = cf.CreateConnection())
{
using (IModel m = c.CreateModel())
{
QueueDeclareOk q = m.QueueDeclare();
IBasicProperties bp = m.CreateBasicProperties();
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
m.BasicPublish("", q.QueueName, bp, body);
var consumer = new AsyncEventingBasicConsumer(m);
var are = new AutoResetEvent(false);
consumer.Received += async (o, a) =>
{
are.Set();
await Task.Yield();
};
string tag = m.BasicConsume(q.QueueName, true, consumer);
// ensure we get a delivery
bool waitRes = are.WaitOne(2000);
Assert.IsTrue(waitRes);
// unsubscribe and ensure no further deliveries
m.BasicCancelNoWait(tag);
m.BasicPublish("", q.QueueName, bp, body);
bool waitResFalse = are.WaitOne(2000);
Assert.IsFalse(waitResFalse);
}
}
}

[Test]
public void NonAsyncConsumerShouldThrowInvalidOperationException()
{
Expand Down