Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.0.74 preview 1 #238

Merged
merged 4 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions samples/AspNetCoreSample/Events/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using WeihanLi.Common.Event;
using WeihanLi.Common;
using WeihanLi.Common.Event;
using WeihanLi.Extensions;

namespace AspNetCoreSample.Events;
Expand All @@ -7,25 +8,24 @@ public class EventConsumer
(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory)
: BackgroundService
{
private readonly IEventQueue _eventQueue = eventQueue;
private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var queues = await _eventQueue.GetQueuesAsync();
var queues = await eventQueue.GetQueuesAsync();
if (queues.Count > 0)
{
await queues.Select(async q =>
{
if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties))
await foreach (var e in eventQueue.ReadAll(q, stoppingToken))
{
var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());
var @event = e.Data;
Guard.NotNull(@event);
var handlers = eventHandlerFactory.GetHandlers(@event.GetType());
if (handlers.Count > 0)
{
await handlers
.Select(h => h.Handle(@event, properties))
.Select(h => h.Handle(@event, e.Properties))
.WhenAll()
;
}
Expand Down
52 changes: 44 additions & 8 deletions src/WeihanLi.Common/Event/EventBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,62 @@ public interface IEvent<out T>
public class EventWrapper<T> : IEvent, IEvent<T>
{
public required T Data { get; init; }
object? IEvent.Data => (object?)Data;
object? IEvent.Data => Data;
public required EventProperties Properties { get; init; }
}

public static class EventBaseExtensions
public static class EventExtensions
{
private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension.SerializerSettingsWith(s =>
{
s.TypeNameHandling = TypeNameHandling.Objects;
});
private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension
.SerializerSettingsWith(s =>
{
s.NullValueHandling = NullValueHandling.Ignore;
s.TypeNameHandling = TypeNameHandling.Objects;
});

public static string ToEventMsg<TEvent>(this TEvent @event)
{
Guard.NotNull(@event);
return GetEvent(@event).ToJson(EventSerializerSettings);
}

public static string ToEventRawMsg<TEvent>(this TEvent @event)
{
Guard.NotNull(@event);
return @event.ToJson(EventSerializerSettings);
}

public static IEventBase ToEvent(this string eventMsg)
private static IEvent GetEvent<TEvent>(this TEvent @event)
{
if (@event is IEvent eventEvent)
return eventEvent;

if (@event is IEventBase eventBase)
return new EventWrapper<TEvent>()
{
Data = @event,
Properties = new()
{
EventAt = eventBase.EventAt,
EventId = eventBase.EventId
}
};

return new EventWrapper<TEvent>
{
Data = @event,
Properties = new EventProperties
{
EventAt = DateTimeOffset.Now
}
};
}

public static TEvent ToEvent<TEvent>(this string eventMsg)
{
Guard.NotNull(eventMsg);
return eventMsg.JsonToObject<IEventBase>(EventSerializerSettings);
return eventMsg.JsonToObject<TEvent>(EventSerializerSettings);
}

public static IEvent ToEvent(this string eventMsg) => ToEvent<IEvent>(eventMsg);
}
4 changes: 1 addition & 3 deletions src/WeihanLi.Common/Event/EventHandlerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

public sealed class DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager) : IEventHandlerFactory
{
private readonly IEventSubscriptionManager _subscriptionManager = subscriptionManager;

[RequiresUnreferencedCode("Unreferenced code may be used")]
public ICollection<IEventHandler> GetHandlers(Type eventType)

Check warning on line 11 in src/WeihanLi.Common/Event/EventHandlerFactory.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

Member 'WeihanLi.Common.Event.DefaultEventHandlerFactory.GetHandlers(Type)' with 'RequiresUnreferencedCodeAttribute' implements interface member 'WeihanLi.Common.Event.IEventHandlerFactory.GetHandlers(Type)' without 'RequiresUnreferencedCodeAttribute'. 'RequiresUnreferencedCodeAttribute' annotations must match across all interface implementations or overrides.

Check warning on line 11 in src/WeihanLi.Common/Event/EventHandlerFactory.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

Member 'WeihanLi.Common.Event.DefaultEventHandlerFactory.GetHandlers(Type)' with 'RequiresUnreferencedCodeAttribute' implements interface member 'WeihanLi.Common.Event.IEventHandlerFactory.GetHandlers(Type)' without 'RequiresUnreferencedCodeAttribute'. 'RequiresUnreferencedCodeAttribute' annotations must match across all interface implementations or overrides.

Check warning on line 11 in src/WeihanLi.Common/Event/EventHandlerFactory.cs

View workflow job for this annotation

GitHub Actions / Running tests on windows-latest

Member 'WeihanLi.Common.Event.DefaultEventHandlerFactory.GetHandlers(Type)' with 'RequiresUnreferencedCodeAttribute' implements interface member 'WeihanLi.Common.Event.IEventHandlerFactory.GetHandlers(Type)' without 'RequiresUnreferencedCodeAttribute'. 'RequiresUnreferencedCodeAttribute' annotations must match across all interface implementations or overrides.
{
var eventHandlers = _subscriptionManager.GetEventHandlers(eventType);
var eventHandlers = subscriptionManager.GetEventHandlers(eventType);
return eventHandlers;
}
}
35 changes: 21 additions & 14 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,48 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
return Task.FromResult(true);
}

