Skip to content

Commit

Permalink
Set CallbackOnReceive back to one shot.
Browse files Browse the repository at this point in the history
  • Loading branch information
tosh-coding committed Feb 25, 2024
1 parent d30cfcb commit 53288c8
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 195 deletions.
4 changes: 2 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,5 @@ There are four channel types.

* _[Channel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/Channel.cs)_ - Forward published messages to all subscribers. One-way. Used for 1:1 unicasting, 1:N broadcasting and N:1 message aggregation. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/Examples/BasicExamples.cs#L20).
* _[QueueChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/QueueChannel.cs)_ - Forward a published message to only one of the subscribers. One-way. Used for 1:N/N:N load balancing. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/QueueChannelTests.cs#L22).
* _[RequestReplyChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/RequestReplyChannel.cs)_ - Subscribers respond to requests from publishers. Two-way. Used for 1:1/N:1 request/reply messaging, 1:N/N:M bulk queries to multiple nodes. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/RequestReplyChannelTests.cs#L20).
* _[SnapshotChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/SnapshotChannel.cs)_ - Subscribers are also notified when they start subscribing, and separately thereafter. One-way. Used for replication with incremental update notifications. Only one responder can be handled within a single channel. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/Examples/BasicExamples.cs#L181).
* _[RequestReplyChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/RequestReplyChannel.cs)_ - Subscribers respond to requests from publishers. Two-way. Used for 1:1/N:1 request/reply messaging, 1:N/N:M bulk queries to multiple nodes. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/RequestReplyChannelTests.cs).
* _[SnapshotChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/SnapshotChannel.cs)_ - Subscribers are also notified when they start subscribing, and separately thereafter. One-way. Used for replication with incremental update notifications. Only one responder can be handled within a single channel. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/Examples/BasicExamples.cs#L189).
30 changes: 30 additions & 0 deletions src/AsyncFiberWorks/Channels/ActionAndFiberExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using AsyncFiberWorks.Core;
using System;

namespace AsyncFiberWorks.Channels
{
/// <summary>
/// Extensions of actions and contexts.
/// </summary>
public static class ActionAndFiberExtensions
{
/// <summary>
/// Create an action to be executed on the specified context.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="fiber">Target fiber.</param>
/// <param name="action">Action.</param>
/// <returns>Action with enqueue.</returns>
public static Action<T> CreateAction<T>(this IExecutionContext fiber, Action<T> action)
{
if (fiber == null)
{
return action;
}
else
{
return (msg) => fiber.Enqueue(() => action(msg));
}
}
}
}
11 changes: 5 additions & 6 deletions src/AsyncFiberWorks/Channels/IReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ public interface IReply<M> : IDisposable
/// <summary>
/// Receive a single response. Can be called repeatedly for multiple replies.
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
/// <param name="result">A message.</param>
/// <returns>Returns false if the buffer is empty or the object has been disposed.</returns>
bool TryReceive(out M result);

/// <summary>
/// Set up on-receive callbacks.
/// Also called if timed out. In that case, TryReceive will fail.
/// Set up on-receive callbacks. It is a one-time call.
/// </summary>
/// <param name="callbackOnReceive"></param>
/// <returns></returns>
/// <param name="callbackOnReceive">Message receive handler.</param>
/// <returns>Returns false if it has already been disposed.</returns>
bool SetCallbackOnReceive(Action callbackOnReceive);
}
}
4 changes: 2 additions & 2 deletions src/AsyncFiberWorks/Channels/IRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public interface IRequest<R, M>
/// <summary>
/// Send one or more responses.
/// </summary>
/// <param name="replyMsg"></param>
/// <returns></returns>
/// <param name="replyMsg">A message</param>
/// <returns>Returns false if it has already been disposed.</returns>
bool SendReply(M replyMsg);
}
}
28 changes: 17 additions & 11 deletions src/AsyncFiberWorks/Channels/RequestReplyChannelRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public TRequestMessage Request

