Skip to content

Commit

Permalink
refactor: update EventQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
WeihanLi committed Jan 3, 2025
1 parent 310abe4 commit 815e083
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
16 changes: 8 additions & 8 deletions samples/AspNetCoreSample/Events/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using WeihanLi.Common.Event;
using WeihanLi.Common;
using WeihanLi.Common.Event;
using WeihanLi.Extensions;

namespace AspNetCoreSample.Events;
Expand All @@ -7,25 +8,24 @@ public class EventConsumer
(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory)
: BackgroundService
{
private readonly IEventQueue _eventQueue = eventQueue;
private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var queues = await _eventQueue.GetQueuesAsync();
var queues = await eventQueue.GetQueuesAsync();
if (queues.Count > 0)
{
await queues.Select(async q =>
{
if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties))
await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken))
{
var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());
var @event = e.Data;
Guard.NotNull(@event);
var handlers = eventHandlerFactory.GetHandlers(@event.GetType());
if (handlers.Count > 0)
{
await handlers
.Select(h => h.Handle(@event, properties))
.Select(h => h.Handle(@event, e.Properties))
.WhenAll()
;
}
Expand Down
35 changes: 21 additions & 14 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,48 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
return Task.FromResult(true);
}

public Task<bool> TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties)
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName)
{
@event = default;
properties = default;

if (_eventQueues.TryGetValue(queueName, out var queue))
{
if (queue.TryDequeue(out var eventWrapper))
{
@event = eventWrapper.Data;
properties = eventWrapper.Properties;
return Task.FromResult(true);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
}

return Task.FromResult(false);
return Task.FromResult<IEvent<TEvent>?>(null);
}

internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync<TEvent>(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IEvent> ReadAllEvents(string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper))
{
yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties);
yield return eventWrapper;
}
}
await Task.Delay(100);
await Task.Delay(100, cancellationToken);
}
}

public bool TryRemoveQueue(string queueName)

public async IAsyncEnumerable<IEvent<TEvent>> ReadAllEvents<TEvent>(string queueName,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
return _eventQueues.TryRemove(queueName, out _);
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent<TEvent> @event)
{
yield return @event;
}
}
await Task.Delay(100, cancellationToken);
}
}
}
12 changes: 4 additions & 8 deletions src/WeihanLi.Common/Event/IEventQueue.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Diagnostics.CodeAnalysis;

namespace WeihanLi.Common.Event;

public interface IEventQueue
{
Task<ICollection<string>> GetQueuesAsync();

Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null);

Task<bool> TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties);

// IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName);
IAsyncEnumerable<IEvent<TEvent>> ReadAllEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent> ReadAllEvents(string queueName, CancellationToken cancellationToken = default);
}

public static class EventQueueExtensions
Expand All @@ -23,6 +19,6 @@ public static class EventQueueExtensions
public static Task<bool> EnqueueAsync<TEvent>(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null)
where TEvent : class
{
return eventQueue.EnqueueAsync(DefaultQueueName, @event);
return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
}
}

0 comments on commit 815e083

Please sign in to comment.