@@ -263,6 +263,7 @@ def initialize(request, timeout, scheduler)
263263 @timeout = timeout
264264 @timed_out = false
265265 @scheduler = scheduler
266+ @lock = Mutex . new
266267 super ( )
267268 end
268269
@@ -275,21 +276,64 @@ def time_out!
275276 end
276277 end
277278
279+ def maybe_start_timer
280+ # This is more complicated than one would expect. First, we want to start a timer
281+ # if a timeout is set. But there is a race condition where send_request creates
282+ # a fresh promise and adds it to @promises, but another thread handles a socket
283+ # closure event and fails all known promises. When a promise fails, we want to cancel
284+ # the timer, if set. So, we synchronize access to @timer to be sure we don't set up
285+ # and cancel the timer at the same time. However, if promise.fail runs first, there
286+ # will be no timer to cancel, and then when maybe_start_timer gets called in the other
287+ # thread, it'll create a timer on a promise that no one is going to action on going forward.
288+ # So, that leads to leaking the timer until it times out. To avoid this, we want to
289+ # check that the future of the promise isn't completed before starting the timer.
290+ # Finally, we cancel a potentially existing timer before scheduling a new one. That's
291+ # just defensive programming -- that scenario shouldn't be possible right now.
292+ if @timeout
293+ @lock . lock
294+ begin
295+ @scheduler . cancel_timer ( @timer ) if @timer
296+ unless @future . completed?
297+ @timer = @scheduler . schedule_timer ( @timeout )
298+ @timer . on_value do
299+ time_out!
300+ end
301+ end
302+ ensure
303+ @lock . unlock
304+ end
305+ end
306+ end
307+
278308 def fulfill ( response )
279309 super
280310
281- if @timer
282- @scheduler . cancel_timer ( @timer )
283- @timer = nil
311+ if @timeout
312+ @lock . lock
313+ begin
314+ if @timer
315+ @scheduler . cancel_timer ( @timer )
316+ @timer = nil
317+ end
318+ ensure
319+ @lock . unlock
320+ end
284321 end
285322 end
286323
287324 def fail ( cause )
288325 super
289326
290- if @timer
291- @scheduler . cancel_timer ( @timer )
292- @timer = nil
327+ if @timeout
328+ @lock . lock
329+ begin
330+ if @timer
331+ @scheduler . cancel_timer ( @timer )
332+ @timer = nil
333+ end
334+ ensure
335+ @lock . unlock
336+ end
293337 end
294338 end
295339 end
@@ -332,12 +376,7 @@ def write_request(id, request_promise)
332376 @connection . write do |buffer |
333377 @frame_encoder . encode ( buffer , request_promise . request , id )
334378 end
335- if request_promise . timeout
336- request_promise . timer = @scheduler . schedule_timer ( request_promise . timeout )
337- request_promise . timer . on_value do
338- request_promise . time_out!
339- end
340- end
379+ request_promise . maybe_start_timer
341380 end
342381
343382 def socket_closed ( cause )
0 commit comments