Skip to content

Commit 939cf74

Browse files
committed
Use FiberExecutionEventArgs with ISubscriber.
1 parent fa3c64d commit 939cf74

File tree

4 files changed

+15
-17
lines changed

4 files changed

+15
-17
lines changed

src/AsyncFiberWorks/Channels/Channel.cs

+3-8
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,10 @@ public IDisposable Subscribe(IExecutionContext executionContext, Action<T> recei
2929
/// <param name="executionContext">The execution context of the message receive handler.</param>
3030
/// <param name="receive">Subscriber.</param>
3131
/// <returns>Unsubscriber.</returns>
32-
public IDisposable Subscribe(IAsyncExecutionContext executionContext, Func<T, Task<Action>> receive)
32+
public IDisposable Subscribe(IAsyncExecutionContext executionContext, Action<FiberExecutionEventArgs, T> receive)
3333
{
34-
return this._channel.AddHandler((msg) =>
35-
{
36-
executionContext.Enqueue((e) =>
37-
{
38-
e.PauseWhileRunning(async () => await receive.Invoke(msg));
39-
});
40-
});
34+
return this._channel.AddHandler(
35+
(msg) => executionContext.Enqueue((e) => receive(e, msg)));
4136
}
4237

4338
/// <summary>

src/AsyncFiberWorks/Channels/ISubscriber.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ public interface ISubscriber<T>
2424
/// <param name="executionContext">The execution context of the message receive handler.</param>
2525
/// <param name="receive">Subscriber.</param>
2626
/// <returns>Unsubscriber.</returns>
27-
IDisposable Subscribe(IAsyncExecutionContext executionContext, Func<T, Task<Action>> receive);
27+
IDisposable Subscribe(IAsyncExecutionContext executionContext, Action<FiberExecutionEventArgs, T> receive);
2828
}
2929
}

src/AsyncFiberWorksTests/ChannelTests.cs

+9-6
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,17 @@ public void AsyncHandler()
142142

143143
foreach (var node in nodeList)
144144
{
145-
channel.Subscribe(node.Fiber, async (msg) =>
145+
channel.Subscribe(node.Fiber, (e, msg) =>
146146
{
147-
if (msg > 0)
147+
e.PauseWhileRunning(async () =>
148148
{
149-
await Task.Delay(msg).ConfigureAwait(false);
150-
}
151-
node.ReceivedMessages.Add(msg);
152-
return () => { };
149+
if (msg > 0)
150+
{
151+
await Task.Delay(msg).ConfigureAwait(false);
152+
}
153+
node.ReceivedMessages.Add(msg);
154+
return () => { };
155+
});
153156
});
154157
}
155158

src/AsyncFiberWorksTests/Examples/BasicExamples.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public void PubSubWithPool()
2323

2424
var reset = new AutoResetEvent(false);
2525
var subscriptionFiber = subscriptions.BeginSubscription();
26-
var subscriptionChannel = channel.Subscribe(fiber, delegate { reset.Set(); });
26+
var subscriptionChannel = channel.Subscribe(fiber, (msg) => reset.Set());
2727
subscriptionFiber.AppendDisposable(subscriptionChannel);
2828
channel.Publish("hello");
2929

@@ -41,7 +41,7 @@ public void PubSubWithDedicatedThread()
4141

4242
var reset = new AutoResetEvent(false);
4343
var subscriptionFiber = subscriptions.BeginSubscription();
44-
var subscriptionChannel = channel.Subscribe(fiber, delegate { reset.Set(); });
44+
var subscriptionChannel = channel.Subscribe(fiber, (msg) => reset.Set());
4545
subscriptionFiber.AppendDisposable(subscriptionChannel);
4646
channel.Publish("hello");
4747

0 commit comments

Comments
 (0)