Skip to content

Commit 559ea8c

Browse files
committed
Integrate AsyncActionDriver and ActionDriver.
1 parent 252c43b commit 559ea8c

17 files changed

+147
-322
lines changed

src/AsyncFiberWorks/Core/AsyncToggleFilter.cs

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/AsyncFiberWorks/Core/ToggleFilter.cs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,6 @@ internal class ToggleFilter
99
{
1010
private bool _running = true;
1111

12-
///<summary>
13-
/// Executes a single action.
14-
///</summary>
15-
///<param name="toExecute"></param>
16-
public void Execute(Action toExecute)
17-
{
18-
if (_running)
19-
{
20-
toExecute();
21-
}
22-
}
23-
2412
/// <summary>
2513
/// When disabled, actions will be ignored by filter.
2614
/// The filter is typically disabled at shutdown
Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1+
using AsyncFiberWorks.Core;
2+
using AsyncFiberWorks.Fibers;
13
using System;
4+
using System.Collections.Generic;
25

36
namespace AsyncFiberWorks.Procedures
47
{
58
/// <summary>
6-
/// Default driver implementation. Invokes all of the subscriber's actions.
9+
/// Invokes all of the subscriber's actions.
710
/// </summary>
811
public class ActionDriver : IActionDriver
912
{
10-
private readonly ActionList _actions = new ActionList();
13+
private object _lock = new object();
14+
private LinkedList<Action<FiberExecutionEventArgs>> _actions = new LinkedList<Action<FiberExecutionEventArgs>>();
1115

1216
/// <summary>
13-
/// Create a driver.
17+
/// Create an action driver.
1418
/// </summary>
1519
public ActionDriver()
1620
{
@@ -23,20 +27,77 @@ public ActionDriver()
2327
/// <returns>Unsubscriber.</returns>
2428
public IDisposable Subscribe(Action action)
2529
{
26-
return _actions.AddHandler(action);
30+
return AddHandler((e) => action());
2731
}
2832

2933
/// <summary>
30-
/// <see cref="IActionInvoker.Invoke"/>
34+
/// Subscribe a channel.
35+
/// </summary>
36+
/// <param name="action">Subscriber.</param>
37+
/// <returns>Unsubscriber.</returns>
38+
public IDisposable Subscribe(Action<FiberExecutionEventArgs> action)
39+
{
40+
return AddHandler(action);
41+
}
42+
43+
/// <summary>
44+
/// Add an action.
45+
/// </summary>
46+
/// <param name="action">An action.</param>
47+
/// <returns>Function for removing the action.</returns>
48+
private IDisposable AddHandler(Action<FiberExecutionEventArgs> action)
49+
{
50+
var maskableFilter = new ToggleFilter();
51+
Action<FiberExecutionEventArgs> safeAction = (e) =>
52+
{
53+
var enabled = maskableFilter.IsEnabled;
54+
if (enabled)
55+
{
56+
action(e);
57+
}
58+
};
59+
60+
lock (_lock)
61+
{
62+
_actions.AddLast(safeAction);
63+
}
64+
65+
var unsubscriber = new Unsubscriber(() =>
66+
{
67+
maskableFilter.IsEnabled = false;
68+
_actions.Remove(safeAction);
69+
});
70+
71+
return unsubscriber;
72+
}
73+
74+
/// <summary>
75+
/// Invoke all actions.
3176
/// </summary>
32-
public void Invoke()
77+
/// <param name="fiber"></param>
78+
public void Invoke(IFiber fiber)
3379
{
34-
_actions.Invoke();
80+
lock (_lock)
81+
{
82+
foreach (var action in _actions)
83+
{
84+
fiber.Enqueue(action);
85+
}
86+
}
3587
}
3688

3789
///<summary>
3890
/// Number of subscribers
3991
///</summary>
40-
public int NumSubscribers { get { return _actions.Count; } }
92+
public int NumSubscribers
93+
{
94+
get
95+
{
96+
lock (_lock)
97+
{
98+
return _actions.Count;
99+
}
100+
}
101+
}
41102
}
42103
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Procedures
5+
{
6+
/// <summary>
7+
/// ActionDriver extension.
8+
/// </summary>
9+
public static class ActionDriverExtensions
10+
{
11+
/// <summary>
12+
/// Subscribe a channel.
13+
/// </summary>
14+
/// <param name="driver"></param>
15+
/// <param name="task">Subscriber.</param>
16+
/// <returns>Unsubscriber.</returns>
17+
public static IDisposable SubscribeAndReceiveAsTask(this IActionDriverSubscriber driver, Func<Task> task)
18+
{
19+
return driver.Subscribe((e) =>
20+
{
21+
e.PauseWhileRunning(task);
22+
});
23+
}
24+
}
25+
}

src/AsyncFiberWorks/Procedures/ActionList.cs

Lines changed: 0 additions & 59 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/AsyncActionDriver.cs

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/AsyncActionList.cs

Lines changed: 0 additions & 91 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/AsyncRegister.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public class AsyncRegister : IDisposable
2222
/// Subscribe a driver.
2323
/// </summary>
2424
/// <param name="subscribable"></param>
25-
public AsyncRegister(IAsyncActionSubscriber subscribable)
25+
public AsyncRegister(IActionDriverSubscriber subscribable)
2626
{
27-
_subscription = subscribable.Subscribe(async () =>
27+
_subscription = subscribable.SubscribeAndReceiveAsTask(async () =>
2828
{
2929
await SetFlagAndWaitClearing().ConfigureAwait(false);
3030
});

0 commit comments

Comments
 (0)