Skip to content

Commit 5251b88

Browse files
authored
Sqlite IPC Rework (#502)
* Use TEXT instead of VARCHAR * Force ascending aliased rowid values * Dispose rented connection and command early * Serialize to bytes early * Notify subscribers AFTER reading * Update test * Rework SqliteIPC * Update * Use Queue instead of Stack * Cleanup ReaderLoop * Seal SqliteIPC * Add WriterLoop * Publish jobs early when in-process * ReaderLoop is for other processes * Cleanup * Cleanup * Update messages * Add EnsureWrite
1 parent 893efda commit 5251b88

File tree

6 files changed

+586
-285
lines changed

6 files changed

+586
-285
lines changed

src/NexusMods.CLI/Types/IpcHandlers/NxmIpcProtocolHandler.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class NxmIpcProtocolHandler : IIpcProtocolHandler
1313
/// <inheritdoc/>
1414
public string Protocol => "nxm";
1515

16-
private IMessageProducer<NXMUrlMessage> _messages;
16+
private readonly IMessageProducer<NXMUrlMessage> _messages;
1717

1818
/// <summary>
1919
/// constructor
@@ -27,6 +27,7 @@ public NxmIpcProtocolHandler(IMessageProducer<NXMUrlMessage> messages)
2727
public async Task Handle(string url, CancellationToken cancel)
2828
{
2929
await _messages.Write(new NXMUrlMessage { Value = NXMUrl.Parse(url) }, cancel);
30+
_messages.EnsureWrite(cancel);
3031
}
3132
}
3233

src/NexusMods.DataModel/Interprocess/IMessageProducer.cs

+8-1
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ namespace NexusMods.DataModel.Interprocess;
44
/// A message producer for sending messages to other processes.
55
/// </summary>
66
/// <typeparam name="T">Message type to send, each message type gets its own queue</typeparam>
7-
public interface IMessageProducer<T> where T : IMessage
7+
public interface IMessageProducer<in T> where T : IMessage
88
{
99
/// <summary>
1010
/// Sends a message to the queue.
1111
/// </summary>
1212
/// <param name="message">The message to write.</param>
1313
/// <param name="token">Can be used to cancel this operation.</param>
1414
public ValueTask Write(T message, CancellationToken token);
15+
16+
/// <summary>
17+
/// Ensures all messages in-flight have been saved to the database.
18+
/// This is done using an <see cref="EventWaitHandle"/> and this call
19+
/// will block the entire thread.
20+
/// </summary>
21+
public void EnsureWrite(CancellationToken cancellationToken);
1522
}

src/NexusMods.DataModel/Interprocess/InterprocessProducer.cs

+20
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
using System.Diagnostics;
2+
using System.Reactive.Linq;
3+
14
namespace NexusMods.DataModel.Interprocess;
25

36
/// <summary>
@@ -48,4 +51,21 @@ private void WriteInner(T message)
4851
var used = message.Write(buffer);
4952
_sqliteIpc.Send(_queueName, buffer[..used]);
5053
}
54+
55+
/// <inheritdoc/>
56+
public void EnsureWrite(CancellationToken cancellationToken)
57+
{
58+
var timeout = SqliteIPC.WriterLoopInterval + SqliteIPC.SqliteDefaultTimeout;
59+
Thread.Sleep(timeout);
60+
61+
const int maxIterations = 10;
62+
var i = 0;
63+
64+
while (i < maxIterations && !cancellationToken.IsCancellationRequested)
65+
{
66+
var signaled = _sqliteIpc.WriterLoopFinished.WaitOne(timeout);
67+
if (signaled) break;
68+
i += 1;
69+
}
70+
}
5171
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace NexusMods.DataModel.Interprocess;
2+
3+
internal readonly struct SemaphoreSlimWaiter : IDisposable
4+
{
5+
private readonly SemaphoreSlim _semaphoreSlim;
6+
7+
public bool HasEntered { get; }
8+
9+
internal SemaphoreSlimWaiter(SemaphoreSlim semaphoreSlim, bool entered)
10+
{
11+
_semaphoreSlim = semaphoreSlim;
12+
HasEntered = entered;
13+
}
14+
15+
public void Dispose()
16+
{
17+
if (!HasEntered) return;
18+
_semaphoreSlim.Release();
19+
}
20+
}
21+
22+
internal static class SemaphoreExtensions
23+
{
24+
public static SemaphoreSlimWaiter CustomWait(
25+
this SemaphoreSlim semaphoreSlim,
26+
TimeSpan timeout,
27+
CancellationToken cancellationToken = default)
28+
{
29+
var entered = semaphoreSlim.Wait(timeout, cancellationToken);
30+
return new SemaphoreSlimWaiter(semaphoreSlim, entered);
31+
}
32+
}

0 commit comments

Comments
 (0)