Skip to content

Commit 5f41d1a

Browse files
Implementing WriteFlush() in the main RSocket class. Adding helper method for the Setup() call in the main RSocket class (with protocol support in the native-typed version for metadata&data - must be old code). Augmenting the Client class to take a connection policy object (which was already implemented) and removed all dependency on the Protocol layer (which was an abstraction violation anyway).
1 parent e59b7f3 commit 5f41d1a

File tree

3 files changed

+14
-29
lines changed

3 files changed

+14
-29
lines changed

RSocket.Core/RSocket.cs

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,8 @@ public RSocket(IRSocketTransport transport, RSocketOptions options = default)
6161
/// <param name="cancel">Cancellation for the handler. Requesting cancellation will stop message handling.</param>
6262
/// <returns>The handler task.</returns>
6363
public Task Connect(CancellationToken cancel = default) => RSocketProtocol.Handler(this, Transport.Input, cancel);
64+
public Task Setup(TimeSpan keepalive, TimeSpan lifetime, string metadataMimeType = null, string dataMimeType = null, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default) => new RSocketProtocol.Setup(keepalive, lifetime, metadataMimeType: metadataMimeType, dataMimeType: dataMimeType, data: data, metadata: metadata).WriteFlush(Transport.Output, data: data, metadata: metadata);
6465

65-
//public async Task<RSocket> ConnectAsync()
66-
//{
67-
// await Transport.StartAsync();
68-
// var server = RSocketProtocol.Handler(this, Transport.Input, CancellationToken.None, name: nameof(RSocketClient));
69-
// ////TODO Move defaults to policy object
70-
// new RSocketProtocol.Setup(keepalive: TimeSpan.FromSeconds(60), lifetime: TimeSpan.FromSeconds(180), metadataMimeType: "binary", dataMimeType: "binary").Write(Transport.Output);
71-
// await Transport.Output.FlushAsync();
72-
// return this;
73-
//}
7466

7567
//TODO SPEC: A requester MUST not send PAYLOAD frames after the REQUEST_CHANNEL frame until the responder sends a REQUEST_N frame granting credits for number of PAYLOADs able to be sent.
7668

