Skip to content

Commit 4d088a0

Browse files
committed
Asynchronous pruning for RubyThreadPoolExecutor
1 parent c8f0bae commit 4d088a0

8 files changed

+162
-92
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro ruby_timeout_queue
5+
class RubyTimeoutQueue < ::Queue
6+
def initialize(*args)
7+
if RUBY_VERSION >= '3.2'
8+
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
9+
end
10+
11+
super(*args)
12+
13+
@mutex = Mutex.new
14+
@cond_var = ConditionVariable.new
15+
end
16+
17+
def push(obj)
18+
@mutex.synchronize do
19+
super(obj)
20+
@cond_var.signal
21+
end
22+
end
23+
alias_method :enq, :push
24+
alias_method :<<, :push
25+
26+
def pop(non_block = false, timeout: nil)
27+
if non_block && timeout
28+
raise ArgumentError, "can't set a timeout if non_block is enabled"
29+
end
30+
31+
if non_block
32+
super(true)
33+
elsif timeout
34+
@mutex.synchronize do
35+
deadline = Concurrent.monotonic_time + timeout
36+
while (now = Concurrent.monotonic_time) < deadline && empty?
37+
@cond_var.wait(@mutex, deadline - now)
38+
end
39+
begin
40+
return super(true)
41+
rescue ThreadError
42+
# still empty
43+
nil
44+
end
45+
end
46+
else
47+
super(false)
48+
end
49+
end
50+
alias_method :deq, :pop
51+
alias_method :shift, :pop
52+
end
53+
private_constant :RubyTimeoutQueue
54+
end
55+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro internal_implementation_note
5+
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
6+
::Queue
7+
else
8+
require 'concurrent/collection/ruby_timeout_queue'
9+
RubyTimeoutQueue
10+
end
11+
private_constant :TimeoutQueueImplementation
12+
13+
# @!visibility private
14+
# @!macro timeout_queue
15+
class TimeoutQueue < TimeoutQueueImplementation
16+
end
17+
end
18+
end

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

+2-4
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,8 @@ module Concurrent
8181
# What is being pruned is controlled by the min_threads and idletime
8282
# parameters passed at pool creation time
8383
#
84-
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
85-
# pool will auto-prune each time a new job is posted. You will need to call
86-
# this method explicitly in case your application post jobs in bursts (a
87-
# lot of jobs and then nothing for long periods)
84+
# This is a no-op on all pool implementations as they prune themselves
85+
# automatically, and has been deprecated.
8886

8987
# @!macro thread_pool_executor_public_api
9088
#

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ module Concurrent
88
# @!macro thread_pool_options
99
# @!visibility private
1010
class JavaThreadPoolExecutor < JavaExecutorService
11+
include Concern::Deprecation
1112

1213
# @!macro thread_pool_executor_constant_default_max_pool_size
1314
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
@@ -100,6 +101,7 @@ def running?
100101

101102
# @!macro thread_pool_executor_method_prune_pool
102103
def prune_pool
104+
deprecated "#prune_pool has no effect and will be removed in the next release."
103105
end
104106

105107
private

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+54-31
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/collection/timeout_queue'
67

78
module Concurrent
89

910
# @!macro thread_pool_executor
1011
# @!macro thread_pool_options
1112
# @!visibility private
1213
class RubyThreadPoolExecutor < RubyExecutorService
14+
include Concern::Deprecation
1315

1416
# @!macro thread_pool_executor_constant_default_max_pool_size
1517
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
@@ -94,9 +96,28 @@ def remaining_capacity
9496
end
9597
end
9698

99+
# removes the worker if it can be pruned
100+
#
101+
# @return [true, false] if the worker was pruned
102+
#
97103
# @!visibility private
98-
def remove_busy_worker(worker)
99-
synchronize { ns_remove_busy_worker worker }
104+
def prune_worker(worker)
105+
synchronize do
106+
if ns_prunable_capacity > 0
107+
remove_worker worker
108+
true
109+
else
110+
false
111+
end
112+
end
113+
end
114+
115+
# @!visibility private
116+
def remove_worker(worker)
117+
synchronize do
118+
ns_remove_ready_worker worker
119+
ns_remove_busy_worker worker
120+
end
100121
end
101122

102123
# @!visibility private
@@ -116,7 +137,7 @@ def worker_task_completed
116137

117138
# @!macro thread_pool_executor_method_prune_pool
118139
def prune_pool
119-
synchronize { ns_prune_pool }
140+
deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082."
120141
end
121142

122143
private
@@ -146,9 +167,6 @@ def ns_initialize(opts)
146167
@largest_length = 0
147168
@workers_counter = 0
148169
@ruby_pid = $$ # detects if Ruby has forked
149-
150-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
152170
end
153171

154172
# @!visibility private
@@ -162,12 +180,10 @@ def ns_execute(*args, &task)
162180

163181
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164182
@scheduled_task_count += 1
183+
nil
165184
else
166-
return fallback_action(*args, &task)
185+
fallback_action(*args, &task)
167186
end
168-
169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170-
nil
171187
end
172188

