Skip to content

Commit

Permalink
refactor: update EventQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
WeihanLi committed Jan 4, 2025
1 parent f37b554 commit 0efe26a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
2 changes: 1 addition & 1 deletion samples/AspNetCoreSample/Events/EventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await queues.Select(async q =>
{
await foreach (var e in eventQueue.ReadAll(q, stoppingToken))
await foreach (var e in eventQueue.ReadAllAsync(q, stoppingToken))
{
var @event = e.Data;
Guard.NotNull(@event);
Expand Down
19 changes: 1 addition & 18 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public sealed class EventQueueInMemory : IEventQueue

public Task<ICollection<string>> GetQueuesAsync() => Task.FromResult(GetQueues());


public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null)
{
properties ??= new();
Expand Down Expand Up @@ -55,7 +54,7 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
return Task.FromResult<IEvent<TEvent>?>(null);
}

public async IAsyncEnumerable<IEvent> ReadAll(string queueName,
public async IAsyncEnumerable<IEvent> ReadAllAsync(string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
Expand All @@ -70,20 +69,4 @@ public async IAsyncEnumerable<IEvent> ReadAll(string queueName,
await Task.Delay(100, cancellationToken);
}
}

public async IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent<TEvent> @event)
{
yield return @event;
}
}
await Task.Delay(100, cancellationToken);
}
}
}
18 changes: 16 additions & 2 deletions src/WeihanLi.Common/Event/IEventQueue.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Runtime.CompilerServices;

namespace WeihanLi.Common.Event;

public interface IEventQueue
{
Task<ICollection<string>> GetQueuesAsync();
Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null);
Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName);
IAsyncEnumerable<IEvent> ReadAll(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent> ReadAllAsync(string queueName, CancellationToken cancellationToken = default);
}

public static class EventQueueExtensions
Expand All @@ -21,4 +22,17 @@ public static Task<bool> EnqueueAsync<TEvent>(this IEventQueue eventQueue, TEven
{
return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
}

public static async IAsyncEnumerable<IEvent<TEvent>> ReadEventsAsync<TEvent>(
this IEventQueue eventQueue,
string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
await foreach (var @event in eventQueue.ReadAllAsync(queueName, cancellationToken))
{
if(@event is IEvent<TEvent> eventEvent)
yield return eventEvent;
}
}
}

0 comments on commit 0efe26a

Please sign in to comment.