Skip to content

Commit

Permalink
refactor: update AckQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
WeihanLi committed Jan 4, 2025
1 parent e849c5e commit ba14da8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 23 deletions.
56 changes: 42 additions & 14 deletions src/WeihanLi.Common/Event/AckQueue.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using WeihanLi.Common.Helpers;

namespace WeihanLi.Common.Event;

public sealed class AckQueue
public sealed class AckQueueOptions
{
public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1);

public bool AutoRequeue { get; set; }

public TimeSpan Requeue { get; set; }
}

public sealed class AckQueue : DisposableBase
{
private readonly AckQueueOptions _options;
private readonly ConcurrentQueue<IEvent> _queue = new();
private readonly ConcurrentDictionary<string, IEvent> _unackedMessages = new();
private readonly TimeSpan _ackTimeout = TimeSpan.FromMinutes(1);
private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new();
private readonly Timer? _timer;

public AckQueue() : this(new()) { }

public AckQueue(AckQueueOptions options)
{
_options = options;
if (options.AutoRequeue)
{
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.Requeue, options.Requeue);
}
}

public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null)
{
Expand All @@ -16,6 +38,7 @@ public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = nu
{
properties.EventId = Guid.NewGuid().ToString();
}

if (properties.EventAt == default)
{
properties.EventAt = DateTimeOffset.Now;
Expand All @@ -35,7 +58,7 @@ public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = nu
{
if (_queue.TryDequeue(out var eventWrapper))
{
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}

Expand All @@ -44,38 +67,43 @@ public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = nu

public Task AckMessageAsync(string eventId)
{
_unackedMessages.TryRemove(eventId, out _);
_unAckedMessages.TryRemove(eventId, out _);
return Task.CompletedTask;
}

public async Task RequeueUnackedMessagesAsync()
public void RequeueUnAckedMessages()
{
foreach (var unackedMessage in _unackedMessages)
foreach (var message in _unAckedMessages)
{
if (DateTimeOffset.Now - unackedMessage.Value.Properties.EventAt > _ackTimeout)
if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
{
_unackedMessages.TryRemove(unackedMessage.Key, out var eventWrapper);
if (eventWrapper != null)
if (_unAckedMessages.TryRemove(message.Key, out var eventWrapper)
&& eventWrapper != null)
{
_queue.Enqueue(eventWrapper);
}
}
}

await Task.CompletedTask;
}

public async IAsyncEnumerable<IEvent> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IEvent> ReadAllAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
while (_queue.TryDequeue(out var eventWrapper))
{
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
yield return eventWrapper;
}

await Task.Delay(200, cancellationToken);
}
}

protected override void Dispose(bool disposing)
{
_timer?.Dispose();
base.Dispose(disposing);
}
}
12 changes: 3 additions & 9 deletions test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System.Threading.Tasks;
using WeihanLi.Common.Event;
using Xunit;

namespace WeihanLi.Common.Test.EventsTest
{
public class AckQueueTest
{
private readonly AckQueue _ackQueue;

public AckQueueTest()
{
_ackQueue = new AckQueue();
}
private readonly AckQueue _ackQueue = new();

[Fact]
public async Task EnqueueAsync_ShouldAddMessageToQueue()
Expand Down Expand Up @@ -54,7 +48,7 @@ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage()
}

[Fact]
public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterTimeout()
public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);
Expand All @@ -65,7 +59,7 @@ public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterT
// Simulate timeout
await Task.Delay(TimeSpan.FromMinutes(2));

await _ackQueue.RequeueUnackedMessagesAsync();
_ackQueue.RequeueUnAckedMessages();

var requeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Expand Down

0 comments on commit ba14da8

Please sign in to comment.