Skip to content

Commit 28c4655

Browse files
author
Stefán Jökull Sigurðarson
committed
More pipelines cleanup.
1 parent c58234c commit 28c4655

13 files changed

+445
-222
lines changed
+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// We only need this if we aren't targeting .NET 6.0 or greater since it already exists there
2+
#if !NET6_0_OR_GREATER
3+
using System;
4+
using System.Diagnostics;
5+
using System.IO;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace System.Buffers
10+
{
11+
public class ArrayBufferWriter<T> : IBufferWriter<T>, IDisposable
12+
{
13+
private T[] _rentedBuffer;
14+
private int _written;
15+
private long _committed;
16+
17+
private const int MinimumBufferSize = 256;
18+
19+
public ArrayBufferWriter(int initialCapacity = MinimumBufferSize)
20+
{
21+
if (initialCapacity <= 0)
22+
{
23+
throw new ArgumentException(null, nameof(initialCapacity));
24+
}
25+
26+
_rentedBuffer = ArrayPool<T>.Shared.Rent(initialCapacity);
27+
_written = 0;
28+
_committed = 0;
29+
}
30+
31+
public Memory<T> WrittenMemory
32+
{
33+
get
34+
{
35+
CheckIfDisposed();
36+
37+
return _rentedBuffer.AsMemory(0, _written);
38+
}
39+
}
40+
41+
public Span<T> WrittenSpan
42+
{
43+
get
44+
{
45+
CheckIfDisposed();
46+
47+
return _rentedBuffer.AsSpan(0, _written);
48+
}
49+
}
50+
51+
public int BytesWritten
52+
{
53+
get
54+
{
55+
CheckIfDisposed();
56+
57+
return _written;
58+
}
59+
}
60+
61+
public long BytesCommitted
62+
{
63+
get
64+
{
65+
CheckIfDisposed();
66+
67+
return _committed;
68+
}
69+
}
70+
71+
public void Clear()
72+
{
73+
CheckIfDisposed();
74+
75+
ClearHelper();
76+
}
77+
78+
private void ClearHelper()
79+
{
80+
_rentedBuffer.AsSpan(0, _written).Clear();
81+
_written = 0;
82+
}
83+
84+
public void Advance(int count)
85+
{
86+
CheckIfDisposed();
87+
88+
if (count < 0)
89+
throw new ArgumentException(nameof(count));
90+
91+
if (_written > _rentedBuffer.Length - count)
92+
throw new InvalidOperationException("Cannot advance past the end of the buffer.");
93+
94+
_written += count;
95+
}
96+
97+
// Returns the rented buffer back to the pool
98+
public void Dispose()
99+
{
100+
if (_rentedBuffer == null)
101+
{
102+
return;
103+
}
104+
105+
ArrayPool<T>.Shared.Return(_rentedBuffer, clearArray: true);
106+
_rentedBuffer = null;
107+
_written = 0;
108+
}
109+
110+
private void CheckIfDisposed()
111+
{
112+
if (_rentedBuffer == null)
113+
throw new ObjectDisposedException(nameof(ArrayBufferWriter<T>));
114+
}
115+
116+
public Memory<T> GetMemory(int sizeHint = 0)
117+
{
118+
CheckIfDisposed();
119+
120+
if (sizeHint < 0)
121+
throw new ArgumentException(nameof(sizeHint));
122+
123+
CheckAndResizeBuffer(sizeHint);
124+
return _rentedBuffer.AsMemory(_written);
125+
}
126+
127+
public Span<T> GetSpan(int sizeHint = 0)
128+
{
129+
CheckIfDisposed();
130+
131+
if (sizeHint < 0)
132+
throw new ArgumentException(nameof(sizeHint));
133+
134+
CheckAndResizeBuffer(sizeHint);
135+
return _rentedBuffer.AsSpan(_written);
136+
}
137+
138+
private void CheckAndResizeBuffer(int sizeHint)
139+
{
140+
Debug.Assert(sizeHint >= 0);
141+
142+
if (sizeHint == 0)
143+
{
144+
sizeHint = MinimumBufferSize;
145+
}
146+
147+
int availableSpace = _rentedBuffer.Length - _written;
148+
149+
if (sizeHint > availableSpace)
150+
{
151+
int growBy = sizeHint > _rentedBuffer.Length ? sizeHint : _rentedBuffer.Length;
152+
153+
int newSize = checked(_rentedBuffer.Length + growBy);
154+
155+
T[] oldBuffer = _rentedBuffer;
156+
157+
_rentedBuffer = ArrayPool<T>.Shared.Rent(newSize);
158+
159+
Debug.Assert(oldBuffer.Length >= _written);
160+
Debug.Assert(_rentedBuffer.Length >= _written);
161+
162+
oldBuffer.AsSpan(0, _written).CopyTo(_rentedBuffer);
163+
ArrayPool<T>.Shared.Return(oldBuffer, clearArray: true);
164+
}
165+
166+
Debug.Assert(_rentedBuffer.Length - _written > 0);
167+
Debug.Assert(_rentedBuffer.Length - _written >= sizeHint);
168+
}
169+
}
170+
}
171+
#endif

