Skip to content

Commit 12f3fbf

Browse files
committed
Remove timeout function from SnapshotRequest.
1 parent 53288c8 commit 12f3fbf

File tree

4 files changed

+14
-35
lines changed

4 files changed

+14
-35
lines changed

src/AsyncFiberWorks/Channels/IRequesterSnapshotChannel.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ public interface IRequesterSnapshotChannel<T>
1515
/// <param name="fiber">the target executor to receive the message</param>
1616
/// <param name="control"></param>
1717
/// <param name="receive"></param>
18-
/// <param name="timeoutInMs">For initial snapshot</param>
19-
IDisposable PrimedSubscribe(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive, int timeoutInMs);
18+
IDisposable PrimedSubscribe(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive);
2019
}
2120
}

src/AsyncFiberWorks/Channels/SnapshotChannel.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ public class SnapshotChannel<T> : ISnapshotChannel<T>
1919
/// <param name="fiber">the target executor to receive the message</param>
2020
/// <param name="control"></param>
2121
/// <param name="receive"></param>
22-
/// <param name="timeoutInMs">For initial snapshot</param>
23-
public IDisposable PrimedSubscribe(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive, int timeoutInMs)
22+
public IDisposable PrimedSubscribe(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive)
2423
{
25-
var requester = new SnapshotRequest<T>(fiber, control, receive, timeoutInMs);
24+
var requester = new SnapshotRequest<T>(fiber, control, receive);
2625
requester.StartSubscribe(_requestChannel, _updatesChannel);
2726
return requester;
2827
}

src/AsyncFiberWorks/Channels/SnapshotRequest.cs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ internal class SnapshotRequest<T> : IDisposable
1414
private readonly IExecutionContext _fiber;
1515
private readonly Action<SnapshotRequestControlEvent> _control;
1616
private readonly Action<T> _receive;
17-
private readonly int _timeoutInMs;
1817

1918
private IReply<T> _reply;
2019
private bool _disposed = false;
@@ -26,13 +25,11 @@ internal class SnapshotRequest<T> : IDisposable
2625
/// <param name="fiber">the target executor to receive the message</param>
2726
/// <param name="control"></param>
2827
/// <param name="receive"></param>
29-
/// <param name="timeoutInMs">For initial snapshot</param>
30-
public SnapshotRequest(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive, int timeoutInMs)
28+
public SnapshotRequest(IExecutionContext fiber, Action<SnapshotRequestControlEvent> control, Action<T> receive)
3129
{
3230
_fiber = fiber;
3331
_control = control;
3432
_receive = receive;
35-
_timeoutInMs = timeoutInMs;
3633
}
3734

3835
internal void StartSubscribe(RequestReplyChannel<object, T> requestChannel, MessageHandlerList<T> _updatesChannel)
@@ -44,21 +41,7 @@ internal void StartSubscribe(RequestReplyChannel<object, T> requestChannel, Mess
4441
}
4542
_reply = reply;
4643

47-
var workFiber = new PoolFiber();
48-
var timeoutTimer = workFiber.Schedule(() =>
49-
{
50-
lock (_lock)
51-
{
52-
if (_reply == null)
53-
{
54-
return;
55-
}
56-
_reply.Dispose();
57-
_reply = null;
58-
}
59-
_fiber.Enqueue(() => _control(SnapshotRequestControlEvent.Timeout));
60-
}, _timeoutInMs);
61-
reply.SetCallbackOnReceive(() => workFiber.Enqueue(() =>
44+
reply.SetCallbackOnReceive(() => DefaultThreadPool.Instance.Queue((_) =>
6245
{
6346
T result;
6447
bool successToFirstReceive = reply.TryReceive(out result);
@@ -71,7 +54,6 @@ internal void StartSubscribe(RequestReplyChannel<object, T> requestChannel, Mess
7154
_reply.Dispose();
7255
_reply = null;
7356
}
74-
timeoutTimer.Dispose();
7557

7658
if (!successToFirstReceive)
7759
{
@@ -137,11 +119,6 @@ public enum SnapshotRequestControlEvent : byte
137119
/// </summary>
138120
None = 0,
139121

140-
/// <summary>
141-
/// A timeout occurred.
142-
/// </summary>
143-
Timeout = 1,
144-
145122
/// <summary>
146123
/// The state changed during a connection attempt.
147124
/// </summary>

src/AsyncFiberWorksTests/Examples/BasicExamples.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,10 @@ public void Snapshot()
223223
var requesterThread = new ThreadPoolAdaptorFromQueueForThread();
224224
var fiberRequest = new PoolFiber(requesterThread, new DefaultExecutor());
225225
var receivedValues = new List<int>();
226+
var timeoutTimerCancellation = new Unsubscriber();
226227
Action<SnapshotRequestControlEvent> actionControl = (controlEvent) =>
227228
{
228-
if (controlEvent == SnapshotRequestControlEvent.Timeout)
229-
{
230-
Assert.Fail("SnapshotRequestControlEvent.Timeout");
231-
}
229+
timeoutTimerCancellation.Dispose();
232230
if (controlEvent == SnapshotRequestControlEvent.Connecting)
233231
{
234232
return;
@@ -271,7 +269,13 @@ public void Snapshot()
271269
receivedValues.Add(v);
272270
Console.WriteLine("Received: " + v);
273271
};
274-
var handleReceive = channel.PrimedSubscribe(fiberRequest, actionControl, actionReceive, 5000);
272+
var handleReceive = channel.PrimedSubscribe(fiberRequest, actionControl, actionReceive);
273+
var timeoutTimer = fiberRequest.Schedule(() =>
274+
{
275+
handleReceive.Dispose();
276+
Assert.Fail("SnapshotRequestControlEvent.Timeout");
277+
}, 5000);
278+
timeoutTimerCancellation.AppendDisposable(timeoutTimer);
275279

276280
requesterThread.Run();
277281
handleReceive.Dispose();

0 commit comments

Comments
 (0)