Skip to content

Commit 4745b80

Browse files
committed
added reset signal and made events readonly structs
1 parent dd1a9dc commit 4745b80

11 files changed

+87
-82
lines changed

src/DotJEM.Json.Index2.Management/IJsonIndexManager.cs

+17-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public interface IJsonIndexManager
2020
{
2121
IInfoStream InfoStream { get; }
2222
IIngestProgressTracker Tracker { get; }
23-
IObservable<IJsonDocumentChange> DocumentChanges { get; }
23+
IObservable<IJsonDocumentSourceEvent> DocumentChanges { get; }
2424
Task<bool> TakeSnapshotAsync();
2525
Task RunAsync();
2626
Task UpdateGenerationAsync(string area, long generation);
@@ -39,7 +39,7 @@ public class JsonIndexManager : IJsonIndexManager
3939

4040
public IInfoStream InfoStream => infoStream;
4141
public IIngestProgressTracker Tracker { get; }
42-
public IObservable<IJsonDocumentChange> DocumentChanges => changesStream;
42+
public IObservable<IJsonDocumentSourceEvent> DocumentChanges => changesStream;
4343

4444
public JsonIndexManager(
4545
IJsonDocumentSource jsonDocumentSource,
@@ -109,33 +109,36 @@ public async Task ResetIndexAsync()
109109
{
110110
index.Storage.Delete();
111111
await jsonDocumentSource.ResetAsync().ConfigureAwait(false);
112+
112113
}
113114

114-
private void CaptureChange(IJsonDocumentChange change)
115+
private void CaptureChange(IJsonDocumentSourceEvent sourceEvent)
115116
{
116117
try
117118
{
118-
switch (change.Type)
119+
switch (sourceEvent)
119120
{
120-
case JsonChangeType.Create:
121-
writer.Create(change.Entity);
121+
case JsonDocumentSourceDigestCompleted commitSignal:
122+
writer.Commit();
122123
break;
123-
case JsonChangeType.Update:
124-
writer.Update(change.Entity);
124+
case JsonDocumentCreated created:
125+
writer.Create(created.Document);
125126
break;
126-
case JsonChangeType.Delete:
127-
writer.Delete(change.Entity);
127+
case JsonDocumentDeleted deleted:
128+
writer.Delete(deleted.Document);
128129
break;
129-
case JsonChangeType.Commit:
130-
writer.Commit();
130+
case JsonDocumentUpdated updated:
131+
writer.Update(updated.Document);
131132
break;
133+
default:
134+
throw new ArgumentOutOfRangeException(nameof(sourceEvent));
132135
}
133136

134-
changesStream.Publish(change);
137+
changesStream.Publish(sourceEvent);
135138
}
136139
catch (Exception ex)
137140
{
138-
infoStream.WriteError($"Failed to ingest change from {change.Area}", ex);
141+
infoStream.WriteError($"Failed to ingest change from {sourceEvent.Area}", ex);
139142
}
140143
}
141144
}

src/DotJEM.Json.Index2.Management/Observables/DocumentChangesStream.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33

44
namespace DotJEM.Json.Index2.Management.Observables;
55

6-
public class DocumentChangesStream : BasicSubject<IJsonDocumentChange> { }
6+
public class DocumentChangesStream : BasicSubject<IJsonDocumentSourceEvent> { }

src/DotJEM.Json.Index2.Management/Source/IJsonDocumentChange.cs

-12
This file was deleted.

src/DotJEM.Json.Index2.Management/Source/IJsonDocumentSource.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public interface IJsonDocumentSource
99
{
1010
IInfoStream InfoStream { get; }
1111

12-
IObservable<IJsonDocumentChange> DocumentChanges { get; }
12+
IObservable<IJsonDocumentSourceEvent> DocumentChanges { get; }
1313
IObservableValue<bool> Initialized { get; }
1414

1515
Task RunAsync();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Newtonsoft.Json.Linq;
2+
using System;
3+
4+
namespace DotJEM.Json.Index2.Management.Source;
5+
6+
public interface IJsonDocumentSourceEvent
7+
{
8+
string Area { get; }
9+
}
10+
11+
//public interface IJsonDocumentSourceChange
12+
//{
13+
// string Area { get; }
14+
//}
15+
16+
public readonly record struct JsonDocumentCreated(string Area, JObject Document, int Size, GenerationInfo Generation) : IJsonDocumentSourceEvent;
17+
18+
public readonly record struct JsonDocumentUpdated(string Area, JObject Document, int Size, GenerationInfo Generation): IJsonDocumentSourceEvent;
19+
20+
public readonly record struct JsonDocumentDeleted(string Area, JObject Document, int Size, GenerationInfo Generation): IJsonDocumentSourceEvent;
21+
22+
public readonly record struct JsonDocumentSourceDigestCompleted(string Area) : IJsonDocumentSourceEvent;
23+
public readonly record struct JsonDocumentSourceReset(string Area) : IJsonDocumentSourceEvent;
24+

src/DotJEM.Json.Index2.Management/Source/JsonChangeType.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ public enum JsonChangeType
55
Create,
66
Update,
77
Delete,
8-
Commit
8+
Commit,
9+
Reset
910
}

src/DotJEM.Json.Index2.Management/Source/JsonDocumentChange.cs

-23
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
using System;
2+
using Newtonsoft.Json.Linq;
3+
4+
namespace DotJEM.Json.Index2.Management.Source;
5+

src/DotJEM.Json.Index2.Management/Tracking/IIngestProgressTracker.cs

+18-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public enum IngestInitializationState
2424

2525
// ReSharper disable once PossibleInterfaceMemberAmbiguity -> Just dictates implementation must be explicit which is OK.
2626
public interface IIngestProgressTracker :
27-
IObserver<IJsonDocumentChange>,
27+
IObserver<IJsonDocumentSourceEvent>,
2828
IObserver<IInfoStreamEvent>,
2929
IObservable<ITrackerState>
3030
{
@@ -93,12 +93,22 @@ public IngestProgressTracker()
9393
UpdateState(IngestInitializationState.Started);
9494
}
9595

96-
public void OnNext(IJsonDocumentChange value)
96+
public void OnNext(IJsonDocumentSourceEvent value)
9797
{
98-
if (value is not JsonDocumentChange)
99-
return;
100-
101-
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(value.Generation, value.Size));
98+
switch (value)
99+
{
100+
case JsonDocumentCreated created:
101+
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(created.Generation, created.Size));
102+
break;
103+
case JsonDocumentUpdated updated:
104+
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(updated.Generation, updated.Size));
105+
break;
106+
case JsonDocumentDeleted deleted:
107+
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(deleted.Generation, deleted.Size));
108+
break;
109+
default:
110+
return;
111+
}
102112
InternalPublish(IngestState);
103113
}
104114

