Skip to content

Commit 646b7c7

Browse files
RPC Service updates. Integration with gRPC tooling in the samples and compatibility with the EchoServer. Skeletons are close (methodname might change), but coming together.
1 parent 11a968e commit 646b7c7

File tree

9 files changed

+199
-27
lines changed

9 files changed

+199
-27
lines changed

RSocket.RPC.Tests/RSocketServiceTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ public class RSocketServiceTests
2121
[TestMethod]
2222
public void ServerBasicTest()
2323
{
24-
var data = new TestData();
25-
var response = Service.RequestResponse(data).Result;
26-
var result = new TestData(response);
24+
//var data = new TestData();
25+
//var response = Service.RequestResponse(data).Result;
26+
//var result = new TestData(response);
2727

28-
Assert.AreEqual(data, result, $"{nameof(Service.RequestResponse)} did not round trip on bytes.");
28+
//Assert.AreEqual(data, result, $"{nameof(Service.RequestResponse)} did not round trip on bytes.");
2929

3030
//public Task<ReadOnlySequence<byte>> requestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.RequestResponse<ReadOnlySequence<byte>>(ServicePrefix + nameof(EchoService), nameof(requestResponse), data, metadata);
3131
//public Task<ReadOnlySequence<byte>> requestStream(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.RequestStream<ReadOnlySequence<byte>>(ServicePrefix + nameof(EchoService), nameof(requestStream), data, metadata);
@@ -41,8 +41,8 @@ public class TestService : RSocketService<TestService>
4141
private const string ServicePrefix = "";
4242
public TestService(RSocketClient client) : base(client) { }
4343
//public void fireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }
44-
public Task<ReadOnlySequence<byte>> RequestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.__RequestResponse<ReadOnlySequence<byte>>(ServicePrefix + nameof(TestService), nameof(RequestResponse), data, metadata);
45-
public Task<ReadOnlySequence<byte>> RequestStream(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.__RequestStream<ReadOnlySequence<byte>>(ServicePrefix + nameof(TestService), nameof(RequestStream), data, metadata);
44+
//public Task<ReadOnlySequence<byte>> RequestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.__RequestResponse<ReadOnlySequence<byte>>(ServicePrefix + nameof(TestService), nameof(RequestResponse), data, metadata);
45+
//public Task<ReadOnlySequence<byte>> RequestStream(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => base.__RequestStream<ReadOnlySequence<byte>>(ServicePrefix + nameof(TestService), nameof(RequestStream), data, metadata);
4646
//public void requestChannel(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestChannel(null, data, metadata); }
4747
}
4848

RSocket.RPC/RSocketService.cs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public abstract class RSocketService<T> : IRSocketStream
1515
protected void __RequestFireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }
1616

1717

18+
protected async Task<TResult> __RequestResponse<TMessage, TResult>(string service, string method, TMessage message, Func<TMessage, byte[]> intransform, Func<byte[], TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) =>
19+
outtransform((await __RequestResponse(service, method, new ReadOnlySequence<byte>(intransform(message)), metadata, tracing)).ToArray());
1820

19-
protected async Task<TResult> __RequestResponse<TResult>(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
21+
protected async Task<TResult> __RequestResponse<TMessage, TResult>(string service, string method, TMessage message, Func<TMessage, ReadOnlySequence<byte>> intransform, Func<ReadOnlySequence<byte>, TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) =>
22+
outtransform(await __RequestResponse(service, method, intransform(message), metadata, tracing));
23+
24+
protected async Task<ReadOnlySequence<byte>> __RequestResponse(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
2025
{
21-
var receiver = new Receiver<TResult>();
26+
var receiver = new Receiver();
2227
await Client.RequestResponse(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing));
2328
return await receiver.Task.ConfigureAwait(false);
2429
}
2530

2631
//TODO Ask about semantics of this - should it execute the server call before subscription?
2732

28-
protected async Task<TResult> __RequestStream<TResult>(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
33+
protected async Task<ReadOnlySequence<byte>> __RequestStream<TResult>(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
2934
{
30-
var receiver = new Receiver<TResult>();
31-
await Client.RequestStream(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing), initial: 3);
35+
var receiver = new Receiver();
36+
await Client.RequestStream(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing), initial: 3); //TODO Policy!!
3237
return await receiver.Task.ConfigureAwait(false);
3338
}
3439

@@ -37,6 +42,16 @@ protected async Task<TResult> __RequestStream<TResult>(string service, string me
3742
//protected void RequestChannel(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestChannel(null, data, metadata); } //TODO Initial?
3843

3944

45+
//private class Receiver : Receiver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream { }
46+
47+
private class Receiver : TaskCompletionSource<ReadOnlySequence<byte>>, IRSocketStream
48+
{
49+
public void OnCompleted() { }
50+
public void OnError(Exception error) => base.SetException(error);
51+
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value) => base.SetResult(value.data);
52+
}
53+
54+
4055
public void Dispatch()
4156
{
4257
}
@@ -57,17 +72,6 @@ public void Dispatch()
5772
throw new NotImplementedException();
5873
}
5974

