Skip to content

Commit af432bb

Browse files
authored
Merge pull request #7 from dotJEM/issues/GH-6
Issues/gh 6
2 parents 3618205 + 1619f34 commit af432bb

File tree

9 files changed

+221
-79
lines changed

9 files changed

+221
-79
lines changed

src/DotJEM.Json.Index2.Contexts/Searching/MultiIndexJsonSearcherManager.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ public MultiIndexJsonSearcherManager(IJsonIndex[] indicies, IJsonDocumentSeriali
2020

2121
public IIndexSearcherContext Acquire()
2222
{
23+
//TODO: Do we need to respect the Writer Lease here, or is this OK as we just get the reader?
2324
IndexReader[] readers = indicies
24-
.Select(idx => idx.WriterManager.Writer.GetReader(true))
25+
.Select(idx => idx.WriterManager.Lease().Value.GetReader(true))
2526
.Select(r => DirectoryReader.OpenIfChanged(r) ?? r)
2627
.Cast<IndexReader>()
2728
.ToArray();

src/DotJEM.Json.Index2.Management.Test/JsonIndexManagerTest.cs

+17-8
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,28 @@ public async Task IndexWriterShouldNotBeDisposed()
4141
IJsonIndexManager manager = new JsonIndexManager(source, snapshots, index);
4242

4343
InfoStreamExceptionEvent? disposedEvent = null;
44+
InfoStreamExceptionEvent? exceptionEvent = null;
4445
manager.InfoStream
4546
.OfType<InfoStreamExceptionEvent>()
4647
.Where(@event => @event.Exception is ObjectDisposedException)
4748
.Subscribe(@event =>
4849
{
49-
Debug.WriteLine(@event.Exception);
5050
disposedEvent = @event;
5151
});
52+
manager.InfoStream
53+
.OfType<InfoStreamExceptionEvent>()
54+
.Where(@event => @event.Exception.Message != "Can't write to an existing snapshot.")
55+
.Subscribe(@event =>
56+
{
57+
exceptionEvent = @event;
58+
});
5259

5360
try
5461
{
5562
await manager.RunAsync();
5663
Debug.WriteLine("TEST STARTED");
5764
Stopwatch sw = Stopwatch.StartNew();
58-
while (sw.Elapsed < 10.Minutes() && disposedEvent == null)
65+
while (sw.Elapsed < 10.Minutes() && disposedEvent == null && exceptionEvent == null)
5966
{
6067
Task result = Random.Shared.Next(100) switch
6168
{
@@ -68,21 +75,23 @@ public async Task IndexWriterShouldNotBeDisposed()
6875

6976
async Task DoAfterDelay(Func<Task> action, TimeSpan? delay = null)
7077
{
71-
await Task.Delay(delay ?? Random.Shared.Next(1,5).Seconds());
78+
await Task.Delay(delay ?? Random.Shared.Next(1, 5).Seconds());
7279
await action();
7380
}
81+
7482
await manager.StopAsync();
75-
index.Close();
7683
}
77-
catch (Exception e)
84+
catch (TaskCanceledException)
85+
{
86+
//Ignore.
87+
}
88+
finally
7889
{
79-
Console.WriteLine(e);
80-
await manager.StopAsync();
8190
index.Close();
82-
throw;
8391
}
8492

8593
Assert.That(disposedEvent, Is.Null, () => disposedEvent?.Exception.ToString());
94+
Assert.That(exceptionEvent, Is.Null, () => exceptionEvent?.Exception.ToString());
8695
}
8796

8897

src/DotJEM.Json.Index2.Management/Writer/IJsonIndexWriter.cs

+40-26
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
using System.Threading;
77
using DotJEM.Json.Index2.Documents;
88
using DotJEM.Json.Index2.Documents.Info;
9+
using DotJEM.Json.Index2.IO;
910
using DotJEM.ObservableExtensions.InfoStreams;
1011
using Lucene.Net.Index;
1112
using Newtonsoft.Json.Linq;
13+
using static Lucene.Net.Documents.Field;
1214

1315
namespace DotJEM.Json.Index2.Management.Writer;
1416

@@ -19,8 +21,8 @@ public interface IJsonIndexWriter
1921
void Create(IEnumerable<JObject> entities);
2022
void Update(JObject entity);
2123
void Delete(JObject entity);
22-
void Commit();
23-
void Flush(bool triggerMerge, bool applyAllDeletes);
24+
void Commit(bool force = false);
25+
void Flush(bool triggerMerge = false, bool applyAllDeletes = false);
2426
void MaybeMerge();
2527
}
2628

@@ -32,7 +34,7 @@ public class JsonIndexWriter : IJsonIndexWriter
3234
private readonly IInfoStream<JsonIndexManager> infoStream = new InfoStream<JsonIndexManager>();
3335
private readonly ThrottledCommit throttledCommit;
3436

35-
private IndexWriter Writer => index.WriterManager.Writer;
37+
private ILease<IndexWriter> WriterLease => index.WriterManager.Lease();
3638
public IInfoStream InfoStream => infoStream;
3739

3840
public JsonIndexWriter(IJsonIndex index)
@@ -46,52 +48,60 @@ public JsonIndexWriter(IJsonIndex index)
4648

4749
public void Update(JObject entity)
4850
{
51+
using ILease<IndexWriter> lease = WriterLease;
52+
4953
Term term = resolver.Resolver.Identity(entity);
5054
LuceneDocumentEntry doc = mapper.Create(entity);
51-
Writer.UpdateDocument(term, doc.Document);
55+
lease.Value.UpdateDocument(term, doc.Document);
5256
throttledCommit.Increment();
5357
DebugInfo($"Writer.UpdateDocument({term}, <doc>)");
5458
}
5559

5660
public void Create(JObject entity)
5761
{
62+
using ILease<IndexWriter> lease = WriterLease;
63+
5864
LuceneDocumentEntry doc = mapper.Create(entity);
59-
Writer.AddDocument(doc.Document);
65+
lease.Value.AddDocument(doc.Document);
6066
throttledCommit.Increment();
6167
DebugInfo($"Writer.AddDocument(<doc>)");
6268
}
6369

6470
public void Create(IEnumerable<JObject> entities)
6571
{
66-
Writer.AddDocuments(entities.Select(entity => mapper.Create(entity).Document));
72+
using ILease<IndexWriter> lease = WriterLease;
73+
lease.Value.AddDocuments(entities.Select(entity => mapper.Create(entity).Document));
6774
throttledCommit.Increment();
6875
DebugInfo($"Writer.AddDocuments(<doc>)");
6976
}
7077

7178
public void Delete(JObject entity)
7279
{
80+
using ILease<IndexWriter> lease = WriterLease;
7381
Term term = resolver.Resolver.Identity(entity);
74-
Writer.DeleteDocuments(term);
82+
lease.Value.DeleteDocuments(term);
7583
throttledCommit.Increment();
7684
DebugInfo($"Writer.UpdateDocuments({term})");
7785
}
7886

7987

80-
public void Commit()
88+
public void Commit(bool force = false)
8189
{
82-
throttledCommit.Invoke();
90+
throttledCommit.Invoke(force);
8391
DebugInfo($"Writer.Commit()");
8492
}
8593

86-
public void Flush(bool triggerMerge, bool applyAllDeletes)
94+
public void Flush(bool triggerMerge = false, bool applyAllDeletes = false)
8795
{
88-
Writer.Flush(triggerMerge, applyAllDeletes);
96+
using ILease<IndexWriter> lease = WriterLease;
97+
lease.Value.Flush(triggerMerge, applyAllDeletes);
8998
DebugInfo($"Writer.Flush({triggerMerge}, {applyAllDeletes})");
9099
}
91100

92101
public void MaybeMerge()
93102
{
94-
Writer.MaybeMerge();
103+
using ILease<IndexWriter> lease = WriterLease;
104+
lease.Value.MaybeMerge();
95105
DebugInfo($"Writer.MaybeMerge()");
96106
}
97107

@@ -113,20 +123,17 @@ public class ThrottledCommit
113123
public ThrottledCommit(JsonIndexWriter target)
114124
{
115125
this.target = target;
116-
ThreadPool.RegisterWaitForSingleObject(handle, (_,_)=>Tick(), null, 200, false);
126+
ThreadPool.RegisterWaitForSingleObject(handle, (_,_)=>Tick(false), null, 200, false);
117127
}
118128

119-
private void Tick()
129+
private void Tick(bool force)
120130
{
121131
long time = Stopwatch.GetTimestamp();
122-
if (time - lastInvocation > upperBound)
123-
{
124-
Commit();
125-
lastInvocation = time;
126-
return;
127-
}
128-
129-
if (time - lastRequest > lowerBound)
132+
// ReSharper disable once InvertIf
133+
if (force
134+
|| time - lastInvocation > upperBound
135+
|| time - lastRequest > lowerBound
136+
)
130137
{
131138
Commit();
132139
lastInvocation = time;
@@ -135,15 +142,18 @@ private void Tick()
135142

136143
private void Commit()
137144
{
138-
if(Interlocked.Exchange(ref writes, 0) < 1)
145+
long writesRead = Interlocked.Exchange(ref writes, 0);
146+
if (writesRead < 1)
139147
return;
140148

141-
if (Interlocked.Exchange(ref calls, 0) < 1)
149+
long callsRead = Interlocked.Exchange(ref calls, 0);
150+
if (callsRead < 1)
142151
return;
143152

144153
try
145154
{
146-
target.Writer.Commit();
155+
using ILease<IndexWriter> lease = target.WriterLease;
156+
lease.Value.Commit();
147157
}
148158
catch (Exception e)
149159
{
@@ -152,10 +162,14 @@ private void Commit()
152162
}
153163
}
154164

155-
public void Invoke()
165+
public void Invoke(bool force)
156166
{
157167
Interlocked.Increment(ref calls);
158168
lastRequest = Stopwatch.GetTimestamp();
169+
if (force)
170+
{
171+
Tick(force);
172+
}
159173
}
160174

161175
public void Increment()

src/DotJEM.Json.Index2.Snapshots/IndexSnapshotHandler.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Text.RegularExpressions;
66
using System.Threading.Tasks;
7+
using DotJEM.Json.Index2.IO;
78
using DotJEM.Json.Index2.Snapshots.Streams;
89
using Lucene.Net.Index;
910
using Lucene.Net.Store;
@@ -32,7 +33,9 @@ public class IndexSnapshotHandler : IIndexSnapshotHandler
3233
{
3334
public async Task<ISnapshot> TakeSnapshotAsync(IJsonIndex index, ISnapshotStorage storage)
3435
{
35-
IndexWriter writer = index.WriterManager.Writer;
36+
using ILease<IndexWriter> lease = index.WriterManager.Lease();
37+
38+
IndexWriter writer = lease.Value;
3639
SnapshotDeletionPolicy sdp = writer.Config.IndexDeletionPolicy as SnapshotDeletionPolicy
3740
?? throw new InvalidOperationException("Index must use an implementation of the SnapshotDeletionPolicy.");
3841
IndexCommit commit = null;

src/DotJEM.Json.Index2/IO/JsonIndexWriter.cs

+25-15
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class JsonIndexWriter : Disposable, IJsonIndexWriter
2727
private readonly ILuceneDocumentFactory factory;
2828

2929
public IJsonIndex Index { get; }
30-
public IndexWriter UnderlyingWriter => manager.Writer;
30+
//public IndexWriter UnderlyingWriter => manager.Writer;
3131

3232
public JsonIndexWriter(IJsonIndex index, ILuceneDocumentFactory factory, IIndexWriterManager manager)
3333
{
@@ -43,51 +43,61 @@ public void Create(IEnumerable<JObject> docs)
4343
IEnumerable<Document> documents = factory
4444
.Create(docs)
4545
.Select(tuple => tuple.Document);
46-
UnderlyingWriter.AddDocuments(documents);
46+
WithLease(writer => writer.AddDocuments(documents));
4747
}
4848

4949
public void Update(JObject doc) => Update(new[] { doc });
5050
public void Update(IEnumerable<JObject> docs)
5151
{
5252
IEnumerable<LuceneDocumentEntry> documents = factory.Create(docs);
53-
foreach ((Term key, Document doc) in documents)
54-
UnderlyingWriter.UpdateDocument(key, doc);
53+
WithLease(writer => {
54+
foreach ((Term key, Document doc) in documents)
55+
writer.UpdateDocument(key, doc);
56+
});
5557
}
5658

5759
public void Delete(JObject doc) => Delete(new[] { doc });
5860
public void Delete(IEnumerable<JObject> docs)
5961
{
6062
IEnumerable<LuceneDocumentEntry> documents = factory.Create(docs);
61-
foreach ((Term key, Document _) in documents)
62-
UnderlyingWriter.DeleteDocuments(key);
63+
WithLease(writer => {
64+
foreach ((Term key, Document _) in documents)
65+
writer.DeleteDocuments(key);
66+
});
6367
}
6468

6569
public void ForceMerge(int maxSegments)
66-
=> UnderlyingWriter.ForceMerge(maxSegments);
70+
=> WithLease(writer => writer.ForceMerge(maxSegments));
6771

6872
public void ForceMerge(int maxSegments, bool wait)
69-
=> UnderlyingWriter.ForceMerge(maxSegments, wait);
73+
=> WithLease(writer => writer.ForceMerge(maxSegments, wait));
7074

7175
public void ForceMergeDeletes()
72-
=> UnderlyingWriter.ForceMergeDeletes();
76+
=> WithLease(writer => writer.ForceMergeDeletes());
7377

7478
public void ForceMergeDeletes(bool wait)
75-
=> UnderlyingWriter.ForceMergeDeletes(wait);
79+
=> WithLease(writer => writer.ForceMergeDeletes(wait));
7680

7781
public void Rollback()
78-
=> UnderlyingWriter.Rollback();
82+
=> WithLease(writer => writer.Rollback());
7983

8084
public void Flush(bool triggerMerge, bool applyDeletes)
81-
=> UnderlyingWriter.Flush(triggerMerge, applyDeletes);
85+
=> WithLease(writer => writer.Flush(triggerMerge, applyDeletes));
8286

8387
public void Commit()
84-
=> UnderlyingWriter.Commit();
88+
=> WithLease(writer => writer.Commit());
8589

8690
public void PrepareCommit()
87-
=> UnderlyingWriter.PrepareCommit();
91+
=> WithLease(writer => writer.PrepareCommit());
8892

8993
public void SetCommitData(IDictionary<string, string> commitUserData)
90-
=> UnderlyingWriter.SetCommitData(commitUserData);
94+
=> WithLease(writer => writer.SetCommitData(commitUserData));
95+
96+
private void WithLease(Action<IndexWriter> action)
97+
{
98+
using ILease<IndexWriter> lease =manager.Lease();
99+
action(lease.Value);
100+
}
91101

92102
protected override void Dispose(bool disposing)
93103
{

0 commit comments

Comments
 (0)