Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a queue support ack #240

Merged
merged 1 commit into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/WeihanLi.Common/Event/AckQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;

namespace WeihanLi.Common.Event;

public sealed class AckQueue
{
private readonly ConcurrentQueue<IEvent> _queue = new();
private readonly ConcurrentDictionary<string, IEvent> _unackedMessages = new();
private readonly TimeSpan _ackTimeout = TimeSpan.FromMinutes(1);

public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null)
{
properties ??= new EventProperties();
if (string.IsNullOrEmpty(properties.EventId))
{
properties.EventId = Guid.NewGuid().ToString();
}
if (properties.EventAt == default)
{
properties.EventAt = DateTimeOffset.Now;
}

var internalEvent = new EventWrapper<TEvent>
{
Data = @event,
Properties = properties
};

_queue.Enqueue(internalEvent);
return Task.CompletedTask;
}

public Task<IEvent<TEvent>?> DequeueAsync<TEvent>()
{
if (_queue.TryDequeue(out var eventWrapper))
{
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}

return Task.FromResult<IEvent<TEvent>?>(null);
}

public Task AckMessageAsync(string eventId)
{
_unackedMessages.TryRemove(eventId, out _);
return Task.CompletedTask;
}

public async Task RequeueUnackedMessagesAsync()
{
foreach (var unackedMessage in _unackedMessages)
{
if (DateTimeOffset.Now - unackedMessage.Value.Properties.EventAt > _ackTimeout)
{
_unackedMessages.TryRemove(unackedMessage.Key, out var eventWrapper);
if (eventWrapper != null)
{
_queue.Enqueue(eventWrapper);
}
}
}

await Task.CompletedTask;
}

public async IAsyncEnumerable<IEvent> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
while (_queue.TryDequeue(out var eventWrapper))
{
_unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
yield return eventWrapper;
}

await Task.Delay(200, cancellationToken);
}
}
}
80 changes: 80 additions & 0 deletions test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Threading.Tasks;
using WeihanLi.Common.Event;
using Xunit;

namespace WeihanLi.Common.Test.EventsTest
{
public class AckQueueTest
{
private readonly AckQueue _ackQueue;

public AckQueueTest()
{
_ackQueue = new AckQueue();
}

[Fact]
public async Task EnqueueAsync_ShouldAddMessageToQueue()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);
Assert.Equal(testEvent.Message, dequeuedEvent?.Data.Message);
}

[Fact]
public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent1 = await _ackQueue.DequeueAsync<TestEvent>();
var dequeuedEvent2 = await _ackQueue.DequeueAsync<TestEvent>();

Assert.NotNull(dequeuedEvent1);
Assert.Equal(testEvent.Message, dequeuedEvent1?.Data.Message);
Assert.Null(dequeuedEvent2);
}

[Fact]
public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

await _ackQueue.AckMessageAsync(dequeuedEvent!.Properties.EventId);

var dequeuedEventAfterAck = await _ackQueue.DequeueAsync<TestEvent>();
Assert.Null(dequeuedEventAfterAck);
}

[Fact]
public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

// Simulate timeout
await Task.Delay(TimeSpan.FromMinutes(2));

await _ackQueue.RequeueUnackedMessagesAsync();

var requeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Assert.Equal(testEvent.Message, requeuedEvent?.Data.Message);
}

private class TestEvent
{
public string Message { get; set; }

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on windows-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check warning on line 77 in test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

View workflow job for this annotation

GitHub Actions / Running tests on windows-latest

Non-nullable property 'Message' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
}
}
}
Loading