Skip to content

Commit fa3c64d

Browse files
committed
Changed fiber pause to use FiberExecutionEventArgs.
1 parent b05a290 commit fa3c64d

File tree

9 files changed

+174
-80
lines changed

9 files changed

+174
-80
lines changed

Readme.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ while (...)
163163
|:-|:-|
164164
| Threads | `Thread.Sleep()` |
165165
| Fiber on dedicated thread | `Thread.Sleep()` |
166-
| Fiber on shared threads | `fiber.Enqueue(Func<Task<Action>>) & await Task.Delay()` |
166+
| Fiber on shared threads | `fiber.Enqueue(Action<FiberExecutionEventArgs>) & FiberExecutionEventArgs.Pause()/Resume(Action)` |
167167
| Asynchronous control flow | `await Task.Deley()` |
168168

169169
## Fibers ##

src/AsyncFiberWorks/Channels/Channel.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public IDisposable Subscribe(IAsyncExecutionContext executionContext, Func<T, Ta
3333
{
3434
return this._channel.AddHandler((msg) =>
3535
{
36-
executionContext.Enqueue(() => receive.Invoke(msg));
36+
executionContext.Enqueue((e) =>
37+
{
38+
e.PauseWhileRunning(async () => await receive.Invoke(msg));
39+
});
3740
});
3841
}
3942

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Core
5+
{
6+
/// <summary>
7+
/// Fiber execution notification handler arguments.
8+
/// </summary>
9+
public class FiberExecutionEventArgs : EventArgs
10+
{
11+
private Action _pause;
12+
private Action<Action> _resume;
13+
14+
/// <summary>
15+
/// Constructor.
16+
/// </summary>
17+
/// <param name="pause"></param>
18+
/// <param name="resume"></param>
19+
public FiberExecutionEventArgs(Action pause, Action<Action> resume)
20+
{
21+
_pause = pause;
22+
_resume = resume;
23+
}
24+
25+
/// <summary>
26+
/// Pauses the consumption of the task queue.
27+
/// This is only called during an Execute in the fiber.
28+
/// </summary>
29+
public void Pause()
30+
{
31+
_pause();
32+
}
33+
34+
/// <summary>
35+
/// Resumes consumption of a paused task queue.
36+
/// </summary>
37+
/// <param name="action">The action to be taken immediately after the resume.</param>
38+
public void Resume(Action action)
39+
{
40+
_resume(action);
41+
}
42+
43+
/// <summary>
44+
/// Pause the fiber while the task is running.
45+
/// </summary>
46+
/// <param name="runningTask"></param>
47+
public void PauseWhileRunning(Func<Task<Action>> runningTask)
48+
{
49+
this.Pause();
50+
Task.Run(async () =>
51+
{
52+
Action resumingAction = default;
53+
try
54+
{
55+
resumingAction = await runningTask.Invoke();
56+
}
57+
finally
58+
{
59+
this.Resume(resumingAction);
60+
}
61+
});
62+
}
63+
}
64+
}

src/AsyncFiberWorks/Core/IAsyncExecutionContext.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public interface IAsyncExecutionContext
1111
/// <summary>
1212
/// Enqueue a single action. It is executed sequentially.
1313
/// </summary>
14-
/// <param name="func">Task generator. This is done after a pause in the fiber. The generated task is monitored and takes action to resume after completion.</param>
15-
void Enqueue(Func<Task<Action>> func);
14+
/// <param name="action">Action to be executed.</param>
15+
void Enqueue(Action<FiberExecutionEventArgs> action);
1616
}
1717
}

src/AsyncFiberWorks/Fibers/PoolFiber.cs

+6-19
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public sealed class PoolFiber : IFiber
1414
private readonly object _lock = new object();
1515
private readonly IThreadPool _pool;
1616
private readonly IExecutor _executor;
17+
private readonly FiberExecutionEventArgs _eventArgs;
1718