@@ -222,8 +232,8 @@ private void InternalPublish(ITrackerState state)
222232

223233
void IObserver<IInfoStreamEvent>.OnError(Exception error) { }
224234
void IObserver<IInfoStreamEvent>.OnCompleted() { }
225-
void IObserver<IJsonDocumentChange>.OnError(Exception error) { }
226-
void IObserver<IJsonDocumentChange>.OnCompleted() { }
235+
void IObserver<IJsonDocumentSourceEvent>.OnError(Exception error) { }
236+
void IObserver<IJsonDocumentSourceEvent>.OnCompleted() { }
227237

228238
private class IndexFileRestoreStateTracker
229239
{

src/Stress/StressTester/Adapter/IJsonStorageAreaObserver.cs

+15-18
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class JsonStorageAreaObserver : IJsonStorageAreaObserver
3030

3131
public string AreaName => StorageArea.Name;
3232
public IInfoStream InfoStream => infoStream;
33-
public IObservable<IJsonDocumentChange> DocumentChanges => observable;
33+
public IObservable<IJsonDocumentSourceEvent> DocumentChanges => observable;
3434
public IObservableValue<bool> Initialized { get; } = new ObservableValue<bool>();
3535

3636
public JsonStorageAreaObserver(IStorageArea storageArea, IWebTaskScheduler scheduler, string pollInterval = "10s")
@@ -69,6 +69,8 @@ public void UpdateGeneration(string area, long value)
6969
public async Task ResetAsync()
7070
{
7171
UpdateGeneration(AreaName, initialGeneration);
72+
observable.Publish(new JsonDocumentSourceReset(AreaName));
73+
task.Signal();
7274
}
7375

7476
public void RunUpdateCheck()
@@ -81,7 +83,7 @@ public void RunUpdateCheck()
8183
BeforeInitialize();
8284
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Initializing, StorageArea.Name, $"Initializing for storageArea '{StorageArea.Name}'.");
8385
using IStorageAreaLogReader changes = log.OpenLogReader(generation, Initialized.Value );
84-
PublishChanges(changes, _ => JsonChangeType.Create);
86+
PublishChanges(changes, row => new JsonDocumentCreated(row.Area, row.CreateEntity(), row.Size, new GenerationInfo(row.Generation, latestGeneration)));
8587
Initialized.Value = true;
8688
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Initialized, StorageArea.Name, $"Initialization complete for storageArea '{StorageArea.Name}' in {timer.Elapsed}.");
8789
AfterInitialize();
@@ -92,42 +94,37 @@ public void RunUpdateCheck()
9294
BeforeUpdate();
9395
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Updating, StorageArea.Name, $"Checking updates for storageArea '{StorageArea.Name}'.");
9496
using IStorageAreaLogReader changes = log.OpenLogReader(generation, Initialized.Value );
95-
PublishChanges(changes, row => MapChange(row.Type));
97+
PublishChanges(changes, MapRow);
9698
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Updated, StorageArea.Name, $"Done checking updates for storageArea '{StorageArea.Name}' in {timer.Elapsed}.");
9799
AfterUpdate();
98100
}
99101
PublishCommitSignal();
100102

