3
3
using System . Diagnostics ;
4
4
using System . IO ;
5
5
using System . Linq ;
6
+ using System . Reactive . Subjects ;
7
+ using System . Runtime . Serialization ;
6
8
using System . Threading . Tasks ;
7
9
using DotJEM . Json . Index2 . Management . Info ;
10
+ using DotJEM . Json . Index2 . Management . Observables ;
8
11
using DotJEM . Json . Index2 . Management . Source ;
9
12
using DotJEM . ObservableExtensions ;
10
13
using DotJEM . ObservableExtensions . InfoStreams ;
@@ -22,9 +25,11 @@ public enum IngestInitializationState
22
25
// ReSharper disable once PossibleInterfaceMemberAmbiguity -> Just dictates implementation must be explicit which is OK.
23
26
public interface IIngestProgressTracker :
24
27
IObserver < IJsonDocumentChange > ,
25
- IObserver < IInfoStreamEvent > , IObservable < ITrackerState >
28
+ IObserver < IInfoStreamEvent > ,
29
+ IObservable < ITrackerState >
26
30
{
27
- IngestInitializationState InitializationState { get ; }
31
+ IObservableValue < IngestInitializationState > InitializationState { get ; }
32
+
28
33
IInfoStream InfoStream { get ; }
29
34
StorageIngestState IngestState { get ; }
30
35
SnapshotRestoreState RestoreState { get ; }
@@ -41,35 +46,42 @@ public class IngestProgressTracker : BasicSubject<ITrackerState>, IIngestProgres
41
46
private readonly ConcurrentDictionary < string , StorageAreaIngestStateTracker > observerTrackers = new ( ) ;
42
47
private readonly ConcurrentDictionary < string , IndexFileRestoreStateTracker > restoreTrackers = new ( ) ;
43
48
private readonly IInfoStream < JsonIndexManager > infoStream = new InfoStream < JsonIndexManager > ( ) ;
44
- private IngestInitializationState initializationState ;
49
+ public IObservableValue < IngestInitializationState > InitializationState { get ; } = new ObservableValue < IngestInitializationState > ( ) ;
45
50
46
51
public IInfoStream InfoStream => infoStream ;
47
52
48
- public IngestInitializationState InitializationState
49
- {
50
- get => initializationState ;
51
- private set
52
- {
53
- if ( initializationState == value ) return ;
54
-
55
- initializationState = value ;
56
- TaskCompletionSource < bool > target = value switch
57
- {
58
- IngestInitializationState . Started => startedEntered ,
59
- IngestInitializationState . Restoring => restoringEntered ,
60
- IngestInitializationState . Ingesting => ingestingEntered ,
61
- IngestInitializationState . Initialized => initializedEntered ,
62
- _ => null
63
- } ;
64
- target ? . TrySetResult ( true ) ;
65
- }
66
- }
67
-
68
53
private readonly TaskCompletionSource < bool > startedEntered = new ( ) ;
69
54
private readonly TaskCompletionSource < bool > restoringEntered = new ( ) ;
70
55
private readonly TaskCompletionSource < bool > ingestingEntered = new ( ) ;
71
56
private readonly TaskCompletionSource < bool > initializedEntered = new ( ) ;
72
57
58
+ private void UpdateState ( IngestInitializationState newState )
59
+ {
60
+ switch ( newState )
61
+ {
62
+ case IngestInitializationState . Started :
63
+ startedEntered . TrySetResult ( true ) ;
64
+ break ;
65
+ case IngestInitializationState . Restoring :
66
+ startedEntered . TrySetResult ( true ) ;
67
+ restoringEntered . TrySetResult ( true ) ;
68
+ break ;
69
+ case IngestInitializationState . Ingesting :
70
+ startedEntered . TrySetResult ( true ) ;
71
+ restoringEntered . TrySetResult ( true ) ;
72
+ ingestingEntered . TrySetResult ( true ) ;
73
+ break ;
74
+ case IngestInitializationState . Initialized :
75
+ startedEntered . TrySetResult ( true ) ;
76
+ restoringEntered . TrySetResult ( true ) ;
77
+ ingestingEntered . TrySetResult ( true ) ;
78
+ initializedEntered . TrySetResult ( true ) ;
79
+ break ;
80
+ default :
81
+ throw new ArgumentOutOfRangeException ( nameof ( newState ) , newState , null ) ;
82
+ }
83
+ }
84
+
73
85
74
86
// TODO: We are adding a number of computational cycles here on each single update, this should be improved as well.
75
87
// So we don't have to do a loop on each turn, but later with that.
@@ -78,7 +90,7 @@ private set
78
90
79
91
public IngestProgressTracker ( )
80
92
{
81
- InitializationState = IngestInitializationState . Started ;
93
+ UpdateState ( IngestInitializationState . Started ) ;
82
94
}
83
95
84
96
public void OnNext ( IJsonDocumentChange value )
@@ -98,7 +110,7 @@ public void UpdateState(StorageAreaIngestState state)
98
110
99
111
public void SetInitialized ( bool initialized )
100
112
{
101
- if ( initialized ) this . InitializationState = IngestInitializationState . Initialized ;
113
+ if ( initialized ) UpdateState ( IngestInitializationState . Initialized ) ;
102
114
}
103
115
104
116
public void OnNext ( IInfoStreamEvent value )
@@ -182,7 +194,7 @@ private void OnStorageObserverInfoStreamEvent(StorageObserverInfoStreamEvent soe
182
194
183
195
private void InternalPublish ( ITrackerState state )
184
196
{
185
- if ( InitializationState == IngestInitializationState . Initialized )
197
+ if ( InitializationState . Value == IngestInitializationState . Initialized )
186
198
{
187
199
Publish ( state ) ;
188
200
return ;
@@ -191,16 +203,16 @@ private void InternalPublish(ITrackerState state)
191
203
switch ( state )
192
204
{
193
205
case SnapshotRestoreState :
194
- InitializationState = IngestInitializationState . Restoring ;
206
+ UpdateState ( IngestInitializationState . Restoring ) ;
195
207
break ;
196
208
197
209
case StorageIngestState storageIngestState :
198
210
JsonSourceEventType [ ] states = storageIngestState . Areas
199
211
. Select ( x => x . LastEvent )
200
212
. ToArray ( ) ;
201
- InitializationState = states . All ( state => state is JsonSourceEventType . Updated or JsonSourceEventType . Updating or JsonSourceEventType . Initialized )
213
+ UpdateState ( states . All ( state => state is JsonSourceEventType . Updated or JsonSourceEventType . Updating or JsonSourceEventType . Initialized )
202
214
? IngestInitializationState . Initialized
203
- : IngestInitializationState . Ingesting ;
215
+ : IngestInitializationState . Ingesting ) ;
204
216
break ;
205
217
206
218
}
0 commit comments