1
+ using Akka . Actor ;
2
+ using Akka . Persistence . EventStore . Query ;
3
+ using Akka . Persistence . Query ;
4
+ using Akka . Streams ;
5
+ using BenchmarkDotNet . Attributes ;
6
+ using FluentAssertions ;
7
+
8
+ namespace Akka . Persistence . EventStore . Benchmarks ;
9
+
10
+ [ Config ( typeof ( MicroBenchmarkConfig ) ) ]
11
+ public class EventStoreTagBenchmark
12
+ {
13
+ private IMaterializer ? _materializer ;
14
+ private IReadJournal ? _readJournal ;
15
+
16
+ private ActorSystem ? _sys ;
17
+
18
+ [ GlobalSetup ]
19
+ public async Task Setup ( )
20
+ {
21
+ _sys = await EventStoreBenchmarkFixture . CreateActorSystem ( "system" ) ;
22
+ _materializer = _sys . Materializer ( ) ;
23
+ _readJournal = _sys . ReadJournalFor < EventStoreReadJournal > ( "akka.persistence.query.journal.eventstore" ) ;
24
+ }
25
+
26
+ [ GlobalCleanup ]
27
+ public async Task Cleanup ( )
28
+ {
29
+ if ( _sys is not null )
30
+ await _sys . Terminate ( ) ;
31
+ }
32
+
33
+ [ Benchmark ]
34
+ public async Task QueryByTag10 ( )
35
+ {
36
+ var events = new List < EventEnvelope > ( ) ;
37
+ var source = ( ( ICurrentEventsByTagQuery ) _readJournal ! ) . CurrentEventsByTag ( Const . Tag10 , NoOffset . Instance ) ;
38
+ await source . RunForeach (
39
+ msg => { events . Add ( msg ) ; } ,
40
+ _materializer ) ;
41
+ events . Select ( e => e . SequenceNr ) . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 2000001 , 10 ) ) ;
42
+ }
43
+
44
+ [ Benchmark ]
45
+ public async Task QueryByTag100 ( )
46
+ {
47
+ var events = new List < EventEnvelope > ( ) ;
48
+ var source = ( ( ICurrentEventsByTagQuery ) _readJournal ! ) . CurrentEventsByTag ( Const . Tag100 , NoOffset . Instance ) ;
49
+ await source . RunForeach (
50
+ msg => { events . Add ( msg ) ; } ,
51
+ _materializer ) ;
52
+ events . Select ( e => e . SequenceNr ) . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 2000001 , 100 ) ) ;
53
+ }
54
+
55
+ [ Benchmark ]
56
+ public async Task QueryByTag1000 ( )
57
+ {
58
+ var events = new List < EventEnvelope > ( ) ;
59
+ var source = ( ( ICurrentEventsByTagQuery ) _readJournal ! ) . CurrentEventsByTag ( Const . Tag1000 , NoOffset . Instance ) ;
60
+ await source . RunForeach (
61
+ msg => { events . Add ( msg ) ; } ,
62
+ _materializer ) ;
63
+ events . Select ( e => e . SequenceNr ) . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 2000001 , 1000 ) ) ;
64
+ }
65
+
66
+ [ Benchmark ]
67
+ public async Task QueryByTag10000 ( )
68
+ {
69
+ var events = new List < EventEnvelope > ( ) ;
70
+ var source = ( ( ICurrentEventsByTagQuery ) _readJournal ! ) . CurrentEventsByTag ( Const . Tag10000 , NoOffset . Instance ) ;
71
+ await source . RunForeach (
72
+ msg => { events . Add ( msg ) ; } ,
73
+ _materializer ) ;
74
+ events . Select ( e => e . SequenceNr ) . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 2000001 , 10000 ) ) ;
75
+ }
76
+ }
0 commit comments