60-
private class Receiver<TResult> : TaskCompletionSource<TResult>, IRSocketStream
61-
{
62-
public Receiver() { }
63-
64-
public void OnCompleted() { }
65-
public void OnError(Exception error) => base.SetException(error);
66-
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
67-
{
68-
Console.WriteLine($"OnNext: [{value.metadata.Length}]:[{value.data.Length}]");
69-
}
70-
}
7175

7276
private Lazy<List<System.Reflection.MethodInfo>> Methods = new Lazy<List<System.Reflection.MethodInfo>>(() => GetMethods());
7377

RSocketRPCSample/EchoService.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using RSocket;
66
using RSocket.RPC;
7+
using Google.Protobuf.WellKnownTypes;
78

89
[System.Runtime.CompilerServices.CompilerGenerated]
910
interface IEchoService
@@ -23,10 +24,15 @@ interface IEchoService
2324
[System.Runtime.CompilerServices.CompilerGenerated]
2425
public class EchoService : RSocketService<EchoService>, IEchoService
2526
{
26-
private const string ServicePrefix = "io.rsocket.rpc.echo.";
27+
private const string ServiceName = "io.rsocket.rpc.echo" + "." + nameof(EchoService);
2728

2829
public EchoService(RSocketClient client) : base(client) { }
2930

31+
//TODO Consider CallerMemberName because the servicename is basically fixed. Also, consider static class...
32+
public Task<BytesValue> requestResponse(BytesValue message, ReadOnlySequence<byte> metadata = default) => __RequestResponse(ServiceName, nameof(requestResponse), message, Google.Protobuf.MessageExtensions.ToByteArray, BytesValue.Parser.ParseFrom, metadata);
33+
34+
35+
3036
//async Task ASD()
3137
//{
3238
// var thing = new System.Buffers.ReadOnlySequence<byte>(new byte[0]);
@@ -36,8 +42,8 @@ public EchoService(RSocketClient client) : base(client) { }
3642

3743
//public void fireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }
3844

39-
public Task<ReadOnlySequence<byte>> requestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) =>
40-
base.__RequestResponse<ReadOnlySequence<byte>>(ServicePrefix + nameof(EchoService), nameof(requestResponse), data, metadata);
45+
//public Task<ReadOnlySequence<byte>> requestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) =>
46+
// base.__RequestResponse(ServicePrefix + nameof(EchoService), nameof(requestResponse), data, metadata);
4147

4248
//public ReadOnlySequence<byte> requestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => requestResponseAsync(data, metadata).Result;
4349

RSocketRPCSample/EchoService.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
syntax = "proto3";
2+
3+
package io.rsocket.rpc.echo;
4+
5+
import "google/protobuf/empty.proto";
6+
import "google/protobuf/wrappers.proto";
7+
import "rsocket/options.proto";
8+
9+
option java_package = "io.rsocket.rpc.echo";
10+
option java_outer_classname = "EchoServiceProto";
11+
option java_multiple_files = true;
12+
13+
service EchoService {
14+
rpc fireAndForget (google.protobuf.BytesValue) returns (google.protobuf.Empty) {
15+
option (io.rsocket.rpc.options) = {
16+
fire_and_forget: true
17+
};
18+
}
19+
rpc requestResponse (google.protobuf.BytesValue) returns (google.protobuf.BytesValue);
20+
rpc requestStream (google.protobuf.BytesValue) returns (stream google.protobuf.BytesValue);
21+
rpc requestChannel(stream google.protobuf.BytesValue) returns (stream google.protobuf.BytesValue);
22+
}

