-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathIJsonIndexManager.cs
172 lines (152 loc) · 5.89 KB
/
IJsonIndexManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotJEM.Json.Index2.Management.Info;
using DotJEM.Json.Index2.Management.Observables;
using DotJEM.Json.Index2.Management.Snapshots;
using DotJEM.Json.Index2.Management.Source;
using DotJEM.Json.Index2.Management.Tracking;
using DotJEM.Json.Index2.Management.Writer;
using DotJEM.ObservableExtensions.InfoStreams;
using DotJEM.Web.Scheduler;
using Lucene.Net.Search;
namespace DotJEM.Json.Index2.Management;
public interface IJsonIndexManager
{
IInfoStream InfoStream { get; }
IIngestProgressTracker Tracker { get; }
IObservable<IJsonDocumentSourceEvent> DocumentChanges { get; }
Task<bool> TakeSnapshotAsync();
Task RunAsync();
Task UpdateGenerationAsync(string area, long generation);
Task ResetIndexAsync();
Task StopAsync();
}
public class JsonIndexManager : IJsonIndexManager
{
private readonly IJsonDocumentSource jsonDocumentSource;
private readonly IJsonIndexSnapshotManager snapshots;
private readonly IJsonIndexWriter writer;
private readonly IJsonIndex index;
private readonly IInfoStream<JsonIndexManager> infoStream = new InfoStream<JsonIndexManager>();
private readonly DocumentChangesStream changesStream = new();
public IInfoStream InfoStream => infoStream;
public IIngestProgressTracker Tracker { get; }
public IObservable<IJsonDocumentSourceEvent> DocumentChanges => changesStream;
public JsonIndexManager(
IJsonDocumentSource jsonDocumentSource,
IJsonIndexSnapshotManager snapshots,
//TODO: Allow multiple indexes and something that can shard
IJsonIndex index,
IJsonIndexWriter writer = null)
{
this.jsonDocumentSource = jsonDocumentSource;
this.snapshots = snapshots;
this.index = index;
this.writer = writer ?? new JsonIndexWriter(index);
this.writer.InfoStream.Subscribe(infoStream);
Tracker = new IngestProgressTracker();
jsonDocumentSource.DocumentChanges.ForEachAsync(CaptureChange);
jsonDocumentSource.InfoStream.Subscribe(infoStream);
jsonDocumentSource.Initialized.Subscribe(Tracker.SetInitialized);
snapshots.InfoStream.Subscribe(infoStream);
jsonDocumentSource.InfoStream.Subscribe(Tracker);
jsonDocumentSource.DocumentChanges.Subscribe(Tracker);
snapshots.InfoStream.Subscribe(Tracker);
Tracker.InfoStream.Subscribe(infoStream);
Tracker.ForEachAsync(state => infoStream.WriteTrackerStateEvent(state, "Tracker state updated"));
}
public async Task RunAsync()
{
bool restoredFromSnapshot = await RestoreSnapshotAsync().ConfigureAwait(false);
if (!restoredFromSnapshot)
{
index.Storage.Delete();
}
else
{
infoStream.WriteInfo($"Index was restored from snapshot.");
}
await Task
.WhenAll(
jsonDocumentSource.StartAsync(),
snapshots.RunAsync(Tracker, restoredFromSnapshot))
.ConfigureAwait(false);
}
public async Task<bool> TakeSnapshotAsync()
{
StorageIngestState state = Tracker.IngestState;
return await snapshots.TakeSnapshotAsync(state).ConfigureAwait(false);
}
public async Task<bool> RestoreSnapshotAsync()
{
RestoreSnapshotResult restoreResult = await snapshots.RestoreSnapshotAsync().ConfigureAwait(false);
if (!restoreResult.RestoredFromSnapshot)
return false;
foreach (StorageAreaIngestState state in restoreResult.State.Areas)
{
jsonDocumentSource.UpdateGeneration(state.Area, state.Generation.Current);
Tracker.UpdateState(state);
}
return true;
}
//TODO: Area is a concept that belongs to the specific storage implementation.
// So is the generation, How can we pass these in a decoupled way?
public Task UpdateGenerationAsync(string area, long generation)
{
jsonDocumentSource.UpdateGeneration(area, generation);
return Task.CompletedTask;
}
/// <summary>
/// Stops the underlying <see cref="IJsonDocumentSource"/>, deletes it's storage,
/// requests reset of the underlying <see cref="IJsonDocumentSource"/> and then starts it again.
/// </summary>
public async Task ResetIndexAsync()
{
await jsonDocumentSource.StopAsync().ConfigureAwait(false);
index.Storage.Delete();
//TODO: Force a commit after delete to see if that helps?
using (var lease = index.WriterManager.Lease())
{
lease.Value.Commit();
}
await jsonDocumentSource.ResetAsync().ConfigureAwait(false);
await jsonDocumentSource.StartAsync().ConfigureAwait(false);
}
public async Task StopAsync()
{
await jsonDocumentSource.StopAsync().ConfigureAwait(false);
}
private void CaptureChange(IJsonDocumentSourceEvent sourceEvent)
{
try
{
switch (sourceEvent)
{
case JsonDocumentSourceDigestCompleted:
if(jsonDocumentSource.Initialized.Value)
writer.Commit();
break;
case JsonDocumentCreated created:
writer.Create(created.Document);
break;
case JsonDocumentDeleted deleted:
writer.Delete(deleted.Document);
break;
case JsonDocumentUpdated updated:
writer.Update(updated.Document);
break;
}
changesStream.Publish(sourceEvent);
}
catch (Exception ex)
{
infoStream.WriteError($"Failed to ingest change from {sourceEvent.Area}", ex);
}
}
}