Skip to content

Commit 075f183

Browse files
author
Stefán Jökull Sigurðarson
committed
Adding pipelines
1 parent 9ccf87a commit 075f183

21 files changed

+726
-509
lines changed
+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// We only need this if we aren't targeting .NET 6.0 or greater since it already exists there
2+
#if !NET6_0_OR_GREATER
3+
using System;
4+
using System.Diagnostics;
5+
using System.IO;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace System.Buffers
10+
{
11+
public class ArrayBufferWriter<T> : IBufferWriter<T>, IDisposable
12+
{
13+
private T[] _rentedBuffer;
14+
private int _written;
15+
private long _committed;
16+
17+
private const int MinimumBufferSize = 256;
18+
19+
public ArrayBufferWriter(int initialCapacity = MinimumBufferSize)
20+
{
21+
if (initialCapacity <= 0)
22+
{
23+
throw new ArgumentException(null, nameof(initialCapacity));
24+
}
25+
26+
_rentedBuffer = ArrayPool<T>.Shared.Rent(initialCapacity);
27+
_written = 0;
28+
_committed = 0;
29+
}
30+
31+
public Memory<T> WrittenMemory
32+
{
33+
get
34+
{
35+
CheckIfDisposed();
36+
37+
return _rentedBuffer.AsMemory(0, _written);
38+
}
39+
}
40+
41+
public Span<T> WrittenSpan
42+
{
43+
get
44+
{
45+
CheckIfDisposed();
46+
47+
return _rentedBuffer.AsSpan(0, _written);
48+
}
49+
}
50+
51+
public int BytesWritten
52+
{
53+
get
54+
{
55+
CheckIfDisposed();
56+
57+
return _written;
58+
}
59+
}
60+
61+
public long BytesCommitted
62+
{
63+
get
64+
{
65+
CheckIfDisposed();
66+
67+
return _committed;
68+
}
69+
}
70+
71+
public void Clear()
72+
{
73+
CheckIfDisposed();
74+
75+
ClearHelper();
76+
}
77+
78+
private void ClearHelper()
79+
{
80+
_rentedBuffer.AsSpan(0, _written).Clear();
81+
_written = 0;
82+
}
83+
84+
public void Advance(int count)
85+
{
86+
CheckIfDisposed();
87+
88+
if (count < 0)
89+
throw new ArgumentException(nameof(count));
90+
91+
if (_written > _rentedBuffer.Length - count)
92+
throw new InvalidOperationException("Cannot advance past the end of the buffer.");
93+
94+
_written += count;
95+
}
96+
97+
// Returns the rented buffer back to the pool
98+
public void Dispose()
99+
{
100+
if (_rentedBuffer == null)
101+
{
102+
return;
103+
}
104+
105+
ArrayPool<T>.Shared.Return(_rentedBuffer, clearArray: true);
106+
_rentedBuffer = null;
107+
_written = 0;
108+
}
109+
110+
private void CheckIfDisposed()
111+
{
112+
if (_rentedBuffer == null)
113+
throw new ObjectDisposedException(nameof(ArrayBufferWriter<T>));
114+
}
115+
116+
public Memory<T> GetMemory(int sizeHint = 0)
117+
{
118+
CheckIfDisposed();
119+
120+
if (sizeHint < 0)
121+
throw new ArgumentException(nameof(sizeHint));
122+
123+
CheckAndResizeBuffer(sizeHint);
124+
return _rentedBuffer.AsMemory(_written);
125+
}
126+
127+
public Span<T> GetSpan(int sizeHint = 0)
128+
{
129+
CheckIfDisposed();
130+
131+
if (sizeHint < 0)
132+
throw new ArgumentException(nameof(sizeHint));
133+
134+
CheckAndResizeBuffer(sizeHint);
135+
return _rentedBuffer.AsSpan(_written);
136+
}
137+
138+
private void CheckAndResizeBuffer(int sizeHint)
139+
{
140+
Debug.Assert(sizeHint >= 0);
141+
142+
if (sizeHint == 0)
143+
{
144+
sizeHint = MinimumBufferSize;
145+
}
146+
147+
int availableSpace = _rentedBuffer.Length - _written;
148+
149+
if (sizeHint > availableSpace)
150+
{
151+
int growBy = sizeHint > _rentedBuffer.Length ? sizeHint : _rentedBuffer.Length;
152+
153+
int newSize = checked(_rentedBuffer.Length + growBy);
154+
155+
T[] oldBuffer = _rentedBuffer;
156+
157+
_rentedBuffer = ArrayPool<T>.Shared.Rent(newSize);
158+
159+
Debug.Assert(oldBuffer.Length >= _written);
160+
Debug.Assert(_rentedBuffer.Length >= _written);
161+
162+
oldBuffer.AsSpan(0, _written).CopyTo(_rentedBuffer);
163+
ArrayPool<T>.Shared.Return(oldBuffer, clearArray: true);
164+
}
165+
166+
Debug.Assert(_rentedBuffer.Length - _written > 0);
167+
Debug.Assert(_rentedBuffer.Length - _written >= sizeHint);
168+
}
169+
}
170+
}
171+
#endif

