Skip to content

Commit

Permalink
Remove ProcessedFlagEventArgs.
Browse files Browse the repository at this point in the history
  • Loading branch information
tosh-coding committed Jun 19, 2024
1 parent f53facf commit d823528
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 408 deletions.
2 changes: 0 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ Drivers call their own Subscriber handlers. There are two types: timing notifica

* _[ActionDriver](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Procedures/ActionDriver.cs)_ - Calls the subscriber's handler. It runs on one fiber. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/ActionDriverTests.cs).
* _[AsyncMessageDriver{TMessage}](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/MessageDrivers/AsyncMessageDriver.cs)_ - It distributes messages to subscribers. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/ActionDriverTests.cs#L68).
* _[AsyncProcessedFlagMessageDriver{TMessage}](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/MessageDrivers/AsyncProcessedFlagMessageDriver.cs)_ - It distributes messages to subscribers. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/AsyncActionDriverWithProcessedFlagEventArgsTests.cs#L16).
* _[AsyncProcessedFlagReverseOrderMessageDriver{TMessage}](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/MessageDrivers/AsyncProcessedFlagReverseOrderMessageDriver.cs)_ - It distributes messages to subscribers. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/AsyncActionDriverWithProcessedFlagEventArgsTests.cs#L50).

## Channels ##
This is a mechanism for parallel processing. If you do not need that much performance, `AsyncMessageDriver{T}` is recommended. It is easy to handle because it is serial.
Expand Down
20 changes: 0 additions & 20 deletions src/AsyncFiberWorks/Core/ProcessedFlagEventArgs.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/AsyncFiberWorks/Core/ToggleFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace AsyncFiberWorks.Core
/// <summary>
/// A filter that can be toggled to run or skip.
/// </summary>
internal class ToggleFilter
public class ToggleFilter
{
private bool _running = true;

Expand Down
77 changes: 0 additions & 77 deletions src/AsyncFiberWorks/MessageDrivers/AsyncActionListOfT.cs

This file was deleted.

46 changes: 42 additions & 4 deletions src/AsyncFiberWorks/MessageDrivers/AsyncMessageDriver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AsyncFiberWorks.Core;
using AsyncFiberWorks.MessageDrivers;

namespace AsyncFiberWorks.MessageFilters
Expand All @@ -13,7 +14,8 @@ namespace AsyncFiberWorks.MessageFilters
/// <typeparam name="TMessage">Message type.</typeparam>
public class AsyncMessageDriver<TMessage> : IAsyncMessageDriver<TMessage>
{
private readonly AsyncActionList<TMessage> _actions = new AsyncActionList<TMessage>();
private object _lock = new object();
private LinkedList<Func<TMessage, Task>> _actions = new LinkedList<Func<TMessage, Task>>();
private List<Func<TMessage, Task>> _copied = new List<Func<TMessage, Task>>();

/// <summary>
Expand All @@ -30,7 +32,31 @@ public AsyncMessageDriver()
/// <returns>Unsubscriber.</returns>
public IDisposable Subscribe(Func<TMessage, Task> action)
{
return _actions.AddHandler(action);
var maskableFilter = new ToggleFilter();
Func<TMessage, Task> safeAction = async (message) =>
{
var enabled = maskableFilter.IsEnabled;
if (enabled)
{
await action(message);
}
};

lock (_lock)
{
_actions.AddLast(safeAction);
}

var unsubscriber = new Unsubscriber(() =>
{
lock (_lock)
{
maskableFilter.IsEnabled = false;
_actions.Remove(safeAction);
}
});

return unsubscriber;
}

/// <summary>
Expand All @@ -40,7 +66,10 @@ public IDisposable Subscribe(Func<TMessage, Task> action)
/// <returns>A task that waits for actions to be performed.</returns>
public async Task InvokeAsync(TMessage message)
{
_actions.CopyTo(_copied);
lock (_lock)
{
_copied.AddRange(_actions);
}
foreach (var action in _copied)
{
await action(message).ConfigureAwait(false);
Expand All @@ -51,6 +80,15 @@ public async Task InvokeAsync(TMessage message)
///<summary>
/// Number of subscribers
///</summary>
public int NumSubscribers { get { return _actions.Count; } }
public int NumSubscribers
{
get
{
lock (_lock)
{
return _actions.Count;
}
}
}
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit d823528

Please sign in to comment.