Skip to content

Commit e59b7f3

Browse files
Adding RSocketProtocol.All.WriteFlush() methods to help with proper pipeline flushing for single messages. Changing the non-mutating Protocol.ReadXXX methods from "ref SequenceReader" to "in SequenceReader" - the test methods want a mutating one, so this clarifies that they do not advance the SequenceReader. On the decoding-side, apparently Setup frames can have Metadata, so adding that. This must be from the very first days of the project to have been missed... Adding TODO to allow the ProtocolHandler to extract and pass those (but not in this commit as it's more invasive). Possibly should review the spec to ensure that all Data/Metadata containing messages have the appropriate dispatching. Adding back the ProtocolTests class with placeholders for other message types (more to come).
1 parent 7644d93 commit e59b7f3

File tree

5 files changed

+144
-57
lines changed

5 files changed

+144
-57
lines changed

RSocket.Core.Tests/ProtocolTests.cs

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
1+
using System;
2+
using System.Buffers;
3+
using System.IO.Pipelines;
4+
using System.Threading.Tasks;
15
using Microsoft.VisualStudio.TestTools.UnitTesting;
26

37
namespace RSocket.Tests
48
{
59
[TestClass]
610
public class ProtocolTests
711
{
12+
Pipe Pipe = new Pipe();
13+
ReadOnlySequence<byte> Encode(string value) => new ReadOnlySequence<byte>(System.Text.Encoding.UTF8.GetBytes(value));
14+
string Decode(ReadOnlySequence<byte> value) => System.Text.Encoding.UTF8.GetString(value.ToArray());
15+
void AssertReaderEmpty() => Assert.IsFalse(Pipe.Reader.TryRead(out var readresult), "Reader not empty!");
16+
17+
818
[TestMethod]
919
public void SimpleInvariantsTest()
1020
{
1121
Assert.AreEqual(0, RSocketProtocol.Header.DEFAULT_STREAM, "Default Stream should always be zero.");
12-
1322
Assert.AreEqual(1, RSocketProtocol.MAJOR_VERSION, nameof(RSocketProtocol.MAJOR_VERSION));
1423
Assert.AreEqual(0, RSocketProtocol.MINOR_VERSION, nameof(RSocketProtocol.MINOR_VERSION));
15-
}
16-
17-
[TestMethod]
18-
public void StateMachineBasicTest()
19-
{
20-
}
21-
22-
23-
[TestMethod]
24-
public void SetupValidationTest()
25-
{
24+
AssertReaderEmpty();
2625
}
2726

2827
[TestMethod]
@@ -84,5 +83,79 @@ public void ExtensionValidationTest()
8483
{
8584
}
8685

86+
[TestMethod]
87+
public void SetupValidationTest()
88+
{
89+
var message = Write(new RSocketProtocol.Setup(123, 456, nameof(RSocketProtocol.Setup.MetadataMimeType), nameof(RSocketProtocol.Setup.DataMimeType)));
90+
var actual = ReadSetup();
91+
Assert.AreEqual(message.MajorVersion, actual.MajorVersion, nameof(message.MajorVersion));
92+
Assert.AreEqual(message.MinorVersion, actual.MinorVersion, nameof(message.MinorVersion));
93+
Assert.AreEqual(message.KeepAlive, actual.KeepAlive, nameof(message.KeepAlive));
94+
Assert.AreEqual(message.Lifetime, actual.Lifetime, nameof(message.Lifetime));
95+
CollectionAssert.AreEqual((message.ResumeToken ?? new byte[0]), actual.ResumeToken, nameof(message.ResumeToken));
96+
Assert.AreEqual(message.MetadataMimeType, actual.MetadataMimeType, nameof(message.MetadataMimeType));
97+
Assert.AreEqual(message.DataMimeType, actual.DataMimeType, nameof(message.DataMimeType));
98+
AssertReaderEmpty();
99+
}
100+
101+
[TestMethod]
102+
public void SetupWithDataMetadataTest()
103+
{
104+
var test = (metadata: Encode("Test Metadata"), data: Encode("Test Data"));
105+
106+
Write(new RSocketProtocol.Setup(123, 456, nameof(RSocketProtocol.Setup.MetadataMimeType), nameof(RSocketProtocol.Setup.DataMimeType)));
107+
var Neither = ReadSetup(out var metadata, out var data);
108+
Assert.AreEqual(string.Empty, Decode(metadata), $"{nameof(Neither)} {nameof(metadata)} mismatch!");
109+
Assert.AreEqual(string.Empty, Decode(data), $"{nameof(Neither)} {nameof(data)} mismatch");
110+
AssertReaderEmpty();
111+
112+
Write(new RSocketProtocol.Setup(123, 456, nameof(RSocketProtocol.Setup.MetadataMimeType), nameof(RSocketProtocol.Setup.DataMimeType), data: test.data), data: test.data);
113+
var DataOnly = ReadSetup(out metadata, out data);
114+
Assert.AreEqual(string.Empty, Decode(metadata), $"{nameof(DataOnly)} {nameof(metadata)} mismatch!");
115+
Assert.AreEqual(Decode(test.data), Decode(data), $"{nameof(DataOnly)} {nameof(data)} mismatch");
116+
AssertReaderEmpty();
117+
118+
Write(new RSocketProtocol.Setup(123, 456, nameof(RSocketProtocol.Setup.MetadataMimeType), nameof(RSocketProtocol.Setup.DataMimeType), metadata: test.metadata), metadata: test.metadata);
119+
var MetadataOnly = ReadSetup(out metadata, out data);
120+
Assert.AreEqual(Decode(test.metadata), Decode(metadata), $"{nameof(MetadataOnly)} {nameof(metadata)} mismatch!");
121+
Assert.AreEqual(string.Empty, Decode(data), $"{nameof(MetadataOnly)} {nameof(data)} mismatch");
122+
AssertReaderEmpty();
123+
124+
Write(new RSocketProtocol.Setup(123, 456, nameof(RSocketProtocol.Setup.MetadataMimeType), nameof(RSocketProtocol.Setup.DataMimeType), metadata: test.metadata, data: test.data), metadata: test.metadata, data: test.data);
125+
var Both = ReadSetup(out metadata, out data);
126+
Assert.AreEqual(Decode(test.metadata), Decode(metadata), $"{nameof(Both)} {nameof(metadata)} mismatch!");
127+
Assert.AreEqual(Decode(test.data), Decode(data), $"{nameof(Both)} {nameof(data)} mismatch");
128+
AssertReaderEmpty();
129+
}
130+
131+
132+
#region Read/Write Helpers
133+
RSocketProtocol.Setup ReadSetup() => ReadSetup(out var metadata, out var data);
134+
RSocketProtocol.Setup ReadSetup(out ReadOnlySequence<byte> metadata, out ReadOnlySequence<byte> data) { var result = new RSocketProtocol.Setup(Read(out var reader, RSocketProtocol.Types.Setup), ref reader); result.Read(ref reader, out metadata, out data); Pipe.Reader.AdvanceTo(reader.Position); return result; }
135+
136+
RSocketProtocol.Header Read(out SequenceReader<byte> reader, RSocketProtocol.Types type)
137+
{
138+
Assert.IsTrue(Pipe.Reader.TryRead(out var result), "Failed to read Pipe.");
139+
var (Length, IsEndOfMessage) = RSocketProtocol.MessageFramePeek(result.Buffer);
140+
reader = new SequenceReader<byte>(result.Buffer.Slice(RSocketProtocol.MESSAGEFRAMESIZE, Length));
141+
var header = new RSocketProtocol.Header(ref reader, Length);
142+
Assert.AreEqual(type, header.Type, "Incorrect Message Type");
143+
return header;
144+
}
145+
146+
RSocketProtocol.Setup Write(RSocketProtocol.Setup message, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default) { message.WriteFlush(Pipe.Writer, data: data, metadata: metadata).Wait(); return message; }
147+
//void Write(RSocketProtocol.Lease message) => message.WriteFlush(Writer).Wait();
148+
//void Write(RSocketProtocol.KeepAlive message) => message.WriteFlush(Writer).Wait();
149+
//void Write(RSocketProtocol.RequestResponse message) => message.WriteFlush(Writer).Wait();
150+
//void Write(RSocketProtocol.RequestFireAndForget message) => message.WriteFlush(Writer).Wait();
151+
//void Write(RSocketProtocol.RequestStream message) => message.WriteFlush(Writer).Wait();
152+
//void Write(RSocketProtocol.RequestChannel message) => message.WriteFlush(Writer).Wait();
153+
//void Write(RSocketProtocol.RequestN message) => message.WriteFlush(Writer).Wait();
154+
//void Write(RSocketProtocol.Cancel message) => message.WriteFlush(Writer).Wait();
155+
void Write(RSocketProtocol.Payload message) => message.WriteFlush(Pipe.Writer).Wait();
156+
//void Write(RSocketProtocol.Error message) => message.WriteFlush(Writer).Wait();
157+
//void Write(RSocketProtocol.MetadataPush message) => message.WriteFlush(Writer).Wait();
158+
//void Write(RSocketProtocol.Extension message) => message.WriteFlush(Writer).Wait();
159+
#endregion
87160
}
88-
}
161+
}