@@ -82,8 +74,7 @@ public IAsyncEnumerable<T> RequestChannel<TSource, T>(IAsyncEnumerable<TSource>
8274
public async Task<IRSocketChannel> RequestChannel(IRSocketStream stream, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, int initial = RSocketOptions.INITIALDEFAULT)
8375
{
8476
var id = StreamDispatch(stream);
85-
new RSocketProtocol.RequestChannel(id, data, metadata, initialRequest: Options.GetInitialRequestSize(initial)).Write(Transport.Output, data, metadata);
86-
await Transport.Output.FlushAsync();
77+
await new RSocketProtocol.RequestChannel(id, data, metadata, initialRequest: Options.GetInitialRequestSize(initial)).WriteFlush(Transport.Output, data, metadata);
8778
var channel = new ChannelHandler(this, id);
8879
return channel;
8980
}
@@ -98,9 +89,7 @@ protected class ChannelHandler : IRSocketChannel //TODO hmmm...
9889
public Task Send((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
9990
{
10091
if (!Socket.Dispatcher.ContainsKey(Stream)) { throw new InvalidOperationException("Channel is closed"); }
101-
new RSocketProtocol.Payload(Stream, value.data, value.metadata, next: true).Write(Socket.Transport.Output, value.data, value.metadata);
102-
var result = Socket.Transport.Output.FlushAsync();
103-
return result.IsCompleted ? Task.CompletedTask : result.AsTask();
92+
return new RSocketProtocol.Payload(Stream, value.data, value.metadata, next: true).WriteFlush(Socket.Transport.Output, value.data, value.metadata);
10493
}
10594
}
10695

@@ -113,9 +102,7 @@ public Task RequestStream(IRSocketStream stream, ReadOnlySequence<byte> data, Re
113102
{
114103
if (initial <= INITIALDEFAULT) { initial = Options.InitialRequestSize; }
115104
var id = StreamDispatch(stream);
116-
new RSocketProtocol.RequestStream(id, data, metadata, initialRequest: Options.GetInitialRequestSize(initial)).Write(Transport.Output, data, metadata);
117-
var result = Transport.Output.FlushAsync();
118-
return result.IsCompleted ? Task.CompletedTask : result.AsTask();
105+
return new RSocketProtocol.RequestStream(id, data, metadata, initialRequest: Options.GetInitialRequestSize(initial)).WriteFlush(Transport.Output, data, metadata);
119106
}
120107

121108
public Task<T> RequestResponse<T>(Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), T> resultmapper,
@@ -125,9 +112,7 @@ public Task<T> RequestResponse<T>(Func<(ReadOnlySequence<byte> data, ReadOnlySeq
125112
public Task RequestResponse(IRSocketStream stream, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default)
126113
{
127114
var id = StreamDispatch(stream);
128-
new RSocketProtocol.RequestResponse(id, data, metadata).Write(Transport.Output, data, metadata);
129-
var result = Transport.Output.FlushAsync();
130-
return result.IsCompleted ? Task.CompletedTask : result.AsTask();
115+
return new RSocketProtocol.RequestResponse(id, data, metadata).WriteFlush(Transport.Output, data, metadata);
131116
}
132117

133118

@@ -138,9 +123,7 @@ public Task RequestFireAndForget(
138123
public Task RequestFireAndForget(IRSocketStream stream, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default)
139124
{
140125
var id = StreamDispatch(stream);
141-
new RSocketProtocol.RequestFireAndForget(id, data, metadata).Write(Transport.Output, data, metadata);
142-
var result = Transport.Output.FlushAsync();
143-
return result.IsCompleted ? Task.CompletedTask : result.AsTask();
126+
return new RSocketProtocol.RequestFireAndForget(id, data, metadata).WriteFlush(Transport.Output, data, metadata);
144127
}
145128

146129

RSocket.Core/RSocketClient.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,18 @@ public class RSocketClient : RSocket
2020

2121
public RSocketClient(IRSocketTransport transport, RSocketOptions options = default) : base(transport, options) { }
2222

23-
public async Task ConnectAsync()
23+
public Task ConnectAsync(RSocketOptions options = default, byte[] data = default, byte[] metadata = default) => ConnectAsync(options, data: data == default ? default : new ReadOnlySequence<byte>(data), metadata: metadata == default ? default : new ReadOnlySequence<byte>(metadata));
24+
25+
public async Task ConnectAsync(RSocketOptions options = default, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> data = default)
2426
{
27+
options = options ?? RSocketOptions.Default;
2528
await Transport.StartAsync();
2629
Handler = Connect(CancellationToken.None);
27-
////TODO Move defaults to policy object, also, maybe not inline, like prefer a method.
28-
new RSocketProtocol.Setup(keepalive: TimeSpan.FromSeconds(60), lifetime: TimeSpan.FromSeconds(180), metadataMimeType: "binary", dataMimeType: "binary").Write(Transport.Output);
29-
await Transport.Output.FlushAsync();
30+
await Setup(options.KeepAlive, options.Lifetime, options.MetadataMimeType, options.DataMimeType, data: data, metadata: metadata);
3031
}
3132

32-
//Ugh, these are all garbage. Remove in favor of the transformation ones.
33+
34+
//TODO Ugh, these are all garbage. Remove in favor of the transformation ones.
3335
public Task<IRSocketChannel> RequestChannel<TData>(IRSocketStream stream, TData data, ReadOnlySpan<byte> metadata = default, int initial = INITIALDEFAULT) => RequestChannel(stream, RequestDataSerializer.Serialize(data), metadata, initial);
3436
public Task<IRSocketChannel> RequestChannel<TMetadata>(IRSocketStream stream, ReadOnlySpan<byte> data, TMetadata metadata = default, int initial = INITIALDEFAULT) => RequestChannel(stream, data, RequestMetadataSerializer.Serialize(metadata), initial);
3537
public Task<IRSocketChannel> RequestChannel<TData, TMetadata>(IRSocketStream stream, TData data, TMetadata metadata = default, int initial = INITIALDEFAULT) => RequestChannel(stream, RequestDataSerializer.Serialize(data), RequestMetadataSerializer.Serialize(metadata), initial);

RSocket.Core/RSocketProtocol.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ public ref struct Setup
765765
public int Length => Header.Length + InnerLength + Header.MetadataHeaderLength + MetadataLength + DataLength;
766766

767767

768-
public Setup(TimeSpan keepalive, TimeSpan lifetime, string metadataMimeType = null, string dataMimeType = null) : this((int)keepalive.TotalMilliseconds, (int)lifetime.TotalMilliseconds, string.IsNullOrEmpty(metadataMimeType) ? string.Empty : metadataMimeType, string.IsNullOrEmpty(dataMimeType) ? string.Empty : dataMimeType) { }
768+
public Setup(TimeSpan keepalive, TimeSpan lifetime, string metadataMimeType = null, string dataMimeType = null, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default) : this((int)keepalive.TotalMilliseconds, (int)lifetime.TotalMilliseconds, string.IsNullOrEmpty(metadataMimeType) ? string.Empty : metadataMimeType, string.IsNullOrEmpty(dataMimeType) ? string.Empty : dataMimeType, data: data, metadata: metadata) { }
769769

770770
public Setup(Int32 keepalive, Int32 lifetime, string metadataMimeType, string dataMimeType, byte[] resumeToken = default, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default)
771771
{

0 commit comments

Comments
 (0)