Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less aggressive async pruning for RubyThreadPoolExecutor #1079

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Concurrent
# @!macro thread_pool_executor_constant_default_max_queue_size
# Default maximum number of tasks that may be added to the task queue.

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
# Default maximum number of seconds a thread in the pool may remain idle
# before being reclaimed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class JavaThreadPoolExecutor < JavaExecutorService
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60

# @!macro thread_pool_executor_constant_default_synchronous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ class RubyThreadPoolExecutor < RubyExecutorService
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0

# @!macro thread_pool_executor_constant_default_thread_timeout
# @!macro thread_pool_executor_constant_default_thread_idle_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60

# @!macro thread_pool_executor_constant_default_pool_prune_timeout
DEFAULT_POOL_PRUNETIMEOUT = 30
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A somewhat arbitrary number, but this is also what puma uses.


# @!macro thread_pool_executor_constant_default_synchronous
DEFAULT_SYNCHRONOUS = false

Expand Down Expand Up @@ -149,6 +152,8 @@ def ns_initialize(opts)

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval

ns_set_pruner
end

# @!visibility private
Expand All @@ -166,7 +171,6 @@ def ns_execute(*args, &task)
return fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

Expand All @@ -183,6 +187,8 @@ def ns_shutdown_execution
# no more tasks will be accepted, just stop all workers
@pool.each(&:stop)
end

ns_pruner&.kill
end

# @!visibility private
Expand Down Expand Up @@ -218,7 +224,7 @@ def ns_assign_worker(*args, &task)
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
Expand Down Expand Up @@ -303,8 +309,24 @@ def ns_reset_if_forked
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$

ns_set_pruner
end
end

def ns_pruner
return if @min_length == @max_length

return @pruner if @pruner && @pruner.alive?

@pruner = Thread.new do
until stopped_event&.set?
sleep DEFAULT_POOL_PRUNETIMEOUT
ns_prune_pool
end
end
end
alias_method :ns_set_pruner, :ns_pruner

# @!visibility private
class Worker
Expand Down
4 changes: 1 addition & 3 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ module Concurrent
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
sleep 36
expect(subject.length).to be < 4
end

Expand Down
43 changes: 18 additions & 25 deletions spec/concurrent/executor/ruby_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,56 +56,49 @@ def wakeup_thread_group(group)
end

before(:each) do
@now = Concurrent.monotonic_time
allow(Concurrent).to receive(:monotonic_time) { @now }

@group1 = prepare_thread_group(5)
@group2 = prepare_thread_group(5)
end

def eventually(mutex: nil, timeout: 5, &block)
start = Time.now
while Time.now - start < timeout
begin
if mutex
mutex.synchronize do
return yield
end
else
start = Time.now
while Time.now - start < timeout
begin
if mutex
mutex.synchronize do
return yield
end
rescue Exception => last_failure
else
return yield
end
Thread.pass
rescue Exception => last_failure
end
raise last_failure
Thread.pass
end
raise last_failure
end

it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
it "triggers pruning if the thread idletimes have elapsed and the prunetime has elapsed" do
wakeup_thread_group(@group1)
@now += 6
sleep 36
wakeup_thread_group(@group2)
subject.post { }

eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end

it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
it "does not trigger pruning if the thread idletimes have elapsed but the prunetime has not elapsed" do
wakeup_thread_group(@group1)
@now += 3
subject.prune_pool
@now += 3
sleep 6
wakeup_thread_group(@group2)
subject.post { }

eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group1.threads).to all(have_attributes(status: 'sleep')) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end

it "reclaims threads that have been idle for more than idletime seconds" do
wakeup_thread_group(@group1)
@now += 6
sleep 6
wakeup_thread_group(@group2)
subject.prune_pool

Expand All @@ -116,7 +109,7 @@ def eventually(mutex: nil, timeout: 5, &block)
it "keeps at least min_length workers" do
wakeup_thread_group(@group1)
wakeup_thread_group(@group2)
@now += 12
sleep 12
subject.prune_pool
all_threads = @group1.threads + @group2.threads
eventually do
Expand Down
Loading