RSocket.Core.Tests/RSocket.Core.Tests.csproj

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
<LangVersion>latest</LangVersion>
99
</PropertyGroup>
1010

11-
<ItemGroup>
12-
<Compile Remove="ProtocolTests.cs" />
13-
</ItemGroup>
14-
1511
<ItemGroup>
1612
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
1713
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" />

RSocket.Core/RSocketProtocol.Handler.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Buffers;
33
using System.Buffers.Binary;
44
using System.Collections.Generic;
@@ -62,7 +62,7 @@ Task Process(int framelength, ReadOnlySequence<byte> sequence)
6262
case Types.Reserved: throw new InvalidOperationException($"Protocol Reserved! [{header.Type}]");
6363
case Types.Setup:
6464
var setup = new Setup(header, ref reader);
65-
OnSetup(sink, setup);
65+
OnSetup(sink, setup); //TODO These can have metadata! , setup.ReadMetadata(ref reader), setup.ReadData(ref reader)););
6666
break;
6767
case Types.Lease:
6868
var lease = new Lease(header, ref reader);
@@ -72,19 +72,19 @@ Task Process(int framelength, ReadOnlySequence<byte> sequence)
7272
break;
7373
case Types.Request_Response:
7474
var requestresponse = new RequestResponse(header, ref reader);
75-
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(ref reader), requestresponse.ReadData(ref reader)); }
75+
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(reader), requestresponse.ReadData(reader)); }
7676
break;
7777
case Types.Request_Fire_And_Forget:
7878
var requestfireandforget = new RequestFireAndForget(header, ref reader);
79-
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(ref reader), requestfireandforget.ReadData(ref reader)); }
79+
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(reader), requestfireandforget.ReadData(reader)); }
8080
break;
8181
case Types.Request_Stream:
8282
var requeststream = new RequestStream(header, ref reader);
83-
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(ref reader), requeststream.ReadData(ref reader)); }
83+
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(reader), requeststream.ReadData(reader)); }
8484
break;
8585
case Types.Request_Channel:
8686
var requestchannel = new RequestChannel(header, ref reader);
87-
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(ref reader), requestchannel.ReadData(ref reader)); }
87+
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(reader), requestchannel.ReadData(reader)); }
8888
break;
8989
case Types.Request_N:
9090
var requestne = new RequestN(header, ref reader);
@@ -97,7 +97,7 @@ Task Process(int framelength, ReadOnlySequence<byte> sequence)
9797
Decoded(payload.ToString());
9898
if (payload.Validate())
9999
{
100-
OnPayload(sink, payload, payload.ReadMetadata(ref reader), payload.ReadData(ref reader));
100+
OnPayload(sink, payload, payload.ReadMetadata(reader), payload.ReadData(reader));
101101
//reader.Sequence.Slice(reader.Position, payload.MetadataLength), reader.Sequence.Slice(reader.Sequence.GetPosition(payload.MetadataLength, reader.Position), payload.DataLength));
102102
}
103103
break;

