Skip to content

Commit 0a3d223

Browse files
Ensure Call queue is closed when dispatch exits.
1 parent bd58818 commit 0a3d223

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

lib/async/container/supervisor/connection.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,13 @@ def forward(target_connection, operation)
147147
def self.dispatch(connection, target, id, message)
148148
Async do
149149
call = self.new(connection, id, message)
150+
# Track the call in the connection's calls hash:
150151
connection.calls[id] = call
151152

153+
# Dispatch the call to the target (synchronously):
152154
target.dispatch(call)
153155

156+
# Stream responses back to the connection (asynchronously):
154157
while response = call.pop
155158
connection.write(id: id, **response)
156159
end
@@ -160,6 +163,9 @@ def self.dispatch(connection, target, id, message)
160163

161164
# If the queue is closed, we don't need to send a finished message:
162165
unless call.closed?
166+
# Ensure the call is closed, to prevent messages being buffered:
167+
call.close
168+
163169
# If the above write failed, this is likely to fail too, and we can safely ignore it.
164170
connection.write(id: id, finished: true) rescue nil
165171
end
@@ -276,16 +282,19 @@ def call(...)
276282
#
277283
# @parameter target [Dispatchable] The target to dispatch calls to.
278284
def run(target)
285+
# Process incoming messages from the connection:
279286
self.each do |message|
287+
# If the message has an ID, it is a response to a call:
280288
if id = message.delete(:id)
289+
# Find the call in the connection's calls hash:
281290
if call = @calls[id]
282-
# Response to a call:
291+
# Enqueue the response for the call:
283292
call.push(**message)
284293
elsif message.key?(:do)
285-
# Incoming call:
294+
# Otherwise, if we couldn't find an existing call, it must be a new call:
286295
Call.dispatch(self, target, id, message)
287296
else
288-
# Likely a response to a timed-out call, ignore it:
297+
# Finally, if none of the above, it is likely a response to a timed-out call, so ignore it:
289298
Console.debug(self, "Ignoring message:", message)
290299
end
291300
else

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Close `Call` queue if asynchronous call fails during dispatch - further messages will fail with `ClosedQueueError`.
6+
37
## v0.9.0
48

59
- Better handling of write failures in `Connection::Call.dispatch`, ensuring we don't leak calls.

test/async/container/connection.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,30 @@ def dispatch(call)
4040
expect(connection.calls).to be(:empty?)
4141
end
4242

43+
it "closes the queue when the connection fails" do
44+
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
45+
stream.rewind
46+
47+
expect(stream).to receive(:write).and_raise(IOError, "Test error")
48+
49+
task = nil
50+
51+
target = TestTarget.new do |call|
52+
task = Async do
53+
while true
54+
call.push(status: "working")
55+
sleep(1) # Loop forever (until the queue is closed).
56+
end
57+
end
58+
end
59+
60+
connection.run(target)
61+
62+
expect(connection.calls).to be(:empty?)
63+
expect(task).to be(:failed?)
64+
expect{task.wait}.to raise_exception(ClosedQueueError)
65+
end
66+
4367
it "handles failed writes when making a call" do
4468
expect(stream).to receive(:write).and_raise(IOError, "Test error")
4569

0 commit comments

Comments
 (0)