Skip to content

Commit e80a9dd

Browse files
committed
implement pthread_workqueue within libdispatch
Provide an implementation of the pthread_workqueue functionality (libpwq) as a fully integrated component of libdispatch. Integration of the workqueue implementation into the libdispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. The overall design is to augment libdispatch's existing pthread_root_queue option with periodic user-space monitoring of the status of a root queue's worker threads and to oversubscribe the workqueue when not enough workers are observed to be runnable when the queue has available work. Initially, the integrated pthread_workqueue is only enabled by default on Linux. The current monitoring implementation relies on Linux-specific code to dynamically estimate the number of runnable worker threads by reading /proc. Porting the monitoring code to non-Linux platforms would entail providing similar functionality for those platforms.
1 parent f84d21d commit e80a9dd

File tree

8 files changed

+464
-36
lines changed

8 files changed

+464
-36
lines changed

configure.ac

+32-12
Original file line numberDiff line numberDiff line change
@@ -320,21 +320,41 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa
320320
AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h])
321321

322322
# pthread_workqueues.
323-
# Look for own version first, then system version.
324-
AS_IF([test -f $srcdir/libpwq/configure.ac],
325-
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
326-
ac_configure_args="--disable-libpwq-install $ac_configure_args"
327-
AC_CONFIG_SUBDIRS([libpwq])
328-
build_own_pthread_workqueues=true
323+
# Look for own version first, then for libpwq in our source tree, finally system version.
324+
AC_ARG_ENABLE([internal-libpwq],
325+
[AS_HELP_STRING([--enable-internal-libpwq],
326+
[Use libdispatch's own implementation of pthread workqueues.])],,
327+
[case $target_os in
328+
linux*)
329+
enable_internal_libpwq=yes
330+
;;
331+
*)
332+
enable_internal_libpwq=no
333+
esac]
334+
)
335+
AS_IF([test "x$enable_internal_libpwq" = "xyes"],
336+
[AC_DEFINE(HAVE_INTERNAL_PTHREAD_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API])
329337
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
330-
have_pthread_workqueues=true],
331-
[build_own_pthread_workqueues=false
332-
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
333-
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
338+
have_internal_pthread_workqueues=true
339+
have_pthread_workqueues=true
340+
build_own_pthread_workqueues=false],
341+
[have_internal_pthread_workqueues=false
342+
AS_IF([test -f $srcdir/libpwq/configure.ac],
343+
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
344+
ac_configure_args="--disable-libpwq-install $ac_configure_args"
345+
AC_CONFIG_SUBDIRS([libpwq])
346+
build_own_pthread_workqueues=true
347+
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
334348
have_pthread_workqueues=true],
335-
[have_pthread_workqueues=false]
336-
)]
349+
[build_own_pthread_workqueues=false
350+
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
351+
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
352+
have_pthread_workqueues=true],
353+
[have_pthread_workqueues=false]
354+
)]
355+
)]
337356
)
357+
AM_CONDITIONAL(HAVE_INTERNAL_PTHREAD_WORKQUEUES, $have_internal_pthread_workqueues)
338358
AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues)
339359
AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues)
340360

src/Makefile.am

+10-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ else
99
lib_LTLIBRARIES=libdispatch.la
1010
endif
1111

12+
if HAVE_INTERNAL_PTHREAD_WORKQUEUES
13+
INTERNAL_WORKQUEUE_SOURCES= \
14+
event/workqueue.c \
15+
event/workqueue_internal.h
16+
endif
17+
1218
libdispatch_la_SOURCES= \
1319
allocator.c \
1420
apply.c \
@@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \
6066
shims/perfmon.h \
6167
shims/time.h \
6268
shims/tsd.h \
63-
shims/yield.h
69+
shims/yield.h \
70+
$(INTERNAL_WORKQUEUE_SOURCES)
6471

6572
EXTRA_libdispatch_la_SOURCES=
6673
EXTRA_libdispatch_la_DEPENDENCIES=
@@ -77,6 +84,7 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS)
7784
AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7885
AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7986

87+
if !HAVE_INTERNAL_PTHREAD_WORKQUEUES
8088
if BUILD_OWN_PTHREAD_WORKQUEUES
8189
PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la
8290
PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include
@@ -85,6 +93,7 @@ if HAVE_PTHREAD_WORKQUEUES
8593
PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue
8694
endif
8795
endif
96+
endif
8897

8998
if BUILD_OWN_BLOCKS_RUNTIME
9099
libdispatch_la_SOURCES+= BlocksRuntime/data.c BlocksRuntime/runtime.c

src/event/workqueue.c

