Skip to content

Reader / Writer improvements #857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
EmitLine("");
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
EmitLine(" {");
var lastWasBitClass = false;
foreach (AmqpField f in m.m_Fields)
{
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
string mangleClass = MangleClass(ResolveDomain(f.Domain));
if (mangleClass != "Bit")
{
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
lastWasBitClass = false;
}
}
else
{
lastWasBitClass = true;
}

EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
}
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
}
EmitLine(" }");
EmitLine("");
Expand Down Expand Up @@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
EmitLine(" {");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
EmitLine(" result.ReadArgumentsFrom(ref reader);");
EmitLine(" return result;");
EmitLine(" }");
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>..\..\packages</PackageOutputPath>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">
Expand Down
22 changes: 0 additions & 22 deletions projects/RabbitMQ.Client/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ class Command : IDisposable
private const int EmptyFrameSize = 8;
private readonly bool _returnBufferOnDispose;

static Command()
{
CheckEmptyFrameSize();
}

internal Command(MethodBase method) : this(method, null, null, false)
{
}
Expand All @@ -80,23 +75,6 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>

internal MethodBase Method { get; private set; }

public static void CheckEmptyFrameSize()
{
var f = new EmptyOutboundFrame();
byte[] b = new byte[f.GetMinimumBufferSize()];
f.WriteTo(b);
long actualLength = f.ByteCount;

if (EmptyFrameSize != actualLength)
{
string message =
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
EmptyFrameSize,
actualLength);
throw new ProtocolViolationException(message);
}
}

internal void Transmit(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
Expand Down
8 changes: 5 additions & 3 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ public Command HandleFrame(in InboundFrame f)
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload);
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
ulong totalBodyBytes = m_header.ReadFrom(span.Slice(2));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public virtual object Clone()
///<summary>
/// Fill this instance from the given byte buffer stream.
///</summary>
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
internal ulong ReadFrom(ReadOnlySpan<byte> span)
{
// Skipping the first two bytes since they arent used (weight - not currently used)
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(span.Slice(2));
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span.Slice(10));
ReadPropertiesFrom(ref reader);
return bodySize;
}
Expand All @@ -81,12 +81,12 @@ internal ulong ReadFrom(ReadOnlyMemory<byte> memory)

private const ushort ZERO = 0;

internal int WriteTo(Memory<byte> memory, ulong bodySize)
internal int WriteTo(Span<byte> span, ulong bodySize)
{
NetworkOrderSerializer.WriteUInt16(memory.Span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(memory.Slice(2).Span, bodySize);
NetworkOrderSerializer.WriteUInt16(span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(span.Slice(2), bodySize);

ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(span.Slice(10));
WritePropertiesTo(ref writer);
return 10 + writer.Offset;
}
Expand Down
77 changes: 39 additions & 38 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderPropertyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,28 @@

namespace RabbitMQ.Client.Impl
{
internal struct ContentHeaderPropertyReader
internal ref struct ContentHeaderPropertyReader
{
private ushort m_bitCount;
private ushort m_flagWord;
private int _memoryOffset;
private readonly ReadOnlyMemory<byte> _memory;
private const int StartBitMask = 0b1000_0000_0000_0000;
private const int EndBitMask = 0b0000_0000_0000_0001;

public ContentHeaderPropertyReader(ReadOnlyMemory<byte> memory)
{
_memory = memory;
_memoryOffset = 0;
m_flagWord = 1; // just the continuation bit
m_bitCount = 15; // the correct position to force a m_flagWord read
}
private readonly ReadOnlySpan<byte> _span;
private int _offset;
private int _bitMask;
private int _bits;

public bool ContinuationBitSet
private ReadOnlySpan<byte> Span => _span.Slice(_offset);

public ContentHeaderPropertyReader(ReadOnlySpan<byte> span)
{
get { return (m_flagWord & 1) != 0; }
_span = span;
_offset = 0;
_bitMask = EndBitMask; // force a flag read
_bits = 1; // just the continuation bit
}

private bool ContinuationBitSet => (_bits & EndBitMask) != 0;

public void FinishPresence()
{
if (ContinuationBitSet)
Expand All @@ -78,82 +80,81 @@ public bool ReadBit()
return ReadPresence();
}

public void ReadFlagWord()
private void ReadBits()
{
if (!ContinuationBitSet)
{
throw new MalformedFrameException("Attempted to read flag word when none advertised");
}
m_flagWord = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 2;
m_bitCount = 0;
_bits = NetworkOrderDeserializer.ReadUInt16(Span);
_offset += 2;
_bitMask = StartBitMask;
}

public uint ReadLong()
{
uint result = NetworkOrderDeserializer.ReadUInt32(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 4;
uint result = NetworkOrderDeserializer.ReadUInt32(Span);
_offset += 4;
return result;
}

public ulong ReadLonglong()
{
ulong result = NetworkOrderDeserializer.ReadUInt64(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 8;
ulong result = NetworkOrderDeserializer.ReadUInt64(Span);
_offset += 8;
return result;
}

public byte[] ReadLongstr()
{
byte[] result = WireFormatting.ReadLongstr(_memory.Slice(_memoryOffset));
_memoryOffset += 4 + result.Length;
byte[] result = WireFormatting.ReadLongstr(Span);
_offset += 4 + result.Length;
return result;
}

public byte ReadOctet()
{
return _memory.Span[_memoryOffset++];
return _span[_offset++];
}

public bool ReadPresence()
{
if (m_bitCount == 15)
if (_bitMask == EndBitMask)
{
ReadFlagWord();
ReadBits();
}

int bit = 15 - m_bitCount;
bool result = (m_flagWord & (1 << bit)) != 0;
m_bitCount++;
bool result = (_bits & _bitMask) != 0;
_bitMask >>= 1;
return result;
}

public ushort ReadShort()
{
ushort result = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 2;
ushort result = NetworkOrderDeserializer.ReadUInt16(Span);
_offset += 2;
return result;
}

public string ReadShortstr()
{
string result = WireFormatting.ReadShortstr(_memory.Slice(_memoryOffset), out int bytesRead);
_memoryOffset += bytesRead;
string result = WireFormatting.ReadShortstr(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

/// <returns>A type of <seealso cref="System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns>
public Dictionary<string, object> ReadTable()
{
Dictionary<string, object> result = WireFormatting.ReadTable(_memory.Slice(_memoryOffset), out int bytesRead);
_memoryOffset += bytesRead;
Dictionary<string, object> result = WireFormatting.ReadTable(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

public AmqpTimestamp ReadTimestamp()
{
AmqpTimestamp result = WireFormatting.ReadTimestamp(_memory.Slice(_memoryOffset));
_memoryOffset += 8;
AmqpTimestamp result = WireFormatting.ReadTimestamp(Span);
_offset += 8;
return result;
}
}
Expand Down
Loading