Skip to content

Commit 267fcd2

Browse files
committed
Merge pull request #878 from bollhals/868
fix issue 868 (cherry picked from commit a654b1e)
1 parent 19471e2 commit 267fcd2

28 files changed

+901
-593
lines changed

projects/Apigen/apigen/Apigen.cs

+24-5
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
855855
EmitLine("");
856856
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
857857
EmitLine(" {");
858+
var lastWasBitClass = false;
858859
foreach (AmqpField f in m.m_Fields)
859860
{
860-
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
861+
string mangleClass = MangleClass(ResolveDomain(f.Domain));
862+
if (mangleClass != "Bit")
863+
{
864+
if (lastWasBitClass)
865+
{
866+
EmitLine($" writer.EndBits();");
867+
lastWasBitClass = false;
868+
}
869+
}
870+
else
871+
{
872+
lastWasBitClass = true;
873+
}
874+
875+
EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
876+
}
877+
if (lastWasBitClass)
878+
{
879+
EmitLine($" writer.EndBits();");
861880
}
862881
EmitLine(" }");
863882
EmitLine("");
@@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)
933952

934953
public void EmitMethodArgumentReader()
935954
{
936-
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
955+
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
937956
EmitLine(" {");
938-
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
939-
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
957+
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
958+
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
940959
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
941960
EmitLine(" if(result != null)");
942961
EmitLine(" {");
943-
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
962+
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
944963
EmitLine(" result.ReadArgumentsFrom(ref reader);");
945964
EmitLine(" return result;");
946965
EmitLine(" }");

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<MinVerVerbosity>minimal</MinVerVerbosity>
2727
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2828
<PackageOutputPath>..\..\packages</PackageOutputPath>
29+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
2930
</PropertyGroup>
3031

3132
<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">

projects/RabbitMQ.Client/client/impl/Command.cs

+42-30
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040

4141
using System;
4242
using System.Buffers;
43-
using System.Collections.Generic;
4443
using System.Runtime.InteropServices;
45-
using RabbitMQ.Client.Exceptions;
4644
using RabbitMQ.Client.Framing.Impl;
4745

4846
namespace RabbitMQ.Client.Impl
@@ -57,11 +55,6 @@ class Command : IDisposable
5755
private const int EmptyFrameSize = 8;
5856
private readonly bool _returnBufferOnDispose;
5957

60-
static Command()
61-
{
62-
CheckEmptyFrameSize();
63-
}
64-
6558
internal Command(MethodBase method) : this(method, null, null, false)
6659
{
6760
}
@@ -80,38 +73,57 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>
8073

8174
internal MethodBase Method { get; private set; }
8275

83-
public static void CheckEmptyFrameSize()
76+
internal void Transmit(ushort channelNumber, Connection connection)
8477
{
85-
var f = new EmptyOutboundFrame();
86-
byte[] b = new byte[f.GetMinimumBufferSize()];
87-
f.WriteTo(b);
88-
long actualLength = f.ByteCount;
78+
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
79+
var size = GetMaxSize(maxBodyPayloadBytes);
8980

90-
if (EmptyFrameSize != actualLength)
81+
// Will be returned by SocketFrameWriter.WriteLoop
82+
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
83+
var span = memory.Span;
84+
85+
var offset = Framing.Method.WriteTo(span, channelNumber, Method);
86+
if (Method.HasContent)
9187
{
92-
string message =
93-
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
94-
EmptyFrameSize,
95-
actualLength);
96-
throw new ProtocolViolationException(message);
88+
int remainingBodyBytes = Body.Length;
89+
offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, Header, remainingBodyBytes);
90+
var bodySpan = Body.Span;
91+
while (remainingBodyBytes > 0)
92+
{
93+
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
94+
offset += Framing.BodySegment.WriteTo(span.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
95+
remainingBodyBytes -= frameSize;
96+
}
9797
}
98+
99+
if (offset != size)
100+
{
101+
throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}");
102+
}
103+
104+
connection.Write(memory);
98105
}
99106

