Skip to content

Commit abff1c2

Browse files
committed
Added some Channel tests.
1 parent 12f3fbf commit abff1c2

File tree

3 files changed

+180
-0
lines changed

3 files changed

+180
-0
lines changed

src/AsyncFiberWorks/Channels/Channel.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@ public class Channel<T> : IChannel<T>
1010
{
1111
private readonly MessageHandlerList<T> _channel = new MessageHandlerList<T>();
1212

13+
/// <summary>
14+
/// Subscribe a channel.
15+
/// </summary>
16+
/// <param name="receiveOnProducerThread">Subscriber.</param>
17+
/// <returns></returns>
18+
public IDisposable Subscribe(Action<T> receiveOnProducerThread)
19+
{
20+
return this._channel.AddHandler(receiveOnProducerThread);
21+
}
22+
1323
/// <summary>
1424
/// Subscribe a channel.
1525
/// </summary>

src/AsyncFiberWorks/Channels/ISubscriber.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ namespace AsyncFiberWorks.Channels
88
/// <typeparam name="T"></typeparam>
99
public interface ISubscriber<T>
1010
{
11+
/// <summary>
12+
/// Subscribe a channel.
13+
/// </summary>
14+
/// <param name="receiveOnProducerThread">Subscriber.</param>
15+
/// <returns></returns>
16+
IDisposable Subscribe(Action<T> receiveOnProducerThread);
17+
1118
/// <summary>
1219
/// Subscribe a channel.
1320
/// </summary>
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using NUnit.Framework;
5+
using AsyncFiberWorks.Channels;
6+
using AsyncFiberWorks.Core;
7+
using AsyncFiberWorks.Fibers;
8+
using System.Linq;
9+
10+
namespace AsyncFiberWorksTests
11+
{
12+
[TestFixture]
13+
public class ChannelTests
14+
{
15+
[Test]
16+
public void BroadcastMessage()
17+
{
18+
var channel = new Channel<string>();
19+
20+
int multiCount = 5;
21+
var nodeList = CreateNodeList<string>(multiCount);
22+
23+
Action<Node<string>, string> receiveAction = (node, msg) =>
24+
{
25+
node.Fiber.Enqueue(() =>
26+
{
27+
node.ReceivedMessages.Add(msg);
28+
});
29+
};
30+
31+
foreach (var node in nodeList)
32+
{
33+
channel.Subscribe((msg) =>
34+
{
35+
receiveAction(node, msg);
36+
});
37+
}
38+
39+
channel.Publish("Hello");
40+
channel.Publish("World");
41+
42+
Thread.Sleep(1);
43+
44+
foreach (var node in nodeList)
45+
{
46+
Assert.AreEqual(node.ReceivedMessages.Count, 2);
47+
Assert.AreEqual("Hello", node.ReceivedMessages[0]);
48+
Assert.AreEqual("World", node.ReceivedMessages[1]);
49+
}
50+
}
51+
52+
Node<T>[] CreateNodeList<T>(int multiCount)
53+
{
54+
var nodeList = new Node<T>[multiCount];
55+
for (int i = 0; i < nodeList.Length; i++)
56+
{
57+
nodeList[i] = new Node<T>(i);
58+
}
59+
return nodeList;
60+
}
61+
62+
[Test]
63+
public void OneToMulti()
64+
{
65+
var channel = new Channel<MessageFrame>();
66+
67+
int multiCount = 5;
68+
var nodeList = CreateNodeList<MessageFrame>(multiCount);
69+
70+
foreach (var node in nodeList)
71+
{
72+
var filter = new MessageFilter<MessageFrame>();
73+
filter.AddFilterOnProducerThread((msg) =>
74+
{
75+
return msg.NodeId != node.NodeId;
76+
});
77+
var subscriber = new ChannelSubscription<MessageFrame>(filter, node.Fiber, (msg) =>
78+
{
79+
node.ReceivedMessages.Add(msg);
80+
});
81+
channel.Subscribe(subscriber);
82+
}
83+
84+
channel.Publish(new MessageFrame() { NodeId = 2, Message = "Hello" });
85+
channel.Publish(new MessageFrame() { NodeId = 2, Message = "World" });
86+
87+
Thread.Sleep(10);
88+
89+
foreach (var node in nodeList)
90+
{
91+
if (node.NodeId == 2)
92+
{
93+
Assert.AreEqual(0, node.ReceivedMessages.Count);
94+
}
95+
else
96+
{
97+
Assert.AreEqual(2, node.ReceivedMessages.Count);
98+
Assert.AreEqual("Hello", node.ReceivedMessages[0].Message);
99+
Assert.AreEqual("World", node.ReceivedMessages[1].Message);
100+
}
101+
}
102+
}
103+
104+
[Test]
105+
public void CallAndResponse()
106+
{
107+
var channelCall = new Channel<MessageFrame>();
108+
var channelResponse = new Channel<MessageFrame>();
109+
110+
int multiCount = 5;
111+
var nodeList = CreateNodeList<MessageFrame>(multiCount);
112+
113+
foreach (var node in nodeList)
114+
{
115+
channelCall.Subscribe(node.Fiber.CreateAction<MessageFrame>((msg) =>
116+
{
117+
channelResponse.Publish(new MessageFrame()
118+
{
119+
NodeId = node.NodeId,
120+
Message = msg.Message.Split(new char[] { ' ' }, 2)[1],
121+
});
122+
}));
123+
channelResponse.Subscribe(node.Fiber.CreateAction<MessageFrame>((msg) =>
124+
{
125+
node.ReceivedMessages.Add(msg);
126+
}));
127+
}
128+
129+
channelCall.Publish(new MessageFrame() { NodeId = 2, Message = "Say Ho" });
130+
channelCall.Publish(new MessageFrame() { NodeId = 2, Message = "Say Ho,Ho" });
131+
132+
Thread.Sleep(10);
133+
134+
foreach (var node in nodeList)
135+
{
136+
Assert.AreEqual(2 * multiCount, node.ReceivedMessages.Count);
137+
Assert.AreEqual(multiCount, node.ReceivedMessages.Count(x => x.Message == "Ho"));
138+
Assert.AreEqual(multiCount, node.ReceivedMessages.Count(x => x.Message == "Ho,Ho"));
139+
}
140+
}
141+
142+
143+
}
144+
145+
class Node<T>
146+
{
147+
public readonly int NodeId;
148+
public readonly PoolFiberSlim Fiber;
149+
public readonly List<T> ReceivedMessages = new List<T>();
150+
151+
public Node(int nodeId)
152+
{
153+
this.NodeId = nodeId;
154+
this.Fiber = new PoolFiberSlim();
155+
}
156+
}
157+
158+
class MessageFrame
159+
{
160+
public int NodeId;
161+
public string Message;
162+
}
163+
}

0 commit comments

Comments
 (0)