Skip to content

Pipeline improvements #734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
27 changes: 27 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
docs\specs\amqp0-9-1.xml = docs\specs\amqp0-9-1.xml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQTest", "RabbitMQTest\RabbitMQTest.csproj", "{3CFAC019-8281-48AD-8925-16CE8EC3CE50}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -104,6 +107,30 @@ Global
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x64.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x64.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x64.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x86.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x86.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|Any CPU.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x64.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x64.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x86.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x86.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x64.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
98 changes: 98 additions & 0 deletions RabbitMQTest/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace DeadlockRabbitMQ
{
class Program
{
private static int messagesSent = 0;
private static int messagesReceived = 0;
private static int batchesToSend = 100;
private static int itemsPerBatch = 500;
static async Task Main(string[] args)
{
Console.ReadLine();
var connectionString = new Uri("amqp://guest:guest@localhost/");

var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
var connection = connectionFactory.CreateConnection();
var connection2 = connectionFactory.CreateConnection();
var publisher = connection.CreateModel();
var subscriber = connection2.CreateModel();
publisher.ConfirmSelect();
//subscriber.ConfirmSelect();

publisher.ExchangeDeclare("test", ExchangeType.Topic, true);

subscriber.QueueDeclare("testqueue", true, false, true);
var asyncListener = new AsyncEventingBasicConsumer(subscriber);
asyncListener.Received += AsyncListener_Received;
subscriber.QueueBind("testqueue", "test", "myawesome.routing.key");
subscriber.BasicConsume("testqueue", false, "testconsumer", asyncListener);

byte[] payload = new byte[16384];
var batchPublish = Task.Run(() =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
var batch = publisher.CreateBasicPublishBatch();
for (int i = 0; i < itemsPerBatch; i++)
{
var properties = publisher.CreateBasicProperties();
properties.AppId = "testapp";
properties.CorrelationId = Guid.NewGuid().ToString();
batch.Add("test", "myawesome.routing.key", false, properties, payload);
}
batch.Publish();
messagesSent += itemsPerBatch;
publisher.WaitForConfirmsOrDie();
}
});

var sentTask = Task.Run(async () =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages sent: {messagesSent}");

await Task.Delay(500);
}

Console.WriteLine("Done sending messages!");
});

var receivedTask = Task.Run(async () =>
{
while (messagesReceived < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages received: {messagesReceived}");

await Task.Delay(500);
}

Console.WriteLine("Done receiving all messages.");
});

await Task.WhenAll(sentTask, receivedTask);
Console.ReadLine();
}

private static ValueTask AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
{
// Doing things in parallel here is what will eventually trigger the deadlock,
// probably due to a race condition in AsyncConsumerWorkService.Loop, although
// I've had trouble pinpointing it exactly, but due to how the code in there uses
// a TaskCompletionSource, and elsewhere overrides it, it might cause Enqueue and Loop
// to eventually be working with different references, or that's at least the current theory.
// Moving to better synchronization constructs solves the issue, and using the ThreadPool
// is standard practice as well to maximize core utilization and reduce overhead of Thread creation
Interlocked.Increment(ref messagesReceived);
(sender as AsyncDefaultBasicConsumer).Model.BasicAck(@event.DeliveryTag, true);
return default;
}
}
}
13 changes: 13 additions & 0 deletions RabbitMQTest/RabbitMQTest.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DebugType>full</DebugType>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\projects\client\RabbitMQ.Client\RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
12 changes: 7 additions & 5 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ else
readonly script_dir="$(cd "$(dirname "$0")" && pwd)"
fi

dotnet restore "$script_dir/RabbitMQDotNetClient.sln"
dotnet run -p "$script_dir/projects/client/Apigen/Apigen.csproj" --apiName:AMQP_0_9_1 \
"$script_dir/docs/specs/amqp0-9-1.stripped.xml" \
"$script_dir/gensrc/autogenerated-api-0-9-1.cs"
dotnet build "$script_dir/RabbitMQDotNetClient.sln"
cd "$script_dir"