100-
internal void Transmit(int channelNumber, Connection connection)
107+
private int GetMaxSize(int maxPayloadBytes)
101108
{
102-
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
103-
if (Method.HasContent)
109+
if (!Method.HasContent)
104110
{
105-
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
106-
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
107-
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
108-
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
109-
{
110-
int remaining = Body.Length - offset;
111-
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
112-
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
113-
}
111+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize();
112+
}
113+
114+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize() +
115+
Framing.Header.FrameSize + Header.GetRequiredPayloadBufferSize() +
116+
Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + Body.Length;
117+
}
118+
119+
private int GetBodyFrameCount(int maxPayloadBytes)
120+
{
121+
if (maxPayloadBytes == int.MaxValue)
122+
{
123+
return 1;
114124
}
125+
126+
return (Body.Length + maxPayloadBytes - 1) / maxPayloadBytes;
115127
}
116128

117129
public void Dispose()

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

+8-3
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,27 @@ public Command HandleFrame(in InboundFrame f)
8181
{
8282
throw new UnexpectedFrameException(f.Type);
8383
}
84-
m_method = m_protocol.DecodeMethodFrom(f.Payload);
84+
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
8585
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
8686
return CompletedCommand();
8787
case AssemblyState.ExpectingContentHeader:
8888
if (!f.IsHeader())
8989
{
9090
throw new UnexpectedFrameException(f.Type);
9191
}
92-
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
93-
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));
92+
93+
ReadOnlySpan<byte> span = f.Payload.Span;
94+
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
95+
m_header.ReadFrom(span.Slice(12));
96+
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
9497
if (totalBodyBytes > MaxArrayOfBytesSize)
9598
{
9699
throw new UnexpectedFrameException(f.Type);
97100
}
98101

99102
m_remainingBodyBytes = (int)totalBodyBytes;
103+
104+
// Is returned by Command.Dispose in Session.HandleFrame
100105
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
101106
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
102107
UpdateContentBodyState();

projects/RabbitMQ.Client/client/impl/Connection.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Buffers;
4243
using System.Collections.Generic;
4344
using System.IO;
4445
using System.Net;
@@ -62,7 +63,6 @@ internal sealed class Connection : IConnection
6263
private readonly object _eventLock = new object();
6364

6465
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
65-
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();
6666

6767
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);
6868

@@ -903,7 +903,7 @@ public void HeartbeatWriteTimerCallback(object state)
903903
{
904904
if (!_closed)
905905
{
906-
WriteFrame(_heartbeatFrame);
906+
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
907907
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
908908
}
909909
}
@@ -940,9 +940,9 @@ public override string ToString()
940940
return string.Format("Connection({0},{1})", _id, Endpoint);
941941
}
942942

943-
public void WriteFrame(OutboundFrame f)
943+
public void Write(Memory<byte> memory)
944944
{
945-
_frameHandler.WriteFrame(f);
945+
_frameHandler.Write(memory);
946946
}
947947

948948
public void UpdateSecret(string newSecret, string reason)

projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs

+2-24
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
using System;
4242
using System.Text;
4343

44-
using RabbitMQ.Util;
45-
4644
namespace RabbitMQ.Client.Impl
4745
{
4846
abstract class ContentHeaderBase : IContentHeader
@@ -67,35 +65,15 @@ public virtual object Clone()
6765
///<summary>
6866
/// Fill this instance from the given byte buffer stream.
6967
///</summary>
70-
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
68+
internal void ReadFrom(ReadOnlySpan<byte> span)
7169
{
72-
// Skipping the first two bytes since they arent used (weight - not currently used)
73-
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
74-
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
70+
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span);
7571
ReadPropertiesFrom(ref reader);
76-
return bodySize;
7772
}
7873

7974
internal abstract void ReadPropertiesFrom(ref ContentHeaderPropertyReader reader);
8075
internal abstract void WritePropertiesTo(ref ContentHeaderPropertyWriter writer);
8176

82-
private const ushort ZERO = 0;
83-
84-
internal int WriteTo(Memory<byte> memory, ulong bodySize)
85-
{
86-
NetworkOrderSerializer.WriteUInt16(memory.Span, ZERO); // Weight - not used
87-
NetworkOrderSerializer.WriteUInt64(memory.Slice(2).Span, bodySize);
88-
89-
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
90-
WritePropertiesTo(ref writer);
91-
return 10 + writer.Offset;
92-
}
93-
public int GetRequiredBufferSize()
94-
{
95-
// The first 10 bytes are the Weight (2 bytes) + body size (8 bytes)
96-
return 10 + GetRequiredPayloadBufferSize();
97-
}
98-
9977
public abstract int GetRequiredPayloadBufferSize();
10078
}
10179
}

0 commit comments

Comments
 (0)