RSocket.Core/RSocketProtocol.StateMachine.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Buffers;
33
using System.Buffers.Binary;
44
using System.Collections.Generic;
@@ -85,19 +85,19 @@ void Process(int framelength, ReadOnlySequence<byte> sequence)
8585
break;
8686
case Types.Request_Response:
8787
var requestresponse = new RequestResponse(header, ref reader);
88-
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(ref reader), requestresponse.ReadData(ref reader)); }
88+
if (requestresponse.Validate()) { OnRequestResponse(sink, requestresponse, requestresponse.ReadMetadata(reader), requestresponse.ReadData(reader)); }
8989
break;
9090
case Types.Request_Fire_And_Forget:
9191
var requestfireandforget = new RequestFireAndForget(header, ref reader);
92-
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(ref reader), requestfireandforget.ReadData(ref reader)); }
92+
if (requestfireandforget.Validate()) { OnRequestFireAndForget(sink, requestfireandforget, requestfireandforget.ReadMetadata(reader), requestfireandforget.ReadData(reader)); }
9393
break;
9494
case Types.Request_Stream:
9595
var requeststream = new RequestStream(header, ref reader);
96-
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(ref reader), requeststream.ReadData(ref reader)); }
96+
if (requeststream.Validate()) { OnRequestStream(sink, requeststream, requeststream.ReadMetadata(reader), requeststream.ReadData(reader)); }
9797
break;
9898
case Types.Request_Channel:
9999
var requestchannel = new RequestChannel(header, ref reader);
100-
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(ref reader), requestchannel.ReadData(ref reader)); }
100+
if (requestchannel.Validate()) { OnRequestChannel(sink, requestchannel, requestchannel.ReadMetadata(reader), requestchannel.ReadData(reader)); }
101101
break;
102102
case Types.Request_N:
103103
var requestne = new RequestN(header, ref reader);
@@ -110,7 +110,7 @@ void Process(int framelength, ReadOnlySequence<byte> sequence)
110110
Decoded(payload.ToString());
111111
if (payload.Validate())
112112
{
113-
OnPayload(sink, payload, payload.ReadMetadata(ref reader), payload.ReadData(ref reader));
113+
OnPayload(sink, payload, payload.ReadMetadata(reader), payload.ReadData(reader));
114114
//reader.Sequence.Slice(reader.Position, payload.MetadataLength), reader.Sequence.Slice(reader.Sequence.GetPosition(payload.MetadataLength, reader.Position), payload.DataLength));
115115
}
116116
break;

0 commit comments

Comments
 (0)