From 815e08307093fb922512be96859f09b38636a21a Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 01:05:22 +0800 Subject: [PATCH] refactor: update EventQueue --- .../AspNetCoreSample/Events/EventConsumer.cs | 16 ++++----- .../Event/EventQueueInMemory.cs | 35 +++++++++++-------- src/WeihanLi.Common/Event/IEventQueue.cs | 12 +++---- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index d244e9d7..99bfb45a 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -1,4 +1,5 @@ -using WeihanLi.Common.Event; +using WeihanLi.Common; +using WeihanLi.Common.Event; using WeihanLi.Extensions; namespace AspNetCoreSample.Events; @@ -7,25 +8,24 @@ public class EventConsumer (IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory) : BackgroundService { - private readonly IEventQueue _eventQueue = eventQueue; - private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory; - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { - var queues = await _eventQueue.GetQueuesAsync(); + var queues = await eventQueue.GetQueuesAsync(); if (queues.Count > 0) { await queues.Select(async q => { - if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties)) + await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken)) { - var handlers = _eventHandlerFactory.GetHandlers(@event.GetType()); + var @event = e.Data; + Guard.NotNull(@event); + var handlers = eventHandlerFactory.GetHandlers(@event.GetType()); if (handlers.Count > 0) { await handlers - .Select(h => h.Handle(@event, properties)) + .Select(h => h.Handle(@event, e.Properties)) .WhenAll() ; } diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index cca4019e..8f24ca6c 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -42,25 +42,21 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult(true); } - public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties) + public Task?> DequeueAsync(string queueName) { - @event = default; - properties = default; - if (_eventQueues.TryGetValue(queueName, out var queue)) { if (queue.TryDequeue(out var eventWrapper)) { - @event = eventWrapper.Data; - properties = eventWrapper.Properties; - return Task.FromResult(true); + return Task.FromResult((IEvent?)eventWrapper); } } - return Task.FromResult(false); + return Task.FromResult?>(null); } - internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable ReadAllEvents(string queueName, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { @@ -68,15 +64,26 @@ public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out obje { while (queue.TryDequeue(out var eventWrapper)) { - yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties); + yield return eventWrapper; } } - await Task.Delay(100); + await Task.Delay(100, cancellationToken); } } - - public bool TryRemoveQueue(string queueName) + + public async IAsyncEnumerable> ReadAllEvents(string queueName, + [EnumeratorCancellation]CancellationToken cancellationToken = default) { - return _eventQueues.TryRemove(queueName, out _); + while (!cancellationToken.IsCancellationRequested) + { + if (_eventQueues.TryGetValue(queueName, out var queue)) + { + while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent @event) + { + yield return @event; + } + } + await Task.Delay(100, cancellationToken); + } } } diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 7d09ef3b..6836b70b 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -1,19 +1,15 @@ // Copyright (c) Weihan Li. All rights reserved. // Licensed under the Apache license. -using System.Diagnostics.CodeAnalysis; - namespace WeihanLi.Common.Event; public interface IEventQueue { Task> GetQueuesAsync(); - Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); - - Task TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties); - - // IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + Task?> DequeueAsync(string queueName); + IAsyncEnumerable> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAllEvents(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions @@ -23,6 +19,6 @@ public static class EventQueueExtensions public static Task EnqueueAsync(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null) where TEvent : class { - return eventQueue.EnqueueAsync(DefaultQueueName, @event); + return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties); } }