Skip to content

Commit

Permalink
Changed the name to make it easier to understand for async.
Browse files Browse the repository at this point in the history
  • Loading branch information
tosh-coding committed Jun 18, 2024
1 parent 1aa075e commit 3f8a436
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 123 deletions.
37 changes: 8 additions & 29 deletions src/AsyncFiberWorks/MessageDrivers/AsyncMessageDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public AsyncMessageDriver(IAsyncExecutor<TMessage> executorSingle)
/// Create a driver.
/// </summary>
public AsyncMessageDriver()
: this(null)
: this(new AsyncSimpleExecutor<TMessage>())
{
}

Expand All @@ -42,48 +42,27 @@ public AsyncMessageDriver()
/// <returns>Unsubscriber.</returns>
public IDisposable Subscribe(Func<TMessage, Task> action)
{
return _actions.AddHandler(action);
return _actions.AddHandler((message) => _executorSingle.Execute(message, action));
}

/// <summary>
/// Distribute one message.
/// </summary>
/// <param name="message">An message.</param>
public async Task Invoke(TMessage message)
/// <returns>A task that waits for actions to be performed.</returns>
public async Task InvokeAsync(TMessage message)
{
_actions.CopyTo(_copied);
await Execute(message, _copied, _executorSingle).ConfigureAwait(false);
foreach (var action in _copied)
{
await action(message).ConfigureAwait(false);
}
_copied.Clear();
}

///<summary>
/// Number of subscribers
///</summary>
public int NumSubscribers { get { return _actions.Count; } }

