Skip to content

Commit fc636a2

Browse files
author
zavarkog
committed
Do not create consumerTag string in HandleBasicDeliver when conumer exists in the dictionary
1 parent 926b2f8 commit fc636a2

File tree

6 files changed

+94
-16
lines changed

6 files changed

+94
-16
lines changed

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class ConsumerDispatcherBase
1515
private protected IConsumerDispatcher _dispatcher;
1616
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
1717
protected readonly string _consumerTag = "ConsumerTag";
18+
protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag");
1819
protected readonly ulong _deliveryTag = 500UL;
1920
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
2021
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
@@ -43,7 +44,7 @@ public void AsyncConsumerDispatcher()
4344
{
4445
for (int i = 0; i < Count; i++)
4546
{
46-
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
47+
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
4748
}
4849
_autoResetEvent.Wait();
4950
_autoResetEvent.Reset();
@@ -61,7 +62,7 @@ public void ConsumerDispatcher()
6162
{
6263
for (int i = 0; i < Count; i++)
6364
{
64-
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
65+
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
6566
}
6667
_autoResetEvent.Wait();
6768
_autoResetEvent.Reset();

projects/RabbitMQ.Client/client/framing/BasicDeliver.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal readonly struct BasicDeliver : IAmqpMethod
4040
{
41-
public readonly string _consumerTag;
41+
public readonly ReadOnlyMemory<byte> _consumerTag;
4242
public readonly ulong _deliveryTag;
4343
public readonly bool _redelivered;
4444
public readonly ReadOnlyMemory<byte> _exchange;
4545
public readonly ReadOnlyMemory<byte> _routingKey;
4646

4747
public BasicDeliver(ReadOnlyMemory<byte> data)
4848
{
49-
int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag);
49+
int offset = WireFormatting.ReadShortMemory(data, out _consumerTag);
5050
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
5151
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
5252
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs

+44-9
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,88 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
24
using System.Runtime.CompilerServices;
5+
using System.Text;
36
using System.Threading.Tasks;
47

8+
using RabbitMQ.Util;
9+
510
namespace RabbitMQ.Client.ConsumerDispatching
611
{
712
#nullable enable
813
internal abstract class ConsumerDispatcherBase
914
{
1015
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
11-
private readonly Dictionary<string, IBasicConsumer> _consumers;
16+
private readonly Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> _consumers;
1217

1318
public IBasicConsumer? DefaultConsumer { get; set; }
1419

1520
protected ConsumerDispatcherBase()
1621
{
17-
_consumers = new Dictionary<string, IBasicConsumer>();
22+
_consumers = new Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance);
1823
}
1924

2025
protected void AddConsumer(IBasicConsumer consumer, string tag)
2126
{
2227
lock (_consumers)
2328
{
24-
_consumers[tag] = consumer;
29+
var tagBytes = Encoding.UTF8.GetBytes(tag);
30+
_consumers[tagBytes] = (consumer, tag);
2531
}
2632
}
2733

28-
protected IBasicConsumer GetConsumerOrDefault(string tag)
34+
protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory<byte> tag)
2935
{
3036
lock (_consumers)
3137
{
32-
return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
38+
if (_consumers.TryGetValue(tag, out var consumerPair))
39+
{
40+
return consumerPair;
41+
}
42+
43+
#if !NETSTANDARD
44+
var consumerTag = Encoding.UTF8.GetString(tag.Span);
45+
#else
46+
string consumerTag;
47+
unsafe
48+
{
49+
fixed (byte* bytes = tag.Span)
50+
{
51+
consumerTag = Encoding.UTF8.GetString(bytes, tag.Length);
52+
}
53+
}
54+
#endif
55+
56+
return (GetDefaultOrFallbackConsumer(), consumerTag);
3357
}
3458
}
3559

3660
public IBasicConsumer GetAndRemoveConsumer(string tag)
3761
{
3862
lock (_consumers)
3963
{
40-
return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
64+
var utf8 = Encoding.UTF8;
65+
var pool = ArrayPool<byte>.Shared;
66+
var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length));
67+
#if NETSTANDARD
68+
int count = utf8.GetBytes(tag, 0, tag.Length, buf, 0);
69+
#else
70+
int count = utf8.GetBytes(tag, buf);
71+
#endif
72+
var memory = buf.AsMemory(0, count);
73+
var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer();
74+
pool.Return(buf);
75+
return result;
4176
}
4277
}
4378

4479
public Task ShutdownAsync(ShutdownEventArgs reason)
4580
{
4681
lock (_consumers)
4782
{
48-
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
83+
foreach (KeyValuePair<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> pair in _consumers)
4984
{
50-
ShutdownConsumer(pair.Value, reason);
85+
ShutdownConsumer(pair.Value.consumer, reason);
5186
}
5287
_consumers.Clear();
5388
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
5353
}
5454
}
5555

56-
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
56+
public void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag, ulong deliveryTag, bool redelivered,
5757
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
5858
{
5959
if (!IsShutdown)
6060
{
61-
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
61+
var consumerPair = GetConsumerOrDefault(consumerTag);
62+
_writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
6263
}
6364
}
6465

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal interface IConsumerDispatcher
4545

4646
void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag);
4747

48-
void HandleBasicDeliver(string consumerTag,
48+
void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag,
4949
ulong deliveryTag,
5050
bool redelivered,
5151
ReadOnlyMemory<byte> exchange,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Runtime.InteropServices;
4+
5+
namespace RabbitMQ.Util;
6+
7+
public sealed class MemoryOfByteEqualityComparer : IEqualityComparer<ReadOnlyMemory<byte>>
8+
{
9+
public static MemoryOfByteEqualityComparer Instance { get; } = new MemoryOfByteEqualityComparer();
10+
11+
public bool Equals(ReadOnlyMemory<byte> left, ReadOnlyMemory<byte> right)
12+
{
13+
return left.Span.SequenceEqual(right.Span);
14+
}
15+
16+
public int GetHashCode(ReadOnlyMemory<byte> value)
17+
{
18+
#if NETSTANDARD
19+
unchecked
20+
{
21+
int hashCode = 0;
22+
var longPart = MemoryMarshal.Cast<byte, long>(value.Span);
23+
foreach (long item in longPart)
24+
{
25+
hashCode = (hashCode * 397) ^ item.GetHashCode();
26+
}
27+
28+
foreach (int item in value.Span.Slice(longPart.Length * 8))
29+
{
30+
hashCode = (hashCode * 397) ^ item.GetHashCode();
31+
}
32+
33+
return hashCode;
34+
}
35+
#else
36+
HashCode result = default;
37+
result.AddBytes(value.Span);
38+
return result.ToHashCode();
39+
#endif
40+
}
41+
}

0 commit comments

Comments
 (0)