dotnet restore ./RabbitMQDotNetClient.sln
dotnet run -p ./projects/client/Apigen/Apigen.csproj --apiName:AMQP_0_9_1 \
./docs/specs/amqp0-9-1.stripped.xml \
./gensrc/autogenerated-api-0-9-1.cs
dotnet build ./RabbitMQDotNetClient.sln
15 changes: 8 additions & 7 deletions projects/client/Apigen/src/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ public void EmitClassProperties(AmqpClass c)
EmitLine(" public override ushort ProtocolClassId { get { return " + c.Index + "; } }");
EmitLine(" public override string ProtocolClassName { get { return \"" + c.Name + "\"; } }");
EmitLine("");
EmitLine(" public override void ReadPropertiesFrom(RabbitMQ.Client.Impl.ContentHeaderPropertyReader reader) {");
EmitLine(" public override void ReadPropertiesFrom(ref RabbitMQ.Client.Impl.ContentHeaderPropertyReader reader) {");
foreach (AmqpField f in c.m_Fields)
{
if (IsBoolean(f))
Expand All @@ -635,7 +635,7 @@ public void EmitClassProperties(AmqpClass c)
}
EmitLine(" }");
EmitLine("");
EmitLine(" public override void WritePropertiesTo(RabbitMQ.Client.Impl.ContentHeaderPropertyWriter writer) {");
EmitLine(" public override void WritePropertiesTo(ref RabbitMQ.Client.Impl.ContentHeaderPropertyWriter writer) {");
foreach (AmqpField f in c.m_Fields)
{
if (IsBoolean(f))
Expand Down Expand Up @@ -770,14 +770,14 @@ public void EmitClassMethodImplementations(AmqpClass c)
EmitLine(" public override bool HasContent { get { return "
+ (m.HasContent ? "true" : "false") + "; } }");
EmitLine("");
EmitLine(" public override void ReadArgumentsFrom(RabbitMQ.Client.Impl.MethodArgumentReader reader) {");
EmitLine(" public override void ReadArgumentsFrom(ref RabbitMQ.Client.Impl.MethodArgumentReader reader) {");
foreach (AmqpField f in m.m_Fields)
{
EmitLine(" m_" + MangleMethod(f.Name) + " = reader.Read" + MangleClass(ResolveDomain(f.Domain)) + "();");
}
EmitLine(" }");
EmitLine("");
EmitLine(" public override void WriteArgumentsTo(RabbitMQ.Client.Impl.MethodArgumentWriter writer) {");
EmitLine(" public override void WriteArgumentsTo(ref RabbitMQ.Client.Impl.MethodArgumentWriter writer) {");
foreach (AmqpField f in m.m_Fields)
{
EmitLine(" writer.Write" + MangleClass(ResolveDomain(f.Domain))
Expand Down Expand Up @@ -811,7 +811,7 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" public override RabbitMQ.Client.Impl.MethodBase DecodeMethodFrom(RabbitMQ.Util.NetworkBinaryReader reader) {");
EmitLine(" public override RabbitMQ.Client.Impl.MethodBase DecodeMethodFrom(RabbitMQ.Util.BinaryBufferReader reader) {");
EmitLine(" ushort classId = reader.ReadUInt16();");
EmitLine(" ushort methodId = reader.ReadUInt16();");
EmitLine("");
Expand All @@ -824,7 +824,8 @@ public void EmitMethodArgumentReader()
{
EmitLine(" case " + m.Index + ": {");
EmitLine(" " + ImplNamespaceBase + "." + MangleMethodClass(c, m) + " result = new " + ImplNamespaceBase + "." + MangleMethodClass(c, m) + "();");
EmitLine(" result.ReadArgumentsFrom(new RabbitMQ.Client.Impl.MethodArgumentReader(reader));");
EmitLine(" var methodReader = new RabbitMQ.Client.Impl.MethodArgumentReader(reader);");
EmitLine(" result.ReadArgumentsFrom(ref methodReader);");
EmitLine(" return result;");
EmitLine(" }");
}
Expand All @@ -841,7 +842,7 @@ public void EmitMethodArgumentReader()

public void EmitContentHeaderReader()
{
EmitLine(" public override RabbitMQ.Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(RabbitMQ.Util.NetworkBinaryReader reader) {");
EmitLine(" public override RabbitMQ.Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(RabbitMQ.Util.BinaryBufferReader reader) {");
EmitLine(" ushort classId = reader.ReadUInt16();");
EmitLine("");
EmitLine(" switch (classId) {");
Expand Down
18 changes: 15 additions & 3 deletions projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Description>The RabbitMQ .NET client is the official client library for C# (and, implicitly, other .NET languages)</Description>
<VersionPrefix>6.0.0</VersionPrefix>
<TargetFrameworks>net461;netstandard2.0</TargetFrameworks>
<TargetFrameworks>net461;netstandard2.0;netstandard2.1</TargetFrameworks>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand All @@ -19,8 +19,17 @@
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
<GenerateAssemblyVersionAttribute>false</GenerateAssemblyVersionAttribute>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' Or '$(TargetFramework)' == 'net461'">
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<Compile Remove="build\**\*" />
<Compile Include="..\..\..\gensrc\autogenerated-api-0-9-1.cs" Exclude="build\**\*;bin\**;obj\**;**\*.xproj;packages\**" />
Expand All @@ -33,7 +42,7 @@
</None>
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' Or '$(TargetFramework)' == 'netstandard2.1'">
<DefineConstants>$(DefineConstants);CORECLR</DefineConstants>
</PropertyGroup>

Expand All @@ -46,7 +55,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.2" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.1.1" />
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" Condition="$('TargetFramework') == ('net461')" />
</ItemGroup>

<ItemGroup Condition=" '$(Configuration)' == 'SignedRelease' ">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;

namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -72,7 +71,7 @@ public string[] ConsumerTags
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancel(string consumerTag)
public virtual ValueTask HandleBasicCancel(string consumerTag)
{
return OnCancel(consumerTag);
}
Expand All @@ -81,7 +80,7 @@ public virtual Task HandleBasicCancel(string consumerTag)
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelOk(string consumerTag)
public virtual ValueTask HandleBasicCancelOk(string consumerTag)
{
return OnCancel(consumerTag);
}
Expand All @@ -90,11 +89,11 @@ public virtual Task HandleBasicCancelOk(string consumerTag)
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicConsumeOk(string consumerTag)
public virtual ValueTask HandleBasicConsumeOk(string consumerTag)
{
m_consumerTags.Add(consumerTag);
IsRunning = true;
return TaskExtensions.CompletedTask;
return default;
}

/// <summary>
Expand All @@ -105,7 +104,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
public virtual Task HandleBasicDeliver(string consumerTag,
public virtual ValueTask HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
Expand All @@ -114,15 +113,15 @@ public virtual Task HandleBasicDeliver(string consumerTag,
byte[] body)
{
// Nothing to do here.
return TaskExtensions.CompletedTask;
return default;
}

/// <summary>
/// Called when the model shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
public virtual ValueTask HandleModelShutdown(object model, ShutdownEventArgs reason)
{
ShutdownReason = reason;
return OnCancel(m_consumerTags.ToArray());
Expand All @@ -135,7 +134,7 @@ public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
/// This default implementation simply sets the <see cref="IsRunning"/>
/// property to false, and takes no further action.
/// </remarks>
public virtual async Task OnCancel(params string[] consumerTags)
public virtual async ValueTask OnCancel(params string[] consumerTags)
{
IsRunning = false;
var handler = ConsumerCancelled;
Expand Down Expand Up @@ -184,4 +183,4 @@ void IBasicConsumer.HandleBasicCancel(string consumerTag)
throw new InvalidOperationException("Should never be called.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class DefaultBasicConsumer : IBasicConsumer
{
private readonly object m_eventLock = new object();
private readonly HashSet<string> m_consumerTags = new HashSet<string>();
public EventHandler<ConsumerEventArgs> m_consumerCancelled;
private EventHandler<ConsumerEventArgs> m_consumerCancelled;

/// <summary>
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
Expand Down
Loading