/// <summary>
/// Executes all actions.
/// </summary>
/// <param name="arg"></param>
/// <param name="actions"></param>
/// <param name="executorSingle"></param>
/// <returns></returns>
async Task Execute(TMessage arg, IReadOnlyList<Func<TMessage, Task>> actions, IAsyncExecutor<TMessage> executorSingle = null)
{
if (executorSingle == null)
{
foreach (var action in actions)
{
await action.Invoke(arg).ConfigureAwait(false);
}
}
else
{
foreach (var action in actions)
{
await executorSingle.Execute(arg, action).ConfigureAwait(false);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public AsyncProcessedFlagMessageDriver(IAsyncExecutor<ProcessedFlagEventArgs<TMe
/// Create a driver.
/// </summary>
public AsyncProcessedFlagMessageDriver()
: this(null)
: this(new AsyncSimpleExecutor<ProcessedFlagEventArgs<TMessage>>())
{
}

Expand All @@ -43,62 +43,33 @@ public AsyncProcessedFlagMessageDriver()
/// <returns>Unsubscriber.</returns>
public IDisposable Subscribe(Func<ProcessedFlagEventArgs<TMessage>, Task> action)
{
return _actions.AddHandler(action);
return _actions.AddHandler((e) => _executorSingle.Execute(e, action));
}

/// <summary>
/// Distribute one message.
/// </summary>
/// <param name="message">An message.</param>
public async Task Invoke(ProcessedFlagEventArgs<TMessage> message)
/// <returns>A task that waits for actions to be performed.</returns>
public async Task InvokeAsync(ProcessedFlagEventArgs<TMessage> message)
{
_actions.CopyTo(_copied);
await Execute(message, _copied, _executorSingle).ConfigureAwait(false);
foreach (var action in _copied)
{
await action(message).ConfigureAwait(false);

// If any action returns true, execution stops at that point.
if (message.Processed)
{
break;
}
}
_copied.Clear();
}

///<summary>
/// Number of subscribers
///</summary>
public int NumSubscribers { get { return _actions.Count; } }

/// <summary>
/// Executes actions.
/// If any action returns true, execution stops at that point.
/// </summary>
/// <param name="eventArgs">An argument.</param>
/// <param name="actions">A list of actions.</param>
/// <param name="executorSingle">Executor for one task. If null, the task is executed directly.</param>
/// <returns>A task that waits for actions to be performed.</returns>
async Task Execute(ProcessedFlagEventArgs<TMessage> eventArgs, IReadOnlyList<Func<ProcessedFlagEventArgs<TMessage>, Task>> actions, IAsyncExecutor<ProcessedFlagEventArgs<TMessage>> executorSingle)
{
if (actions == null)
{
return;
}

if (executorSingle != null)
{
foreach (var action in actions)
{
await executorSingle.Execute(eventArgs, action).ConfigureAwait(false);
if (eventArgs.Processed)
{
break;
}
}
}
else
{
foreach (var action in actions)
{
await action(eventArgs).ConfigureAwait(false);
if (eventArgs.Processed)
{
break;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public AsyncProcessedFlagReverseOrderMessageDriver(IAsyncExecutor<ProcessedFlagE
/// Create a driver.
/// </summary>
public AsyncProcessedFlagReverseOrderMessageDriver()
: this(null)
: this(new AsyncSimpleExecutor<ProcessedFlagEventArgs<TMessage>>())
{
}

Expand All @@ -44,62 +44,33 @@ public AsyncProcessedFlagReverseOrderMessageDriver()
/// <returns>Unsubscriber.</returns>
public IDisposable Subscribe(Func<ProcessedFlagEventArgs<TMessage>, Task> action)
{
return _actions.AddHandler(action);
return _actions.AddHandler((e) => _executorSingle.Execute(e, action));
}

/// <summary>
/// Distribute one message.
/// </summary>
/// <param name="message">An message.</param>
public async Task Invoke(ProcessedFlagEventArgs<TMessage> message)
/// <returns>A task that waits for actions to be performed.</returns>
public async Task InvokeAsync(ProcessedFlagEventArgs<TMessage> message)
{
_actions.CopyTo(_copied);
await Execute(message, _copied, _executorSingle).ConfigureAwait(false);
foreach (var action in _copied.Reverse<Func<ProcessedFlagEventArgs<TMessage>, Task>>())
{
await action(message).ConfigureAwait(false);

// If any action is processed, execution stops at that point.
if (message.Processed)
{
break;
}
}
_copied.Clear();
}

///<summary>
/// Number of subscribers
///</summary>
public int NumSubscribers { get { return _actions.Count; } }

/// <summary>
/// Call the actions in the reverse order in which they were registered.
/// If any action is processed, execution stops at that point.
/// </summary>
/// <param name="eventArgs">An argument.</param>
/// <param name="actions">A list of actions.</param>
/// <param name="executorSingle">Executor for one task. If null, the task is executed directly.</param>
/// <returns>A task that waits for actions to be performed.</returns>
async Task Execute(ProcessedFlagEventArgs<TMessage> eventArgs, IReadOnlyList<Func<ProcessedFlagEventArgs<TMessage>, Task>> actions, IAsyncExecutor<ProcessedFlagEventArgs<TMessage>> executorSingle)
{
if (actions == null)
{
return;
}

if (executorSingle != null)
{
foreach (var action in actions.Reverse())
{
await executorSingle.Execute(eventArgs, action).ConfigureAwait(false);
if (eventArgs.Processed)
{
break;
}
}
}
else
{
foreach (var action in actions.Reverse())
{
await action(eventArgs).ConfigureAwait(false);
if (eventArgs.Processed)
{
break;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ public interface IAsyncMessageDriverDistributor<TMessage>
/// </summary>
/// <param name="message">A message.</param>
/// <returns>Tasks waiting for call completion.</returns>
Task Invoke(TMessage message);
Task InvokeAsync(TMessage message);
}
}
4 changes: 2 additions & 2 deletions src/AsyncFiberWorksTests/ActionDriverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public async Task AsyncInvokingWithArgument()
var disposable1 = driver.Subscribe(action1);
var disposable2 = driver.Subscribe(action2);

await driver.Invoke(200);
await driver.InvokeAsync(200);
Assert.AreEqual(200 + 20, counter);
await driver.Invoke(10);
await driver.InvokeAsync(10);
Assert.AreEqual(200 + 20 + 10 + 1, counter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task ForwardOrder()

var eventArgs = new ProcessedFlagEventArgs<int>();
eventArgs.Arg = 123;
await driver.Invoke(eventArgs);
await driver.InvokeAsync(eventArgs);
Thread.Sleep(50);
Assert.AreEqual(301, counter);
}
Expand Down Expand Up @@ -75,7 +75,7 @@ public async Task ReverseOrder()

var arg = new ProcessedFlagEventArgs<int>();
arg.Arg = 123;
await driver.Invoke(arg).ConfigureAwait(false);
await driver.InvokeAsync(arg).ConfigureAwait(false);
Thread.Sleep(50);
Assert.AreEqual(300, counter);
}
Expand Down Expand Up @@ -106,7 +106,7 @@ public async Task DiscontinuedDuringInvoking()

var eventArgs = new ProcessedFlagEventArgs<int>();
eventArgs.Arg = 123;
await driver.Invoke(eventArgs);
await driver.InvokeAsync(eventArgs);
Thread.Sleep(50);
Assert.AreEqual(300, counter);
}
Expand Down
4 changes: 2 additions & 2 deletions src/AsyncFiberWorksTests/AsyncRegisterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public async Task WaitingOfAsyncRegisterOfT()

for (int i = 0; i < 10; i++)
{
await driver.Invoke(i + 1);
await driver.InvokeAsync(i + 1);
}

await Task.WhenAll(task1, task2);
Expand Down Expand Up @@ -146,7 +146,7 @@ public async Task WaitingOfProcessedFlagEventArgs()
for (int i = 0; i < 10; i++)
{
eventArgs.Arg = i + 1;
await driver.Invoke(eventArgs);
await driver.InvokeAsync(eventArgs);
}

cts.Cancel();
Expand Down

0 comments on commit 3f8a436

Please sign in to comment.