Closed
Description
The payload passed in the BasicPublish is modifyable after the call itself. This itself is an issue, but the biggest issue with it is if you use a memory from an ArrayPool that you'd return after the call. (Meaning it could be reused & modified before the payload is actually sent)
Repo case based on stebet test code.
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMQPlayground
{
class Program
{
private static int messagesSent = 0;
private static int messagesReceived = 0;
private static int messagesReceivedMod = 0;
private static int batchesToSend = 100;
private static int itemsPerBatch = 500;
static async Task Main(string[] args)
{
ThreadPool.SetMinThreads(16 * Environment.ProcessorCount, 16 * Environment.ProcessorCount);
var connectionString = new Uri("amqp://guest:guest@localhost/");
var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
var connection = connectionFactory.CreateConnection();
var publisher = connection.CreateModel();
var subscriber = connection.CreateModel();
publisher.ConfirmSelect();
publisher.ExchangeDeclare("test", ExchangeType.Topic, false, false);
subscriber.QueueDeclare("testqueue", false, false, true);
var asyncListener = new AsyncEventingBasicConsumer(subscriber);
asyncListener.Received += AsyncListener_Received;
subscriber.QueueBind("testqueue", "test", "myawesome.routing.key");
subscriber.BasicConsume("testqueue", true, "testconsumer", asyncListener);
//byte[] payload = new byte[512];
var batchPublish = Task.Run(() =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
//var batch = publisher.CreateBasicPublishBatch();
for (int i = 0; i < itemsPerBatch; i++)
{
var properties = publisher.CreateBasicProperties();
properties.AppId = "testapp";
properties.CorrelationId = Guid.NewGuid().ToString();
//batch.Add("test", "myawesome.routing.key", false, properties, payload);
var payload = new byte[255];
payload.AsSpan().Fill(1);
publisher.BasicPublish("test", "myawesome.routing.key", false, properties, payload);
payload.AsSpan().Fill(byte.MaxValue);
}
//batch.Publish();
messagesSent += itemsPerBatch;
publisher.WaitForConfirmsOrDie();
}
});
var sentTask = Task.Run(async () =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages sent: {messagesSent}");
await Task.Delay(500);
}
Console.WriteLine("Done sending messages!");
});
var receivedTask = Task.Run(async () =>
{
while (messagesReceived < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages received: {messagesReceived}, modified {messagesReceivedMod}");
await Task.Delay(500);
}
Console.WriteLine($"Done receiving all messages. (modified {messagesReceivedMod})");
});
await Task.WhenAll(sentTask, receivedTask);
publisher.Dispose();
subscriber.Dispose();
connection.Dispose();
Console.ReadLine();
}
private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
{
if (@event.Body.Span.IndexOf(byte.MaxValue) >= 0)
{
Interlocked.Increment(ref messagesReceivedMod);
}
Interlocked.Increment(ref messagesReceived);
return Task.CompletedTask;
}
}
Metadata
Metadata
Assignees
Labels
No labels