public Task<bool> TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties)
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName)
{
@event = default;
properties = default;

if (_eventQueues.TryGetValue(queueName, out var queue))
{
if (queue.TryDequeue(out var eventWrapper))
{
@event = eventWrapper.Data;
properties = eventWrapper.Properties;
return Task.FromResult(true);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
}

return Task.FromResult(false);
return Task.FromResult<IEvent<TEvent>?>(null);
}

internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync<TEvent>(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IEvent> ReadAll(string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper))
{
yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties);
yield return eventWrapper;
}
}
await Task.Delay(100);
await Task.Delay(100, cancellationToken);
}
}

public bool TryRemoveQueue(string queueName)

public async IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
return _eventQueues.TryRemove(queueName, out _);
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent<TEvent> @event)
{
yield return @event;
}
}
await Task.Delay(100, cancellationToken);
}
}
}
12 changes: 4 additions & 8 deletions src/WeihanLi.Common/Event/IEventQueue.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Diagnostics.CodeAnalysis;

namespace WeihanLi.Common.Event;

public interface IEventQueue
{
Task<ICollection<string>> GetQueuesAsync();

Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null);

Task<bool> TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties);

// IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName);
IAsyncEnumerable<IEvent> ReadAll(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
}

public static class EventQueueExtensions
Expand All @@ -23,6 +19,6 @@ public static class EventQueueExtensions
public static Task<bool> EnqueueAsync<TEvent>(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null)
where TEvent : class
{
return eventQueue.EnqueueAsync(DefaultQueueName, @event);
return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ public async Task ClassTest()
await handTask;
}

[Fact]
public async Task GenericMethodTest()
{
var publisher = _serviceProvider.ResolveRequiredService<IEventPublisher>();
Assert.NotNull(publisher);
var publisherType = publisher.GetType();
Assert.True(publisherType.IsSealed);
Assert.True(publisherType.Assembly.IsDynamic);

await publisher.PublishAsync(new TestEvent());
}

// not supported, will not intercept
[Fact]
public void OpenGenericTypeTest()
Expand Down
24 changes: 22 additions & 2 deletions test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void EventMessageExtensionsTest()
{
Name = "1213"
};
var eventMsg = testEvent.ToEventMsg();
var eventFromMsg = eventMsg.ToEvent();
var eventMsg = testEvent.ToEventRawMsg();
var eventFromMsg = eventMsg.ToEvent<TestEvent>();
Assert.Equal(typeof(TestEvent), eventFromMsg.GetType());

var deserializedEvent = eventFromMsg as TestEvent;
Expand All @@ -53,4 +53,24 @@ public void EventMessageExtensionsTest()
Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
Assert.Equal(testEvent.Name, deserializedEvent.Name);
}

[Fact]
public void EventMessageExtensions2Test()
{
var testEvent = new TestEvent()
{
Name = "1213"
};
var eventMsg = testEvent.ToEventMsg();
var eventFromMsg = eventMsg.ToEvent<EventWrapper<TestEvent>>();
Assert.Equal(typeof(EventWrapper<TestEvent>), eventFromMsg.GetType());

var deserializedEvent = eventFromMsg.Data;
Assert.NotNull(deserializedEvent);
Assert.Equal(testEvent.EventId, deserializedEvent.EventId);
Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
Assert.Equal(testEvent.Name, deserializedEvent.Name);
Assert.Equal(testEvent.EventId, eventFromMsg.Properties.EventId);
Assert.Equal(testEvent.EventAt, eventFromMsg.Properties.EventAt);
}
}
Loading