Skip to content

Commit 5e97e6b

Browse files
author
Stefán J. Sigurðarson
committed
Simplifying code, reducing blocking and threads by trying to inline writes and resulting to backlog writes if that fails.
1 parent 48ec4cc commit 5e97e6b

File tree

5 files changed

+202
-137
lines changed

5 files changed

+202
-137
lines changed

projects/RabbitMQ.Client/client/impl/Connection.cs

+29-30
Original file line numberDiff line numberDiff line change
@@ -593,38 +593,37 @@ public int MainLoopIteration(ref ReadOnlySequence<byte> buffer)
593593

594594
bool shallReturn = true;
595595
// We have received an actual frame.
596-
if (frame.Type == FrameType.FrameHeartbeat)
596+
if (frame.Type != FrameType.FrameHeartbeat)
597597
{
598-
// Ignore it: we've already just reset the heartbeat
599-
}
600-
else if (frame.Channel == 0)
601-
{
602-
// In theory, we could get non-connection.close-ok
603-
// frames here while we're quiescing (m_closeReason !=
604-
// null). In practice, there's a limited number of
605-
// things the server can ask of us on channel 0 -
606-
// essentially, just connection.close. That, combined
607-
// with the restrictions on pipelining, mean that
608-
// we're OK here to handle channel 0 traffic in a
609-
// quiescing situation, even though technically we
610-
// should be ignoring everything except
611-
// connection.close-ok.
612-
shallReturn = _session0.HandleFrame(in frame);
613-
}
614-
else
615-
{
616-
// If we're still m_running, but have a m_closeReason,
617-
// then we must be quiescing, which means any inbound
618-
// frames for non-zero channels (and any inbound
619-
// commands on channel zero that aren't
620-
// Connection.CloseOk) must be discarded.
621-
if (_closeReason is null)
598+
if (frame.Channel == 0)
622599
{
623-
// No close reason, not quiescing the
624-
// connection. Handle the frame. (Of course, the
625-
// Session itself may be quiescing this particular
626-
// channel, but that's none of our concern.)
627-
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
600+
// In theory, we could get non-connection.close-ok
601+
// frames here while we're quiescing (m_closeReason !=
602+
// null). In practice, there's a limited number of
603+
// things the server can ask of us on channel 0 -
604+
// essentially, just connection.close. That, combined
605+
// with the restrictions on pipelining, mean that
606+
// we're OK here to handle channel 0 traffic in a
607+
// quiescing situation, even though technically we
608+
// should be ignoring everything except
609+
// connection.close-ok.
610+
shallReturn = _session0.HandleFrame(in frame);
611+
}
612+
else
613+
{
614+
// If we're still m_running, but have a m_closeReason,
615+
// then we must be quiescing, which means any inbound
616+
// frames for non-zero channels (and any inbound
617+
// commands on channel zero that aren't
618+
// Connection.CloseOk) must be discarded.
619+
if (_closeReason is null)
620+
{
621+
// No close reason, not quiescing the
622+
// connection. Handle the frame. (Of course, the
623+
// Session itself may be quiescing this particular
624+
// channel, but that's none of our concern.)
625+
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
626+
}
628627
}
629628
}
630629

projects/RabbitMQ.Client/client/impl/Frame.cs

+67-53
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ internal static class Framing
4545
* +------------+---------+----------------+---------+------------------+
4646
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
4747
* +------------+---------+----------------+---------+------------------+ */
48-
private const int BaseFrameSize = 1 + 2 + 4 + 1;
49-
private const int StartPayload = 7;
48+
internal const int BaseFrameSize = 1 + 2 + 4 + 1;
49+
internal const int StartPayload = 7;
5050

5151
[MethodImpl(MethodImplOptions.AggressiveInlining)]
5252
private static int WriteBaseFrame(Span<byte> span, FrameType type, ushort channel, int payloadLength)
@@ -152,90 +152,104 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
152152
_rentedArray = rentedArray;
153153
}
154154