101-
JsonChangeType MapChange(ChangeType type)
103+
IJsonDocumentSourceEvent MapRow(IChangeLogRow row)
102104
{
103-
return type switch
105+
return row.Type switch
104106
{
105-
ChangeType.Create => JsonChangeType.Create,
106-
ChangeType.Update => JsonChangeType.Update,
107-
ChangeType.Delete => JsonChangeType.Delete,
108-
_ => throw new ArgumentOutOfRangeException(nameof(type), type, null)
107+
ChangeType.Create => new JsonDocumentCreated(row.Area, row.CreateEntity(), row.Size, new GenerationInfo(row.Generation, latestGeneration)),
108+
ChangeType.Update => new JsonDocumentUpdated(row.Area, row.CreateEntity(), row.Size, new GenerationInfo(row.Generation, latestGeneration)),
109+
ChangeType.Delete => new JsonDocumentDeleted(row.Area, row.CreateEntity(), row.Size, new GenerationInfo(row.Generation, latestGeneration)),
110+
_ => throw new NotSupportedException()
109111
};
110112
}
111113

112114
void PublishCommitSignal()
113115
{
114-
observable.Publish(new CommitSignal(AreaName));
116+
observable.Publish(new JsonDocumentSourceDigestCompleted(AreaName));
115117
}
116118

117-
void PublishChanges(IStorageAreaLogReader changes, Func<IChangeLogRow, JsonChangeType> changeTypeGetter)
119+
void PublishChanges(IStorageAreaLogReader changes, Func<IChangeLogRow, IJsonDocumentSourceEvent> rowMapper)
118120
{
119121
foreach (IChangeLogRow change in changes)
120122
{
121123
generation = change.Generation;
122124
if (change.Type == ChangeType.Faulty)
123125
continue;
124-
125-
observable.Publish(new JsonDocumentChange(
126-
change.Area,
127-
changeTypeGetter(change),
128-
change.CreateEntity(),
129-
change.Size,
130-
new GenerationInfo(change.Generation, latestGeneration)));
126+
127+
observable.Publish(rowMapper(change));
131128
}
132129
}
133130
}

src/Stress/StressTester/Adapter/JsonStorageDocumentSource.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class JsonStorageDocumentSource : IJsonDocumentSource
1818
private readonly DocumentChangesStream observable = new();
1919
private readonly InfoStream<JsonStorageDocumentSource> infoStream = new();
2020

21-
public IObservable<IJsonDocumentChange> DocumentChanges => observable;
21+
public IObservable<IJsonDocumentSourceEvent> DocumentChanges => observable;
2222
public IObservableValue<bool> Initialized { get; } = new ObservableValue<bool>();
2323
public IInfoStream InfoStream => infoStream;
2424

@@ -41,13 +41,13 @@ public JsonStorageDocumentSource(IJsonStorageAreaObserverFactory factory)
4141
.ToDictionary(x => x.AreaName);
4242
}
4343

44-
private void Forward(IJsonDocumentChange change)
44+
private void Forward(IJsonDocumentSourceEvent sourceEvent)
4545
{
4646
//Only pass a commit signal through once all are initialized.
47-
if(change.Type == JsonChangeType.Commit && !Initialized.Value)
47+
if(sourceEvent is JsonDocumentSourceDigestCompleted && !Initialized.Value)
4848
return;
4949

50-
observable.Publish(change);
50+
observable.Publish(sourceEvent);
5151
}
5252

5353
private void InitializedChanged()

0 commit comments

Comments
 (0)