Skip to content

Commit 49eeb81

Browse files
committed
Add AcknowledgeChannel.
1 parent abff1c2 commit 49eeb81

11 files changed

+432
-0
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Channels
5+
{
6+
/// <summary>
7+
/// This channel is a publishing interface controlled by an acknowledgment.
8+
/// </summary>
9+
/// <typeparam name="TMessage"></typeparam>
10+
/// <typeparam name="TAck"></typeparam>
11+
public class AcknowledgementChannel<TMessage, TAck> : IAcknowledgementChannel<TMessage, TAck>
12+
{
13+
private readonly AcknowledgementMessageHandlerList<TMessage, TAck> _channel = new AcknowledgementMessageHandlerList<TMessage, TAck>();
14+
15+
/// <summary>
16+
/// Subscribe a channel.
17+
/// </summary>
18+
/// <param name="messageReceiver">Subscriber.</param>
19+
/// <returns>Unsubscriber.</returns>
20+
public IDisposable Subscribe(IAcknowledgeMessageReceiver<TMessage, TAck> messageReceiver)
21+
{
22+
return this._channel.AddHandler(messageReceiver.OnReceive);
23+
}
24+
25+
/// <summary>
26+
/// Publish a message to subscribers.
27+
/// </summary>
28+
/// <param name="msg">The message to send.</param>
29+
/// <param name="control">Publishing controller.</param>
30+
/// <returns>Waiting for the publishing process to complete.</returns>
31+
public async Task Publish(TMessage msg, IAcknowledgementControl<TMessage, TAck> control)
32+
{
33+
await _channel.Publish(msg, control);
34+
}
35+
36+
///<summary>
37+
/// Number of subscribers
38+
///</summary>
39+
public int NumSubscribers { get { return _channel.Count; } }
40+
}
41+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
6+
namespace AsyncFiberWorks.Channels
7+
{
8+
/// <summary>
9+
/// List of message handlers with acknowledgement.
10+
/// </summary>
11+
/// <typeparam name="TMessage"></typeparam>
12+
/// <typeparam name="TAck"></typeparam>
13+
internal sealed class AcknowledgementMessageHandlerList<TMessage, TAck>
14+
{
15+
private object _lock = new object();
16+
private LinkedList<Func<TMessage, Task<TAck>>> _handlers = new LinkedList<Func<TMessage, Task<TAck>>>();
17+
18+
/// <summary>
19+
/// Add a message handler.
20+
/// </summary>
21+
/// <param name="action">A message handler.</param>
22+
/// <returns>Function for removing the handler.</returns>
23+
public IDisposable AddHandler(Func<TMessage, Task<TAck>> action)
24+
{
25+
_handlers.AddLast(action);
26+
27+
var unsubscriber = new Unsubscriber(() => {
28+
lock (_lock)
29+
{
30+
this._handlers.Remove(action);
31+
}
32+
});
33+
34+
return unsubscriber;
35+
}
36+
37+
/// <summary>
38+
/// Send a message to receive handlers.
39+
/// </summary>
40+
/// <param name="msg">The message to send.</param>
41+
/// <param name="control">Publishing controller.</param>
42+
/// <returns>A task that waits for IAcknowledgeControl.OnPublish to complete.</returns>
43+
public async Task Publish(TMessage msg, IAcknowledgementControl<TMessage, TAck> control)
44+
{
45+
Func<TMessage, Task<TAck>>[] copied;
46+
lock (_lock)
47+
{
48+
copied = _handlers.ToArray();
49+
}
50+
await control.OnPublish(msg, copied).ConfigureAwait(false);
51+
}
52+
53+
/// <summary>
54+
/// Number of handlers.
55+
/// </summary>
56+
public int Count
57+
{
58+
get
59+
{
60+
lock (_lock)
61+
{
62+
return _handlers.Count;
63+
}
64+
}
65+
}
66+
}
67+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Channels
5+
{
6+
/// <summary>
7+
/// Subscription for actions on a acknowledgement channel.
8+
/// </summary>
9+
/// <typeparam name="TMessage"></typeparam>
10+
public class DefaultAcknowledgementChannelSubscription<TMessage> : IAcknowledgeMessageReceiver<TMessage, bool>
11+
{
12+
private readonly Func<TMessage, Task<bool>> _receiver;
13+
private readonly IMessageFilter<TMessage> _filter;
14+
15+
/// <summary>
16+
/// Construct the subscription
17+
/// </summary>
18+
/// <param name="receiver"></param>
19+
/// <param name="filter"></param>
20+
public DefaultAcknowledgementChannelSubscription(Func<TMessage, Task<bool>> receiver, IMessageFilter<TMessage> filter = null)
21+
{
22+
_receiver = receiver;
23+
_filter = filter;
24+
}
25+
26+
/// <summary>
27+
/// Message receive handler with acknowledgement.
28+
/// </summary>
29+
/// <param name="msg">A received message.</param>
30+
/// <returns>True if accepted, false if ignored.</returns>
31+
public async Task<bool> OnReceive(TMessage msg)
32+
{
33+
if (_filter?.PassesProducerThreadFilter(msg) ?? true)
34+
{
35+
return await _receiver(msg);
36+
}
37+
return false;
38+
}
39+
}
40+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Channels
5+
{
6+
/// <summary>
7+
/// Call all handlers in the order in which they are registered.
8+
/// Wait for the calls to complete one at a time before proceeding.
9+
/// If ACK is false, it moves on to the next subscriber. If true, publish will be terminated at that point.
10+
/// </summary>
11+
/// <typeparam name="TMessage"></typeparam>
12+
public class DefaultAcknowledgementControl<TMessage> : IAcknowledgementControl<TMessage, bool>
13+
{
14+
/// <summary>
15+
/// Publish a message and accept an ACK.
16+
/// </summary>
17+
/// <param name="msg">The message to send.</param>
18+
/// <param name="handlers">A list of message recipients.</param>
19+
/// <returns>Wait for the publishing process to complete.</returns>
20+
public async Task OnPublish(TMessage msg, Func<TMessage, Task<bool>>[] handlers)
21+
{
22+
if (handlers != null)
23+
{
24+
foreach (var h in handlers)
25+
{
26+
bool ack = await h(msg).ConfigureAwait(false);
27+
if (ack)
28+
{
29+
break;
30+
}
31+
}
32+
}
33+
}
34+
}
35+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Threading.Tasks;
2+
3+
namespace AsyncFiberWorks.Channels
4+
{
5+
/// <summary>
6+
/// Message receive handler with acknowledgement.
7+
/// </summary>
8+
/// <typeparam name="TMessage"></typeparam>
9+
/// <typeparam name="TAck"></typeparam>
10+
public interface IAcknowledgeMessageReceiver<TMessage, TAck>
11+
{
12+
/// <summary>
13+
/// Message receive handler with acknowledgement.
14+
/// </summary>
15+
/// <param name="msg">A received message.</param>
16+
/// <returns>Tasks waiting for subscribers to be received.</returns>
17+
Task<TAck> OnReceive(TMessage msg);
18+
}
19+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
3+
namespace AsyncFiberWorks.Channels
4+
{
5+
/// <summary>
6+
/// This channel is a publishing interface controlled by an acknowledgment.
7+
/// </summary>
8+
/// <typeparam name="TMessage"></typeparam>
9+
/// <typeparam name="TAck"></typeparam>
10+
public interface IAcknowledgementChannel<TMessage, TAck> : IAcknowledgementSubscriber<TMessage, TAck>, IAcknowledgementPublisher<TMessage, TAck>
11+
{
12+
}
13+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace AsyncFiberWorks.Channels
5+
{
6+
/// <summary>
7+
/// Control message transmission.
8+
/// </summary>
9+
/// <typeparam name="TMessage"></typeparam>
10+
/// <typeparam name="TAck"></typeparam>
11+
public interface IAcknowledgementControl<TMessage, TAck>
12+
{
13+
/// <summary>
14+
/// Handle the response from the notification destination.
15+
/// </summary>
16+
/// <param name="msg">The message to send.</param>
17+
/// <param name="handlers">A list of message recipients.</param>
18+
/// <returns>Wait for the publishing process to complete.</returns>
19+
Task OnPublish(TMessage msg, Func<TMessage, Task<TAck>>[] handlers);
20+
}
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Threading.Tasks;
2+
3+
namespace AsyncFiberWorks.Channels
4+
{
5+
/// <summary>
6+
/// Sender interface to channels with acknowledgments.
7+
/// </summary>
8+
/// <typeparam name="TMessage"></typeparam>
9+
/// <typeparam name="TAck"></typeparam>
10+
public interface IAcknowledgementPublisher<TMessage, TAck>
11+
{
12+
/// <summary>
13+
/// Publish a message to subscribers.
14+
/// </summary>
15+
/// <param name="msg">The message to send.</param>
16+
/// <param name="control">Publishing controller.</param>
17+
/// <returns>Waiting for the publishing process to complete.</returns>
18+
Task Publish(TMessage msg, IAcknowledgementControl<TMessage, TAck> control);
19+
}
20+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System;
2+
3+
namespace AsyncFiberWorks.Channels
4+
{
5+
/// <summary>
6+
/// Acknowledgement channel subscription interface.
7+
/// </summary>
8+
/// <typeparam name="TMessage"></typeparam>
9+
/// <typeparam name="TAck"></typeparam>
10+
public interface IAcknowledgementSubscriber<TMessage, TAck>
11+
{
12+
/// <summary>
13+
/// Subscribe a channel.
14+
/// </summary>
15+
/// <param name="messageReceiver">Subscriber.</param>
16+
/// <returns>Unsubscriber.</returns>
17+
IDisposable Subscribe(IAcknowledgeMessageReceiver<TMessage, TAck> messageReceiver);
18+
}
19+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
5+
namespace AsyncFiberWorks.Channels
6+
{
7+
/// <summary>
8+
/// Call the handlers in the reverse order in which they were registered.
9+
/// Wait for the calls to complete one at a time before proceeding.
10+
/// If ACK is false, it moves on to the next subscriber. If true, publish will be terminated at that point.
11+
/// </summary>
12+
/// <typeparam name="TMessage"></typeparam>
13+
public class ReverseOrderAcknowledgementControl<TMessage> : IAcknowledgementControl<TMessage, bool>
14+
{
15+
/// <summary>
16+
/// Publish a message and accept an ACK.
17+
/// </summary>
18+
/// <param name="msg">The message to send.</param>
19+
/// <param name="handlers">A list of message recipients.</param>
20+
/// <returns>Wait for the publishing process to complete.</returns>
21+
public async Task OnPublish(TMessage msg, Func<TMessage, Task<bool>>[] handlers)
22+
{
23+
if (handlers != null)
24+
{
25+
foreach (var h in handlers.Reverse())
26+
{
27+
bool ack = await h(msg).ConfigureAwait(false);
28+
if (ack)
29+
{
30+
break;
31+
}
32+
}
33+
}
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)