From ceefc5c3137ad7c452db8f6f03d0af30ff1d897c Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Mon, 16 Dec 2024 14:32:57 +0800 Subject: [PATCH 01/23] refactor: add comments and avoid conflict --- .../Helpers/CommandLineParser.cs | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/WeihanLi.Common/Helpers/CommandLineParser.cs b/src/WeihanLi.Common/Helpers/CommandLineParser.cs index c8c2dd6b..dd73de60 100644 --- a/src/WeihanLi.Common/Helpers/CommandLineParser.cs +++ b/src/WeihanLi.Common/Helpers/CommandLineParser.cs @@ -128,16 +128,37 @@ public static IEnumerable ParseLine(string line, LineParseOptions? optio return GetValueInternal(args, optionName) ?? defaultValue; } + /// + /// Get boolean argument value from arguments + /// + /// argument name to get value + /// arguments + /// default argument value when not found + /// Boolean value public static bool BooleanVal(string optionName, string[]? args = null, bool defaultValue = default) { return GetValueInternal(args ?? Environment.GetCommandLineArgs(), optionName).ToBoolean(defaultValue); } - public static bool BooleanVal(string optionName, bool defaultValue = default, string[]? args = null) + /// + /// Get boolean argument value from arguments + /// + /// argument name to get value + /// default argument value when not found + /// arguments + /// Boolean value + public static bool BooleanVal(string optionName, bool defaultValue, string[]? args = null) { return GetValueInternal(args ?? Environment.GetCommandLineArgs(), optionName).ToBoolean(defaultValue); } + /// + /// Get boolean argument value from arguments + /// + /// arguments + /// argument name to get value + /// default argument value when not found + /// Boolean value public static bool BooleanVal(string[] args, string optionName, bool defaultValue = default) { return GetValueInternal(args, optionName).ToBoolean(defaultValue); From 9529873d4d58ea2c0f8482bc64febe83c9a3f4f6 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Mon, 16 Dec 2024 14:33:23 +0800 Subject: [PATCH 02/23] feat: bump package version --- build/version.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/version.props b/build/version.props index c4bfd37d..cf2d6df7 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 1 0 - 73 + 74 $(VersionMajor).$(VersionMinor).$(VersionPatch) From 7caff2f92673bc29539e3e28784e7549586591d9 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Thu, 19 Dec 2024 08:37:40 +0800 Subject: [PATCH 03/23] refactor: use StreamWriter instead of FileStream --- src/WeihanLi.Common/Logging/FileLoggingProcessor.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs index 43a6f84e..03113440 100644 --- a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs +++ b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs @@ -15,7 +15,7 @@ internal sealed class FileLoggingProcessor : DisposableBase private readonly BlockingCollection<(string log, DateTimeOffset timestamp)> _messageQueue = []; private readonly Thread _outputThread; - private FileStream? _fileStream; + private StreamWriter? _fileStream; private string? _logFileName; public FileLoggingProcessor(FileLoggingOptions options) @@ -96,7 +96,7 @@ private void WriteLoggingEvent(string log, DateTimeOffset timestamp) if (_logFileName != previousFileName) { // file name changed - var fs = File.OpenWrite(fileInfo.FullName); + var fs = File.CreateText(fileInfo.FullName); var originalWriter = Interlocked.Exchange(ref _fileStream, fs); if (originalWriter is not null) { @@ -106,12 +106,8 @@ private void WriteLoggingEvent(string log, DateTimeOffset timestamp) } Guard.NotNull(_fileStream); - var bytes = Encoding.UTF8.GetBytes(log); - _fileStream.Write(bytes, 0, bytes.Length); - if (SecurityHelper.Random.CoinToss()) - { - _fileStream.Flush(); - } + _fileStream.Write(log); + _fileStream.Flush(); } catch (Exception ex) { From 581418b10fa432378a5ba88a025d6d13d867a943 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Thu, 19 Dec 2024 08:39:18 +0800 Subject: [PATCH 04/23] revert: revert to FileStream since StreamWriter would override instead of append if exists --- src/WeihanLi.Common/Logging/FileLoggingProcessor.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs index 03113440..720817cc 100644 --- a/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs +++ b/src/WeihanLi.Common/Logging/FileLoggingProcessor.cs @@ -15,7 +15,7 @@ internal sealed class FileLoggingProcessor : DisposableBase private readonly BlockingCollection<(string log, DateTimeOffset timestamp)> _messageQueue = []; private readonly Thread _outputThread; - private StreamWriter? _fileStream; + private FileStream? _fileStream; private string? _logFileName; public FileLoggingProcessor(FileLoggingOptions options) @@ -96,7 +96,7 @@ private void WriteLoggingEvent(string log, DateTimeOffset timestamp) if (_logFileName != previousFileName) { // file name changed - var fs = File.CreateText(fileInfo.FullName); + var fs = File.OpenWrite(fileInfo.FullName); var originalWriter = Interlocked.Exchange(ref _fileStream, fs); if (originalWriter is not null) { @@ -106,7 +106,8 @@ private void WriteLoggingEvent(string log, DateTimeOffset timestamp) } Guard.NotNull(_fileStream); - _fileStream.Write(log); + var bytes = Encoding.UTF8.GetBytes(log); + _fileStream.Write(bytes, 0, bytes.Length); _fileStream.Flush(); } catch (Exception ex) From 335fbcb06b6b7fcd8db9c663ef30ac9bfe2aa241 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Wed, 25 Dec 2024 21:20:27 +0800 Subject: [PATCH 05/23] sample: update logger test sample --- samples/DotNetCoreSample/LoggerTest.cs | 20 ++++++++++++++++---- samples/DotNetCoreSample/Program.cs | 4 ++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/samples/DotNetCoreSample/LoggerTest.cs b/samples/DotNetCoreSample/LoggerTest.cs index 084db6bb..38e46b85 100644 --- a/samples/DotNetCoreSample/LoggerTest.cs +++ b/samples/DotNetCoreSample/LoggerTest.cs @@ -37,10 +37,24 @@ public static void MicrosoftLoggingTest() var services = new ServiceCollection() .AddLogging(builder => // builder.AddConsole() - builder.AddFile() + builder.AddFile(options => options.LogFormatter = (category, level, exception, msg, timestamp) => + $"{timestamp} - [{category}] {level} - {msg}\n{exception}") ) .AddSingleton(typeof(GenericTest<>)) .BuildServiceProvider(); + + var logger = services.GetRequiredService() + .CreateLogger("test"); + while (!ApplicationHelper.ExitToken.IsCancellationRequested) + { + logger.LogInformation("Echo time: {Time}", DateTimeOffset.Now); + Thread.Sleep(500); + } + + ConsoleHelper.ReadKeyWithPrompt(); + services.GetRequiredService() + .CreateLogger("test") + .LogInformation("test 123"); services.GetRequiredService>() .Test(); services.GetRequiredService>() @@ -60,8 +74,6 @@ public static void MicrosoftLoggingTest() private class GenericTest(ILogger> logger) { - private readonly ILogger> _logger = logger; - - public void Test() => _logger.LogInformation("test"); + public void Test() => logger.LogInformation("test"); } } diff --git a/samples/DotNetCoreSample/Program.cs b/samples/DotNetCoreSample/Program.cs index fb24ff44..10b8ba91 100644 --- a/samples/DotNetCoreSample/Program.cs +++ b/samples/DotNetCoreSample/Program.cs @@ -347,8 +347,8 @@ // InvokeHelper.TryInvoke(() => throw null, 3); -// InvokeHelper.TryInvoke(LoggerTest.MicrosoftLoggingTest); -await InvokeHelper.TryInvokeAsync(InMemoryStreamTest.MainTest); +InvokeHelper.TryInvoke(LoggerTest.MicrosoftLoggingTest); +// await InvokeHelper.TryInvokeAsync(InMemoryStreamTest.MainTest); ConsoleHelper.ReadKeyWithPrompt("Press any key to exit"); From 7185a473a36253b1e8c3316b9d0869ae758d1e25 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Wed, 25 Dec 2024 21:21:46 +0800 Subject: [PATCH 06/23] feat: configure nuget audit --- Directory.Packages.props | 7 +++++++ src/Directory.Build.props | 1 + 2 files changed, 8 insertions(+) diff --git a/Directory.Packages.props b/Directory.Packages.props index 54063c72..a4d07b3f 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -2,6 +2,13 @@ true + + true + + true + all + + NU1901;NU1902;NU1903;NU1904 diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 5ca21a36..25c5caeb 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -25,6 +25,7 @@ MIT preview $(NoWarn);CS9216; + direct From 981d0e404fc821941c90fad389b3226fe934788a Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Fri, 3 Jan 2025 22:48:35 +0800 Subject: [PATCH 07/23] feat: public EventWrapper --- src/WeihanLi.Common/Event/EventBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 1159f3d6..4db23817 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -64,7 +64,7 @@ public interface IEvent T Data { get; } } -internal sealed class EventWrapper : IEvent, IEvent +public sealed class EventWrapper : IEvent, IEvent { public required T Data { get; init; } object? IEvent.Data => (object?)Data; From fa3b184132260bea0add1405d52179ffd1a38e3e Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Fri, 3 Jan 2025 22:51:36 +0800 Subject: [PATCH 08/23] refactor: unsealed EventWrapper --- src/WeihanLi.Common/Event/EventBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 4db23817..1d9fb5ce 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -64,7 +64,7 @@ public interface IEvent T Data { get; } } -public sealed class EventWrapper : IEvent, IEvent +public class EventWrapper : IEvent, IEvent { public required T Data { get; init; } object? IEvent.Data => (object?)Data; From 6f2a52108d04f802c204f504a95ee93d3581ab85 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Fri, 3 Jan 2025 23:01:26 +0800 Subject: [PATCH 09/23] refactor: remove type constraint for ToEventMsg --- src/WeihanLi.Common/Event/EventBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 1d9fb5ce..1ae94979 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -78,7 +78,7 @@ public static class EventBaseExtensions s.TypeNameHandling = TypeNameHandling.Objects; }); - public static string ToEventMsg(this TEvent @event) where TEvent : class, IEventBase + public static string ToEventMsg(this TEvent @event) { Guard.NotNull(@event); return @event.ToJson(EventSerializerSettings); From 07f9da5c10714671f8d010d8ec2c4a2e8f7a7fe2 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 00:46:54 +0800 Subject: [PATCH 10/23] refactor: update EventExtensions --- src/WeihanLi.Common/Event/EventBase.cs | 50 +++++++++++++++++++++----- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 1ae94979..eb96d25b 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -67,26 +67,60 @@ public interface IEvent public class EventWrapper : IEvent, IEvent { public required T Data { get; init; } - object? IEvent.Data => (object?)Data; + object? IEvent.Data => Data; public required EventProperties Properties { get; init; } } -public static class EventBaseExtensions +public static class EventExtensions { - private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension.SerializerSettingsWith(s => - { - s.TypeNameHandling = TypeNameHandling.Objects; - }); + private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension + .SerializerSettingsWith(s => + { + s.NullValueHandling = NullValueHandling.Ignore; + s.TypeNameHandling = TypeNameHandling.Objects; + }); public static string ToEventMsg(this TEvent @event) { Guard.NotNull(@event); - return @event.ToJson(EventSerializerSettings); + return GetEvent(@event).ToJson(EventSerializerSettings); + } + + private static IEvent GetEvent(this TEvent @event) + { + if (@event is IEvent eventEvent) + return eventEvent; + + if (@event is IEventBase eventBase) + return new EventWrapper() + { + Data = @event, + Properties = new() + { + EventAt = eventBase.EventAt, + EventId = eventBase.EventId + } + }; + + return new EventWrapper + { + Data = @event, + Properties = new EventProperties + { + EventAt = DateTimeOffset.Now + } + }; } - public static IEventBase ToEvent(this string eventMsg) + public static IEventBase ToEventBase(this string eventMsg) { Guard.NotNull(eventMsg); return eventMsg.JsonToObject(EventSerializerSettings); } + + public static IEvent ToEvent(this string eventMsg) + { + Guard.NotNull(eventMsg); + return eventMsg.JsonToObject(EventSerializerSettings); + } } From 310abe4c03723d8220aef7740fbb98e19d2a8adb Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 00:49:38 +0800 Subject: [PATCH 11/23] refactor: update ToEvent extension --- src/WeihanLi.Common/Event/EventBase.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index eb96d25b..4b01df48 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -112,15 +112,11 @@ private static IEvent GetEvent(this TEvent @event) }; } - public static IEventBase ToEventBase(this string eventMsg) + public static TEvent ToEvent(this string eventMsg) { Guard.NotNull(eventMsg); - return eventMsg.JsonToObject(EventSerializerSettings); + return eventMsg.JsonToObject(EventSerializerSettings); } - public static IEvent ToEvent(this string eventMsg) - { - Guard.NotNull(eventMsg); - return eventMsg.JsonToObject(EventSerializerSettings); - } + public static IEvent ToEvent(this string eventMsg) => ToEvent(eventMsg); } From 815e08307093fb922512be96859f09b38636a21a Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 01:05:22 +0800 Subject: [PATCH 12/23] refactor: update EventQueue --- .../AspNetCoreSample/Events/EventConsumer.cs | 16 ++++----- .../Event/EventQueueInMemory.cs | 35 +++++++++++-------- src/WeihanLi.Common/Event/IEventQueue.cs | 12 +++---- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index d244e9d7..99bfb45a 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -1,4 +1,5 @@ -using WeihanLi.Common.Event; +using WeihanLi.Common; +using WeihanLi.Common.Event; using WeihanLi.Extensions; namespace AspNetCoreSample.Events; @@ -7,25 +8,24 @@ public class EventConsumer (IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory) : BackgroundService { - private readonly IEventQueue _eventQueue = eventQueue; - private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory; - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { - var queues = await _eventQueue.GetQueuesAsync(); + var queues = await eventQueue.GetQueuesAsync(); if (queues.Count > 0) { await queues.Select(async q => { - if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties)) + await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken)) { - var handlers = _eventHandlerFactory.GetHandlers(@event.GetType()); + var @event = e.Data; + Guard.NotNull(@event); + var handlers = eventHandlerFactory.GetHandlers(@event.GetType()); if (handlers.Count > 0) { await handlers - .Select(h => h.Handle(@event, properties)) + .Select(h => h.Handle(@event, e.Properties)) .WhenAll() ; } diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index cca4019e..8f24ca6c 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -42,25 +42,21 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult(true); } - public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties) + public Task?> DequeueAsync(string queueName) { - @event = default; - properties = default; - if (_eventQueues.TryGetValue(queueName, out var queue)) { if (queue.TryDequeue(out var eventWrapper)) { - @event = eventWrapper.Data; - properties = eventWrapper.Properties; - return Task.FromResult(true); + return Task.FromResult((IEvent?)eventWrapper); } } - return Task.FromResult(false); + return Task.FromResult?>(null); } - internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable ReadAllEvents(string queueName, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { @@ -68,15 +64,26 @@ public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out obje { while (queue.TryDequeue(out var eventWrapper)) { - yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties); + yield return eventWrapper; } } - await Task.Delay(100); + await Task.Delay(100, cancellationToken); } } - - public bool TryRemoveQueue(string queueName) + + public async IAsyncEnumerable> ReadAllEvents(string queueName, + [EnumeratorCancellation]CancellationToken cancellationToken = default) { - return _eventQueues.TryRemove(queueName, out _); + while (!cancellationToken.IsCancellationRequested) + { + if (_eventQueues.TryGetValue(queueName, out var queue)) + { + while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent @event) + { + yield return @event; + } + } + await Task.Delay(100, cancellationToken); + } } } diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 7d09ef3b..6836b70b 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -1,19 +1,15 @@ // Copyright (c) Weihan Li. All rights reserved. // Licensed under the Apache license. -using System.Diagnostics.CodeAnalysis; - namespace WeihanLi.Common.Event; public interface IEventQueue { Task> GetQueuesAsync(); - Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); - - Task TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties); - - // IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + Task?> DequeueAsync(string queueName); + IAsyncEnumerable> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAllEvents(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions @@ -23,6 +19,6 @@ public static class EventQueueExtensions public static Task EnqueueAsync(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null) where TEvent : class { - return eventQueue.EnqueueAsync(DefaultQueueName, @event); + return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties); } } From f37b5548a6e2f80528ad11375aa24da9362ad896 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 01:20:14 +0800 Subject: [PATCH 13/23] refactor: refine events --- .../AspNetCoreSample/Events/EventConsumer.cs | 2 +- src/WeihanLi.Common/Event/EventBase.cs | 6 +++++ .../Event/EventHandlerFactory.cs | 4 +--- .../Event/EventQueueInMemory.cs | 4 ++-- src/WeihanLi.Common/Event/IEventQueue.cs | 4 ++-- .../ServiceContainerBuilderBuildTest.cs | 12 ---------- .../EventsTest/EventBaseTest.cs | 24 +++++++++++++++++-- 7 files changed, 34 insertions(+), 22 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index 99bfb45a..dbe21da1 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await queues.Select(async q => { - await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken)) + await foreach (var e in eventQueue.ReadAll(q, stoppingToken)) { var @event = e.Data; Guard.NotNull(@event); diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 4b01df48..09ba4a74 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -85,6 +85,12 @@ public static string ToEventMsg(this TEvent @event) Guard.NotNull(@event); return GetEvent(@event).ToJson(EventSerializerSettings); } + + public static string ToEventRawMsg(this TEvent @event) + { + Guard.NotNull(@event); + return @event.ToJson(EventSerializerSettings); + } private static IEvent GetEvent(this TEvent @event) { diff --git a/src/WeihanLi.Common/Event/EventHandlerFactory.cs b/src/WeihanLi.Common/Event/EventHandlerFactory.cs index 6c4e4cfd..229b8eb7 100644 --- a/src/WeihanLi.Common/Event/EventHandlerFactory.cs +++ b/src/WeihanLi.Common/Event/EventHandlerFactory.cs @@ -7,12 +7,10 @@ namespace WeihanLi.Common.Event; public sealed class DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager) : IEventHandlerFactory { - private readonly IEventSubscriptionManager _subscriptionManager = subscriptionManager; - [RequiresUnreferencedCode("Unreferenced code may be used")] public ICollection GetHandlers(Type eventType) { - var eventHandlers = _subscriptionManager.GetEventHandlers(eventType); + var eventHandlers = subscriptionManager.GetEventHandlers(eventType); return eventHandlers; } } diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index 8f24ca6c..2d8738b4 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -55,7 +55,7 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult?>(null); } - public async IAsyncEnumerable ReadAllEvents(string queueName, + public async IAsyncEnumerable ReadAll(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) @@ -71,7 +71,7 @@ public async IAsyncEnumerable ReadAllEvents(string queueName, } } - public async IAsyncEnumerable> ReadAllEvents(string queueName, + public async IAsyncEnumerable> ReadEvents(string queueName, [EnumeratorCancellation]CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 6836b70b..5d1ddf53 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -8,8 +8,8 @@ public interface IEventQueue Task> GetQueuesAsync(); Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); Task?> DequeueAsync(string queueName); - IAsyncEnumerable> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); - IAsyncEnumerable ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAll(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable> ReadEvents(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions diff --git a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs index 8cfe4028..01d5d9b4 100644 --- a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs +++ b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs @@ -99,18 +99,6 @@ public async Task ClassTest() await handTask; } - [Fact] - public async Task GenericMethodTest() - { - var publisher = _serviceProvider.ResolveRequiredService(); - Assert.NotNull(publisher); - var publisherType = publisher.GetType(); - Assert.True(publisherType.IsSealed); - Assert.True(publisherType.Assembly.IsDynamic); - - await publisher.PublishAsync(new TestEvent()); - } - // not supported, will not intercept [Fact] public void OpenGenericTypeTest() diff --git a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs index cd815875..9a686a3d 100644 --- a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs @@ -43,8 +43,8 @@ public void EventMessageExtensionsTest() { Name = "1213" }; - var eventMsg = testEvent.ToEventMsg(); - var eventFromMsg = eventMsg.ToEvent(); + var eventMsg = testEvent.ToEventRawMsg(); + var eventFromMsg = eventMsg.ToEvent(); Assert.Equal(typeof(TestEvent), eventFromMsg.GetType()); var deserializedEvent = eventFromMsg as TestEvent; @@ -53,4 +53,24 @@ public void EventMessageExtensionsTest() Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt); Assert.Equal(testEvent.Name, deserializedEvent.Name); } + + [Fact] + public void EventMessageExtensions2Test() + { + var testEvent = new TestEvent() + { + Name = "1213" + }; + var eventMsg = testEvent.ToEventMsg(); + var eventFromMsg = eventMsg.ToEvent>(); + Assert.Equal(typeof(EventWrapper), eventFromMsg.GetType()); + + var deserializedEvent = eventFromMsg.Data; + Assert.NotNull(deserializedEvent); + Assert.Equal(testEvent.EventId, deserializedEvent.EventId); + Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt); + Assert.Equal(testEvent.Name, deserializedEvent.Name); + Assert.Equal(testEvent.EventId, eventFromMsg.Properties.EventId); + Assert.Equal(testEvent.EventAt, eventFromMsg.Properties.EventAt); + } } From 0efe26a32369804d15d3d9b4114aac2c267453fc Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 11:14:02 +0800 Subject: [PATCH 14/23] refactor: update EventQueue --- .../AspNetCoreSample/Events/EventConsumer.cs | 2 +- .../Event/EventQueueInMemory.cs | 19 +------------------ src/WeihanLi.Common/Event/IEventQueue.cs | 18 ++++++++++++++++-- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index dbe21da1..d71e34f6 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await queues.Select(async q => { - await foreach (var e in eventQueue.ReadAll(q, stoppingToken)) + await foreach (var e in eventQueue.ReadAllAsync(q, stoppingToken)) { var @event = e.Data; Guard.NotNull(@event); diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index 2d8738b4..eebd01c2 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -16,7 +16,6 @@ public sealed class EventQueueInMemory : IEventQueue public Task> GetQueuesAsync() => Task.FromResult(GetQueues()); - public Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) { properties ??= new(); @@ -55,7 +54,7 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult?>(null); } - public async IAsyncEnumerable ReadAll(string queueName, + public async IAsyncEnumerable ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) @@ -70,20 +69,4 @@ public async IAsyncEnumerable ReadAll(string queueName, await Task.Delay(100, cancellationToken); } } - - public async IAsyncEnumerable> ReadEvents(string queueName, - [EnumeratorCancellation]CancellationToken cancellationToken = default) - { - while (!cancellationToken.IsCancellationRequested) - { - if (_eventQueues.TryGetValue(queueName, out var queue)) - { - while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent @event) - { - yield return @event; - } - } - await Task.Delay(100, cancellationToken); - } - } } diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 5d1ddf53..7399e1da 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -1,6 +1,8 @@ // Copyright (c) Weihan Li. All rights reserved. // Licensed under the Apache license. +using System.Runtime.CompilerServices; + namespace WeihanLi.Common.Event; public interface IEventQueue @@ -8,8 +10,7 @@ public interface IEventQueue Task> GetQueuesAsync(); Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); Task?> DequeueAsync(string queueName); - IAsyncEnumerable ReadAll(string queueName, CancellationToken cancellationToken = default); - IAsyncEnumerable> ReadEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAllAsync(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions @@ -21,4 +22,17 @@ public static Task EnqueueAsync(this IEventQueue eventQueue, TEven { return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties); } + + public static async IAsyncEnumerable> ReadEventsAsync( + this IEventQueue eventQueue, + string queueName, + [EnumeratorCancellation] CancellationToken cancellationToken = default + ) + { + await foreach (var @event in eventQueue.ReadAllAsync(queueName, cancellationToken)) + { + if(@event is IEvent eventEvent) + yield return eventEvent; + } + } } From 8de85fe109e9b534f2fe5d0f9a577a8339dcce56 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 11:35:18 +0800 Subject: [PATCH 15/23] feat: use Channel for EventQueueInMemory --- .../Event/EventQueueInMemory.cs | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index eebd01c2..c5f22e9a 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -3,20 +3,26 @@ using System.Collections.Concurrent; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +#if NET +using System.Threading.Channels; +#endif + namespace WeihanLi.Common.Event; public sealed class EventQueueInMemory : IEventQueue { +#if NET + private readonly ConcurrentDictionary> _eventQueues = new(); +#else private readonly ConcurrentDictionary> _eventQueues = new(); - +#endif public ICollection GetQueues() => _eventQueues.Keys; public Task> GetQueuesAsync() => Task.FromResult(GetQueues()); - public Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) + public async Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) { properties ??= new(); if (string.IsNullOrEmpty(properties.EventId)) @@ -36,16 +42,26 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro Data = @event, Properties = properties }; +#if NET + var queue = _eventQueues.GetOrAdd(queueName, _ => Channel.CreateUnbounded()); + await queue.Writer.WriteAsync(internalEvent); +#else var queue = _eventQueues.GetOrAdd(queueName, _ => new ConcurrentQueue()); queue.Enqueue(internalEvent); - return Task.FromResult(true); + await Task.CompletedTask; +#endif + return true; } public Task?> DequeueAsync(string queueName) { if (_eventQueues.TryGetValue(queueName, out var queue)) { +#if NET + if (queue.Reader.TryRead(out var eventWrapper)) +#else if (queue.TryDequeue(out var eventWrapper)) +#endif { return Task.FromResult((IEvent?)eventWrapper); } @@ -61,12 +77,20 @@ public async IAsyncEnumerable ReadAllAsync(string queueName, { if (_eventQueues.TryGetValue(queueName, out var queue)) { +#if NET + await foreach (var @event in queue.Reader.ReadAllAsync(cancellationToken)) + { + yield return @event; + } +#else while (queue.TryDequeue(out var eventWrapper)) { yield return eventWrapper; } +#endif } - await Task.Delay(100, cancellationToken); + + await Task.Delay(200, cancellationToken); } } } From b36feddf7d2f15800e46d3676547942ae8a26633 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 20:56:15 +0800 Subject: [PATCH 16/23] Introduce a queue support ack Related to #226 Introduce a new `AckQueue` class to support message acknowledgment without automatic removal from the queue. contributes to #226 --- src/WeihanLi.Common/Event/AckQueue.cs | 81 +++++++++++++++++++ .../EventsTest/AckQueueTest.cs | 80 ++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 src/WeihanLi.Common/Event/AckQueue.cs create mode 100644 test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs new file mode 100644 index 00000000..838cf0cc --- /dev/null +++ b/src/WeihanLi.Common/Event/AckQueue.cs @@ -0,0 +1,81 @@ +using System.Collections.Concurrent; +using System.Runtime.CompilerServices; + +namespace WeihanLi.Common.Event; + +public sealed class AckQueue +{ + private readonly ConcurrentQueue _queue = new(); + private readonly ConcurrentDictionary _unackedMessages = new(); + private readonly TimeSpan _ackTimeout = TimeSpan.FromMinutes(1); + + public Task EnqueueAsync(TEvent @event, EventProperties? properties = null) + { + properties ??= new EventProperties(); + if (string.IsNullOrEmpty(properties.EventId)) + { + properties.EventId = Guid.NewGuid().ToString(); + } + if (properties.EventAt == default) + { + properties.EventAt = DateTimeOffset.Now; + } + + var internalEvent = new EventWrapper + { + Data = @event, + Properties = properties + }; + + _queue.Enqueue(internalEvent); + return Task.CompletedTask; + } + + public Task?> DequeueAsync() + { + if (_queue.TryDequeue(out var eventWrapper)) + { + _unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); + return Task.FromResult((IEvent?)eventWrapper); + } + + return Task.FromResult?>(null); + } + + public Task AckMessageAsync(string eventId) + { + _unackedMessages.TryRemove(eventId, out _); + return Task.CompletedTask; + } + + public async Task RequeueUnackedMessagesAsync() + { + foreach (var unackedMessage in _unackedMessages) + { + if (DateTimeOffset.Now - unackedMessage.Value.Properties.EventAt > _ackTimeout) + { + _unackedMessages.TryRemove(unackedMessage.Key, out var eventWrapper); + if (eventWrapper != null) + { + _queue.Enqueue(eventWrapper); + } + } + } + + await Task.CompletedTask; + } + + public async IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + while (_queue.TryDequeue(out var eventWrapper)) + { + _unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); + yield return eventWrapper; + } + + await Task.Delay(200, cancellationToken); + } + } +} diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs new file mode 100644 index 00000000..0051b213 --- /dev/null +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -0,0 +1,80 @@ +using System.Threading.Tasks; +using WeihanLi.Common.Event; +using Xunit; + +namespace WeihanLi.Common.Test.EventsTest +{ + public class AckQueueTest + { + private readonly AckQueue _ackQueue; + + public AckQueueTest() + { + _ackQueue = new AckQueue(); + } + + [Fact] + public async Task EnqueueAsync_ShouldAddMessageToQueue() + { + var testEvent = new TestEvent { Message = "Test Message" }; + await _ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent = await _ackQueue.DequeueAsync(); + Assert.NotNull(dequeuedEvent); + Assert.Equal(testEvent.Message, dequeuedEvent?.Data.Message); + } + + [Fact] + public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval() + { + var testEvent = new TestEvent { Message = "Test Message" }; + await _ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent1 = await _ackQueue.DequeueAsync(); + var dequeuedEvent2 = await _ackQueue.DequeueAsync(); + + Assert.NotNull(dequeuedEvent1); + Assert.Equal(testEvent.Message, dequeuedEvent1?.Data.Message); + Assert.Null(dequeuedEvent2); + } + + [Fact] + public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage() + { + var testEvent = new TestEvent { Message = "Test Message" }; + await _ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent = await _ackQueue.DequeueAsync(); + Assert.NotNull(dequeuedEvent); + + await _ackQueue.AckMessageAsync(dequeuedEvent!.Properties.EventId); + + var dequeuedEventAfterAck = await _ackQueue.DequeueAsync(); + Assert.Null(dequeuedEventAfterAck); + } + + [Fact] + public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterTimeout() + { + var testEvent = new TestEvent { Message = "Test Message" }; + await _ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent = await _ackQueue.DequeueAsync(); + Assert.NotNull(dequeuedEvent); + + // Simulate timeout + await Task.Delay(TimeSpan.FromMinutes(2)); + + await _ackQueue.RequeueUnackedMessagesAsync(); + + var requeuedEvent = await _ackQueue.DequeueAsync(); + Assert.NotNull(requeuedEvent); + Assert.Equal(testEvent.Message, requeuedEvent?.Data.Message); + } + + private class TestEvent + { + public string Message { get; set; } + } + } +} From ba14da84474d01a93f80ff6b7ad67ccbec217843 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:13:15 +0800 Subject: [PATCH 17/23] refactor: update AckQueue --- src/WeihanLi.Common/Event/AckQueue.cs | 56 ++++++++++++++----- .../EventsTest/AckQueueTest.cs | 12 +--- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs index 838cf0cc..a5801bd1 100644 --- a/src/WeihanLi.Common/Event/AckQueue.cs +++ b/src/WeihanLi.Common/Event/AckQueue.cs @@ -1,13 +1,35 @@ using System.Collections.Concurrent; using System.Runtime.CompilerServices; +using WeihanLi.Common.Helpers; namespace WeihanLi.Common.Event; -public sealed class AckQueue +public sealed class AckQueueOptions { + public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1); + + public bool AutoRequeue { get; set; } + + public TimeSpan Requeue { get; set; } +} + +public sealed class AckQueue : DisposableBase +{ + private readonly AckQueueOptions _options; private readonly ConcurrentQueue _queue = new(); - private readonly ConcurrentDictionary _unackedMessages = new(); - private readonly TimeSpan _ackTimeout = TimeSpan.FromMinutes(1); + private readonly ConcurrentDictionary _unAckedMessages = new(); + private readonly Timer? _timer; + + public AckQueue() : this(new()) { } + + public AckQueue(AckQueueOptions options) + { + _options = options; + if (options.AutoRequeue) + { + _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.Requeue, options.Requeue); + } + } public Task EnqueueAsync(TEvent @event, EventProperties? properties = null) { @@ -16,6 +38,7 @@ public Task EnqueueAsync(TEvent @event, EventProperties? properties = nu { properties.EventId = Guid.NewGuid().ToString(); } + if (properties.EventAt == default) { properties.EventAt = DateTimeOffset.Now; @@ -35,7 +58,7 @@ public Task EnqueueAsync(TEvent @event, EventProperties? properties = nu { if (_queue.TryDequeue(out var eventWrapper)) { - _unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); + _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); return Task.FromResult((IEvent?)eventWrapper); } @@ -44,38 +67,43 @@ public Task EnqueueAsync(TEvent @event, EventProperties? properties = nu public Task AckMessageAsync(string eventId) { - _unackedMessages.TryRemove(eventId, out _); + _unAckedMessages.TryRemove(eventId, out _); return Task.CompletedTask; } - public async Task RequeueUnackedMessagesAsync() + public void RequeueUnAckedMessages() { - foreach (var unackedMessage in _unackedMessages) + foreach (var message in _unAckedMessages) { - if (DateTimeOffset.Now - unackedMessage.Value.Properties.EventAt > _ackTimeout) + if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout) { - _unackedMessages.TryRemove(unackedMessage.Key, out var eventWrapper); - if (eventWrapper != null) + if (_unAckedMessages.TryRemove(message.Key, out var eventWrapper) + && eventWrapper != null) { _queue.Enqueue(eventWrapper); } } } - - await Task.CompletedTask; } - public async IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable ReadAllAsync( + [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { while (_queue.TryDequeue(out var eventWrapper)) { - _unackedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); + _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); yield return eventWrapper; } await Task.Delay(200, cancellationToken); } } + + protected override void Dispose(bool disposing) + { + _timer?.Dispose(); + base.Dispose(disposing); + } } diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs index 0051b213..6951f734 100644 --- a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -1,4 +1,3 @@ -using System.Threading.Tasks; using WeihanLi.Common.Event; using Xunit; @@ -6,12 +5,7 @@ namespace WeihanLi.Common.Test.EventsTest { public class AckQueueTest { - private readonly AckQueue _ackQueue; - - public AckQueueTest() - { - _ackQueue = new AckQueue(); - } + private readonly AckQueue _ackQueue = new(); [Fact] public async Task EnqueueAsync_ShouldAddMessageToQueue() @@ -54,7 +48,7 @@ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage() } [Fact] - public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterTimeout() + public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout() { var testEvent = new TestEvent { Message = "Test Message" }; await _ackQueue.EnqueueAsync(testEvent); @@ -65,7 +59,7 @@ public async Task RequeueUnackedMessagesAsync_ShouldRequeueUnackedMessagesAfterT // Simulate timeout await Task.Delay(TimeSpan.FromMinutes(2)); - await _ackQueue.RequeueUnackedMessagesAsync(); + _ackQueue.RequeueUnAckedMessages(); var requeuedEvent = await _ackQueue.DequeueAsync(); Assert.NotNull(requeuedEvent); From e074b86fba08f07fb1c5912397899c41a0015c44 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:17:21 +0800 Subject: [PATCH 18/23] refactor: update AckQueueOptions --- src/WeihanLi.Common/Event/AckQueue.cs | 4 ++-- test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs index a5801bd1..55f1b178 100644 --- a/src/WeihanLi.Common/Event/AckQueue.cs +++ b/src/WeihanLi.Common/Event/AckQueue.cs @@ -8,9 +8,9 @@ public sealed class AckQueueOptions { public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1); - public bool AutoRequeue { get; set; } + public bool AutoRequeue { get; set; } = true; - public TimeSpan Requeue { get; set; } + public TimeSpan Requeue { get; set; } = TimeSpan.FromMinutes(1); } public sealed class AckQueue : DisposableBase diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs index 6951f734..8f679d66 100644 --- a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -5,7 +5,10 @@ namespace WeihanLi.Common.Test.EventsTest { public class AckQueueTest { - private readonly AckQueue _ackQueue = new(); + private readonly AckQueue _ackQueue = new(new() + { + AutoRequeue = false + }); [Fact] public async Task EnqueueAsync_ShouldAddMessageToQueue() From c689a003e9b8de5fd9c81f368d95e318b19593f2 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:21:56 +0800 Subject: [PATCH 19/23] refactor: fix warnings --- test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs index 8f679d66..5debe690 100644 --- a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -18,7 +18,7 @@ public async Task EnqueueAsync_ShouldAddMessageToQueue() var dequeuedEvent = await _ackQueue.DequeueAsync(); Assert.NotNull(dequeuedEvent); - Assert.Equal(testEvent.Message, dequeuedEvent?.Data.Message); + Assert.Equal(testEvent.Message, dequeuedEvent.Data.Message); } [Fact] @@ -31,7 +31,7 @@ public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval() var dequeuedEvent2 = await _ackQueue.DequeueAsync(); Assert.NotNull(dequeuedEvent1); - Assert.Equal(testEvent.Message, dequeuedEvent1?.Data.Message); + Assert.Equal(testEvent.Message, dequeuedEvent1.Data.Message); Assert.Null(dequeuedEvent2); } @@ -44,7 +44,7 @@ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage() var dequeuedEvent = await _ackQueue.DequeueAsync(); Assert.NotNull(dequeuedEvent); - await _ackQueue.AckMessageAsync(dequeuedEvent!.Properties.EventId); + await _ackQueue.AckMessageAsync(dequeuedEvent.Properties.EventId); var dequeuedEventAfterAck = await _ackQueue.DequeueAsync(); Assert.Null(dequeuedEventAfterAck); @@ -66,7 +66,7 @@ public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterT var requeuedEvent = await _ackQueue.DequeueAsync(); Assert.NotNull(requeuedEvent); - Assert.Equal(testEvent.Message, requeuedEvent?.Data.Message); + Assert.Equal(testEvent.Message, requeuedEvent.Data.Message); } private class TestEvent From 6a91e9f3d1fae0bde07bc5a247de80e5c20b21fc Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:30:09 +0800 Subject: [PATCH 20/23] refactor: update AckQueue --- src/WeihanLi.Common/Event/AckQueue.cs | 4 +- .../EventsTest/AckQueueTest.cs | 40 ++++++++++++++++--- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs index 55f1b178..4798bddc 100644 --- a/src/WeihanLi.Common/Event/AckQueue.cs +++ b/src/WeihanLi.Common/Event/AckQueue.cs @@ -10,7 +10,7 @@ public sealed class AckQueueOptions public bool AutoRequeue { get; set; } = true; - public TimeSpan Requeue { get; set; } = TimeSpan.FromMinutes(1); + public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1); } public sealed class AckQueue : DisposableBase @@ -27,7 +27,7 @@ public AckQueue(AckQueueOptions options) _options = options; if (options.AutoRequeue) { - _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.Requeue, options.Requeue); + _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod); } } diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs index 5debe690..3a104e79 100644 --- a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -54,17 +54,45 @@ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage() public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout() { var testEvent = new TestEvent { Message = "Test Message" }; - await _ackQueue.EnqueueAsync(testEvent); - - var dequeuedEvent = await _ackQueue.DequeueAsync(); + var ackQueue = new AckQueue(new() + { + AutoRequeue = false, + AckTimeout = TimeSpan.FromSeconds(3) + }); + await ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent = await ackQueue.DequeueAsync(); Assert.NotNull(dequeuedEvent); // Simulate timeout - await Task.Delay(TimeSpan.FromMinutes(2)); + await Task.Delay(TimeSpan.FromSeconds(5)); + + ackQueue.RequeueUnAckedMessages(); - _ackQueue.RequeueUnAckedMessages(); + var requeuedEvent = await ackQueue.DequeueAsync(); + Assert.NotNull(requeuedEvent); + Assert.Equal(testEvent.Message, requeuedEvent.Data.Message); + } + + [Fact] + public async Task AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout() + { + var testEvent = new TestEvent { Message = "Test Message" }; + var ackQueue = new AckQueue(new() + { + AutoRequeue = true, + AckTimeout = TimeSpan.FromSeconds(3), + RequeuePeriod = TimeSpan.FromSeconds(2) + }); + await ackQueue.EnqueueAsync(testEvent); + + var dequeuedEvent = await ackQueue.DequeueAsync(); + Assert.NotNull(dequeuedEvent); + + // Simulate timeout + await Task.Delay(TimeSpan.FromSeconds(5)); - var requeuedEvent = await _ackQueue.DequeueAsync(); + var requeuedEvent = await ackQueue.DequeueAsync(); Assert.NotNull(requeuedEvent); Assert.Equal(testEvent.Message, requeuedEvent.Data.Message); } From 8683101ec78f3e26273343b96ce4ee63e4d9266f Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:35:36 +0800 Subject: [PATCH 21/23] refactor: disable AutoRequeue by default --- src/WeihanLi.Common/Event/AckQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WeihanLi.Common/Event/AckQueue.cs b/src/WeihanLi.Common/Event/AckQueue.cs index 4798bddc..4aa96624 100644 --- a/src/WeihanLi.Common/Event/AckQueue.cs +++ b/src/WeihanLi.Common/Event/AckQueue.cs @@ -8,7 +8,7 @@ public sealed class AckQueueOptions { public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1); - public bool AutoRequeue { get; set; } = true; + public bool AutoRequeue { get; set; } public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1); } From 83425b8f9de7a40055321cd227966af739902ab3 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 23:36:58 +0800 Subject: [PATCH 22/23] test: update test case add using for AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout --- test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs index 3a104e79..6ff8f145 100644 --- a/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs @@ -78,7 +78,7 @@ public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterT public async Task AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout() { var testEvent = new TestEvent { Message = "Test Message" }; - var ackQueue = new AckQueue(new() + await using var ackQueue = new AckQueue(new() { AutoRequeue = true, AckTimeout = TimeSpan.FromSeconds(3), From 2aabd19ba60e8cea34645d1d102e7f9322a1e55c Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sun, 5 Jan 2025 08:12:46 +0800 Subject: [PATCH 23/23] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2c8e1687..d454441c 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # WeihanLi.Common -## Build status - [![WeihanLi.Common Latest Stable](https://img.shields.io/nuget/v/WeihanLi.Common.svg)](https://www.nuget.org/packages/WeihanLi.Common/) [![WeihanLi.Common Latest Preview](https://img.shields.io/nuget/vpre/WeihanLi.Common)](https://www.nuget.org/packages/WeihanLi.Common/absoluteLatest) +## Build status + [![Azure Pipelines Build Status](https://weihanli.visualstudio.com/Pipelines/_apis/build/status/WeihanLi.WeihanLi.Common?branchName=master)](https://weihanli.visualstudio.com/Pipelines/_build/latest?definitionId=16&branchName=master) [![Github Actions Build Status](https://github.com/WeihanLi/WeihanLi.Common/actions/workflows/default.yml/badge.svg)](https://github.com/WeihanLi/WeihanLi.Common/actions/workflows/default.yml)