Skip to content

Issues/gh 6 #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public MultiIndexJsonSearcherManager(IJsonIndex[] indicies, IJsonDocumentSeriali

public IIndexSearcherContext Acquire()
{
//TODO: Do we need to respect the Writer Lease here, or is this OK as we just get the reader?
IndexReader[] readers = indicies
.Select(idx => idx.WriterManager.Writer.GetReader(true))
.Select(idx => idx.WriterManager.Lease().Value.GetReader(true))
.Select(r => DirectoryReader.OpenIfChanged(r) ?? r)
.Cast<IndexReader>()
.ToArray();
Expand Down
25 changes: 17 additions & 8 deletions src/DotJEM.Json.Index2.Management.Test/JsonIndexManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,28 @@ public async Task IndexWriterShouldNotBeDisposed()
IJsonIndexManager manager = new JsonIndexManager(source, snapshots, index);

InfoStreamExceptionEvent? disposedEvent = null;
InfoStreamExceptionEvent? exceptionEvent = null;
manager.InfoStream
.OfType<InfoStreamExceptionEvent>()
.Where(@event => @event.Exception is ObjectDisposedException)
.Subscribe(@event =>
{
Debug.WriteLine(@event.Exception);
disposedEvent = @event;
});
manager.InfoStream
.OfType<InfoStreamExceptionEvent>()
.Where(@event => @event.Exception.Message != "Can't write to an existing snapshot.")
.Subscribe(@event =>
{
exceptionEvent = @event;
});

try
{
await manager.RunAsync();
Debug.WriteLine("TEST STARTED");
Stopwatch sw = Stopwatch.StartNew();
while (sw.Elapsed < 10.Minutes() && disposedEvent == null)
while (sw.Elapsed < 10.Minutes() && disposedEvent == null && exceptionEvent == null)
{
Task result = Random.Shared.Next(100) switch
{
Expand All @@ -68,21 +75,23 @@ public async Task IndexWriterShouldNotBeDisposed()

async Task DoAfterDelay(Func<Task> action, TimeSpan? delay = null)
{
await Task.Delay(delay ?? Random.Shared.Next(1,5).Seconds());
await Task.Delay(delay ?? Random.Shared.Next(1, 5).Seconds());
await action();
}

await manager.StopAsync();
index.Close();
}
catch (Exception e)
catch (TaskCanceledException)
{
//Ignore.
}
finally
{
Console.WriteLine(e);
await manager.StopAsync();
index.Close();
throw;
}

Assert.That(disposedEvent, Is.Null, () => disposedEvent?.Exception.ToString());
Assert.That(exceptionEvent, Is.Null, () => exceptionEvent?.Exception.ToString());
}


Expand Down
66 changes: 40 additions & 26 deletions src/DotJEM.Json.Index2.Management/Writer/IJsonIndexWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
using System.Threading;
using DotJEM.Json.Index2.Documents;
using DotJEM.Json.Index2.Documents.Info;
using DotJEM.Json.Index2.IO;
using DotJEM.ObservableExtensions.InfoStreams;
using Lucene.Net.Index;
using Newtonsoft.Json.Linq;
using static Lucene.Net.Documents.Field;

namespace DotJEM.Json.Index2.Management.Writer;

Expand All @@ -19,8 +21,8 @@ public interface IJsonIndexWriter
void Create(IEnumerable<JObject> entities);
void Update(JObject entity);
void Delete(JObject entity);
void Commit();
void Flush(bool triggerMerge, bool applyAllDeletes);
void Commit(bool force = false);
void Flush(bool triggerMerge = false, bool applyAllDeletes = false);
void MaybeMerge();
}

Expand All @@ -32,7 +34,7 @@ public class JsonIndexWriter : IJsonIndexWriter
private readonly IInfoStream<JsonIndexManager> infoStream = new InfoStream<JsonIndexManager>();
private readonly ThrottledCommit throttledCommit;

private IndexWriter Writer => index.WriterManager.Writer;
private ILease<IndexWriter> WriterLease => index.WriterManager.Lease();
public IInfoStream InfoStream => infoStream;

public JsonIndexWriter(IJsonIndex index)
Expand All @@ -46,52 +48,60 @@ public JsonIndexWriter(IJsonIndex index)

public void Update(JObject entity)
{
using ILease<IndexWriter> lease = WriterLease;

Term term = resolver.Resolver.Identity(entity);
LuceneDocumentEntry doc = mapper.Create(entity);
Writer.UpdateDocument(term, doc.Document);
lease.Value.UpdateDocument(term, doc.Document);
throttledCommit.Increment();
DebugInfo($"Writer.UpdateDocument({term}, <doc>)");
}

public void Create(JObject entity)
{
using ILease<IndexWriter> lease = WriterLease;

LuceneDocumentEntry doc = mapper.Create(entity);
Writer.AddDocument(doc.Document);
lease.Value.AddDocument(doc.Document);
throttledCommit.Increment();
DebugInfo($"Writer.AddDocument(<doc>)");
}

public void Create(IEnumerable<JObject> entities)
{
Writer.AddDocuments(entities.Select(entity => mapper.Create(entity).Document));
using ILease<IndexWriter> lease = WriterLease;
lease.Value.AddDocuments(entities.Select(entity => mapper.Create(entity).Document));
throttledCommit.Increment();
DebugInfo($"Writer.AddDocuments(<doc>)");
}

public void Delete(JObject entity)
{
using ILease<IndexWriter> lease = WriterLease;
Term term = resolver.Resolver.Identity(entity);
Writer.DeleteDocuments(term);
lease.Value.DeleteDocuments(term);
throttledCommit.Increment();
DebugInfo($"Writer.UpdateDocuments({term})");
}


public void Commit()
public void Commit(bool force = false)
{
throttledCommit.Invoke();
throttledCommit.Invoke(force);
DebugInfo($"Writer.Commit()");
}

public void Flush(bool triggerMerge, bool applyAllDeletes)
public void Flush(bool triggerMerge = false, bool applyAllDeletes = false)
{
Writer.Flush(triggerMerge, applyAllDeletes);
using ILease<IndexWriter> lease = WriterLease;
lease.Value.Flush(triggerMerge, applyAllDeletes);
DebugInfo($"Writer.Flush({triggerMerge}, {applyAllDeletes})");
}

public void MaybeMerge()
{
Writer.MaybeMerge();
using ILease<IndexWriter> lease = WriterLease;
lease.Value.MaybeMerge();
DebugInfo($"Writer.MaybeMerge()");
}

Expand All @@ -113,20 +123,17 @@ public class ThrottledCommit
public ThrottledCommit(JsonIndexWriter target)
{
this.target = target;
ThreadPool.RegisterWaitForSingleObject(handle, (_,_)=>Tick(), null, 200, false);
ThreadPool.RegisterWaitForSingleObject(handle, (_,_)=>Tick(false), null, 200, false);
}

private void Tick()
private void Tick(bool force)
{
long time = Stopwatch.GetTimestamp();
if (time - lastInvocation > upperBound)
{
Commit();
lastInvocation = time;
return;
}

if (time - lastRequest > lowerBound)
// ReSharper disable once InvertIf
if (force
|| time - lastInvocation > upperBound
|| time - lastRequest > lowerBound
)
{
Commit();
lastInvocation = time;
Expand All @@ -135,15 +142,18 @@ private void Tick()

private void Commit()
{
if(Interlocked.Exchange(ref writes, 0) < 1)
long writesRead = Interlocked.Exchange(ref writes, 0);
if (writesRead < 1)
return;

if (Interlocked.Exchange(ref calls, 0) < 1)
long callsRead = Interlocked.Exchange(ref calls, 0);
if (callsRead < 1)
return;

try
{
target.Writer.Commit();
using ILease<IndexWriter> lease = target.WriterLease;
lease.Value.Commit();
}
catch (Exception e)
{
Expand All @@ -152,10 +162,14 @@ private void Commit()
}
}

public void Invoke()
public void Invoke(bool force)
{
Interlocked.Increment(ref calls);
lastRequest = Stopwatch.GetTimestamp();
if (force)
{
Tick(force);
}
}

public void Increment()
Expand Down
5 changes: 4 additions & 1 deletion src/DotJEM.Json.Index2.Snapshots/IndexSnapshotHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using DotJEM.Json.Index2.IO;
using DotJEM.Json.Index2.Snapshots.Streams;
using Lucene.Net.Index;
using Lucene.Net.Store;
Expand Down Expand Up @@ -32,7 +33,9 @@ public class IndexSnapshotHandler : IIndexSnapshotHandler
{
public async Task<ISnapshot> TakeSnapshotAsync(IJsonIndex index, ISnapshotStorage storage)
{
IndexWriter writer = index.WriterManager.Writer;
using ILease<IndexWriter> lease = index.WriterManager.Lease();

IndexWriter writer = lease.Value;
SnapshotDeletionPolicy sdp = writer.Config.IndexDeletionPolicy as SnapshotDeletionPolicy
?? throw new InvalidOperationException("Index must use an implementation of the SnapshotDeletionPolicy.");
IndexCommit commit = null;
Expand Down
40 changes: 25 additions & 15 deletions src/DotJEM.Json.Index2/IO/JsonIndexWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class JsonIndexWriter : Disposable, IJsonIndexWriter
private readonly ILuceneDocumentFactory factory;

public IJsonIndex Index { get; }
public IndexWriter UnderlyingWriter => manager.Writer;
//public IndexWriter UnderlyingWriter => manager.Writer;

public JsonIndexWriter(IJsonIndex index, ILuceneDocumentFactory factory, IIndexWriterManager manager)
{
Expand All @@ -43,51 +43,61 @@ public void Create(IEnumerable<JObject> docs)
IEnumerable<Document> documents = factory
.Create(docs)
.Select(tuple => tuple.Document);
UnderlyingWriter.AddDocuments(documents);
WithLease(writer => writer.AddDocuments(documents));
}

public void Update(JObject doc) => Update(new[] { doc });
public void Update(IEnumerable<JObject> docs)
{
IEnumerable<LuceneDocumentEntry> documents = factory.Create(docs);
foreach ((Term key, Document doc) in documents)
UnderlyingWriter.UpdateDocument(key, doc);
WithLease(writer => {
foreach ((Term key, Document doc) in documents)
writer.UpdateDocument(key, doc);
});
}

public void Delete(JObject doc) => Delete(new[] { doc });
public void Delete(IEnumerable<JObject> docs)
{
IEnumerable<LuceneDocumentEntry> documents = factory.Create(docs);
foreach ((Term key, Document _) in documents)
UnderlyingWriter.DeleteDocuments(key);
WithLease(writer => {
foreach ((Term key, Document _) in documents)
writer.DeleteDocuments(key);
});
}

public void ForceMerge(int maxSegments)
=> UnderlyingWriter.ForceMerge(maxSegments);
=> WithLease(writer => writer.ForceMerge(maxSegments));

public void ForceMerge(int maxSegments, bool wait)
=> UnderlyingWriter.ForceMerge(maxSegments, wait);
=> WithLease(writer => writer.ForceMerge(maxSegments, wait));

public void ForceMergeDeletes()
=> UnderlyingWriter.ForceMergeDeletes();
=> WithLease(writer => writer.ForceMergeDeletes());

public void ForceMergeDeletes(bool wait)
=> UnderlyingWriter.ForceMergeDeletes(wait);
=> WithLease(writer => writer.ForceMergeDeletes(wait));

public void Rollback()
=> UnderlyingWriter.Rollback();
=> WithLease(writer => writer.Rollback());

public void Flush(bool triggerMerge, bool applyDeletes)
=> UnderlyingWriter.Flush(triggerMerge, applyDeletes);
=> WithLease(writer => writer.Flush(triggerMerge, applyDeletes));

public void Commit()
=> UnderlyingWriter.Commit();
=> WithLease(writer => writer.Commit());

public void PrepareCommit()
=> UnderlyingWriter.PrepareCommit();
=> WithLease(writer => writer.PrepareCommit());

public void SetCommitData(IDictionary<string, string> commitUserData)
=> UnderlyingWriter.SetCommitData(commitUserData);
=> WithLease(writer => writer.SetCommitData(commitUserData));

private void WithLease(Action<IndexWriter> action)
{
using ILease<IndexWriter> lease =manager.Lease();
action(lease.Value);
}

protected override void Dispose(bool disposing)
{
Expand Down
Loading
Loading