Skip to content

Commit 703b1d5

Browse files
committedMay 18, 2017
implement pthread_workqueue within libdispatch
Provide an implementation of the pthread_workqueue functionality (libpwq) as a fully integrated component of libdispatch. Integration of the workqueue implementation into the libdispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. The overall design is to augment libdispatch's existing pthread_root_queue option with periodic user-space monitoring of the status of a root queue's worker threads and to oversubscribe the workqueue when not enough workers are observed to be runnable when the queue has available work. Initially, the integrated pthread_workqueue is only enabled by default on Linux. The current monitoring implementation relies on Linux-specific code to dynamically estimate the number of runnable worker threads by reading /proc. Porting the monitoring code to non-Linux platforms would entail providing similar functionality for those platforms. Remove libpwq git submodule and autotools support for building libpwq from source as part of building libdispatch.

13 files changed

+445
-60
lines changed
 

‎.gitmodules

-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +0,0 @@
1-
[submodule "libpwq"]
2-
path = libpwq
3-
url = https://github.com/mheily/libpwq.git

‎Makefile.am

-5
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,12 @@
44

55
ACLOCAL_AMFLAGS = -I m4
66

7-
if BUILD_OWN_PTHREAD_WORKQUEUES
8-
MAYBE_PTHREAD_WORKQUEUES = libpwq
9-
endif
10-
117
if BUILD_TESTS
128
MAYBE_TESTS = tests
139
endif
1410

1511
SUBDIRS= \
1612
dispatch \
17-
$(MAYBE_PTHREAD_WORKQUEUES) \
1813
man \
1914
os \
2015
private \

‎configure.ac

+18-9
Original file line numberDiff line numberDiff line change
@@ -320,22 +320,31 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa
320320
AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h])
321321

322322
# pthread_workqueues.
323-
# Look for own version first, then system version.
324-
AS_IF([test -f $srcdir/libpwq/configure.ac],
325-
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
326-
ac_configure_args="--disable-libpwq-install $ac_configure_args"
327-
AC_CONFIG_SUBDIRS([libpwq])
328-
build_own_pthread_workqueues=true
323+
# Look for own version first, than see if there is a system version.
324+
AC_ARG_ENABLE([internal-libpwq],
325+
[AS_HELP_STRING([--enable-internal-libpwq],
326+
[Use libdispatch's own implementation of pthread workqueues.])],,
327+
[case $target_os in
328+
linux*)
329+
enable_internal_libpwq=yes
330+
;;
331+
*)
332+
enable_internal_libpwq=no
333+
esac]
334+
)
335+
AS_IF([test "x$enable_internal_libpwq" = "xyes"],
336+
[AC_DEFINE(DISPATCH_USE_INTERNAL_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API])
329337
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
338+
dispatch_use_internal_workqueue=true
330339
have_pthread_workqueues=true],
331-
[build_own_pthread_workqueues=false
340+
[dispatch_use_internal_workqueue=false
332341
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
333342
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
334343
have_pthread_workqueues=true],
335344
[have_pthread_workqueues=false]
336-
)]
345+
)]
337346
)
338-
AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues)
347+
AM_CONDITIONAL(DISPATCH_USE_INTERNAL_WORKQUEUE, $dispatch_use_internal_workqueue)
339348
AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues)
340349

341350
AC_CHECK_HEADERS([libproc_internal.h], [], [], [#include <mach/mach.h>])

‎libpwq

-1
This file was deleted.

‎src/Makefile.am

+12-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ else
99
lib_LTLIBRARIES=libdispatch.la
1010
endif
1111

12+
if DISPATCH_USE_INTERNAL_WORKQUEUE
13+
INTERNAL_WORKQUEUE_SOURCES= \
14+
event/workqueue.c \
15+
event/workqueue_internal.h
16+
endif
17+
1218
libdispatch_la_SOURCES= \
1319
allocator.c \
1420
apply.c \
@@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \
6066
shims/perfmon.h \
6167
shims/time.h \
6268
shims/tsd.h \
63-
shims/yield.h
69+
shims/yield.h \
70+
$(INTERNAL_WORKQUEUE_SOURCES)
6471

6572
EXTRA_libdispatch_la_SOURCES=
6673
EXTRA_libdispatch_la_DEPENDENCIES=
@@ -77,12 +84,13 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS)
7784
AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7885
AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7986

80-
if BUILD_OWN_PTHREAD_WORKQUEUES
81-
PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la
82-
PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include
87+
if DISPATCH_USE_INTERNAL_WORKQUEUE
88+
PTHREAD_WORKQUEUE_LIBS=
89+
PTHREAD_WORKQUEUE_CFLAGS=
8390
else
8491
if HAVE_PTHREAD_WORKQUEUES
8592
PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue
93+
PTHREAD_WORKQUEUE_CFLAGS=
8694
endif
8795
endif
8896

‎src/apply.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ static inline void
159159
_dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
160160
dispatch_function_t func)
161161
{
162-
uint32_t i = 0;
162+
int32_t i = 0;
163163
dispatch_continuation_t head = NULL, tail = NULL;
164164

165165
// The current thread does not need a continuation
166-
uint32_t continuation_cnt = da->da_thr_cnt - 1;
166+
int32_t continuation_cnt = da->da_thr_cnt - 1;
167167

168168
dispatch_assert(continuation_cnt);
169169

@@ -192,14 +192,14 @@ static void
192192
_dispatch_apply_redirect(void *ctxt)
193193
{
194194
dispatch_apply_t da = (dispatch_apply_t)ctxt;
195-
uint32_t da_width = da->da_thr_cnt - 1;
195+
int32_t da_width = da->da_thr_cnt - 1;
196196
dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
197197

198198
do {
199-
uint32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width);
199+
int32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width);
200200

201201
if (slowpath(da_width > width)) {
202-
uint32_t excess = da_width - width;
202+
int32_t excess = da_width - width;
203203
for (tq = dq; tq != rq; tq = tq->do_targetq) {
204204
_dispatch_queue_relinquish_width(tq, excess);
205205
}
@@ -234,7 +234,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
234234
if (slowpath(iterations == 0)) {
235235
return;
236236
}
237-
uint32_t thr_cnt = dispatch_hw_config(active_cpus);
237+
int32_t thr_cnt = dispatch_hw_config(active_cpus);
238238
dispatch_thread_context_t dtctxt = _dispatch_thread_context_find(_dispatch_apply_key);
239239
size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0;
240240
dispatch_queue_t old_dq = _dispatch_queue_get_current();
@@ -247,7 +247,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
247247
? nested * iterations : DISPATCH_APPLY_MAX;
248248
}
249249
if (iterations < thr_cnt) {
250-
thr_cnt = (uint32_t)iterations;
250+
thr_cnt = iterations;
251251
}
252252
if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
253253
dq = old_dq ? old_dq : _dispatch_get_root_queue(

0 commit comments

Comments
 (0)
Please sign in to comment.