+287
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* Copyright (c) 2017-2017 Apple Inc. All rights reserved.
3+
*
4+
* @APPLE_APACHE_LICENSE_HEADER_START@
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* @APPLE_APACHE_LICENSE_HEADER_END@
19+
*/
20+
21+
#include "internal.h"
22+
23+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
24+
25+
/*
26+
* dispatch_workq monitors the thread pool that is
27+
* executing the work enqueued on libdispatch's pthread
28+
* root queues and dynamically adjusts its size.
29+
*
30+
* The dynamic monitoring can be implemented using either
31+
* (a) low-frequency user-level approximation of the number of runnable
32+
* worker threads via reading the /proc file system
33+
* (b) a Linux kernel extension that hooks the process change handler
34+
* to accurately track the number of runnable normal worker threads
35+
* Using either form of monitoring, if (i) there appears to be
36+
* work available in the monitored pthread root queue, (ii) the
37+
* number of runnable workers is below the target size for the pool,
38+
* and (iii) the total number of worker threads is below an upper limit,
39+
* then an additional worker thread will be added to the pool.
40+
*/
41+
42+
#pragma mark static data for management subsystem
43+
44+
#if DISPATCH_ENABLE_PWQ_KEXT
45+
/* Are we using user-level or kext based management? */
46+
static bool _dispatch_workq_kext_active;
47+
#endif
48+
49+
/*
50+
* State for the user-level monitoring of a workqueue.
51+
*/
52+
typedef struct dispatch_workq_manager_s {
53+
/* The observed number of runnable registered workers */
54+
int32_t runnable_workers;
55+
56+
/* The desired number of runnable registered workers */
57+
int32_t target_runnable_workers;
58+
59+
/*
60+
* Tracking of registered workers; all accesses must hold lock.
61+
* Invariant: registered_workers[0]...registered_workers[num_registered_workers-1]
62+
* contain the pids of the workers that we are managing.
63+
*/
64+
dispatch_unfair_lock_s registered_worker_lock;
65+
pid_t *registered_workers;
66+
int num_registered_workers;
67+
} dispatch_workq_manager_s, *dispatch_workq_manager_t;
68+
69+
static dispatch_workq_manager_s _dispatch_workq_manager;
70+
71+
#pragma mark inline helper functions
72+
73+
DISPATCH_INLINE
74+
dispatch_workq_manager_t
75+
_dispatch_workq_get_default_manager() {
76+
return &_dispatch_workq_manager;
77+
}
78+
79+
DISPATCH_INLINE
80+
dispatch_queue_t
81+
_dispatch_workq_get_default_root_queue()
82+
{
83+
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
84+
}
85+
86+
DISPATCH_INLINE
87+
bool
88+
_dispatch_workq_root_queue_has_work(dispatch_queue_t dq)
89+
{
90+
return dq->dq_items_tail != NULL;
91+
}
92+
93+
#pragma mark Implementation of the management subsystem.
94+
95+
#define WORKQ_MAX_TRACKED_WORKERS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
96+
#define WORKQ_OVERSUBSCRIBE_FACTOR 2
97+
98+
static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
99+
static dispatch_once_t _dispatch_workq_init_once_pred;
100+
101+
bool
102+
dispatch_workq_worker_register(dispatch_queue_t root_q)
103+
{
104+
if (root_q != _dispatch_workq_get_default_root_queue()) {
105+
// Not tracked.
106+
return false;
107+
}
108+
109+
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
110+
111+
#if DISPATCH_ENABLE_PWQ_KEXT
112+
if (_dispatch_workq_kext_active) {
113+
_dispatch_workq_worker_register_kext();
114+
return true;
115+
}
116+
#endif
117+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
118+
bool rc;
119+
int tid = syscall(SYS_gettid);
120+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
121+
if (mgr->num_registered_workers < WORKQ_MAX_TRACKED_WORKERS-1) {
122+
int worker_id = mgr->num_registered_workers++;
123+
mgr->registered_workers[worker_id] = tid;
124+
rc = true;
125+
} else {
126+
rc = false;
127+
}
128+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
129+
130+
return rc;
131+
}
132+
133+
void
134+
dispatch_workq_worker_unregister(dispatch_queue_t root_q)
135+
{
136+
if (root_q != _dispatch_workq_get_default_root_queue()) {
137+
// Not tracked.
138+
return;
139+
}
140+
141+
#if DISPATCH_ENABLE_PWQ_KEXT
142+
if (_dispatch_workq_kext_active) {
143+
_dispatch_workq_worker_register_kext();
144+
return;
145+
}
146+
#endif
147+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
148+
int tid = syscall(SYS_gettid);
149+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
150+
for (int i=0; i<mgr->num_registered_workers; i++) {
151+
if (mgr->registered_workers[i] == tid) {
152+
int last = mgr->num_registered_workers - 1;
153+
mgr->registered_workers[i] = mgr->registered_workers[last];
154+
mgr->registered_workers[last] = 0;
155+
mgr->num_registered_workers--;
156+
break;
157+
}
158+
}
159+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
160+
}
161+
162+
163+
/*
164+
* For each pid that is a registered worker, read /proc/[pid]/stat
165+
* to get a count of the number of them that are actually runnable.
166+
* See the proc(5) man page for the format of the contents of /proc/[pid]/stat
167+
*/
168+
static void
169+
_dispatch_workq_count_runnable_workers(dispatch_workq_manager_t mgr)
170+
{
171+
char path[128];
172+
char buf[4096];
173+
int running_count = 0;
174+
175+
memset(buf, 0, sizeof(buf));
176+
177+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
178+
179+
for (int i=0; i<mgr->num_registered_workers; i++) {
180+
pid_t worker_pid = mgr->registered_workers[i];
181+
int fd;
182+
size_t bytes_read = -1;
183+
184+
int r = snprintf(path, sizeof(path), "/proc/%d/stat", worker_pid);
185+
(void)dispatch_assume(r > 0 && r < sizeof(path));
186+
187+
fd = open(path, O_RDONLY | O_NONBLOCK);
188+
if (unlikely(fd == -1)) {
189+
// Unable to open file.
190+
// Must mean worker exited uncleanly (without executing _dispatch_worker_unregister())
191+
// Clean up by removing pid and decrementing number of registered workers
192+
_dispatch_debug("workq: Unable to open /proc/%d/stat; removing worker from monitoring list", worker_pid);
193+
int last = mgr->num_registered_workers-1;
194+
mgr->registered_workers[i] = mgr->registered_workers[last];
195+
mgr->registered_workers[last] = 0;
196+
mgr->num_registered_workers--;
197+
} else {
198+
bytes_read = read(fd, buf, sizeof(buf));
199+
(void)close(fd);
200+
}
201+
202+
if (bytes_read > 0) {
203+
char state;
204+
if (sscanf(buf, "%*d %*s %c", &state) == 1) {
205+
// _dispatch_debug("workq: The state of worker %d is %c\n", worker_pid, state);
206+
if (state == 'R') {
207+
running_count++;
208+
}
209+
} else {
210+
_dispatch_debug("workq: Failed to scan state for worker %d", worker_pid);
211+
}
212+
memset(buf, 0, bytes_read);
213+
} else {
214+
_dispatch_debug("workq: Failed to read %s", path);
215+
}
216+
}
217+
218+
mgr->runnable_workers = running_count;
219+
220+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
221+
}
222+
223+
static void
224+
_dispatch_workq_monitor_thread_pool(void *context DISPATCH_UNUSED)
225+
{
226+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
227+
dispatch_queue_t dq = _dispatch_workq_get_default_root_queue();
228+
bool work_available = _dispatch_workq_root_queue_has_work(dq);
229+
230+
if (!work_available) {
231+
_dispatch_debug("workq: %s is empty; doing nothing", dq->dq_label);
232+
return;
233+
}
234+
235+
_dispatch_workq_count_runnable_workers(mgr);
236+
int32_t count = _dispatch_pthread_root_queue_thread_pool_size(dq);
237+
238+
_dispatch_debug("workq: %s is non-empty with pool count %d (%d runnable)",
239+
dq->dq_label, count, mgr->runnable_workers);
240+
241+
if (mgr->runnable_workers < mgr->target_runnable_workers) {
242+
// If we are below target there are two cases to consider:
243+
// (a) We are below target, but some workers are still runnable.
244+
// We want to oversubscribe to hit the target, but this
245+
// may be transitory so only go up to a small multiple
246+
// of threads per core.
247+
// (b) We are below target, and no worker is runnable.
248+
// It is likely the program is stalled. Therefore treat
249+
// this as dq was an overcommit queue and create
250+
// another worker unless we have already hit the hard
251+
// limit on the maximum number of workers for dq.
252+
int32_t oversubscribe_limit = WORKQ_OVERSUBSCRIBE_FACTOR * mgr->target_runnable_workers;
253+
int32_t stalled_limit = WORKQ_MAX_TRACKED_WORKERS - mgr->target_runnable_workers;
254+
int32_t limit = mgr->runnable_workers == 0 ? stalled_limit : MIN(stalled_limit, oversubscribe_limit);
255+
if (count + limit > 0) {
256+
_dispatch_debug("workq: adding 1 additional worker to %s", dq->dq_label);
257+
_dispatch_pthread_root_queue_oversubscribe(dq, 1);
258+
} else {
259+
_dispatch_debug("workq: %s already oversubscribed by %d; taking no action",
260+
dq->dq_label, -count);
261+
}
262+
}
263+
}
264+
265+
static void
266+
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
267+
{
268+
#if DISPATCH_ENABLE_PWQ_KEXT
269+
// Initialize kext subsystem.
270+
_dispatch_workq_kext_active = _dispatch_workq_management_init_kext();
271+
272+
if (_dispatch_workq_kext_active) {
273+
return;
274+
}
275+
#endif
276+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
277+
mgr->registered_workers = _dispatch_calloc(WORKQ_MAX_TRACKED_WORKERS, sizeof(pid_t));
278+
mgr->target_runnable_workers = dispatch_hw_config(active_cpus);
279+
280+
// Create monitoring timer that will periodically run on dispatch_mgr_q
281+
dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, &_dispatch_mgr_q);
282+
dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0), NSEC_PER_SEC, 0);
283+
dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_thread_pool);
284+
dispatch_activate(ds);
285+
}
286+
287+
#endif // HAVE_INTERNAL_PTHREAD_WORKQUEUE

0 commit comments

Comments
 (0)