Skip to content

Commit

Permalink
Remove QueueChannel.
Browse files Browse the repository at this point in the history
  • Loading branch information
tosh-coding committed Feb 28, 2024
1 parent 2ac2833 commit 4802ed1
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 119 deletions.
5 changes: 2 additions & 3 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,17 @@ ThreadFiber does not support pause. It is specifically intended for performance-
* _[ThreadPoolAdaptorFromQueueForThread](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Core/ThreadPoolAdaptorFromQueueForThread.cs)_ - A thread pool that uses a single existing thread as a worker thread. Convenient to combine with the main thread.

## Channels ##
The channel function has not changed much from the original Retlang design concept. The following explanation is quoted from Retlang.
A channel is a messaging mechanism that abstracts the communication destination. Channel functionality has not changed much from the original Retlang design concept. The following explanation is quoted from Retlang.

> Message based concurrency in .NET
> \[...\]
> The library is intended for use in [message based concurrency](http://en.wikipedia.org/wiki/Message_passing) similar to [event based actors in Scala](http://lampwww.epfl.ch/~phaller/doc/haller07actorsunify.pdf). The library does not provide remote messaging capabilities. It is designed specifically for high performance in-memory messaging.
(Quote from [Retlang page](https://code.google.com/archive/p/retlang/). Broken links were replaced.)

### Four channels ###
### Channel classes ###
There are four channel types.

* _[Channel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/Channel.cs)_ - Forward published messages to all subscribers. One-way. Used for 1:1 unicasting, 1:N broadcasting and N:1 message aggregation. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/Examples/BasicExamples.cs#L20).
* _[QueueChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/QueueChannel.cs)_ - Forward a published message to only one of the subscribers. One-way. Used for 1:N/N:N load balancing. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/QueueChannelTests.cs#L22).
* _[RequestReplyChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/RequestReplyChannel.cs)_ - Subscribers respond to requests from publishers. Two-way. Used for 1:1/N:1 request/reply messaging, 1:N/N:M bulk queries to multiple nodes. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/RequestReplyChannelTests.cs).
* _[SnapshotChannel](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorks/Channels/SnapshotChannel.cs)_ - Subscribers are also notified when they start subscribing, and separately thereafter. One-way. Used for replication with incremental update notifications. Only one responder can be handled within a single channel. [Example](https://github.com/tosh-coding/AsyncFiberWorks/blob/main/src/AsyncFiberWorksTests/Examples/BasicExamples.cs#L189).
15 changes: 0 additions & 15 deletions src/AsyncFiberWorks/Channels/IPublisherQueueChannel.cs

This file was deleted.

14 changes: 0 additions & 14 deletions src/AsyncFiberWorks/Channels/IQueueChannel.cs

This file was deleted.

20 changes: 0 additions & 20 deletions src/AsyncFiberWorks/Channels/ISubscriberQueueChannel.cs

This file was deleted.

55 changes: 0 additions & 55 deletions src/AsyncFiberWorks/Channels/QueueChannel.cs

This file was deleted.

30 changes: 25 additions & 5 deletions src/AsyncFiberWorks/Channels/QueueConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,44 @@

namespace AsyncFiberWorks.Channels
{
internal class QueueConsumer<T>
/// <summary>
/// Queue incoming messages once. It is then dequeued from the callback processing performed on the fiber.
/// </summary>
/// <typeparam name="T"></typeparam>
public class QueueConsumer<T> : IMessageReceiver<T>
{
private readonly object _lock = new object();
private bool _flushPending;
private readonly IExecutionContext _fiber;
private readonly Action<T> _callback;
private readonly IMessageQueue<T> _queue;

public QueueConsumer(IExecutionContext fiber, Action<T> callback, IMessageQueue<T> queue)
/// <summary>
/// Constructor.
/// </summary>
/// <param name="fiber"></param>
/// <param name="callback"></param>
/// <param name="queue"></param>
public QueueConsumer(IExecutionContext fiber, Action<T> callback, IMessageQueue<T> queue = null)
{
if (queue == null)
{
queue = new InternalQueue<T>();
}
_fiber = fiber;
_callback = callback;
_queue = queue;
}

public void Signal(byte dummy)
/// <summary>
/// Message receiving function.
/// </summary>
/// <param name="msg">A message.</param>
public void ReceiveOnProducerThread(T msg)
{
lock (this)
lock (_lock)
{
_queue.Enqueue(msg);
if (_flushPending)
{
return;
Expand All @@ -42,7 +62,7 @@ private void ConsumeNext()
}
finally
{
lock (this)
lock (_lock)
{
if (_queue.IsEmpty)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace AsyncFiberWorksTests
{
[TestFixture]
public class QueueChannelTests
public class QueueConsumerTests
{
[Test]
public void SingleConsumer()
Expand All @@ -19,7 +19,7 @@ public void SingleConsumer()
var reset = new AutoResetEvent(false);
using (one)
{
var channel = new QueueChannel<int>();
var channel = new Channel<int>();
Action<int> onMsg = delegate
{
oneConsumed++;
Expand All @@ -29,7 +29,8 @@ public void SingleConsumer()
}
};
var subscriptionFiber = one.BeginSubscription();
var subscriptionChannel = channel.Subscribe(one, onMsg);
var consumer = new QueueConsumer<int>(one, onMsg);
var subscriptionChannel = channel.Subscribe(consumer.ReceiveOnProducerThread);
subscriptionFiber.AppendDisposable(subscriptionChannel);
for (var i = 0; i < 20; i++)
{
Expand All @@ -47,7 +48,7 @@ public void SingleConsumerWithException()
var reset = new AutoResetEvent(false);
using (one)
{
var channel = new QueueChannel<int>();
var channel = new Channel<int>();
Action<int> onMsg = delegate(int num)
{
if (num == 0)
Expand All @@ -57,7 +58,8 @@ public void SingleConsumerWithException()
reset.Set();
};
var subscriptionFiber = one.BeginSubscription();
var subscriptionChannel = channel.Subscribe(one, onMsg);
var consumer = new QueueConsumer<int>(one, onMsg);
var subscriptionChannel = channel.Subscribe(consumer.ReceiveOnProducerThread);
subscriptionFiber.AppendDisposable(subscriptionChannel);
channel.Publish(0);
channel.Publish(1);
Expand All @@ -72,7 +74,7 @@ public void Multiple()
var queues = new List<IFiber>();
var receiveCount = 0;
var reset = new AutoResetEvent(false);
var channel = new QueueChannel<int>();
var channel = new Channel<int>();

var messageCount = 100;
var updateLock = new object();
Expand All @@ -93,7 +95,9 @@ public void Multiple()
var fiber = new PoolFiber();
queues.Add(fiber);
var subscriptionFiber = fiber.BeginSubscription();
var subscriptionChannel = channel.Subscribe(fiber, onReceive);

var consumer = new QueueConsumer<int>(fiber, onReceive);
var subscriptionChannel = channel.Subscribe(consumer.ReceiveOnProducerThread);
subscriptionFiber.AppendDisposable(subscriptionChannel);
}
for (var i = 0; i < messageCount; i++)
Expand Down

0 comments on commit 4802ed1

Please sign in to comment.