Skip to content

Commit ce03b33

Browse files
author
SirJosh3917
committed
get performance numbers for message pipe
1 parent e018d41 commit ce03b33

File tree

5 files changed

+71
-4
lines changed

5 files changed

+71
-4
lines changed

src/StringDB.PerformanceNumbers/InsertRangeFileSize.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ void Insert(IDatabase<string, string> db, int c)
2323
Console.WriteLine($"Size after 1 elements in an insert range: {size1}");
2424
Console.WriteLine($"Size after 50 elements in an insert range: {size2}");
2525
Console.WriteLine($"Size after 100 elements in an insert range: {size3}");
26-
27-
Console.ReadLine();
2826
}
2927

3028
public static long GetSizeAfter(int kvps, Action<IDatabase<string, string>, int> action)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using StringDB.Querying.Messaging;
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace StringDB.PerformanceNumbers
7+
{
8+
public class MessagePipeMessagesPerSecond
9+
{
10+
private readonly IMessagePipe<int> _pipe = new SimpleMessagePipe<int>();
11+
12+
public async Task Run()
13+
{
14+
var messagesPerSecond = 0;
15+
16+
var tell = Task.Run(() =>
17+
{
18+
Console.WriteLine("Tell started");
19+
while(true)
20+
{
21+
System.Threading.Thread.Sleep(1000);
22+
var loc = messagesPerSecond;
23+
messagesPerSecond = 0;
24+
Console.WriteLine($"{nameof(SimpleMessagePipe<int>)} messages per second: {loc:n0}");
25+
}
26+
});
27+
28+
var push = Task.Run(() =>
29+
{
30+
Console.WriteLine("Push started");
31+
for (var i = int.MinValue; i < int.MaxValue; i++)
32+
{
33+
_pipe.Enqueue(i);
34+
}
35+
});
36+
37+
var consume = ((Func<Task>)(async () =>
38+
{
39+
Console.WriteLine("Consume started");
40+
while (true)
41+
{
42+
try
43+
{
44+
var message = await _pipe.Dequeue();
45+
messagesPerSecond++;
46+
} catch (Exception ex)
47+
{
48+
Console.WriteLine("consuming ex: " + ex);
49+
}
50+
}
51+
}))();
52+
53+
await Task.WhenAll(tell, push, consume).ConfigureAwait(false);
54+
}
55+
}
56+
}

src/StringDB.PerformanceNumbers/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ private static async Task Main()
1616
{
1717
new SingleInsertFileSize().Run();
1818
new InsertRangeFileSize().Run();
19+
await new MessagePipeMessagesPerSecond().Run();
1920

2021
const BenchmarkToRun benchmark = BenchmarkToRun.YieldOrLinq;
2122

src/StringDB.PerformanceNumbers/SingleInsertFileSize.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ public void Run()
1616
Console.WriteLine($"Size after 1 single insert: {size1}");
1717
Console.WriteLine($"Size after 50 single inserts: {size2}");
1818
Console.WriteLine($"Size after 100 single inserts: {size3}");
19-
20-
Console.ReadLine();
2119
}
2220

2321
public static long GetSizeAfter(int times, Action<IDatabase<string, string>> action)

src/StringDB/Querying/Messaging/SimpleMessagePipe.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Runtime.CompilerServices;
34
using System.Threading;
45
using System.Threading.Tasks;
56

@@ -20,16 +21,23 @@ public void Dispose()
2021
_mres.Dispose();
2122
}
2223

24+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
2325
public void Enqueue(T message)
2426
{
2527
_queue.Enqueue(message);
28+
2629
_mres.Set();
2730
}
2831

2932
public Task<T> Dequeue(CancellationToken cancellationToken = default)
3033
{
3134
if (_queue.TryDequeue(out var result))
3235
{
36+
if (_mres.IsSet)
37+
{
38+
_mres.Reset();
39+
}
40+
3341
return Task.FromResult(result);
3442
}
3543

@@ -61,5 +69,11 @@ private Task<T> AsyncDequeue(CancellationToken cancellationToken)
6169

6270
return tcs.Task;
6371
}
72+
73+
[MethodImpl(MethodImplOptions.NoInlining)]
74+
private static void ThrowDispose()
75+
{
76+
throw new ObjectDisposedException(nameof(SimpleMessagePipe<T>));
77+
}
6478
}
6579
}

0 commit comments

Comments
 (0)