Skip to content

Commit 1416650

Browse files
authored
add memory performance profiler test rig, and run; oops, missed one! (#342)
1 parent d90ecd4 commit 1416650

File tree

5 files changed

+130
-4
lines changed

5 files changed

+130
-4
lines changed

docs/releasenotes.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
## unreleased
44

5-
## 1.2.0
5+
## 1.2.2
6+
7+
- fix missing memory recycling in `Stream` scenario
8+
9+
## 1.2.1
610

711
- support `[Value]Task<Stream>` as a return value, rewriting via [`stream BytesValue`](https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/wrappers.proto) - first
812
step in [#340](https://github.com/protobuf-net/protobuf-net.Grpc/issues/340)

src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ async static Task ReadByteValueSequenceToPipeWriter(AsyncServerStreamingCall<Byt
105105
var chunk = source.Current;
106106
var result = await destination.WriteAsync(chunk.Memory, cancellationToken).ConfigureAwait(false);
107107
actualLength += chunk.Length;
108+
chunk.Recycle();
108109

109110
if (result.IsCanceled)
110111
{

toys/PlayClient/MyContracts.cs

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
using Grpc.Core;
1+
using ProtoBuf;
22
using ProtoBuf.Grpc;
33
using ProtoBuf.Grpc.Configuration;
44
using System.Collections.Generic;
5+
using System.IO;
56
using System.Runtime.Serialization;
67
using System.ServiceModel;
78
using System.Threading.Tasks;
@@ -60,5 +61,17 @@ public class BidiStreamingResponse
6061
public interface IBidiStreamingService
6162
{
6263
IAsyncEnumerable<BidiStreamingResponse> TestAsync(IAsyncEnumerable<BidiStreamingRequest> request, CallContext options);
64+
65+
ValueTask<Stream> TestStreamAsync(TestStreamRequest request, CallContext options = default);
66+
}
67+
68+
[ProtoContract]
69+
public class TestStreamRequest
70+
{
71+
[ProtoMember(1)]
72+
public int Seed { get; set; }
73+
74+
[ProtoMember(2)]
75+
public long Length { get; set; }
6376
}
6477
}

toys/PlayClient/Program.cs

+61-2
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ static class Program
1616
{
1717
static async Task Main()
1818
{
19-
await TestChannel();
19+
while (true)
20+
{
21+
long length = 1000000L;
22+
int seed = 12345;
23+
await TestStreamUnmanagedAsync(length, seed);
2024
#if HTTPCLIENT
21-
await TestHttpClient();
25+
await TestStreamManagedAsync(length, seed);
2226
#endif
27+
}
2328
}
2429

2530
static async Task TestCalculator(ICalculator calculator, [CallerMemberName] string? caller = null)
@@ -81,6 +86,60 @@ static async IAsyncEnumerable<MultiplyRequest> Rand(int count, TimeSpan delay, [
8186
Console.WriteLine("[client all done sending!]");
8287
}
8388

89+
90+
#if HTTPCLIENT
91+
static async Task TestStreamManagedAsync(long length, int seed)
92+
{
93+
GrpcClientFactory.AllowUnencryptedHttp2 = true;
94+
using var http = Grpc.Net.Client.GrpcChannel.ForAddress("http://localhost:10042");
95+
await TestStreamAsync(http, length, seed);
96+
}
97+
#endif
98+
99+
static async Task TestStreamUnmanagedAsync(long length, int seed)
100+
{
101+
var channel = new Grpc.Core.Channel("localhost", 10042, ChannelCredentials.Insecure);
102+
try
103+
{
104+
await TestStreamAsync(channel, length, seed);
105+
}
106+
finally
107+
{
108+
await channel.ShutdownAsync();
109+
}
110+
}
111+
static async Task TestStreamAsync(ChannelBase channel, long length, int seed)
112+
{
113+
var random = new Random(seed);
114+
Console.WriteLine("Creating proxy...");
115+
var proxy = channel.CreateGrpcService<IBidiStreamingService>();
116+
using var stream = await proxy.TestStreamAsync(new TestStreamRequest { Length = length, Seed = seed });
117+
118+
byte[] buffer = new byte[1024];
119+
long totalRead = 0;
120+
int read;
121+
Console.WriteLine("Initializing...");
122+
while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
123+
{
124+
totalRead += Check(random, new ReadOnlySpan<byte>(buffer, 0, read));
125+
Console.WriteLine($"Bytes communicated: {totalRead}");
126+
}
127+
if (totalRead != length)
128+
{
129+
Throw();
130+
}
131+
132+
static int Check(Random random, ReadOnlySpan<byte> buffer)
133+
{
134+
foreach (byte b in buffer)
135+
{
136+
if (b != random.Next(256)) Throw();
137+
}
138+
return buffer.Length;
139+
}
140+
static void Throw() => throw new InvalidOperationException("data fail");
141+
}
142+
84143
static async Task TestChannel()
85144
{
86145
var channel = new Channel("localhost", 10042, ChannelCredentials.Insecure);

toys/PlayServer/Program.cs

+49
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using Shared_CS;
55
using System;
66
using System.Collections.Generic;
7+
using System.IO;
8+
using System.IO.Pipelines;
79
using System.Runtime.CompilerServices;
810
using System.Threading;
911
using System.Threading.Tasks;
@@ -95,4 +97,51 @@ public async IAsyncEnumerable<BidiStreamingResponse> TestAsync(IAsyncEnumerable<
9597
//static bool Always() => true;
9698
yield break;
9799
}
100+
101+
public ValueTask<Stream> TestStreamAsync(TestStreamRequest request, CallContext options = default)
102+
{
103+
Console.WriteLine("Creating pipe...");
104+
var pipe = new Pipe();
105+
_ = Task.Run(async () =>
106+
{
107+
Exception? ex = null;
108+
try
109+
{
110+
Console.WriteLine($"Starting stream of length {request.Length}...");
111+
long remaining = request.Length;
112+
var rand = new Random(request.Seed);
113+
byte[] buffer = new byte[4096];
114+
115+
while (remaining > 0)
116+
{
117+
int chunkLen = (int)Math.Min(remaining, buffer.Length);
118+
var chunk = new Memory<byte>(buffer, 0, chunkLen);
119+
Console.WriteLine($"Sending {chunkLen}...");
120+
Fill(rand, chunk.Span);
121+
await pipe.Writer.WriteAsync(chunk, options.CancellationToken);
122+
remaining -= chunkLen;
123+
}
124+
}
125+
catch (Exception fault)
126+
{
127+
Console.WriteLine("Fault: " + fault.Message);
128+
ex = fault;
129+
}
130+
finally
131+
{
132+
Console.WriteLine("Completing...");
133+
await pipe.Writer.CompleteAsync(ex);
134+
}
135+
136+
});
137+
return new(pipe.Reader.AsStream());
138+
139+
static void Fill(Random rand, Span<byte> buffer)
140+
{
141+
foreach (ref byte b in buffer)
142+
{
143+
b = (byte)rand.Next(0, 256);
144+
}
145+
}
146+
}
98147
}

0 commit comments

Comments
 (0)