diff --git a/Directory.Packages.props b/Directory.Packages.props
index 54063c72..a4d07b3f 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -2,6 +2,13 @@
true
+
+ true
+
+ true
+ all
+
+ NU1901;NU1902;NU1903;NU1904
diff --git a/README.md b/README.md
index 2c8e1687..d454441c 100644
--- a/README.md
+++ b/README.md
@@ -1,11 +1,11 @@
# WeihanLi.Common
-## Build status
-
[![WeihanLi.Common Latest Stable](https://img.shields.io/nuget/v/WeihanLi.Common.svg)](https://www.nuget.org/packages/WeihanLi.Common/)
[![WeihanLi.Common Latest Preview](https://img.shields.io/nuget/vpre/WeihanLi.Common)](https://www.nuget.org/packages/WeihanLi.Common/absoluteLatest)
+## Build status
+
[![Azure Pipelines Build Status](https://weihanli.visualstudio.com/Pipelines/_apis/build/status/WeihanLi.WeihanLi.Common?branchName=master)](https://weihanli.visualstudio.com/Pipelines/_build/latest?definitionId=16&branchName=master)
[![Github Actions Build Status](https://github.com/WeihanLi/WeihanLi.Common/actions/workflows/default.yml/badge.svg)](https://github.com/WeihanLi/WeihanLi.Common/actions/workflows/default.yml)
diff --git a/build/version.props b/build/version.props
index c4bfd37d..cf2d6df7 100644
--- a/build/version.props
+++ b/build/version.props
@@ -2,7 +2,7 @@
1
0
- 73
+ 74
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs
index d244e9d7..d71e34f6 100644
--- a/samples/AspNetCoreSample/Events/EventConsumer.cs
+++ b/samples/AspNetCoreSample/Events/EventConsumer.cs
@@ -1,4 +1,5 @@
-using WeihanLi.Common.Event;
+using WeihanLi.Common;
+using WeihanLi.Common.Event;
using WeihanLi.Extensions;
namespace AspNetCoreSample.Events;
@@ -7,25 +8,24 @@ public class EventConsumer
(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory)
: BackgroundService
{
- private readonly IEventQueue _eventQueue = eventQueue;
- private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory;
-
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
- var queues = await _eventQueue.GetQueuesAsync();
+ var queues = await eventQueue.GetQueuesAsync();
if (queues.Count > 0)
{
await queues.Select(async q =>
{
- if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties))
+ await foreach (var e in eventQueue.ReadAllAsync(q, stoppingToken))
{
- var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());
+ var @event = e.Data;
+ Guard.NotNull(@event);
+ var handlers = eventHandlerFactory.GetHandlers(@event.GetType());
if (handlers.Count > 0)
{
await handlers
- .Select(h => h.Handle(@event, properties))
+ .Select(h => h.Handle(@event, e.Properties))
.WhenAll()
;
}
diff --git a/samples/DotNetCoreSample/LoggerTest.cs b/samples/DotNetCoreSample/LoggerTest.cs
index 084db6bb..38e46b85 100644
--- a/samples/DotNetCoreSample/LoggerTest.cs
+++ b/samples/DotNetCoreSample/LoggerTest.cs
@@ -37,10 +37,24 @@ public static void MicrosoftLoggingTest()
var services = new ServiceCollection()
.AddLogging(builder =>
// builder.AddConsole()
- builder.AddFile()
+ builder.AddFile(options => options.LogFormatter = (category, level, exception, msg, timestamp) =>
+ $"{timestamp} - [{category}] {level} - {msg}\n{exception}")
)
.AddSingleton(typeof(GenericTest<>))
.BuildServiceProvider();
+
+ var logger = services.GetRequiredService()
+ .CreateLogger("test");
+ while (!ApplicationHelper.ExitToken.IsCancellationRequested)
+ {
+ logger.LogInformation("Echo time: {Time}", DateTimeOffset.Now);
+ Thread.Sleep(500);
+ }
+
+ ConsoleHelper.ReadKeyWithPrompt();
+ services.GetRequiredService()
+ .CreateLogger("test")
+ .LogInformation("test 123");
services.GetRequiredService>()
.Test();
services.GetRequiredService>()
@@ -60,8 +74,6 @@ public static void MicrosoftLoggingTest()
private class GenericTest(ILogger> logger)
{
- private readonly ILogger> _logger = logger;
-
- public void Test() => _logger.LogInformation("test");
+ public void Test() => logger.LogInformation("test");
}
}
diff --git a/samples/DotNetCoreSample/Program.cs b/samples/DotNetCoreSample/Program.cs
index fb24ff44..10b8ba91 100644
--- a/samples/DotNetCoreSample/Program.cs
+++ b/samples/DotNetCoreSample/Program.cs
@@ -347,8 +347,8 @@
// InvokeHelper.TryInvoke(() => throw null, 3);
-// InvokeHelper.TryInvoke(LoggerTest.MicrosoftLoggingTest);
-await InvokeHelper.TryInvokeAsync(InMemoryStreamTest.MainTest);
+InvokeHelper.TryInvoke(LoggerTest.MicrosoftLoggingTest);
+// await InvokeHelper.TryInvokeAsync(InMemoryStreamTest.MainTest);
ConsoleHelper.ReadKeyWithPrompt("Press any key to exit");
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 5ca21a36..25c5caeb 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -25,6 +25,7 @@
MIT
preview
$(NoWarn);CS9216;
+ direct
diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs
new file mode 100644
index 00000000..4aa96624
--- /dev/null
+++ b/src/WeihanLi.Common/Event/AckQueue.cs
@@ -0,0 +1,109 @@
+using System.Collections.Concurrent;
+using System.Runtime.CompilerServices;
+using WeihanLi.Common.Helpers;
+
+namespace WeihanLi.Common.Event;
+
+public sealed class AckQueueOptions
+{
+ public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1);
+
+ public bool AutoRequeue { get; set; }
+
+ public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1);
+}
+
+public sealed class AckQueue : DisposableBase
+{
+ private readonly AckQueueOptions _options;
+ private readonly ConcurrentQueue _queue = new();
+ private readonly ConcurrentDictionary _unAckedMessages = new();
+ private readonly Timer? _timer;
+
+ public AckQueue() : this(new()) { }
+
+ public AckQueue(AckQueueOptions options)
+ {
+ _options = options;
+ if (options.AutoRequeue)
+ {
+ _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod);
+ }
+ }
+
+ public Task EnqueueAsync(TEvent @event, EventProperties? properties = null)
+ {
+ properties ??= new EventProperties();
+ if (string.IsNullOrEmpty(properties.EventId))
+ {
+ properties.EventId = Guid.NewGuid().ToString();
+ }
+
+ if (properties.EventAt == default)
+ {
+ properties.EventAt = DateTimeOffset.Now;
+ }
+
+ var internalEvent = new EventWrapper
+ {
+ Data = @event,
+ Properties = properties
+ };
+
+ _queue.Enqueue(internalEvent);
+ return Task.CompletedTask;
+ }
+
+ public Task?> DequeueAsync()
+ {
+ if (_queue.TryDequeue(out var eventWrapper))
+ {
+ _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
+ return Task.FromResult((IEvent?)eventWrapper);
+ }
+
+ return Task.FromResult?>(null);
+ }
+
+ public Task AckMessageAsync(string eventId)
+ {
+ _unAckedMessages.TryRemove(eventId, out _);
+ return Task.CompletedTask;
+ }
+
+ public void RequeueUnAckedMessages()
+ {
+ foreach (var message in _unAckedMessages)
+ {
+ if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
+ {
+ if (_unAckedMessages.TryRemove(message.Key, out var eventWrapper)
+ && eventWrapper != null)
+ {
+ _queue.Enqueue(eventWrapper);
+ }
+ }
+ }
+ }
+
+ public async IAsyncEnumerable ReadAllAsync(
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ while (_queue.TryDequeue(out var eventWrapper))
+ {
+ _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
+ yield return eventWrapper;
+ }
+
+ await Task.Delay(200, cancellationToken);
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ _timer?.Dispose();
+ base.Dispose(disposing);
+ }
+}
diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs
index 1159f3d6..09ba4a74 100644
--- a/src/WeihanLi.Common/Event/EventBase.cs
+++ b/src/WeihanLi.Common/Event/EventBase.cs
@@ -64,29 +64,65 @@ public interface IEvent
T Data { get; }
}
-internal sealed class EventWrapper : IEvent, IEvent
+public class EventWrapper : IEvent, IEvent
{
public required T Data { get; init; }
- object? IEvent.Data => (object?)Data;
+ object? IEvent.Data => Data;
public required EventProperties Properties { get; init; }
}
-public static class EventBaseExtensions
+public static class EventExtensions
{
- private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension.SerializerSettingsWith(s =>
- {
- s.TypeNameHandling = TypeNameHandling.Objects;
- });
+ private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension
+ .SerializerSettingsWith(s =>
+ {
+ s.NullValueHandling = NullValueHandling.Ignore;
+ s.TypeNameHandling = TypeNameHandling.Objects;
+ });
- public static string ToEventMsg(this TEvent @event) where TEvent : class, IEventBase
+ public static string ToEventMsg(this TEvent @event)
+ {
+ Guard.NotNull(@event);
+ return GetEvent(@event).ToJson(EventSerializerSettings);
+ }
+
+ public static string ToEventRawMsg(this TEvent @event)
{
Guard.NotNull(@event);
return @event.ToJson(EventSerializerSettings);
}
- public static IEventBase ToEvent(this string eventMsg)
+ private static IEvent GetEvent(this TEvent @event)
+ {
+ if (@event is IEvent eventEvent)
+ return eventEvent;
+
+ if (@event is IEventBase eventBase)
+ return new EventWrapper()
+ {
+ Data = @event,
+ Properties = new()
+ {
+ EventAt = eventBase.EventAt,
+ EventId = eventBase.EventId
+ }
+ };
+
+ return new EventWrapper
+ {
+ Data = @event,
+ Properties = new EventProperties
+ {
+ EventAt = DateTimeOffset.Now
+ }
+ };
+ }
+
+ public static TEvent ToEvent(this string eventMsg)
{
Guard.NotNull(eventMsg);
- return eventMsg.JsonToObject(EventSerializerSettings);
+ return eventMsg.JsonToObject(EventSerializerSettings);
}
+
+ public static IEvent ToEvent(this string eventMsg) => ToEvent(eventMsg);
}
diff --git a/src/WeihanLi.Common/Event/EventHandlerFactory.cs b/src/WeihanLi.Common/Event/EventHandlerFactory.cs
index 6c4e4cfd..229b8eb7 100644
--- a/src/WeihanLi.Common/Event/EventHandlerFactory.cs
+++ b/src/WeihanLi.Common/Event/EventHandlerFactory.cs
@@ -7,12 +7,10 @@ namespace WeihanLi.Common.Event;
public sealed class DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager) : IEventHandlerFactory
{
- private readonly IEventSubscriptionManager _subscriptionManager = subscriptionManager;
-
[RequiresUnreferencedCode("Unreferenced code may be used")]
public ICollection GetHandlers(Type eventType)
{
- var eventHandlers = _subscriptionManager.GetEventHandlers(eventType);
+ var eventHandlers = subscriptionManager.GetEventHandlers(eventType);
return eventHandlers;
}
}
diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs
index cca4019e..c5f22e9a 100644
--- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs
+++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs
@@ -3,21 +3,26 @@
using System.Collections.Concurrent;
using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
+#if NET
+using System.Threading.Channels;
+#endif
+
namespace WeihanLi.Common.Event;
public sealed class EventQueueInMemory : IEventQueue
{
+#if NET
+ private readonly ConcurrentDictionary> _eventQueues = new();
+#else
private readonly ConcurrentDictionary> _eventQueues = new();
-
+#endif
public ICollection GetQueues() => _eventQueues.Keys;
public Task> GetQueuesAsync() => Task.FromResult(GetQueues());
-
- public Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null)
+ public async Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null)
{
properties ??= new();
if (string.IsNullOrEmpty(properties.EventId))
@@ -37,46 +42,55 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro
Data = @event,
Properties = properties
};
+#if NET
+ var queue = _eventQueues.GetOrAdd(queueName, _ => Channel.CreateUnbounded());
+ await queue.Writer.WriteAsync(internalEvent);
+#else
var queue = _eventQueues.GetOrAdd(queueName, _ => new ConcurrentQueue());
queue.Enqueue(internalEvent);
- return Task.FromResult(true);
+ await Task.CompletedTask;
+#endif
+ return true;
}
- public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties)
+ public Task?> DequeueAsync(string queueName)
{
- @event = default;
- properties = default;
-
if (_eventQueues.TryGetValue(queueName, out var queue))
{
+#if NET
+ if (queue.Reader.TryRead(out var eventWrapper))
+#else
if (queue.TryDequeue(out var eventWrapper))
+#endif
{
- @event = eventWrapper.Data;
- properties = eventWrapper.Properties;
- return Task.FromResult(true);
+ return Task.FromResult((IEvent?)eventWrapper);
}
}
- return Task.FromResult(false);
+ return Task.FromResult?>(null);
}
- internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public async IAsyncEnumerable ReadAllAsync(string queueName,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
+#if NET
+ await foreach (var @event in queue.Reader.ReadAllAsync(cancellationToken))
+ {
+ yield return @event;
+ }
+#else
while (queue.TryDequeue(out var eventWrapper))
{
- yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties);
+ yield return eventWrapper;
}
+#endif
}
- await Task.Delay(100);
+
+ await Task.Delay(200, cancellationToken);
}
}
-
- public bool TryRemoveQueue(string queueName)
- {
- return _eventQueues.TryRemove(queueName, out _);
- }
}
diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs
index 7d09ef3b..7399e1da 100644
--- a/src/WeihanLi.Common/Event/IEventQueue.cs
+++ b/src/WeihanLi.Common/Event/IEventQueue.cs
@@ -1,19 +1,16 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.
-using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
namespace WeihanLi.Common.Event;
public interface IEventQueue
{
Task> GetQueuesAsync();
-
Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null);
-
- Task TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties);
-
- // IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents(string queueName, CancellationToken cancellationToken = default);
+ Task?> DequeueAsync(string queueName);
+ IAsyncEnumerable ReadAllAsync(string queueName, CancellationToken cancellationToken = default);
}
public static class EventQueueExtensions
@@ -23,6 +20,19 @@ public static class EventQueueExtensions
public static Task EnqueueAsync(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null)
where TEvent : class
{
- return eventQueue.EnqueueAsync(DefaultQueueName, @event);
+ return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
+ }
+
+ public static async IAsyncEnumerable> ReadEventsAsync(
+ this IEventQueue eventQueue,
+ string queueName,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default
+ )
+ {
+ await foreach (var @event in eventQueue.ReadAllAsync(queueName, cancellationToken))
+ {
+ if(@event is IEvent eventEvent)
+ yield return eventEvent;
+ }
}
}
diff --git a/src/WeihanLi.Common/Helpers/CommandLineParser.cs b/src/WeihanLi.Common/Helpers/CommandLineParser.cs
index c8c2dd6b..dd73de60 100644
--- a/src/WeihanLi.Common/Helpers/CommandLineParser.cs
+++ b/src/WeihanLi.Common/Helpers/CommandLineParser.cs
@@ -128,16 +128,37 @@ public static IEnumerable ParseLine(string line, LineParseOptions? optio
return GetValueInternal(args, optionName) ?? defaultValue;
}
+ ///
+ /// Get boolean argument value from arguments
+ ///
+ /// argument name to get value
+ /// arguments
+ /// default argument value when not found
+ /// Boolean value
public static bool BooleanVal(string optionName, string[]? args = null, bool defaultValue = default)
{
return GetValueInternal(args ?? Environment.GetCommandLineArgs(), optionName).ToBoolean(defaultValue);
}
- public static bool BooleanVal(string optionName, bool defaultValue = default, string[]? args = null)
+ ///
+ /// Get boolean argument value from arguments
+ ///
+ /// argument name to get value
+ /// default argument value when not found
+ /// arguments
+ /// Boolean value
+ public static bool BooleanVal(string optionName, bool defaultValue, string[]? args = null)
{
return GetValueInternal(args ?? Environment.GetCommandLineArgs(), optionName).ToBoolean(defaultValue);
}
+ ///
+ /// Get boolean argument value from arguments
+ ///
+ /// arguments
+ /// argument name to get value
+ /// default argument value when not found
+ /// Boolean value
public static bool BooleanVal(string[] args, string optionName, bool defaultValue = default)
{
return GetValueInternal(args, optionName).ToBoolean(defaultValue);
diff --git a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs
index 43a6f84e..720817cc 100644
--- a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs
+++ b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs
@@ -108,10 +108,7 @@ private void WriteLoggingEvent(string log, DateTimeOffset timestamp)
Guard.NotNull(_fileStream);
var bytes = Encoding.UTF8.GetBytes(log);
_fileStream.Write(bytes, 0, bytes.Length);
- if (SecurityHelper.Random.CoinToss())
- {
- _fileStream.Flush();
- }
+ _fileStream.Flush();
}
catch (Exception ex)
{
diff --git a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs
index 8cfe4028..01d5d9b4 100644
--- a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs
+++ b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs
@@ -99,18 +99,6 @@ public async Task ClassTest()
await handTask;
}
- [Fact]
- public async Task GenericMethodTest()
- {
- var publisher = _serviceProvider.ResolveRequiredService();
- Assert.NotNull(publisher);
- var publisherType = publisher.GetType();
- Assert.True(publisherType.IsSealed);
- Assert.True(publisherType.Assembly.IsDynamic);
-
- await publisher.PublishAsync(new TestEvent());
- }
-
// not supported, will not intercept
[Fact]
public void OpenGenericTypeTest()
diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
new file mode 100644
index 00000000..6ff8f145
--- /dev/null
+++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
@@ -0,0 +1,105 @@
+using WeihanLi.Common.Event;
+using Xunit;
+
+namespace WeihanLi.Common.Test.EventsTest
+{
+ public class AckQueueTest
+ {
+ private readonly AckQueue _ackQueue = new(new()
+ {
+ AutoRequeue = false
+ });
+
+ [Fact]
+ public async Task EnqueueAsync_ShouldAddMessageToQueue()
+ {
+ var testEvent = new TestEvent { Message = "Test Message" };
+ await _ackQueue.EnqueueAsync(testEvent);
+
+ var dequeuedEvent = await _ackQueue.DequeueAsync();
+ Assert.NotNull(dequeuedEvent);
+ Assert.Equal(testEvent.Message, dequeuedEvent.Data.Message);
+ }
+
+ [Fact]
+ public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval()
+ {
+ var testEvent = new TestEvent { Message = "Test Message" };
+ await _ackQueue.EnqueueAsync(testEvent);
+
+ var dequeuedEvent1 = await _ackQueue.DequeueAsync();
+ var dequeuedEvent2 = await _ackQueue.DequeueAsync();
+
+ Assert.NotNull(dequeuedEvent1);
+ Assert.Equal(testEvent.Message, dequeuedEvent1.Data.Message);
+ Assert.Null(dequeuedEvent2);
+ }
+
+ [Fact]
+ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage()
+ {
+ var testEvent = new TestEvent { Message = "Test Message" };
+ await _ackQueue.EnqueueAsync(testEvent);
+
+ var dequeuedEvent = await _ackQueue.DequeueAsync();
+ Assert.NotNull(dequeuedEvent);
+
+ await _ackQueue.AckMessageAsync(dequeuedEvent.Properties.EventId);
+
+ var dequeuedEventAfterAck = await _ackQueue.DequeueAsync();
+ Assert.Null(dequeuedEventAfterAck);
+ }
+
+ [Fact]
+ public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
+ {
+ var testEvent = new TestEvent { Message = "Test Message" };
+ var ackQueue = new AckQueue(new()
+ {
+ AutoRequeue = false,
+ AckTimeout = TimeSpan.FromSeconds(3)
+ });
+ await ackQueue.EnqueueAsync(testEvent);
+
+ var dequeuedEvent = await ackQueue.DequeueAsync();
+ Assert.NotNull(dequeuedEvent);
+
+ // Simulate timeout
+ await Task.Delay(TimeSpan.FromSeconds(5));
+
+ ackQueue.RequeueUnAckedMessages();
+
+ var requeuedEvent = await ackQueue.DequeueAsync();
+ Assert.NotNull(requeuedEvent);
+ Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
+ }
+
+ [Fact]
+ public async Task AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
+ {
+ var testEvent = new TestEvent { Message = "Test Message" };
+ await using var ackQueue = new AckQueue(new()
+ {
+ AutoRequeue = true,
+ AckTimeout = TimeSpan.FromSeconds(3),
+ RequeuePeriod = TimeSpan.FromSeconds(2)
+ });
+ await ackQueue.EnqueueAsync(testEvent);
+
+ var dequeuedEvent = await ackQueue.DequeueAsync();
+ Assert.NotNull(dequeuedEvent);
+
+ // Simulate timeout
+ await Task.Delay(TimeSpan.FromSeconds(5));
+
+ var requeuedEvent = await ackQueue.DequeueAsync();
+ Assert.NotNull(requeuedEvent);
+ Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
+ }
+
+ private class TestEvent
+ {
+ public string Message { get; set; }
+ }
+ }
+}
diff --git a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs
index cd815875..9a686a3d 100644
--- a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs
+++ b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs
@@ -43,8 +43,8 @@ public void EventMessageExtensionsTest()
{
Name = "1213"
};
- var eventMsg = testEvent.ToEventMsg();
- var eventFromMsg = eventMsg.ToEvent();
+ var eventMsg = testEvent.ToEventRawMsg();
+ var eventFromMsg = eventMsg.ToEvent();
Assert.Equal(typeof(TestEvent), eventFromMsg.GetType());
var deserializedEvent = eventFromMsg as TestEvent;
@@ -53,4 +53,24 @@ public void EventMessageExtensionsTest()
Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
Assert.Equal(testEvent.Name, deserializedEvent.Name);
}
+
+ [Fact]
+ public void EventMessageExtensions2Test()
+ {
+ var testEvent = new TestEvent()
+ {
+ Name = "1213"
+ };
+ var eventMsg = testEvent.ToEventMsg();
+ var eventFromMsg = eventMsg.ToEvent>();
+ Assert.Equal(typeof(EventWrapper), eventFromMsg.GetType());
+
+ var deserializedEvent = eventFromMsg.Data;
+ Assert.NotNull(deserializedEvent);
+ Assert.Equal(testEvent.EventId, deserializedEvent.EventId);
+ Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
+ Assert.Equal(testEvent.Name, deserializedEvent.Name);
+ Assert.Equal(testEvent.EventId, eventFromMsg.Properties.EventId);
+ Assert.Equal(testEvent.EventAt, eventFromMsg.Properties.EventAt);
+ }
}