Skip to content

Commit 904c94d

Browse files
authored
Expose the number of threads that are actively executing tasks with ThreadPoolExecutor#active_count. (#1002)
1 parent da2d27c commit 904c94d

File tree

4 files changed

+55
-8
lines changed

4 files changed

+55
-8
lines changed

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ module Concurrent
3939
# The number of tasks that have been completed by the pool since construction.
4040
# @return [Integer] The number of tasks that have been completed by the pool since construction.
4141

42+
# @!macro thread_pool_executor_method_active_count
43+
# The number of threads that are actively executing tasks.
44+
# @return [Integer] The number of threads that are actively executing tasks.
45+
4246
# @!macro thread_pool_executor_attr_reader_idletime
4347
# The number of seconds that a thread may be idle before being reclaimed.
4448
# @return [Integer] The number of seconds that a thread may be idle before being reclaimed.

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ def completed_task_count
7373
@executor.getCompletedTaskCount
7474
end
7575

76+
# @!macro thread_pool_executor_method_active_count
77+
def active_count
78+
@executor.getActiveCount
79+
end
80+
7681
# @!macro thread_pool_executor_attr_reader_idletime
7782
def idletime
7883
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ def completed_task_count
6161
synchronize { @completed_task_count }
6262
end
6363

64+
# @!macro thread_pool_executor_method_active_count
65+
def active_count
66+
synchronize do
67+
@pool.length - @ready.length
68+
end
69+
end
70+
6471
# @!macro executor_service_method_can_overflow_question
6572
def can_overflow?
6673
synchronize { ns_limited_queue? }

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require_relative 'thread_pool_shared'
22
require 'concurrent/atomic/atomic_fixnum'
3+
require 'concurrent/atomic/cyclic_barrier'
34

45
RSpec.shared_examples :thread_pool_executor do
56

@@ -258,6 +259,36 @@
258259
end
259260
end
260261

262+
context '#active_count' do
263+
subject do
264+
described_class.new(
265+
min_threads: 5,
266+
max_threads: 10,
267+
idletime: 60,
268+
max_queue: 0,
269+
fallback_policy: :discard
270+
)
271+
end
272+
273+
it 'returns the number of threads that are actively executing tasks.' do
274+
barrier = Concurrent::CyclicBarrier.new(4)
275+
latch = Concurrent::CountDownLatch.new(1)
276+
277+
3.times do
278+
subject.post do
279+
barrier.wait
280+
latch.wait
281+
end
282+
end
283+
barrier.wait
284+
285+
expect(subject.active_count).to eq 3
286+
287+
# release
288+
latch.count_down
289+
end
290+
end
291+
261292
context '#fallback_policy' do
262293

263294
let!(:min_threads){ 1 }
@@ -627,33 +658,33 @@
627658
max_threads: 1,
628659
max_queue: 1,
629660
fallback_policy: :caller_runs)
630-
661+
631662
worker_unblocker = Concurrent::CountDownLatch.new(1)
632663
executor_unblocker = Concurrent::CountDownLatch.new(1)
633664
queue_done = Concurrent::CountDownLatch.new(1)
634-
665+
635666
# Block the worker thread
636667
executor << proc { worker_unblocker.wait }
637-
668+
638669
# Fill the queue
639670
executor << proc { log.push :queued; queue_done.count_down }
640-
671+
641672
# Block in a caller_runs job
642673
caller_runs_thread = Thread.new {
643674
executor << proc { executor_unblocker.wait; log.push :unblocked }
644675
}
645-
676+
646677
# Wait until the caller_runs job is blocked
647678
Thread.pass until caller_runs_thread.status == 'sleep'
648-
679+
649680
# Now unblock the worker thread
650681
worker_unblocker.count_down
651682
queue_done.wait
652683
executor_unblocker.count_down
653-
684+
654685
# Tidy up
655686
caller_runs_thread.join
656-
687+
657688
# We will see the queued jobs run before the caller_runs job unblocks
658689
expect([log.pop, log.pop]).to eq [:queued, :unblocked]
659690
end

0 commit comments

Comments
 (0)