173189
# @!visibility private
@@ -218,7 +234,7 @@ def ns_assign_worker(*args, &task)
218234
# @!visibility private
219235
def ns_enqueue(*args, &task)
220236
return false if @synchronous
221-
237+
222238
if !ns_limited_queue? || @queue.size < @max_queue
223239
@queue << [task, args]
224240
true
@@ -265,7 +281,7 @@ def ns_ready_worker(worker, last_message, success = true)
265281
end
266282
end
267283

268-
# removes a worker which is not in not tracked in @ready
284+
# removes a worker which is not tracked in @ready
269285
#
270286
# @!visibility private
271287
def ns_remove_busy_worker(worker)
@@ -274,25 +290,27 @@ def ns_remove_busy_worker(worker)
274290
true
275291
end
276292

277-
# try oldest worker if it is idle for enough time, it's returned back at the start
278-
#
279293
# @!visibility private
280-
def ns_prune_pool
281-
now = Concurrent.monotonic_time
282-
stopped_workers = 0
283-
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
284-
worker, last_message = @ready.first
285-
if now - last_message > self.idletime
286-
stopped_workers += 1
287-
@ready.shift
288-
worker << :stop
289-
else break
290-
end
294+
def ns_remove_ready_worker(worker)
295+
if index = @ready.index { |rw, _| rw == worker }
296+
@ready.delete_at(index)
291297
end
298+
true
299+
end
292300

293-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
301+
# @return [Integer] number of excess idle workers which can be removed without
302+
# going below min_length, or all workers if not running
303+
#
304+
# @!visibility private
305+
def ns_prunable_capacity
306+
if running?
307+
[@pool.size - @min_length, @ready.size].min
308+
else
309+
@pool.size
310+
end
294311
end
295312

313+
# @!visibility private
296314
def ns_reset_if_forked
297315
if $$ != @ruby_pid
298316
@queue.clear
@@ -312,7 +330,7 @@ class Worker
312330

313331
def initialize(pool, id)
314332
# instance variables accessed only under pool's lock so no need to sync here again
315-
@queue = Queue.new
333+
@queue = Collection::TimeoutQueue.new
316334
@pool = pool
317335
@thread = create_worker @queue, pool, pool.idletime
318336

@@ -338,17 +356,22 @@ def kill
338356
def create_worker(queue, pool, idletime)
339357
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
340358
catch(:stop) do
341-
loop do
359+
prunable = true
342360

343-
case message = my_queue.pop
361+
loop do
362+
timeout = prunable && my_pool.running? ? my_idletime : nil
363+
case message = my_queue.pop(timeout: timeout)
364+
when nil
365+
throw :stop if my_pool.prune_worker(self)
366+
prunable = false
344367
when :stop
345-
my_pool.remove_busy_worker(self)
368+
my_pool.remove_worker(self)
346369
throw :stop
347-
348370
else
349371
task, args = message
350372
run_task my_pool, task, args
351373
my_pool.ready_worker(self, Concurrent.monotonic_time)
374+
prunable = true
352375
end
353376
end
354377
end

spec/concurrent/executor/cached_thread_pool_spec.rb

+15-15
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,13 @@ module Concurrent
152152

153153
context 'garbage collection' do
154154

155-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
155+
subject { described_class.new(idletime: 0.1, max_threads: 2) }
156156

157157
it 'removes from pool any thread that has been idle too long' do
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160+
sleep 0.4
160161
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
164162
expect(subject.length).to be < 4
165163
end
166164

@@ -197,25 +195,27 @@ module Concurrent
197195
expect(subject.length).to be >= 5
198196
3.times { subject << proc { sleep(1) } }
199197
sleep(0.1)
200-
expect(subject.length).to be >= 5
198+
expect(subject.length).to be >= 3
201199
end
202200
end
203201
end
204202

205203
context 'stress' do
206204
configurations = [
207-
{ min_threads: 2,
208-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
209-
idletime: 0.1, # 1 minute
210-
max_queue: 0, # unlimited
205+
{
206+
min_threads: 2,
207+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
208+
idletime: 60, # 1 minute
209+
max_queue: 0, # unlimited
211210
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
212-
gc_interval: 0.1 },
213-
{ min_threads: 2,
214-
max_threads: 4,
215-
idletime: 0.1, # 1 minute
216-
max_queue: 0, # unlimited
211+
},
212+
{
213+
min_threads: 2,
214+
max_threads: 4,
215+
idletime: 60, # 1 minute
216+
max_queue: 0, # unlimited
217217
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
218-
gc_interval: 0.1 }
218+
}
219219
]
220220

221221
configurations.each do |config|

spec/concurrent/executor/java_thread_pool_executor_spec.rb

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ module Concurrent
2626

2727
it_should_behave_like :thread_pool_executor
2828

29-
context :prune do
30-
it "is a no-op, pruning is handled by the JVM" do
31-
executor = JavaThreadPoolExecutor.new
32-
executor.prune_pool
33-
end
34-
end
35-
3629
context '#overload_policy' do
3730

3831
specify ':abort maps to AbortPolicy' do

0 commit comments

Comments
 (0)