-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #242 from WeihanLi/dev
1.0.74
- Loading branch information
Showing
17 changed files
with
395 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
using System.Collections.Concurrent; | ||
using System.Runtime.CompilerServices; | ||
using WeihanLi.Common.Helpers; | ||
|
||
namespace WeihanLi.Common.Event; | ||
|
||
public sealed class AckQueueOptions | ||
{ | ||
public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1); | ||
|
||
public bool AutoRequeue { get; set; } | ||
|
||
public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1); | ||
} | ||
|
||
public sealed class AckQueue : DisposableBase | ||
{ | ||
private readonly AckQueueOptions _options; | ||
private readonly ConcurrentQueue<IEvent> _queue = new(); | ||
private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new(); | ||
private readonly Timer? _timer; | ||
|
||
public AckQueue() : this(new()) { } | ||
|
||
public AckQueue(AckQueueOptions options) | ||
{ | ||
_options = options; | ||
if (options.AutoRequeue) | ||
{ | ||
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod); | ||
} | ||
} | ||
|
||
public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null) | ||
{ | ||
properties ??= new EventProperties(); | ||
if (string.IsNullOrEmpty(properties.EventId)) | ||
{ | ||
properties.EventId = Guid.NewGuid().ToString(); | ||
} | ||
|
||
if (properties.EventAt == default) | ||
{ | ||
properties.EventAt = DateTimeOffset.Now; | ||
} | ||
|
||
var internalEvent = new EventWrapper<TEvent> | ||
{ | ||
Data = @event, | ||
Properties = properties | ||
}; | ||
|
||
_queue.Enqueue(internalEvent); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>() | ||
{ | ||
if (_queue.TryDequeue(out var eventWrapper)) | ||
{ | ||
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); | ||
return Task.FromResult((IEvent<TEvent>?)eventWrapper); | ||
} | ||
|
||
return Task.FromResult<IEvent<TEvent>?>(null); | ||
} | ||
|
||
public Task AckMessageAsync(string eventId) | ||
{ | ||
_unAckedMessages.TryRemove(eventId, out _); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public void RequeueUnAckedMessages() | ||
{ | ||
foreach (var message in _unAckedMessages) | ||
{ | ||
if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout) | ||
{ | ||
if (_unAckedMessages.TryRemove(message.Key, out var eventWrapper) | ||
&& eventWrapper != null) | ||
{ | ||
_queue.Enqueue(eventWrapper); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public async IAsyncEnumerable<IEvent> ReadAllAsync( | ||
[EnumeratorCancellation] CancellationToken cancellationToken = default) | ||
{ | ||
while (!cancellationToken.IsCancellationRequested) | ||
{ | ||
while (_queue.TryDequeue(out var eventWrapper)) | ||
{ | ||
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); | ||
yield return eventWrapper; | ||
} | ||
|
||
await Task.Delay(200, cancellationToken); | ||
} | ||
} | ||
|
||
protected override void Dispose(bool disposing) | ||
{ | ||
_timer?.Dispose(); | ||
base.Dispose(disposing); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.