public bool SendReply(TReplyMessage response)
{
Action action;
lock (_lock)
{
if (_disposed)
Expand All @@ -31,10 +32,11 @@ public bool SendReply(TReplyMessage response)
}
_resp.Enqueue(response);

var callbackOnReceive = _callbackOnReceive;
callbackOnReceive?.Invoke();
return true;
action = _callbackOnReceive;
_callbackOnReceive = null;
}
action?.Invoke();
return true;
}

public bool TryReceive(out TReplyMessage result)
Expand All @@ -53,24 +55,28 @@ public bool TryReceive(out TReplyMessage result)

public bool SetCallbackOnReceive(Action callbackOnReceive)
{
if (callbackOnReceive == null)
{
throw new ArgumentNullException(nameof(callbackOnReceive));
}
bool hasResponse = false;
lock (_lock)
{
if (_disposed)
{
return false;
}

_callbackOnReceive = callbackOnReceive;

if (_resp.Count > 0)
hasResponse = _resp.Count > 0;
if (hasResponse)
{
_callbackOnReceive = null;
}
else
{
callbackOnReceive();
_callbackOnReceive = callbackOnReceive;
}
}
if (hasResponse)
{
callbackOnReceive?.Invoke();
}
return true;
}

Expand Down
9 changes: 8 additions & 1 deletion src/AsyncFiberWorks/Core/ActionWithContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
namespace AsyncFiberWorks.Core
{
/// <summary>
/// A pair of Action and Fiber.
/// An action with an execution context.
/// </summary>
/// <typeparam name="T"></typeparam>
public class ActionWithContext<T>
{
/// <summary>
/// Action.
/// </summary>
private readonly Action<T> _action;

/// <summary>
/// The context in which the action should be executed.
/// </summary>
private readonly IExecutionContext _fiber;

/// <summary>
Expand Down
41 changes: 41 additions & 0 deletions src/AsyncFiberWorks/Core/OneShotExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;

namespace AsyncFiberWorks.Core
{
/// <summary>
/// One shot executor. It is executed only once the first time.
/// </summary>
public class OneShotExecutor : IExecutor
{
private bool _fired = false;

/// <summary>
/// Execute only first one.
/// </summary>
/// <param name="toExecute"></param>
public void Execute(List<Action> toExecute)
{
if (_fired) return;
foreach (var action in toExecute)
{
_fired = true;
Execute(action);
break;
}
}

///<summary>
/// Executes a single action.
///</summary>
///<param name="toExecute"></param>
public void Execute(Action toExecute)
{
if (!_fired)
{
_fired = true;
toExecute();
}
}
}
}
8 changes: 4 additions & 4 deletions src/AsyncFiberWorksTests/Examples/BasicExamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ public void RequestReply()
{
var channel = new RequestReplyChannel<string, string>();
var subscriptionFiber = fiber.BeginSubscription();
var actionWithContext = new ActionWithContext<IRequest<string, string>>(req => req.SendReply("bye"), fiber);
var subscriptionChannel = channel.AddResponder(actionWithContext.OnReceive);
var subscriptionChannel = channel.AddResponder(
fiber.CreateAction<IRequest<string, string>>(req => req.SendReply("bye")));
subscriptionFiber.AppendDisposable(subscriptionChannel);

var reply = channel.SendRequest("hello");
Expand Down Expand Up @@ -201,8 +201,8 @@ public void Snapshot()
}
};
var subscriptionFiber = fiberReply.BeginSubscription();
var actionWithContext = new ActionWithContext<IRequest<object, int>>(request => request.SendReply(reply()), fiberReply);
var subscriptionChannel = channel.ReplyToPrimingRequest(actionWithContext.OnReceive);
var subscriptionChannel = channel.ReplyToPrimingRequest(
fiberReply.CreateAction<IRequest<object, int>>(request => request.SendReply(reply())));
subscriptionFiber.AppendDisposable(subscriptionChannel);
Assert.AreEqual(1, channel.NumSubscribers);

Expand Down
Loading

0 comments on commit 53288c8

Please sign in to comment.