diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index dbe21da1..d71e34f6 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await queues.Select(async q => { - await foreach (var e in eventQueue.ReadAll(q, stoppingToken)) + await foreach (var e in eventQueue.ReadAllAsync(q, stoppingToken)) { var @event = e.Data; Guard.NotNull(@event); diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index 2d8738b4..eebd01c2 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -16,7 +16,6 @@ public sealed class EventQueueInMemory : IEventQueue public Task> GetQueuesAsync() => Task.FromResult(GetQueues()); - public Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) { properties ??= new(); @@ -55,7 +54,7 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult?>(null); } - public async IAsyncEnumerable ReadAll(string queueName, + public async IAsyncEnumerable ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) @@ -70,20 +69,4 @@ public async IAsyncEnumerable ReadAll(string queueName, await Task.Delay(100, cancellationToken); } } - - public async IAsyncEnumerable> ReadEvents(string queueName, - [EnumeratorCancellation]CancellationToken cancellationToken = default) - { - while (!cancellationToken.IsCancellationRequested) - { - if (_eventQueues.TryGetValue(queueName, out var queue)) - { - while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent @event) - { - yield return @event; - } - } - await Task.Delay(100, cancellationToken); - } - } } diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 5d1ddf53..7399e1da 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -1,6 +1,8 @@ // Copyright (c) Weihan Li. All rights reserved. // Licensed under the Apache license. +using System.Runtime.CompilerServices; + namespace WeihanLi.Common.Event; public interface IEventQueue @@ -8,8 +10,7 @@ public interface IEventQueue Task> GetQueuesAsync(); Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); Task?> DequeueAsync(string queueName); - IAsyncEnumerable ReadAll(string queueName, CancellationToken cancellationToken = default); - IAsyncEnumerable> ReadEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAllAsync(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions @@ -21,4 +22,17 @@ public static Task EnqueueAsync(this IEventQueue eventQueue, TEven { return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties); } + + public static async IAsyncEnumerable> ReadEventsAsync( + this IEventQueue eventQueue, + string queueName, + [EnumeratorCancellation] CancellationToken cancellationToken = default + ) + { + await foreach (var @event in eventQueue.ReadAllAsync(queueName, cancellationToken)) + { + if(@event is IEvent eventEvent) + yield return eventEvent; + } + } }