projects/Benchmarks/WireFormatting/MethodFraming.cs

+31-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Text;
34

45
using BenchmarkDotNet.Attributes;
@@ -19,7 +20,12 @@ public class MethodFramingBasicAck
1920
public ushort Channel { get; set; }
2021

2122
[Benchmark]
22-
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
23+
public ReadOnlyMemory<byte> BasicAckWrite()
24+
{
25+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
26+
Framing.SerializeToFrames(ref _basicAck, _writer, Channel);
27+
return _writer.WrittenMemory;
28+
}
2329
}
2430

2531
[Config(typeof(Config))]
@@ -41,13 +47,28 @@ public class MethodFramingBasicPublish
4147
public int FrameMax { get; set; }
4248

4349
[Benchmark]
44-
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
50+
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty()
51+
{
52+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
53+
Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, _writer, Channel, FrameMax);
54+
return _writer.WrittenMemory;
55+
}
4556

4657
[Benchmark]
47-
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
58+
public ReadOnlyMemory<byte> BasicPublishWrite()
59+
{
60+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
61+
Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
62+
return _writer.WrittenMemory;
63+
}
4864

4965
[Benchmark]
50-
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
66+
public ReadOnlyMemory<byte> BasicPublishMemoryWrite()
67+
{
68+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
69+
Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
70+
return _writer.WrittenMemory;
71+
}
5172
}
5273

5374
[Config(typeof(Config))]
@@ -60,6 +81,11 @@ public class MethodFramingChannelClose
6081
public ushort Channel { get; set; }
6182

6283
[Benchmark]
63-
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
84+
public ReadOnlyMemory<byte> ChannelCloseWrite()
85+
{
86+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
87+
Framing.SerializeToFrames(ref _channelClose, _writer, Channel);
88+
return _writer.WrittenMemory;
89+
}
6490
}
6591
}

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<PackageReference Include="MinVer" Version="3.1.0" PrivateAssets="All" />
6464
<PackageReference Include="System.Memory" Version="4.5.4" />
6565
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
66+
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.2.2" />
6667
</ItemGroup>
6768

6869
</Project>

projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs

+5-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
using System;
3333
using System.Net.Sockets;
34+
35+
using Pipelines.Sockets.Unofficial;
36+
3437
using RabbitMQ.Client.Impl;
3538

3639
namespace RabbitMQ.Client
@@ -49,12 +52,8 @@ public class ConnectionFactoryBase
4952
/// <returns>New instance of a <see cref="TcpClient"/>.</returns>
5053
public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
5154
{
52-
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
53-
{
54-
NoDelay = true,
55-
ReceiveBufferSize = 65536,
56-
SendBufferSize = 65536
57-
};
55+
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
56+
SocketConnection.SetRecommendedClientOptions(socket);
5857
return new TcpClientAdapter(socket);
5958
}
6059
}

0 commit comments

Comments
 (0)