Skip to content

Concurrent Publish using shared model fails #1036

Closed
@bard83

Description

@bard83

Hi,
one test in our code base - that was previously working - doesn't work anymore. N messages were published - correctly - over the MQ and then consumed by a consumer components (the component under test). After upgrading RabbitMQ and also the client the messages were not published anymore. Did the #878 introduced a change? Am I doing something wrong?

Expectations: all messages are published and confirmed and available to be consumed.
Results: few messages are published and confirmed, few messages are not confirmed and many get exceptions on publish.

Here the tests:

using NUnit.Framework;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

    public class RabbitMqTests
    {
        [Test]
        public async Task TestMultithreadPublishWithBarrier()
        {
            var count = 500;
            var barrier = new AsyncBarrier(count);
            var exchangeName = nameof(TestMultithreadPublishWithBarrier);
            var factory = new ConnectionFactory
            {
                UserName = "rabbitmq",
                Password = "rabbitmq",
                HostName = "localhost",
                Port = 5672,
                VirtualHost = "/",
            };

            var connection = factory.CreateConnection(nameof(TestMultithreadPublishWithBarrier));
            var tasksList = new List<Task<(int Code, string? Error)>>();
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);
                channel.ConfirmSelect();
                for (int i = 0; i < count; i++)
                {
                    var task = PublishAsync(channel, $"Message {i}", exchangeName, $"rabbit.test.{i}", barrier);
                    tasksList.Add(task);
                }

                await Task.WhenAll(tasksList);
                Console.Error.WriteLine($"# publishing error: {tasksList.Where(t => t.Result.Code == 1).Count()}");
                Console.Error.WriteLine($"# confirms error: {tasksList.Where(t => t.Result.Code == 2).Count()}");
                Console.Error.WriteLine($"# nack-ed: {tasksList.Where(t => t.Result.Code == 3).Count()}");
                Console.Error.WriteLine($"# not confirmed: {tasksList.Where(t => t.Result.Code == 4).Count()}");
                Console.Error.WriteLine($"# publishing error: {tasksList.Where(t => t.Result.Code == 1).First().Result.Error}");
                Assert.IsTrue(tasksList.Where(t => t.Result.Code == 0).Count() == count);
            }
        }

        [Test]
        public async Task TestMultithreadPublish()
        {
            var count = 500;
            var exchangeName = nameof(TestMultithreadPublish);
            var factory = new ConnectionFactory
            {
                UserName = "rabbitmq",
                Password = "rabbitmq",
                HostName = "localhost",
                Port = 5672,
                VirtualHost = "/",
            };

            var connection = factory.CreateConnection(nameof(TestMultithreadPublish));
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);
                channel.ConfirmSelect();
                var tasksList = new List<Task<(int Code, string? Error)>>();
                for (int i = 0; i < count; i++)
                {
                    var task = Task.Run(() => Publish(channel, $"Message {i}", exchangeName, $"rabbit.test.{i}"));
                    tasksList.Add(task);
                }

                await Task.WhenAll(tasksList);
                Console.Error.WriteLine($"# publishing error: {tasksList.Where(t => t.Result.Code == 1).Count()}");
                Console.Error.WriteLine($"# confirms error: {tasksList.Where(t => t.Result.Code == 2).Count()}");
                Console.Error.WriteLine($"# nack-ed: {tasksList.Where(t => t.Result.Code == 3).Count()}");
                Console.Error.WriteLine($"# not confirmed: {tasksList.Where(t => t.Result.Code == 4).Count()}");
                Assert.IsTrue(tasksList.Where(t => t.Result.Code == 0).Count() == count);
            }
        }

        private async Task<(int Code, string? Error)> PublishAsync(IModel channel, string message, string exchangeName, string routingKey, AsyncBarrier barrier)
        {
            await barrier.SignalAndWaitAsync();

            return Publish(channel, message, exchangeName, routingKey);
        }

        private (int Code, string? Error) Publish(IModel channel, string message, string exchangeName, string routingKey)
        {
            byte[] byteMessage = Encoding.UTF8.GetBytes(message);

            IBasicProperties props = channel.CreateBasicProperties();
            props.ContentType = "application/json";
            props.Persistent = true;

            bool isConfirmed;
            bool timedOut;
            lock (channel)
            {
                try
                {
                    channel.BasicPublish(exchangeName, routingKey, props, byteMessage);
                }
                catch (Exception e)
                {
                    return (1, $"{message}: {e.ToString()}");
                }

                try
                {
                    isConfirmed = channel.WaitForConfirms(new TimeSpan(0, 1, 0), out timedOut);
                }
                catch (Exception e)
                {
                    return (2, $"{message}: {e.ToString()}");
                }
            }

            if (!isConfirmed)
            {
                return (3, $"{message}: got NACK-ed");
            }

            if (timedOut)
            {
                return (4, $"{message}: got time-out on waiting for message confirmaiton");
            }

            return (0, null);
        }

        private class AsyncBarrier
        {
            private int _count;
            private TaskCompletionSource<bool> _tcs;
            private int _waiting;

            public AsyncBarrier(int count)
            {
                _count = count;
                _waiting = 0;
                // Compiler doesn't recognize the initialization in a method...
                _tcs = new TaskCompletionSource<bool>();
            }

            public Task SignalAndWaitAsync()
            {
                var tcs = _tcs;
                if (Interlocked.Increment(ref _waiting) == _count)
                {
                    // Reset();
                    tcs.SetResult(true);
                }

                return tcs.Task;
            }

            private void Reset()
            {
                _waiting = 0;
                _tcs = new TaskCompletionSource<bool>();
            }
        }
    }

