Skip to content

Commit 8e7dd8e

Browse files
jmdjmd
jmd
authored and
jmd
committed
do not swallow digest complete or reset event
1 parent b7fb2ee commit 8e7dd8e

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

src/DotJEM.Json.Index2.Management/Tracking/IIngestProgressTracker.cs

+19-4
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,27 @@ public void OnNext(IJsonDocumentSourceEvent value)
9898
switch (value)
9999
{
100100
case JsonDocumentCreated created:
101-
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(created.Generation, created.Size));
101+
observerTrackers.AddOrUpdate(value.Area,
102+
_ => throw new InvalidDataException(),
103+
(_, state) => state.UpdateState(created.Generation, created.Size));
102104
break;
103105
case JsonDocumentUpdated updated:
104-
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(updated.Generation, updated.Size));
106+
observerTrackers.AddOrUpdate(value.Area,
107+
_ => throw new InvalidDataException(),
108+
(_, state) => state.UpdateState(updated.Generation, updated.Size));
105109
break;
106110
case JsonDocumentDeleted deleted:
107-
observerTrackers.AddOrUpdate(value.Area, _ => throw new InvalidDataException(), (_, state) => state.UpdateState(deleted.Generation, deleted.Size));
111+
observerTrackers.AddOrUpdate(value.Area,
112+
_ => throw new InvalidDataException(),
113+
(_, state) => state.UpdateState(deleted.Generation, deleted.Size));
108114
break;
115+
case JsonDocumentSourceDigestCompleted:
116+
observerTrackers.AddOrUpdate(value.Area,
117+
_ => throw new InvalidDataException(), (_, state) => state);
118+
break;
119+
case JsonDocumentSourceReset:
120+
break;
121+
109122
default:
110123
return;
111124
}
@@ -114,8 +127,10 @@ public void OnNext(IJsonDocumentSourceEvent value)
114127

115128
public void UpdateState(StorageAreaIngestState state)
116129
{
117-
observerTrackers.AddOrUpdate(state.Area, s => new StorageAreaIngestStateTracker(s, JsonSourceEventType.Initialized).UpdateState(state)
130+
observerTrackers.AddOrUpdate(state.Area,
131+
s => new StorageAreaIngestStateTracker(s, JsonSourceEventType.Initialized).UpdateState(state)
118132
, (s, tracker) => tracker.UpdateState(state));
133+
InternalPublish(IngestState);
119134
}
120135

121136
public void SetInitialized(bool initialized)

src/Stress/StressTester/Program.cs

+1-6
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
IWebTaskScheduler scheduler = new WebTaskScheduler();
6969
IJsonIndexManager jsonIndexManager = new JsonIndexManager(
7070
new JsonStorageDocumentSource(new JsonStorageAreaObserverFactory(storage, scheduler,areas)),
71-
new JsonIndexSnapshotManager(index, new ZipSnapshotStrategy(".\\app_data\\snapshots"), scheduler, "10s"),
71+
new JsonIndexSnapshotManager(index, new ZipSnapshotStrategy(".\\app_data\\snapshots"), scheduler, "30m"),
7272
index
7373
);
7474

@@ -83,11 +83,6 @@
8383
{
8484
Console.Clear();
8585
Console.WriteLine("COMPLETED");
86-
Console.WriteLine("COMPLETED");
87-
Console.WriteLine("COMPLETED");
88-
Console.WriteLine("COMPLETED");
89-
Console.WriteLine("COMPLETED");
90-
Console.WriteLine("COMPLETED");
9186

9287
});
9388

0 commit comments

Comments
 (0)