1819
private Queue<Action> _queue = new Queue<Action>();
1920
private Queue<Action> _toPass = new Queue<Action>();
@@ -32,6 +33,7 @@ public PoolFiber(IThreadPool pool, IExecutor executor)
3233
{
3334
_pool = pool;
3435
_executor = executor;
36+
_eventArgs = new FiberExecutionEventArgs(this.Pause, this.Resume);
3537
}
3638

3739
/// <summary>
@@ -203,27 +205,12 @@ private void Resume(Action action)
203205
}
204206

205207
/// <summary>
206-
/// Enqueue a single task.
208+
/// Enqueue a single action. It is executed sequentially.
207209
/// </summary>
208-
/// <param name="func">Task generator. This is done after a pause in the fiber. The generated task is monitored and takes action to resume after completion.</param>
209-
public void Enqueue(Func<Task<Action>> func)
210+
/// <param name="action">Action to be executed.</param>
211+
public void Enqueue(Action<FiberExecutionEventArgs> action)
210212
{
211-
this.Enqueue(() =>
212-
{
213-
this.Pause();
214-
Task.Run(async () =>
215-
{
216-
Action resumingAction = default;
217-
try
218-
{
219-
resumingAction = await func.Invoke();
220-
}
221-
finally
222-
{
223-
this.Resume(resumingAction);
224-
}
225-
});
226-
});
213+
this.Enqueue(() => action(_eventArgs));
227214
}
228215
}
229216
}

src/AsyncFiberWorks/Fibers/StubFiber.cs

+6-19
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public sealed class StubFiber : IFiber
1616
private readonly object _lock = new object();
1717
private readonly ConcurrentQueue<Action> _pending = new ConcurrentQueue<Action>();
1818
private readonly IExecutor _executor;
19+
private readonly FiberExecutionEventArgs _eventArgs;
1920

2021
private int _paused = 0;
2122
private Action _resumeAction = null;
@@ -35,6 +36,7 @@ public StubFiber()
3536
public StubFiber(IExecutor executor)
3637
{
3738
_executor = executor;
39+
_eventArgs = new FiberExecutionEventArgs(this.Pause, this.Resume);
3840
}
3941

4042
/// <summary>
@@ -172,27 +174,12 @@ private void ExecuteResumeProcess()
172174
}
173175

174176
/// <summary>
175-
/// Enqueue a single task.
177+
/// Enqueue a single action. It is executed sequentially.
176178
/// </summary>
177-
/// <param name="func">Task generator. This is done after a pause in the fiber. The generated task is monitored and takes action to resume after completion.</param>
178-
public void Enqueue(Func<Task<Action>> func)
179+
/// <param name="action">Action to be executed.</param>
180+
public void Enqueue(Action<FiberExecutionEventArgs> action)
179181
{
180-
this.Enqueue(() =>
181-
{
182-
this.Pause();
183-
Task.Run(async () =>
184-
{
185-
Action resumingAction = default;
186-
try
187-
{
188-
resumingAction = await func.Invoke();
189-
}
190-
finally
191-
{
192-
this.Resume(resumingAction);
193-
}
194-
});
195-
});
182+
this.Enqueue(() => action(_eventArgs));
196183
}
197184
}
198185
}

src/AsyncFiberWorks/Fibers/ThreadFiber.cs

+71-14
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,15 @@ namespace AsyncFiberWorks.Fibers
1111
/// </summary>
1212
public class ThreadFiber : IFiber, IDisposable
1313
{
14+
private readonly object _lock = new object();
1415
private readonly IDedicatedConsumerThreadWork _queue;
1516
private readonly UserWorkerThread _workerThread;
17+
private readonly FiberExecutionEventArgs _eventArgs;
1618
private bool _stopped = false;
19+
private bool _paused;
20+
private bool _resuming;
21+
private AutoResetEvent _autoReset = null;
22+
private Action _resumeAction = null;
1723

1824
/// <summary>
1925
/// Create a thread fiber with the default queue.
@@ -51,6 +57,7 @@ public ThreadFiber(string threadName)
5157
public ThreadFiber(IDedicatedConsumerThreadWork queue, string threadName, bool isBackground = true, ThreadPriority priority = ThreadPriority.Normal)
5258
{
5359
_queue = queue;
60+
_eventArgs = new FiberExecutionEventArgs(this.Pause, this.Resume);
5461
_workerThread = new UserWorkerThread(queue, threadName, isBackground, priority);
5562
_workerThread.Start();
5663
}
@@ -61,6 +68,7 @@ public ThreadFiber(IDedicatedConsumerThreadWork queue, string threadName, bool i
6168
/// <param name="consumerThread">A consumer thread.</param>
6269
public ThreadFiber(UserWorkerThread consumerThread)
6370
{
71+
_eventArgs = new FiberExecutionEventArgs(this.Pause, this.Resume);
6472
_workerThread = consumerThread;
6573
_workerThread.Start();
6674
}
@@ -85,6 +93,50 @@ public void Dispose()
8593
Stop();
8694
}
8795

96+
/// <summary>
97+
/// Pauses the consumption of the task queue.
98+
/// This is only called during an Execute in the fiber.
99+
/// </summary>
100+
/// <exception cref="InvalidOperationException">Pause was called twice.</exception>
101+
private void Pause()
102+
{
103+
lock (_lock)
104+
{
105+
if (_paused)
106+
{
107+
throw new InvalidOperationException("Pause was called twice.");
108+
}
109+
_paused = true;
110+
if (_autoReset == null)
111+
{
112+
_autoReset = new AutoResetEvent(false);
113+
}
114+
}
115+
}
116+
117+
/// <summary>
118+
/// Resumes consumption of a paused task queue.
119+
/// </summary>
120+
/// <param name="action">The action to be taken immediately after the resume.</param>
121+
/// <exception cref="InvalidOperationException">Resume was called in the unpaused state.</exception>
122+
private void Resume(Action action)
123+
{
124+
lock (_lock)
125+
{
126+
if (!_paused)
127+
{
128+
throw new InvalidOperationException("Resume was called in the unpaused state.");
129+
}
130+
if (_resuming)
131+
{
132+
throw new InvalidOperationException("Resume was called twice.");
133+
}
134+
_resuming = true;
135+
_resumeAction = action;
136+
_autoReset.Set();
137+
}
138+
}
139+
88140
/// <summary>
89141
/// Enqueue a single action.
90142
/// </summary>
@@ -95,31 +147,36 @@ public void Enqueue(Action action)
95147
}
96148

97149
/// <summary>
98-
/// Enqueue a single task.
150+
/// Enqueue a single action. It is executed sequentially.
99151
/// </summary>
100-
/// <param name="func">Task generator. This is done after a pause in the fiber. The generated task is monitored and takes action to resume after completion.</param>
101-
public void Enqueue(Func<Task<Action>> func)
152+
/// <param name="action">Action to be executed.</param>
153+
public void Enqueue(Action<FiberExecutionEventArgs> action)
102154
{
103155
this.Enqueue(() =>
104156
{
105-
var tcs = new TaskCompletionSource<Action>(TaskCreationOptions.RunContinuationsAsynchronously);
106-
Task.Run(async () =>
157+
action(_eventArgs);
158+
bool tmpPaused;
159+
lock (_lock)
107160
{
108-
Action resumingAction = default;
161+
tmpPaused = _paused;
162+
}
163+
if (tmpPaused)
164+
{
165+
_autoReset.WaitOne();
109166
try
110167
{
111-
resumingAction = await func.Invoke();
168+
_resumeAction();
112169
}
113170
finally
114171
{
115-
tcs.SetResult(resumingAction);
172+
lock (_lock)
173+
{
174+
_paused = false;
175+
_resuming = false;
176+
_resumeAction = null;
177+
}
116178
}
117-
});
118-
119-
// This is in a dedicated thread. Blocking OK.
120-
tcs.Task.Wait();
121-
var act = tcs.Task.Result;
122-
act?.Invoke();
179+
}
123180
});
124181
}
125182
}

0 commit comments

Comments
 (0)