Skip to content

Commit 649c6ea

Browse files
committed
Add IAsyncFiber.EnqueueAsync.
1 parent 6d708e6 commit 649c6ea

9 files changed

+270
-106
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Core
5+
{
6+
/// <summary>
7+
/// Extensions to AsyncFiber.
8+
/// </summary>
9+
public static class AsyncFiberExtentions
10+
{
11+
/// <summary>
12+
/// Enqueue a task.
13+
/// </summary>
14+
/// <param name="fiber">An fiber.</param>
15+
/// <param name="func">A function that returns a task.</param>
16+
/// <returns>A task that waits until a given task is finished.</returns>
17+
public static async Task EnqueueAsync(this IAsyncFiber fiber, Func<Task> func)
18+
{
19+
var tcs = new TaskCompletionSource<byte>();
20+
fiber.Enqueue(async () =>
21+
{
22+
try
23+
{
24+
await func().ConfigureAwait(false);
25+
}
26+
finally
27+
{
28+
tcs.SetResult(0);
29+
}
30+
});
31+
await tcs.Task;
32+
}
33+
}
34+
}

src/AsyncFiberWorks/Core/AsyncNonReentrantExecutor.cs

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Core
5+
{
6+
/// <summary>
7+
/// The same instance of this class will not be executed concurrently.
8+
/// The one executed later is skipped.
9+
/// </summary>
10+
public class AsyncNonReentrantFiberFilter : IAsyncFiber
11+
{
12+
private readonly object _lockObj = new object();
13+
private readonly IAsyncFiber _fiber;
14+
private bool _executing = false;
15+
16+
/// <summary>
17+
/// Create a filter.
18+
/// </summary>
19+
/// <param name="fiber"></param>
20+
public AsyncNonReentrantFiberFilter(IAsyncFiber fiber)
21+
{
22+
_fiber = fiber;
23+
}
24+
25+
/// <summary>
26+
/// Enqueue a task.
27+
/// </summary>
28+
/// <param name="func">A function that returns a task.</param>
29+
public void Enqueue(Func<Task> func)
30+
{
31+
lock (_lockObj)
32+
{
33+
if (_executing)
34+
{
35+
return;
36+
}
37+
_executing = true;
38+
}
39+
40+
_fiber.Enqueue(async () =>
41+
{
42+
try
43+
{
44+
await func().ConfigureAwait(false);
45+
}
46+
finally
47+
{
48+
lock (_lockObj)
49+
{
50+
_executing = false;
51+
}
52+
}
53+
});
54+
}
55+
}
56+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System;
2+
3+
namespace AsyncFiberWorks.Core
4+
{
5+
/// <summary>
6+
/// The same instance of this class will not be executed concurrently.
7+
/// The one executed later is skipped.
8+
/// </summary>
9+
public class NonReentrantFiberFilter : IExecutionContext
10+
{
11+
private readonly object _lockObj = new object();
12+
private readonly IExecutionContext _fiber;
13+
private bool _executing = false;
14+
15+
/// <summary>
16+
/// Create a filter.
17+
/// </summary>
18+
/// <param name="fiber"></param>
19+
public NonReentrantFiberFilter(IExecutionContext fiber)
20+
{
21+
_fiber = fiber;
22+
}
23+
24+
/// <summary>
25+
/// Enqueue an action.
26+
/// </summary>
27+
/// <param name="action">Action to be executed.</param>
28+
public void Enqueue(Action action)
29+
{
30+
lock (_lockObj)
31+
{
32+
if (_executing)
33+
{
34+
return;
35+
}
36+
_executing = true;
37+
}
38+
39+
_fiber.Enqueue(() =>
40+
{
41+
try
42+
{
43+
action();
44+
}
45+
finally
46+
{
47+
lock (_lockObj)
48+
{
49+
_executing = false;
50+
}
51+
}
52+
});
53+
}
54+
}
55+
}

src/AsyncFiberWorks/Procedures/ActionDriver.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using AsyncFiberWorks.Channels;
12
using AsyncFiberWorks.Core;
23
using System;
34

@@ -29,14 +30,23 @@ public IDisposable Subscribe(Action action)
2930
{
3031
if (_executor != null)
3132
{
32-
return _actions.AddHandler(() =>
33+
var maskableFilter = new MaskableExecutor();
34+
var disposable = _actions.AddHandler(() => maskableFilter.Execute(() => _executor.Execute(action)));
35+
return new Unsubscriber(() =>
3336
{
34-
_executor.Execute(action);
37+
maskableFilter.IsEnabled = false;
38+
disposable.Dispose();
3539
});
3640
}
3741
else
3842
{
39-
return _actions.AddHandler(action);
43+
var maskableFilter = new MaskableExecutor();
44+
var disposable = _actions.AddHandler(() => maskableFilter.Execute(action));
45+
return new Unsubscriber(() =>
46+
{
47+
maskableFilter.IsEnabled = false;
48+
disposable.Dispose();
49+
});
4050
}
4151
}
4252

src/AsyncFiberWorks/Procedures/AsyncActionDriver.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Threading.Tasks;
3+
using AsyncFiberWorks.Channels;
34
using AsyncFiberWorks.Core;
45

56
namespace AsyncFiberWorks.Procedures
@@ -43,7 +44,13 @@ public AsyncActionDriver()
4344
/// <returns>Unsubscriber.</returns>
4445
public IDisposable Subscribe(Func<Task> action)
4546
{
46-
return _actions.AddHandler(action);
47+
var maskableFilter = new AsyncMaskableExecutor();
48+
var disposable = _actions.AddHandler(() => maskableFilter.Execute(action));
49+
return new Unsubscriber(() =>
50+
{
51+
maskableFilter.IsEnabled = false;
52+
disposable.Dispose();
53+
});
4754
}
4855

4956
/// <summary>

src/AsyncFiberWorks/Procedures/AsyncActionInvokerRepeatInvoker.cs

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/AsyncFiberWorksTests/AsyncFiberTests.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,11 @@ async Task AsyncFiberTest(IAsyncFiber fiber)
4444
counter += 100;
4545
await Task.Yield();
4646
});
47-
#pragma warning disable 1998
48-
fiber.Enqueue(async () =>
47+
fiber.Enqueue(() =>
4948
{
5049
tcs.SetResult(0);
50+
return Task.CompletedTask;
5151
});
52-
#pragma warning restore 1998
5352

5453
await tcs.Task;
5554
Assert.AreEqual(120, counter);
@@ -118,5 +117,29 @@ public async Task RepeatingTimer()
118117
Console.WriteLine($"as{sw.Elapsed}");
119118
Assert.AreEqual(3, counter);
120119
}
120+
121+
[Test]
122+
public async Task EnqueueAsyncTest()
123+
{
124+
var fiber = new ChainAsyncFiber();
125+
int counter = 0;
126+
var sw = Stopwatch.StartNew();
127+
var t1 = fiber.EnqueueAsync(async () =>
128+
{
129+
counter = 1;
130+
await Task.Delay(300).ConfigureAwait(false);
131+
});
132+
await Task.Delay(10).ConfigureAwait(false);
133+
fiber.Enqueue(() =>
134+
{
135+
counter = 2;
136+
return Task.CompletedTask;
137+
});
138+
Assert.AreEqual(1, counter);
139+
await t1;
140+
Assert.GreaterOrEqual(sw.Elapsed, TimeSpan.FromMilliseconds(300));
141+
await Task.Delay(10).ConfigureAwait(false);
142+
Assert.AreEqual(2, counter);
143+
}
121144
}
122145
}

0 commit comments

Comments
 (0)