Skip to content

Commit 6e5e003

Browse files
committed
Less aggressive async pruning for RubyThreadPoolExecutor
1 parent dbfbc14 commit 6e5e003

File tree

5 files changed

+46
-33
lines changed

5 files changed

+46
-33
lines changed

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Concurrent
1212
# @!macro thread_pool_executor_constant_default_max_queue_size
1313
# Default maximum number of tasks that may be added to the task queue.
1414

15-
# @!macro thread_pool_executor_constant_default_thread_timeout
15+
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
1616
# Default maximum number of seconds a thread in the pool may remain idle
1717
# before being reclaimed.
1818

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class JavaThreadPoolExecutor < JavaExecutorService
1818
# @!macro thread_pool_executor_constant_default_max_queue_size
1919
DEFAULT_MAX_QUEUE_SIZE = 0
2020

21-
# @!macro thread_pool_executor_constant_default_thread_timeout
21+
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
2222
DEFAULT_THREAD_IDLETIMEOUT = 60
2323

2424
# @!macro thread_pool_executor_constant_default_synchronous

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+25-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ class RubyThreadPoolExecutor < RubyExecutorService
2020
# @!macro thread_pool_executor_constant_default_max_queue_size
2121
DEFAULT_MAX_QUEUE_SIZE = 0
2222

23-
# @!macro thread_pool_executor_constant_default_thread_timeout
23+
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
2424
DEFAULT_THREAD_IDLETIMEOUT = 60
2525

26+
# @!macro thread_pool_executor_constant_default_pool_prune_timeout
27+
DEFAULT_POOL_PRUNETIMEOUT = 30
28+
2629
# @!macro thread_pool_executor_constant_default_synchronous
2730
DEFAULT_SYNCHRONOUS = false
2831

@@ -149,6 +152,8 @@ def ns_initialize(opts)
149152

150153
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151154
@next_gc_time = Concurrent.monotonic_time + @gc_interval
155+
156+
ns_set_pruner
152157
end
153158

154159
# @!visibility private
@@ -166,7 +171,6 @@ def ns_execute(*args, &task)
166171
return fallback_action(*args, &task)
167172
end
168173

169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170174
nil
171175
end
172176

@@ -183,6 +187,8 @@ def ns_shutdown_execution
183187
# no more tasks will be accepted, just stop all workers
184188
@pool.each(&:stop)
185189
end
190+
191+
ns_pruner&.kill
186192
end
187193

188194
# @!visibility private
@@ -218,7 +224,7 @@ def ns_assign_worker(*args, &task)
218224
# @!visibility private
219225
def ns_enqueue(*args, &task)
220226
return false if @synchronous
221-
227+
222228
if !ns_limited_queue? || @queue.size < @max_queue
223229
@queue << [task, args]
224230
true
@@ -303,8 +309,24 @@ def ns_reset_if_forked
303309
@largest_length = 0
304310
@workers_counter = 0
305311
@ruby_pid = $$
312+
313+
ns_set_pruner
314+
end
315+
end
316+
317+
def ns_pruner
318+
return if @min_length == @max_length
319+
320+
return @pruner if @pruner && @pruner.alive?
321+
322+
@pruner = Thread.new do
323+
until stopped_event&.set?
324+
sleep DEFAULT_POOL_PRUNETIMEOUT
325+
ns_prune_pool
326+
end
306327
end
307328
end
329+
alias_method :ns_set_pruner, :ns_pruner
308330

309331
# @!visibility private
310332
class Worker

spec/concurrent/executor/cached_thread_pool_spec.rb

+1-3
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,7 @@ module Concurrent
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160160
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
161+
sleep 36
164162
expect(subject.length).to be < 4
165163
end
166164

spec/concurrent/executor/ruby_thread_pool_executor_spec.rb

+18-25
Original file line numberDiff line numberDiff line change
@@ -56,56 +56,49 @@ def wakeup_thread_group(group)
5656
end
5757

5858
before(:each) do
59-
@now = Concurrent.monotonic_time
60-
allow(Concurrent).to receive(:monotonic_time) { @now }
61-
6259
@group1 = prepare_thread_group(5)
6360
@group2 = prepare_thread_group(5)
6461
end
6562

6663
def eventually(mutex: nil, timeout: 5, &block)
67-
start = Time.now
68-
while Time.now - start < timeout
69-
begin
70-
if mutex
71-
mutex.synchronize do
72-
return yield
73-
end
74-
else
64+
start = Time.now
65+
while Time.now - start < timeout
66+
begin
67+
if mutex
68+
mutex.synchronize do
7569
return yield
7670
end
77-
rescue Exception => last_failure
71+
else
72+
return yield
7873
end
79-
Thread.pass
74+
rescue Exception => last_failure
8075
end
81-
raise last_failure
76+
Thread.pass
77+
end
78+
raise last_failure
8279
end
8380

84-
it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
81+
it "triggers pruning if the thread idletimes have elapsed and the prunetime has elapsed" do
8582
wakeup_thread_group(@group1)
86-
@now += 6
83+
sleep 36
8784
wakeup_thread_group(@group2)
88-
subject.post { }
8985

9086
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
9187
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
9288
end
9389

94-
it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
90+
it "does not trigger pruning if the thread idletimes have elapsed but the prunetime has not elapsed" do
9591
wakeup_thread_group(@group1)
96-
@now += 3
97-
subject.prune_pool
98-
@now += 3
92+
sleep 6
9993
wakeup_thread_group(@group2)
100-
subject.post { }
10194

102-
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
95+
eventually { expect(@group1.threads).to all(have_attributes(status: 'sleep')) }
10396
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
10497
end
10598

10699
it "reclaims threads that have been idle for more than idletime seconds" do
107100
wakeup_thread_group(@group1)
108-
@now += 6
101+
sleep 6
109102
wakeup_thread_group(@group2)
110103
subject.prune_pool
111104

@@ -116,7 +109,7 @@ def eventually(mutex: nil, timeout: 5, &block)
116109
it "keeps at least min_length workers" do
117110
wakeup_thread_group(@group1)
118111
wakeup_thread_group(@group2)
119-
@now += 12
112+
sleep 12
120113
subject.prune_pool
121114
all_threads = @group1.threads + @group2.threads
122115
eventually do

0 commit comments

Comments
 (0)