RabbitMQ instance

version: '2.4'

services:
  mq:
    image: rabbitmq:3.8.14-management-alpine
    ports:
    - '5672:5672'
    - '15672:15672'
    environment:
         RABBITMQ_NODENAME=eventStoreMq
        RABBITMQ_DEFAULT_USER=rabbitmq
        RABBITMQ_DEFAULT_PASS=rabbitmq
    healthcheck:
      test: ["CMD-SHELL", "rabbitmq-diagnostics -q ping"]
      interval: 1s
      timeout: 5s
      retries: 5

RabbitMQ logs

mq_1                | 2021-04-13 18:49:07.840 [info] <0.1050.0> Connection <0.1050.0> (172.19.0.1:42592 -> 172.19.0.2:5672) has a client-provided name: TestConcurrentPublish
mq_1                | 2021-04-13 18:49:07.845 [info] <0.1050.0> connection <0.1050.0> (172.19.0.1:42592 -> 172.19.0.2:5672 - TestConcurrentPublish): user 'rabbitmq' authenticated and granted access to vhost '/'
mq_1                | 2021-04-13 18:54:07.849 [error] <0.1050.0> closing AMQP connection <0.1050.0> (172.19.0.1:42592 -> 172.19.0.2:5672 - TestConcurrentPublish):
mq_1                | missed heartbeats from client, timeout: 60s
mq_1                | 2021-04-13 18:54:07.849 [info] <0.1462.0> Closing all channels from connection '172.19.0.1:42592 -> 172.19.0.2:5672' because it has been closed

dotnet --info

.NET SDK (reflecting any global.json):
 Version:   5.0.202
 Commit:    db7cc87d51

Runtime Environment:
 OS Name:     ubuntu
 OS Version:  18.04
 OS Platform: Linux
 RID:         ubuntu.18.04-x64
 Base Path:   /snap/dotnet-sdk/120/sdk/5.0.202/

Host (useful for support):
  Version: 5.0.5
  Commit:  2f740adc14

.NET SDKs installed:
  5.0.202 [/snap/dotnet-sdk/120/sdk]

.NET runtimes installed:
  Microsoft.AspNetCore.App 5.0.5 [/snap/dotnet-sdk/120/shared/Microsoft.AspNetCore.App]
  Microsoft.NETCore.App 5.0.5 [/snap/dotnet-sdk/120/shared/Microsoft.NETCore.App]

To install additional .NET runtimes or SDKs:
  https://aka.ms/dotnet-download

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions