-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
Introduce a queue support ack
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
using System.Collections.Concurrent; | ||
using System.Runtime.CompilerServices; | ||
|
||
namespace WeihanLi.Common.Event; | ||
|
||
public sealed class AckQueue | ||
{ | ||
private readonly ConcurrentQueue<IEvent> _queue = new(); | ||
private readonly ConcurrentDictionary<string, IEvent> _unackedMessages = new(); | ||
private readonly TimeSpan _ackTimeout = TimeSpan.FromMinutes(1); | ||
|
||
public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null) | ||
{ | ||
properties ??= new EventProperties(); | ||
if (string.IsNullOrEmpty(properties.EventId)) | ||
{ | ||
properties.EventId = Guid.NewGuid().ToString(); | ||
} | ||
if (properties.EventAt == default) | ||
{ | ||
properties.EventAt = DateTimeOffset.Now; | ||
} | ||
|
||
var internalEvent = new EventWrapper<TEvent> | ||
{ | ||
Data = @event, | ||
Properties = properties | ||
}; | ||
|
||
_queue.Enqueue(internalEvent); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>() | ||
{ | ||
if (_queue.TryDequeue(out var eventWrapper)) | ||
{ | ||
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); | ||
return Task.FromResult((IEvent<TEvent>?)eventWrapper); | ||
} | ||
|
||
return Task.FromResult<IEvent<TEvent>?>(null); | ||
} | ||
|
||
public Task AckMessageAsync(string eventId) | ||
{ | ||
_unackedMessages.TryRemove(eventId, out _); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public async Task RequeueUnackedMessagesAsync() | ||
{ | ||
foreach (var unackedMessage in _unackedMessages) | ||
{ | ||
if (DateTimeOffset.Now - unackedMessage.Value.Properties.EventAt > _ackTimeout) | ||
{ | ||
_unackedMessages.TryRemove(unackedMessage.Key, out var eventWrapper); | ||
if (eventWrapper != null) | ||
{ | ||
_queue.Enqueue(eventWrapper); | ||
} | ||
} | ||
} | ||
|
||
await Task.CompletedTask; | ||
} | ||
|
||
public async IAsyncEnumerable<IEvent> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) | ||
{ | ||
while (!cancellationToken.IsCancellationRequested) | ||
{ | ||
while (_queue.TryDequeue(out var eventWrapper)) | ||
{ | ||
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); | ||
yield return eventWrapper; | ||
} | ||
|
||
await Task.Delay(200, cancellationToken); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
using System.Threading.Tasks; | ||
using WeihanLi.Common.Event; | ||
using Xunit; | ||
|
||
namespace WeihanLi.Common.Test.EventsTest | ||
{ | ||
public class AckQueueTest | ||
{ | ||
private readonly AckQueue _ackQueue; | ||
|
||
public AckQueueTest() | ||
{ | ||
_ackQueue = new AckQueue(); | ||
} | ||
|
||
[Fact] | ||
public async Task EnqueueAsync_ShouldAddMessageToQueue() | ||
{ | ||
var testEvent = new TestEvent { Message = "Test Message" }; | ||
await _ackQueue.EnqueueAsync(testEvent); | ||
|
||
var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>(); | ||
Assert.NotNull(dequeuedEvent); | ||
Assert.Equal(testEvent.Message, dequeuedEvent?.Data.Message); | ||
} | ||
|
||
[Fact] | ||
public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval() | ||
{ | ||
var testEvent = new TestEvent { Message = "Test Message" }; | ||
await _ackQueue.EnqueueAsync(testEvent); | ||
|
||
var dequeuedEvent1 = await _ackQueue.DequeueAsync<TestEvent>(); | ||
var dequeuedEvent2 = await _ackQueue.DequeueAsync<TestEvent>(); | ||
|
||
Assert.NotNull(dequeuedEvent1); | ||
Assert.Equal(testEvent.Message, dequeuedEvent1?.Data.Message); | ||
Assert.Null(dequeuedEvent2); | ||
} | ||
|
||
[Fact] | ||
public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage() | ||
{ | ||
var testEvent = new TestEvent { Message = "Test Message" }; | ||
await _ackQueue.EnqueueAsync(testEvent); | ||
|
||
var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>(); | ||
Assert.NotNull(dequeuedEvent); | ||
|
||
await _ackQueue.AckMessageAsync(dequeuedEvent!.Properties.EventId); | ||
|
||
var dequeuedEventAfterAck = await _ackQueue.DequeueAsync<TestEvent>(); | ||
Assert.Null(dequeuedEventAfterAck); | ||
} | ||
|
||
[Fact] | ||
public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterTimeout() | ||
{ | ||
var testEvent = new TestEvent { Message = "Test Message" }; | ||
await _ackQueue.EnqueueAsync(testEvent); | ||
|
||
var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>(); | ||
Assert.NotNull(dequeuedEvent); | ||
|
||
// Simulate timeout | ||
await Task.Delay(TimeSpan.FromMinutes(2)); | ||
|
||
await _ackQueue.RequeueUnackedMessagesAsync(); | ||
|
||
var requeuedEvent = await _ackQueue.DequeueAsync<TestEvent>(); | ||
Assert.NotNull(requeuedEvent); | ||
Assert.Equal(testEvent.Message, requeuedEvent?.Data.Message); | ||
} | ||
|
||
private class TestEvent | ||
{ | ||
public string Message { get; set; } | ||
Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
|
||
} | ||
} | ||
} |