projects/Benchmarks/WireFormatting/MethodFraming.cs

+31-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.Text;
34

45
using BenchmarkDotNet.Attributes;
@@ -19,7 +20,12 @@ public class MethodFramingBasicAck
1920
public ushort Channel { get; set; }
2021

2122
[Benchmark]
22-
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
23+
public ReadOnlyMemory<byte> BasicAckWrite()
24+
{
25+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
26+
Framing.SerializeToFrames(ref _basicAck, _writer, Channel);
27+
return _writer.WrittenMemory;
28+
}
2329
}
2430

2531
[Config(typeof(Config))]
@@ -41,13 +47,28 @@ public class MethodFramingBasicPublish
4147
public int FrameMax { get; set; }
4248

4349
[Benchmark]
44-
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
50+
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty()
51+
{
52+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
53+
Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, _writer, Channel, FrameMax);
54+
return _writer.WrittenMemory;
55+
}
4556

4657
[Benchmark]
47-
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
58+
public ReadOnlyMemory<byte> BasicPublishWrite()
59+
{
60+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
61+
Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
62+
return _writer.WrittenMemory;
63+
}
4864

4965
[Benchmark]
50-
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
66+
public ReadOnlyMemory<byte> BasicPublishMemoryWrite()
67+
{
68+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
69+
Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
70+
return _writer.WrittenMemory;
71+
}
5172
}
5273

5374
[Config(typeof(Config))]
@@ -60,6 +81,11 @@ public class MethodFramingChannelClose
6081
public ushort Channel { get; set; }
6182

6283
[Benchmark]
63-
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
84+
public ReadOnlyMemory<byte> ChannelCloseWrite()
85+
{
86+
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
87+
Framing.SerializeToFrames(ref _channelClose, _writer, Channel);
88+
return _writer.WrittenMemory;
89+
}
6490
}
6591
}

projects/RabbitMQ.Client/client/framing/Protocol.cs

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ internal sealed class Protocol : ProtocolBase
5050
///<summary>Protocol API name (= :AMQP_0_9_1)</summary>
5151
public override string ApiName => ":AMQP_0_9_1";
5252

53+
public override ReadOnlySpan<byte> Header => Amqp091Header;
54+
55+
private static ReadOnlySpan<byte> Amqp091Header => new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P', 0, 0, 9, 1 };
56+
5357
///<summary>Default TCP port (= 5672)</summary>
5458
public override int DefaultPort => 5672;
5559

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private void StartAndTune()
8484
_model0.m_connectionStartCell = connectionStartCell;
8585
_model0.HandshakeContinuationTimeout = _config.HandshakeContinuationTimeout;
8686
_frameHandler.ReadTimeout = _config.HandshakeContinuationTimeout;
87-
_frameHandler.SendHeader();
87+
Write(Protocol.Header);
8888

8989
ConnectionStartDetails connectionStart = connectionStartCell.WaitForValue();
9090

projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private void HeartbeatWriteTimerCallback(object? state)
149149
{
150150
if (!_closed)
151151
{
152-
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
152+
Write(Client.Impl.Framing.Heartbeat.Payload);
153153
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
154154
}
155155
}

0 commit comments

Comments
 (0)