RSocketRPCSample/Program.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99

1010
namespace RSocketRPCSample
1111
{
12+
using Io.Rsocket.Rpc.Testing;
13+
using Google.Protobuf;
14+
using Google.Protobuf.WellKnownTypes;
15+
1216
class Program
1317
{
1418
//TODO Connection Cleanup on Unsubscribe/failure/etc
@@ -52,9 +56,9 @@ static async Task Main(string[] args)
5256

5357
var service = new EchoService(client);
5458

55-
var result = await service.requestResponse(data);
59+
var result = await service.requestResponse(new BytesValue() { Value = ByteString.CopyFromUtf8("TEST VALUES!!") });
5660

57-
Console.WriteLine($"Result: [{result.Length}]");
61+
Console.WriteLine($"Result: {result.ToString()}");
5862

5963
//var rpcclient = new RSocketRPCClient(client);
6064

RSocketRPCSample/RSocketRPCSample.csproj

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,28 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10+
<None Remove="EchoService.proto" />
11+
<None Remove="rsocket\options.proto" />
12+
<None Remove="SimpleService.proto" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<PackageReference Include="Google.Protobuf" Version="3.6.1" />
17+
<PackageReference Include="Grpc.Tools" Version="1.18.0">
18+
<PrivateAssets>all</PrivateAssets>
19+
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
20+
</PackageReference>
1021
<PackageReference Include="System.Reactive" Version="4.1.2" />
1122
</ItemGroup>
1223

1324
<ItemGroup>
1425
<ProjectReference Include="..\RSocket.RPC\RSocket.RPC.csproj" />
1526
</ItemGroup>
1627

28+
<ItemGroup>
29+
<ProtoBuf Include="SimpleService.proto" GrpcServices="None" />
30+
<ProtoBuf Include="EchoService.proto" GrpcServices="None" />
31+
<ProtoBuf Include="rsocket\options.proto" GrpcServices="None" />
32+
</ItemGroup>
33+
1734
</Project>

RSocketRPCSample/SimpleService.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
namespace RSocketRPCSample
2+
{
3+
using System.Buffers;
4+
using System.Threading.Tasks;
5+
using RSocket;
6+
using RSocket.RPC;
7+
using Io.Rsocket.Rpc.Testing;
8+
9+
[System.Runtime.CompilerServices.CompilerGenerated]
10+
interface ISimpleService
11+
{
12+
//String SERVICE = "io.rsocket.rpc.echo.EchoService";
13+
//String METHOD_FIRE_AND_FORGET = "fireAndForget";
14+
//String METHOD_REQUEST_RESPONSE = "requestResponse";
15+
//String METHOD_REQUEST_STREAM = "requestStream";
16+
//String METHOD_REQUEST_CHANNEL = "requestChannel";
17+
18+
//reactor.core.publisher.Mono<Void> fireAndForget(com.google.protobuf.BytesValue message, io.netty.buffer.ByteBuf metadata);
19+
//reactor.core.publisher.Mono<com.google.protobuf.BytesValue> requestResponse(com.google.protobuf.BytesValue message, io.netty.buffer.ByteBuf metadata);
20+
//reactor.core.publisher.Flux<com.google.protobuf.BytesValue> requestStream(com.google.protobuf.BytesValue message, io.netty.buffer.ByteBuf metadata);
21+
//reactor.core.publisher.Flux<com.google.protobuf.BytesValue> requestChannel(org.reactivestreams.Publisher<com.google.protobuf.BytesValue> messages, io.netty.buffer.ByteBuf metadata);
22+
23+
24+
//rpc RequestReply(SimpleRequest) returns(SimpleResponse) { }
25+
//rpc FireAndForget(SimpleRequest) returns(google.protobuf.Empty) { }
26+
//rpc RequestStream(SimpleRequest) returns(stream SimpleResponse) { }
27+
//rpc StreamingRequestSingleResponse(stream SimpleRequest) returns(SimpleResponse) { }
28+
//rpc StreamingRequestAndResponse(stream SimpleRequest) returns(stream SimpleResponse) { }
29+
}
30+
31+
[System.Runtime.CompilerServices.CompilerGenerated]
32+
public class SimpleService : RSocketService<SimpleService>, ISimpleService
33+
{
34+
private const string ServiceName = "io.rsocket.rpc.testing.protobuf" + "." + nameof(SimpleService);
35+
36+
public SimpleService(RSocketClient client) : base(client) { }
37+
38+
public Task<SimpleResponse> RequestReply(SimpleRequest message, ReadOnlySequence<byte> metadata = default) => __RequestResponse(ServiceName, nameof(RequestReply), message, Google.Protobuf.MessageExtensions.ToByteArray, SimpleResponse.Parser.ParseFrom, metadata);
39+
40+
41+
//rpc RequestReply(SimpleRequest) returns(SimpleResponse) { }
42+
//rpc FireAndForget(SimpleRequest) returns(google.protobuf.Empty) { }
43+
//rpc RequestStream(SimpleRequest) returns(stream SimpleResponse) { }
44+
//rpc StreamingRequestSingleResponse(stream SimpleRequest) returns(SimpleResponse) { }
45+
//rpc StreamingRequestAndResponse(stream SimpleRequest) returns(stream SimpleResponse) { }
46+
47+
//public void fireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }
48+
49+
//public Task<SimpleResponse> RequestReply(SimpleRequest message, ReadOnlySequence<byte> metadata = default) =>
50+
// base.__RequestResponse(ServicePrefix + nameof(SimpleService), nameof(RequestReply), message, source => Google.Protobuf.MessageExtensions.ToByteArray(source), result => SimpleResponse.Parser.ParseFrom(result), metadata);
51+
52+
53+
54+
//public ReadOnlySequence<byte> requestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) => requestResponseAsync(data, metadata).Result;
55+
56+
57+
//public Task<ReadOnlySequence<byte>> requestStreamAsync(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) =>
58+
// base.__RequestStream<ReadOnlySequence<byte>>(ServicePrefix + nameof(EchoService), nameof(requestStream), data, metadata);
59+
60+
////Prefer this implementation.
61+
//public IObservable<ReadOnlySequence<byte>> requestStream(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) =>
62+
// base.__RequestStream<ReadOnlySequence<byte>>(ServicePrefix + nameof(EchoService), nameof(requestStream), data, metadata);
63+
64+
65+
//Not real. Wrong signature.
66+
//public void requestChannel(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestChannel(null, data, metadata); }
67+
}
68+
}

RSocketRPCSample/SimpleService.proto

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
syntax = "proto3";
2+
3+
package io.rsocket.rpc.testing;
4+
5+
import "google/protobuf/empty.proto";
6+
7+
option java_package = "io.rsocket.rpc.testing.protobuf";
8+
option java_outer_classname = "SimpleServiceProto";
9+
option java_multiple_files = true;
10+
11+
service SimpleService {
12+
// Request / Response
13+
rpc RequestReply (SimpleRequest) returns (SimpleResponse) {}
14+
15+
// Fire-and-Forget
16+
rpc FireAndForget (SimpleRequest) returns (google.protobuf.Empty) {}
17+
18+
// Single Request / Streaming Response
19+
rpc RequestStream (SimpleRequest) returns (stream SimpleResponse) {}
20+
21+
// Streaming Request / Single Response
22+
rpc StreamingRequestSingleResponse (stream SimpleRequest) returns (SimpleResponse) {}
23+
24+
// Streaming Request / Streaming Response
25+
rpc StreamingRequestAndResponse (stream SimpleRequest) returns (stream SimpleResponse) {}
26+
}
27+
28+
message SimpleRequest {
29+
string requestMessage = 1;
30+
}
31+
32+
message SimpleResponse {
33+
string responseMessage = 1;
34+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
syntax = "proto3";
2+
3+
package io.rsocket.rpc;
4+
5+
import "google/protobuf/descriptor.proto";
6+
7+
option java_package = "io.rsocket.rpc";
8+
option java_outer_classname = "RSocketOptions";
9+
option java_multiple_files = true;
10+
11+
extend google.protobuf.MethodOptions {
12+
RSocketMethodOptions options = 1057;
13+
}
14+
15+
message RSocketMethodOptions {
16+
bool fire_and_forget = 1;
17+
}

0 commit comments

Comments
 (0)