Skip to content

Unable to implement a consumer that also publishes messages #886

Closed
@Bibek-Singh9

Description

@Bibek-Singh9

I am trying to implement a consumer that also publishes message.
Usage case-

  1. Created an Exchange "A" and bind a queue "A-queue" and produced a message.

  2. Created a consumer, consuming message from "A-queue" of Exchange "A".

  3. Within the consumer's handler, I am trying to publish/produce message to a new Exchange "B", It works for once or twice out of 100 times and fails with exception -
    The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED FROM - expected content body, got non content body frame instead', class Id=60, methodId=40
    note: But the same piece of code works till version - RabbitMQ.Client 5.2.0, in the later versions - 6.0.0 and above it causes this issue.

                   cons.Consume("A", async m =>
                     {
                         await Task.Delay(1000);
                         prod.Produce(channel: "Status", message: "Hello");
                        
                     });
                     // Each time the user enters a key that is not the escape key.
                     while (Console.ReadKey(true).Key != ConsoleKey.Escape)
                     {
                         // Produce to the channel and wait for the Consumer as defined above to pick up on the Produced.                         
                         for (int i = 0; i <= 10; i++)
                         {
                             prod.Produce(channel: "A", message: "Hello");
                         }
                     }
    

The produce and consume code is encapsulated in methods.
Stacktrace-
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, Boolean internal, Boolean nowait, IDictionary2 arguments) at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary2 arguments)
at RabbitMQ.Client.Impl.AutorecoveringModel.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary2 arguments) at RabbitMQ.Client.IModelExensions.ExchangeDeclare(IModel model, String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary2 arguments)
at Bentley.Extensions.MessageBus.RabbitMQ.RabbitMQProducer.d__4.MoveNext() in D:\Projects\MessageBus\src\NetCore\Bentley.Extensions.MessageBus.RabbitMQ\RabbitMQProducer.cs:line 54

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions