Skip to content

Commit aefa97a

Browse files
jmdjmd
jmd
authored and
jmd
committed
improve reset. still observing a writer disposed.
1 parent 76fc9c7 commit aefa97a

File tree

5 files changed

+104
-41
lines changed

5 files changed

+104
-41
lines changed

src/DotJEM.Json.Index2.Management/IJsonIndexManager.cs

+12-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using DotJEM.Json.Index2.Management.Writer;
1414
using DotJEM.ObservableExtensions.InfoStreams;
1515
using DotJEM.Web.Scheduler;
16+
using Lucene.Net.Search;
1617

1718
namespace DotJEM.Json.Index2.Management;
1819

@@ -75,7 +76,7 @@ public async Task RunAsync()
7576

7677
await Task
7778
.WhenAll(
78-
jsonDocumentSource.RunAsync(),
79+
jsonDocumentSource.StartAsync(),
7980
snapshots.RunAsync(Tracker, restoredFromSnapshot))
8081
.ConfigureAwait(false);
8182
}
@@ -101,17 +102,25 @@ public async Task<bool> RestoreSnapshotAsync()
101102
return true;
102103
}
103104

105+
//TODO: Area is a concept that belongs to the specific storage implementation.
106+
// So is the generation, How can we pass these in a decoupled way?
104107
public Task UpdateGenerationAsync(string area, long generation)
105108
{
106109
jsonDocumentSource.UpdateGeneration(area, generation);
107110
return Task.CompletedTask;
108111
}
109112

113+
114+
/// <summary>
115+
/// Stops the underlying <see cref="IJsonDocumentSource"/>, deletes it's storage,
116+
/// requests reset of the underlying <see cref="IJsonDocumentSource"/> and then starts it again.
117+
/// </summary>
110118
public async Task ResetIndexAsync()
111119
{
112-
120+
await jsonDocumentSource.StopAsync().ConfigureAwait(false);
113121
index.Storage.Delete();
114122
await jsonDocumentSource.ResetAsync().ConfigureAwait(false);
123+
await jsonDocumentSource.StartAsync().ConfigureAwait(false);
115124
}
116125

117126
private void CaptureChange(IJsonDocumentSourceEvent sourceEvent)
@@ -120,7 +129,7 @@ private void CaptureChange(IJsonDocumentSourceEvent sourceEvent)
120129
{
121130
switch (sourceEvent)
122131
{
123-
case JsonDocumentSourceDigestCompleted commitSignal:
132+
case JsonDocumentSourceDigestCompleted:
124133
writer.Commit();
125134
break;
126135
case JsonDocumentCreated created:

src/DotJEM.Json.Index2.Management/Source/IJsonDocumentSource.cs

+37-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,49 @@
55

66
namespace DotJEM.Json.Index2.Management.Source;
77

8+
/// <summary>
9+
///
10+
/// </summary>
811
public interface IJsonDocumentSource
912
{
13+
/// <summary>
14+
///
15+
/// </summary>
1016
IInfoStream InfoStream { get; }
1117

18+
/// <summary>
19+
///
20+
/// </summary>
1221
IObservable<IJsonDocumentSourceEvent> DocumentChanges { get; }
22+
23+
/// <summary>
24+
///
25+
/// </summary>
1326
IObservableValue<bool> Initialized { get; }
1427

15-
Task RunAsync();
28+
/// <summary>
29+
///
30+
/// </summary>
31+
/// <returns></returns>
32+
Task StartAsync();
33+
34+
/// <summary>
35+
///
36+
/// </summary>
37+
/// <returns></returns>
38+
Task StopAsync();
39+
40+
/// <summary>
41+
///
42+
/// </summary>
43+
/// <param name="area"></param>
44+
/// <param name="generation"></param>
1645
void UpdateGeneration(string area, long generation);
46+
47+
/// <summary>
48+
///
49+
/// </summary>
50+
/// <returns></returns>
1751
Task ResetAsync();
18-
}
52+
}
53+

src/Stress/StressTester/Adapter/IJsonStorageAreaObserver.cs

+44-15
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,47 @@ public interface IJsonStorageAreaObserver : IJsonDocumentSource
1515
string AreaName { get; }
1616
}
1717

18+
internal class Atomic<T>
19+
{
20+
private T value;
21+
private readonly object padlock = new ();
22+
23+
public Atomic(T value)
24+
{
25+
this.value = value;
26+
}
27+
28+
public T Read() {
29+
lock (padlock)
30+
{
31+
return value;
32+
}
33+
}
34+
35+
public T Exchange(T value)
36+
{
37+
lock (padlock)
38+
{
39+
T current = this.value;
40+
this.value = value;
41+
return current;
42+
}
43+
}
44+
45+
public static implicit operator Atomic<T>(T value) => new (value);
46+
public static implicit operator T(Atomic<T> value) => value.Read();
47+
}
48+
49+
1850
public class JsonStorageAreaObserver : IJsonStorageAreaObserver
1951
{
2052
private readonly string pollInterval;
2153
private readonly IWebTaskScheduler scheduler;
2254
private readonly IStorageAreaLog log;
2355
private readonly DocumentChangesStream observable = new();
2456
private readonly IInfoStream<JsonStorageAreaObserver> infoStream = new InfoStream<JsonStorageAreaObserver>();
25-
57+
private readonly Atomic<bool> started = false;
58+
2659
private long generation = 0;
2760
private long initialGeneration = 0;
2861
private IScheduledTask task;
@@ -41,19 +74,25 @@ public JsonStorageAreaObserver(IStorageArea storageArea, IWebTaskScheduler sched
4174
log = storageArea.Log;
4275
}
4376

44-
public async Task RunAsync()
77+
public async Task StartAsync()
4578
{
79+
if(started.Exchange(true))
80+
return;
81+
4682
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Starting, StorageArea.Name, $"Ingest starting for storageArea '{StorageArea.Name}'.");
4783
task = scheduler.Schedule($"JsonStorageAreaObserver:{StorageArea.Name}", _ => RunUpdateCheck(), pollInterval);
4884
task.InfoStream.Subscribe(infoStream);
49-
await task.WhenCompleted();
85+
await task.Signal(true);
5086
}
5187

5288
public async Task StopAsync()
5389
{
90+
if(!started.Exchange(false))
91+
return;
92+
5493
task.Dispose();
55-
await task.WhenCompleted();
56-
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Stopped, StorageArea.Name, $"Initializing for storageArea '{StorageArea.Name}'.");
94+
await task.WhenCompleted().ConfigureAwait(false);
95+
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Stopped, StorageArea.Name, $"Stopping for storageArea '{StorageArea.Name}'.");
5796
}
5897

5998
public void UpdateGeneration(string area, long value)
@@ -70,7 +109,6 @@ public async Task ResetAsync()
70109
{
71110
UpdateGeneration(AreaName, initialGeneration);
72111
observable.Publish(new JsonDocumentSourceReset(AreaName));
73-
task.Signal();
74112
}
75113

76114
public void RunUpdateCheck()
@@ -80,23 +118,19 @@ public void RunUpdateCheck()
80118
Stopwatch timer = Stopwatch.StartNew();
81119
if (!Initialized.Value)
82120
{
83-
BeforeInitialize();
84121
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Initializing, StorageArea.Name, $"Initializing for storageArea '{StorageArea.Name}'.");
85122
using IStorageAreaLogReader changes = log.OpenLogReader(generation, Initialized.Value );
86123
PublishChanges(changes, row => new JsonDocumentCreated(row.Area, row.CreateEntity(), row.Size, new GenerationInfo(row.Generation, latestGeneration)));
87124
Initialized.Value = true;
88125
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Initialized, StorageArea.Name, $"Initialization complete for storageArea '{StorageArea.Name}' in {timer.Elapsed}.");
89-
AfterInitialize();
90126

91127
}
92128
else
93129
{
94-
BeforeUpdate();
95130
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Updating, StorageArea.Name, $"Checking updates for storageArea '{StorageArea.Name}'.");
96131
using IStorageAreaLogReader changes = log.OpenLogReader(generation, Initialized.Value );
97132
PublishChanges(changes, MapRow);
98133
infoStream.WriteJsonSourceEvent(JsonSourceEventType.Updated, StorageArea.Name, $"Done checking updates for storageArea '{StorageArea.Name}' in {timer.Elapsed}.");
99-
AfterUpdate();
100134
}
101135
PublishCommitSignal();
102136

@@ -128,9 +162,4 @@ void PublishChanges(IStorageAreaLogReader changes, Func<IChangeLogRow, IJsonDocu
128162
}
129163
}
130164
}
131-
public virtual void BeforeInitialize() { }
132-
public virtual void AfterInitialize() { }
133-
134-
public virtual void BeforeUpdate() { }
135-
public virtual void AfterUpdate() { }
136165
}

src/Stress/StressTester/Adapter/JsonStorageDocumentSource.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,17 @@ private void InitializedChanged()
5555
this.Initialized.Value = observers.Values.All(observer => observer.Initialized.Value);
5656
}
5757

58-
public async Task RunAsync()
58+
public async Task StartAsync()
5959
{
6060
await Task.WhenAll(
61-
observers.Values.Select(async observer => await observer.RunAsync().ConfigureAwait(false))
61+
observers.Values.Select(async observer => await observer.StartAsync().ConfigureAwait(false))
62+
).ConfigureAwait(false);
63+
}
64+
65+
public async Task StopAsync()
66+
{
67+
await Task.WhenAll(
68+
observers.Values.Select(async observer => await observer.StopAsync().ConfigureAwait(false))
6269
).ConfigureAwait(false);
6370
}
6471

src/Stress/StressTester/Program.cs

+2-19
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,6 @@
7878
//,genTask
7979
);
8080

81-
82-
jsonIndexManager.Tracker.WhenState(IngestInitializationState.Initialized).ContinueWith(task =>
83-
{
84-
Console.Clear();
85-
Console.WriteLine("COMPLETED");
86-
87-
});
88-
8981
while (true)
9082
{
9183
string? input = Console.ReadLine();
@@ -99,21 +91,12 @@
9991
await jsonIndexManager.TakeSnapshotAsync();
10092
break;
10193

102-
case 'C':
103-
jsonIndexManager.Tracker.WhenState(IngestInitializationState.Initialized).ContinueWith(task =>
104-
{
105-
Console.Clear();
106-
Console.WriteLine("COMPLETED");
107-
});
108-
109-
break;
110-
11194
case 'I':
11295
Console.Clear();
11396
break;
11497

115-
case 'L':
116-
Console.Clear();
98+
case 'R':
99+
jsonIndexManager.ResetIndexAsync();
117100
break;
118101

119102
case 'Q':

0 commit comments

Comments
 (0)