Skip to content

Commit af4a6d9

Browse files
committed
implement pthread_workqueue within libdispatch
Provide an implementation of the pthread_workqueue functionality (libpwq) as a fully integrated component of libdispatch. Integration of the workqueue implementation into the libdispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. The overall design is to augment libdispatch's existing pthread_root_queue option with periodic user-space monitoring of the status of a root queue's worker threads and to oversubscribe the workqueue when not enough workers are observed to be runnable when the queue has available work. Initially, the integrated pthread_workqueue is only enabled by default on Linux. The current monitoring implementation relies on Linux-specific code to dynamically estimate the number of runnable worker threads by reading /proc. Porting the monitoring code to non-Linux platforms would entail providing similar functionality for those platforms.
1 parent f84d21d commit af4a6d9

10 files changed

+443
-50
lines changed

configure.ac

+32-12
Original file line numberDiff line numberDiff line change
@@ -320,21 +320,41 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa
320320
AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h])
321321

322322
# pthread_workqueues.
323-
# Look for own version first, then system version.
324-
AS_IF([test -f $srcdir/libpwq/configure.ac],
325-
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
326-
ac_configure_args="--disable-libpwq-install $ac_configure_args"
327-
AC_CONFIG_SUBDIRS([libpwq])
328-
build_own_pthread_workqueues=true
323+
# Look for own version first, then for libpwq in our source tree, finally system version.
324+
AC_ARG_ENABLE([internal-libpwq],
325+
[AS_HELP_STRING([--enable-internal-libpwq],
326+
[Use libdispatch's own implementation of pthread workqueues.])],,
327+
[case $target_os in
328+
linux*)
329+
enable_internal_libpwq=yes
330+
;;
331+
*)
332+
enable_internal_libpwq=no
333+
esac]
334+
)
335+
AS_IF([test "x$enable_internal_libpwq" = "xyes"],
336+
[AC_DEFINE(DISPATCH_USE_INTERNAL_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API])
329337
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
330-
have_pthread_workqueues=true],
331-
[build_own_pthread_workqueues=false
332-
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
333-
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
338+
dispatch_use_internal_workqueue=true
339+
have_pthread_workqueues=true
340+
build_own_pthread_workqueues=false],
341+
[dispatch_use_internal_workqueue=false
342+
AS_IF([test -f $srcdir/libpwq/configure.ac],
343+
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
344+
ac_configure_args="--disable-libpwq-install $ac_configure_args"
345+
AC_CONFIG_SUBDIRS([libpwq])
346+
build_own_pthread_workqueues=true
347+
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
334348
have_pthread_workqueues=true],
335-
[have_pthread_workqueues=false]
336-
)]
349+
[build_own_pthread_workqueues=false
350+
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
351+
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
352+
have_pthread_workqueues=true],
353+
[have_pthread_workqueues=false]
354+
)]
355+
)]
337356
)
357+
AM_CONDITIONAL(DISPATCH_USE_INTERNAL_WORKQUEUE, $dispatch_use_internal_workqueue)
338358
AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues)
339359
AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues)
340360

src/Makefile.am

+14-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ else
99
lib_LTLIBRARIES=libdispatch.la
1010
endif
1111

12+
if DISPATCH_USE_INTERNAL_WORKQUEUE
13+
INTERNAL_WORKQUEUE_SOURCES= \
14+
event/workqueue.c \
15+
event/workqueue_internal.h
16+
endif
17+
1218
libdispatch_la_SOURCES= \
1319
allocator.c \
1420
apply.c \
@@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \
6066
shims/perfmon.h \
6167
shims/time.h \
6268
shims/tsd.h \
63-
shims/yield.h
69+
shims/yield.h \
70+
$(INTERNAL_WORKQUEUE_SOURCES)
6471

6572
EXTRA_libdispatch_la_SOURCES=
6673
EXTRA_libdispatch_la_DEPENDENCIES=
@@ -77,12 +84,18 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS)
7784
AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7885
AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7986

87+
if DISPATCH_USE_INTERNAL_WORKQUEUE
88+
PTHREAD_WORKQUEUE_LIBS=
89+
PTHREAD_WORKQUEUE_CFLAGS=
90+
else
8091
if BUILD_OWN_PTHREAD_WORKQUEUES
8192
PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la
8293
PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include
8394
else
8495
if HAVE_PTHREAD_WORKQUEUES
8596
PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue
97+
PTHREAD_WORKQUEUE_CFLAGS=
98+
endif
8699
endif
87100
endif
88101

src/apply.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ static inline void
159159
_dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
160160
dispatch_function_t func)
161161
{
162-
uint32_t i = 0;
162+
int32_t i = 0;
163163
dispatch_continuation_t head = NULL, tail = NULL;
164164

165165
// The current thread does not need a continuation
166-
uint32_t continuation_cnt = da->da_thr_cnt - 1;
166+
int32_t continuation_cnt = da->da_thr_cnt - 1;
167167

168168
dispatch_assert(continuation_cnt);
169169

@@ -192,14 +192,14 @@ static void
192192
_dispatch_apply_redirect(void *ctxt)
193193
{
194194
dispatch_apply_t da = (dispatch_apply_t)ctxt;
195-
uint32_t da_width = da->da_thr_cnt - 1;
195+
int32_t da_width = da->da_thr_cnt - 1;
196196
dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
197197

198198
do {
199-
uint32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width);
199+
int32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width);
200200

201201
if (slowpath(da_width > width)) {
202-
uint32_t excess = da_width - width;
202+
int32_t excess = da_width - width;
203203
for (tq = dq; tq != rq; tq = tq->do_targetq) {
204204
_dispatch_queue_relinquish_width(tq, excess);
205205
}
@@ -234,7 +234,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
234234
if (slowpath(iterations == 0)) {
235235
return;
236236
}
237-
uint32_t thr_cnt = dispatch_hw_config(active_cpus);
237+
int32_t thr_cnt = dispatch_hw_config(active_cpus);
238238
dispatch_thread_context_t dtctxt = _dispatch_thread_context_find(_dispatch_apply_key);
239239
size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0;
240240
dispatch_queue_t old_dq = _dispatch_queue_get_current();
@@ -247,7 +247,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
247247
? nested * iterations : DISPATCH_APPLY_MAX;
248248
}
249249
if (iterations < thr_cnt) {
250-
thr_cnt = (uint32_t)iterations;
250+
thr_cnt = iterations;
251251
}
252252
if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
253253
dq = old_dq ? old_dq : _dispatch_get_root_queue(

0 commit comments

Comments
 (0)