155-
private static void ProcessProtocolHeader(ReadOnlySpan<byte> protocolError)
156-
{
157-
byte b1 = protocolError[0];
158-
byte b2 = protocolError[1];
159-
byte b3 = protocolError[2];
160-
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
161-
{
162-
throw new MalformedFrameException("Invalid AMQP protocol header from server");
163-
}
164-
165-
int transportHigh = protocolError[3];
166-
int transportLow = protocolError[4];
167-
int serverMajor = protocolError[5];
168-
int serverMinor = protocolError[6];
169-
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
170-
}
171-
172155
internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, out InboundFrame frame)
173156
{
174-
// We'll always need to read at least 8 bytes (type (1) + channel (2) + payloadSize (4) + end marker (1)) or (8 bytes of protocol error, see ProcessProtocolHeader).
175-
if (buffer.Length < 8)
157+
// We'll always need to read at least 8 bytes (the Framing.BaseFrameSize) or 8 bytes of protocol error (see ProcessProtocolHeader).
158+
if (buffer.Length < Framing.BaseFrameSize)
176159
{
177160
frame = default;
178161
return false;
179162
}
180163

164+
// Let's first check if we have a protocol header.
181165
if (buffer.First.Span[0] == 'A')
182166
{
183-
// Probably an AMQP protocol header, otherwise meaningless
184-
if (buffer.First.Length >= 8)
185-
{
186-
ProcessProtocolHeader(buffer.First.Span.Slice(1, 7));
187-
}
188-
else
189-
{
190-
Span<byte> protocolError = stackalloc byte[7];
191-
buffer.Slice(1, 7).CopyTo(protocolError);
192-
ProcessProtocolHeader(protocolError);
193-
}
167+
ProcessProtocolHeader(buffer);
194168
}
195169

196170
int type = buffer.First.Span[0];
197-
int channel;
198-
int payloadSize;
171+
ParseFrameHeader(buffer, out int channel, out int payloadSize);
199172

200-
if (buffer.First.Length >= 7)
201-
{
202-
channel = NetworkOrderDeserializer.ReadUInt16(buffer.First.Span.Slice(1, 2));
203-
payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.First.Span.Slice(3, 4)); // FIXME - throw exn on unreasonable value
204-
}
205-
else
206-
{
207-
Span<byte> headerBytes = stackalloc byte[6];
208-
buffer.Slice(1, 6).CopyTo(headerBytes);
209-
channel = NetworkOrderDeserializer.ReadUInt16(headerBytes.Slice(0, 2));
210-
payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2, 4)); // FIXME - throw exn on unreasonable value
211-
}
173+
// We'll need to read the payloadSize + FrameEndMarker (1 byte)
174+
int readSize = payloadSize + 1;
212175

213-
const int EndMarkerLength = 1;
214-
int readSize = payloadSize + EndMarkerLength;
215-
216-
// Do we have enough bytes to read an entire frame (type + channel + payloadSize + payload + end marker)
217-
if (buffer.Length < (7 + readSize))
176+
// Do we have enough bytes to read the rest of the frame
177+
if (buffer.Length < (Framing.StartPayload + readSize))
218178
{
219179
frame = default;
220180
return false;
221181
}
222182

223-
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
183+
// The rented array is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
224184
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
225185
Memory<byte> payloadMemory = payloadBytes.AsMemory(0, readSize);
226186
ReadOnlySequence<byte> payloadSlice = buffer.Slice(7, readSize);
227187
payloadSlice.CopyTo(payloadMemory.Span);
188+
189+
// Let's validate that the frame contains a valid Frame End Marker
228190
if (payloadBytes[payloadSize] != Constants.FrameEnd)
229191
{
230192
ArrayPool<byte>.Shared.Return(payloadBytes);
231193
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
232194
}
233195

234-
buffer = buffer.Slice(payloadSlice.End);
196+
// We have a frame!
235197
frame = new InboundFrame((FrameType)type, channel, payloadMemory.Slice(0, payloadSize), payloadBytes);
198+
199+
// Update the buffer
200+
buffer = buffer.Slice(payloadSlice.End);
236201
return true;
237202
}
238203

204+
private static void ProcessProtocolHeader(ReadOnlySequence<byte> buffer)
205+
{
206+
if (buffer.First.Length >= 8)
207+
{
208+
EvaluateProtocolError(buffer.First.Span.Slice(1, 7));
209+
}
210+
else
211+
{
212+
Span<byte> tempBuffer = stackalloc byte[7];
213+
buffer.Slice(1, 7).CopyTo(tempBuffer);
214+
EvaluateProtocolError(tempBuffer);
215+
}
216+
217+
void EvaluateProtocolError(ReadOnlySpan<byte> protocolError)
218+
{
219+
byte b1 = protocolError[0];
220+
byte b2 = protocolError[1];
221+
byte b3 = protocolError[2];
222+
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
223+
{
224+
throw new MalformedFrameException("Invalid AMQP protocol header from server");
225+
}
226+
227+
int transportHigh = protocolError[3];
228+
int transportLow = protocolError[4];
229+
int serverMajor = protocolError[5];
230+
int serverMinor = protocolError[6];
231+
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
232+
}
233+
}
234+
235+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
236+
237+
private static void ParseFrameHeader(ReadOnlySequence<byte> buffer, out int channel, out int payloadSize)
238+
{
239+
if (buffer.First.Length >= 7)
240+
{
241+
channel = NetworkOrderDeserializer.ReadUInt16(buffer.First.Span.Slice(1, 2));
242+
payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.First.Span.Slice(3, 4)); // FIXME - throw exn on unreasonable value
243+
}
244+
else
245+
{
246+
Span<byte> headerBytes = stackalloc byte[6];
247+
buffer.Slice(1, 6).CopyTo(headerBytes);
248+
channel = NetworkOrderDeserializer.ReadUInt16(headerBytes.Slice(0, 2));
249+
payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2, 4)); // FIXME - throw exn on unreasonable value
250+
}
251+
}
252+
239253
public byte[] TakeoverPayload()
240254
{
241255
return _rentedArray;

0 commit comments

Comments
 (0)