Skip to content

Commit 168dafb

Browse files
authored
[Proxying] Cancel proxied work when the worker thread dies (#18741)
Update the synchronous proxying APIs to return 0 when attempting to proxy to a thread that has already exited or has been canceled or exits or is canceled before the proxied work is completed. To implement cancellation, add an optional `cancel` function pointer to the tasks queued in `em_task_queue`s, which are the internal thread-specific queues used in the implementation of `em_proxying_queue`. This `cancel` function pointer is not controllable directly by users of the API, but is rather used internally to ensure that synchronous waiters are notified when the thread they are waiting on dies. This mechanism will scale to other forms of cancellation notification in the future. For example, it could be used to reject a promise associated with the proxied work. When a thread dies, it runs the cancellation handlers for each task in its mailbox. Those tasks each carry a pointer to some task queue owned by a proxying queue, so their cancellation handlers in turn run the cancellation handlers for all the tasks in those pointed-to queues. Task queue cancellation handlers do not handle the case of proxied work that has been started but not finished when the worker thread dies because that work has already been removed from the task queues. To cancel in-progress work as well, update the implementation of synchronous proxying to place `em_proxying_ctx` objects associated with active work in a thread-local doubly linked list that is traversed by a registered thread specific data destructor. The destructor takes care of canceling the active work. Implements the design sketched out in #18631.
1 parent 0e7b185 commit 168dafb

14 files changed

+356
-26
lines changed

ChangeLog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ See docs/process.md for more on how version tagging works.
4545
SjLj, the combination we do not intend to support for the long term.
4646
- Added support for Wasm-based AudioWorklets for realtime audio processing
4747
(#16449)
48+
- Synchronous proxying functions in emscripten/proxying.h now return errors
49+
instead of hanging forever when the worker thread dies before the proxied work
50+
is finished.
4851

4952
3.1.31 - 01/26/23
5053
-----------------

site/source/docs/api_reference/proxying.h.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ Functions
9191
9292
Enqueue ``func`` to be called with argument ``arg`` on the given queue and
9393
thread then wait for ``func`` to be executed synchronously before returning.
94-
Returns 1 if the ``func`` was successfully completed and 0 otherwise.
94+
Returns 1 if the ``func`` was successfully completed and 0 otherwise,
95+
including if the target thread is canceled or exits before the work is
96+
completed.
9597
9698
.. c:function:: int emscripten_proxy_sync_with_ctx(em_proxying_queue* q, pthread_t target_thread, void (*func)(em_proxying_ctx*, void*), void* arg)
9799

system/include/emscripten/proxying.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ int emscripten_proxy_async_with_callback(em_proxying_queue* q,
7171

7272
// Enqueue `func` on the given queue and thread and wait for it to finish
7373
// executing before returning. Returns 1 if the task was successfully completed
74-
// and 0 otherwise.
74+
// and 0 otherwise, including if the target thread is canceled or exits before
75+
// the work is completed.
7576
int emscripten_proxy_sync(em_proxying_queue* q,
7677
pthread_t target_thread,
7778
void (*func)(void*),
@@ -82,7 +83,8 @@ int emscripten_proxy_sync(em_proxying_queue* q,
8283
// before returning. `func` need not call `emscripten_proxying_finish` itself;
8384
// it could instead store the context pointer and call
8485
// `emscripten_proxying_finish` at an arbitrary later time. Returns 1 if the
85-
// task was successfully completed and 0 otherwise.
86+
// task was successfully completed and 0 otherwise, including if the target
87+
// thread is canceled or exits before the work is completed.
8688
int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
8789
pthread_t target_thread,
8890
void (*func)(em_proxying_ctx*, void*),

system/lib/pthread/em_task_queue.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,23 @@ void em_task_queue_execute(em_task_queue* queue) {
180180
queue->processing = 0;
181181
}
182182

183+
void em_task_queue_cancel(em_task_queue* queue) {
184+
pthread_mutex_lock(&queue->mutex);
185+
while (!em_task_queue_is_empty(queue)) {
186+
task t = em_task_queue_dequeue(queue);
187+
if (t.cancel) {
188+
t.cancel(t.arg);
189+
}
190+
}
191+
pthread_mutex_unlock(&queue->mutex);
192+
// Any subsequent messages to this queue (for example if a pthread struct is
193+
// reused for a future thread, potentially on a different worker) will require
194+
// a new notification. Clearing the flag is safe here because in both the
195+
// proxying queue and mailbox cases, there are no more outstanding references
196+
// to the queue after thread shutdown.
197+
queue->notification = NOTIFICATION_NONE;
198+
}
199+
183200
int em_task_queue_enqueue(em_task_queue* queue, task t) {
184201
if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) {
185202
return 0;
@@ -204,6 +221,11 @@ static void receive_notification(void* arg) {
204221
&tasks->notification, &expected, NOTIFICATION_NONE);
205222
}
206223

224+
static void cancel_notification(void* arg) {
225+
em_task_queue* tasks = arg;
226+
em_task_queue_cancel(tasks);
227+
}
228+
207229
int em_task_queue_send(em_task_queue* queue, task t) {
208230
// Ensure the target mailbox will remain open or detect that it is already
209231
// closed.
@@ -228,8 +250,10 @@ int em_task_queue_send(em_task_queue* queue, task t) {
228250
return 1;
229251
}
230252

231-
emscripten_thread_mailbox_send(
232-
queue->thread, (task){.func = receive_notification, .arg = queue});
253+
emscripten_thread_mailbox_send(queue->thread,
254+
(task){.func = receive_notification,
255+
.cancel = cancel_notification,
256+
.arg = queue});
233257
emscripten_thread_mailbox_unref(queue->thread);
234258
return 1;
235259
}

system/lib/pthread/em_task_queue.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// A task is an arbitrary function combined with some arbitrary state.
1515
typedef struct task {
1616
void (*func)(void*);
17+
void (*cancel)(void*);
1718
void* arg;
1819
} task;
1920

@@ -53,6 +54,9 @@ void em_task_queue_destroy(em_task_queue* queue);
5354
// Execute tasks until an empty queue is observed. Internally locks the queue.
5455
void em_task_queue_execute(em_task_queue* queue);
5556

57+
// Cancel all tasks in the queue. Internally locks the queue.
58+
void em_task_queue_cancel(em_task_queue* queue);
59+
5660
// Not thread safe.
5761
static inline int em_task_queue_is_empty(em_task_queue* queue) {
5862
return queue->head == queue->tail;

system/lib/pthread/proxying.c

Lines changed: 105 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,7 @@ void emscripten_proxy_execute_queue(em_proxying_queue* q) {
135135
}
136136
}
137137

138-
int emscripten_proxy_async(em_proxying_queue* q,
139-
pthread_t target_thread,
140-
void (*func)(void*),
141-
void* arg) {
138+
static int do_proxy(em_proxying_queue* q, pthread_t target_thread, task t) {
142139
assert(q != NULL);
143140
pthread_mutex_lock(&q->mutex);
144141
em_task_queue* tasks = get_or_add_tasks_for_thread(q, target_thread);
@@ -147,26 +144,105 @@ int emscripten_proxy_async(em_proxying_queue* q,
147144
return 0;
148145
}
149146

150-
return em_task_queue_send(tasks, (task){func, arg});
147+
return em_task_queue_send(tasks, t);
148+
}
149+
150+
int emscripten_proxy_async(em_proxying_queue* q,
151+
pthread_t target_thread,
152+
void (*func)(void*),
153+
void* arg) {
154+
return do_proxy(q, target_thread, (task){func, NULL, arg});
151155
}
152156

157+
enum ctx_state { PENDING, DONE, CANCELED };
158+
153159
struct em_proxying_ctx {
154160
// The user-provided function and argument.
155161
void (*func)(em_proxying_ctx*, void*);
156162
void* arg;
157-
// Set `done` to 1 and signal the condition variable once the proxied task is
158-
// done.
159-
int done;
163+
// Update `state` and signal the condition variable once the proxied task is
164+
// done or canceled.
165+
enum ctx_state state;
160166
pthread_mutex_t mutex;
161167
pthread_cond_t cond;
168+
// A doubly linked list of contexts associated with active work on a single
169+
// thread. If the thread is canceled, it will traverse this list to find
170+
// contexts that need to be canceled.
171+
struct em_proxying_ctx* next;
172+
struct em_proxying_ctx* prev;
162173
};
163174

175+
// The key that `cancel_active_ctxs` is bound to so that it runs when a thread
176+
// is canceled or exits.
177+
static pthread_key_t active_ctxs;
178+
179+
static void cancel_ctx(void* arg);
180+
static void cancel_active_ctxs(void* arg);
181+
182+
static void init_active_ctxs(void) {
183+
int ret = pthread_key_create(&active_ctxs, cancel_active_ctxs);
184+
assert(ret == 0);
185+
(void)ret;
186+
}
187+
188+
static void add_active_ctx(em_proxying_ctx* ctx) {
189+
assert(ctx != NULL);
190+
em_proxying_ctx* head = pthread_getspecific(active_ctxs);
191+
if (head == NULL) {
192+
// This is the only active context; initialize the active contexts list.
193+
ctx->next = ctx->prev = ctx;
194+
pthread_setspecific(active_ctxs, ctx);
195+
} else {
196+
// Insert this context at the tail of the list just before `head`.
197+
ctx->next = head;
198+
ctx->prev = head->prev;
199+
ctx->next->prev = ctx;
200+
ctx->prev->next = ctx;
201+
}
202+
}
203+
204+
static void remove_active_ctx(em_proxying_ctx* ctx) {
205+
assert(ctx != NULL);
206+
assert(ctx->next != NULL);
207+
assert(ctx->prev != NULL);
208+
if (ctx->next == ctx) {
209+
// This is the only active context; clear the active contexts list.
210+
ctx->next = ctx->prev = NULL;
211+
pthread_setspecific(active_ctxs, NULL);
212+
return;
213+
}
214+
215+
// Update the list head if we are removing the current head.
216+
em_proxying_ctx* head = pthread_getspecific(active_ctxs);
217+
if (ctx == head) {
218+
pthread_setspecific(active_ctxs, head->next);
219+
}
220+
221+
// Remove the context from the list.
222+
ctx->prev->next = ctx->next;
223+
ctx->next->prev = ctx->prev;
224+
ctx->next = ctx->prev = NULL;
225+
}
226+
227+
static void cancel_active_ctxs(void* arg) {
228+
pthread_setspecific(active_ctxs, NULL);
229+
em_proxying_ctx* head = arg;
230+
em_proxying_ctx* curr = head;
231+
do {
232+
em_proxying_ctx* next = curr->next;
233+
cancel_ctx(curr);
234+
curr = next;
235+
} while (curr != head);
236+
}
237+
164238
static void em_proxying_ctx_init(em_proxying_ctx* ctx,
165239
void (*func)(em_proxying_ctx*, void*),
166240
void* arg) {
241+
static pthread_once_t once = PTHREAD_ONCE_INIT;
242+
pthread_once(&once, init_active_ctxs);
167243
*ctx = (em_proxying_ctx){.func = func,
168244
.arg = arg,
169-
.done = 0,
245+
.state = PENDING,
170246
.mutex = PTHREAD_MUTEX_INITIALIZER,
171247
.cond = PTHREAD_COND_INITIALIZER};
172248
}
@@ -178,14 +254,24 @@ static void em_proxying_ctx_deinit(em_proxying_ctx* ctx) {
178254

179255
void emscripten_proxy_finish(em_proxying_ctx* ctx) {
180256
pthread_mutex_lock(&ctx->mutex);
181-
ctx->done = 1;
257+
ctx->state = DONE;
258+
remove_active_ctx(ctx);
259+
pthread_mutex_unlock(&ctx->mutex);
260+
pthread_cond_signal(&ctx->cond);
261+
}
262+
263+
static void cancel_ctx(void* arg) {
264+
em_proxying_ctx* ctx = arg;
265+
pthread_mutex_lock(&ctx->mutex);
266+
ctx->state = CANCELED;
182267
pthread_mutex_unlock(&ctx->mutex);
183268
pthread_cond_signal(&ctx->cond);
184269
}
185270

186271
// Helper for wrapping the call with ctx as a `void (*)(void*)`.
187-
static void call_with_ctx(void* p) {
188-
em_proxying_ctx* ctx = (em_proxying_ctx*)p;
272+
static void call_with_ctx(void* arg) {
273+
em_proxying_ctx* ctx = arg;
274+
add_active_ctx(ctx);
189275
ctx->func(ctx, ctx->arg);
190276
}
191277

@@ -197,21 +283,23 @@ int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
197283
"Cannot synchronously wait for work proxied to the current thread");
198284
em_proxying_ctx ctx;
199285
em_proxying_ctx_init(&ctx, func, arg);
200-
if (!emscripten_proxy_async(q, target_thread, call_with_ctx, &ctx)) {
286+
if (!do_proxy(q, target_thread, (task){call_with_ctx, cancel_ctx, &ctx})) {
287+
em_proxying_ctx_deinit(&ctx);
201288
return 0;
202289
}
203290
pthread_mutex_lock(&ctx.mutex);
204-
while (!ctx.done) {
291+
while (ctx.state == PENDING) {
205292
pthread_cond_wait(&ctx.cond, &ctx.mutex);
206293
}
207294
pthread_mutex_unlock(&ctx.mutex);
295+
int ret = ctx.state == DONE;
208296
em_proxying_ctx_deinit(&ctx);
209-
return 1;
297+
return ret;
210298
}
211299

212300
// Helper for signaling the end of the task after the user function returns.
213301
static void call_then_finish(em_proxying_ctx* ctx, void* arg) {
214-
task* t = (task*)arg;
302+
task* t = arg;
215303
t->func(t->arg);
216304
emscripten_proxy_finish(ctx);
217305
}
@@ -220,7 +308,7 @@ int emscripten_proxy_sync(em_proxying_queue* q,
220308
pthread_t target_thread,
221309
void (*func)(void*),
222310
void* arg) {
223-
task t = {func, arg};
311+
task t = {.func = func, .arg = arg};
224312
return emscripten_proxy_sync_with_ctx(q, target_thread, call_then_finish, &t);
225313
}
226314

system/lib/pthread/pthread_create.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ int __pthread_create(pthread_t* restrict res,
270270
self->next,
271271
self->prev,
272272
new);
273+
273274
*res = new;
274275
return 0;
275276
}

system/lib/pthread/thread_mailbox.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@ void _emscripten_thread_mailbox_shutdown(pthread_t thread) {
5858
emscripten_futex_wait(&thread->mailbox_refcount, count, INFINITY);
5959
count = thread->mailbox_refcount;
6060
}
61-
// TODO: Cancel tasks.
6261

63-
// The mailbox will not be accessed again after this point.
62+
// The mailbox is now closed. No more messages will be enqueued. Run the
63+
// shutdown handler for any message already in the queue.
64+
em_task_queue_cancel(thread->mailbox);
65+
66+
// The mailbox is now empty and will not be accessed again after this point.
6467
em_task_queue_destroy(thread->mailbox);
6568
}
6669

test/other/metadce/test_metadce_minimal_pthreads.funcs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ $__memcpy
44
$__pthread_mutex_lock
55
$__pthread_mutex_trylock
66
$__pthread_mutex_unlock
7+
$__pthread_rwlock_rdlock
8+
$__pthread_rwlock_timedrdlock
79
$__pthread_rwlock_tryrdlock
10+
$__pthread_rwlock_unlock
811
$__pthread_self_internal
912
$__pthread_setcancelstate
1013
$__set_thread_state
@@ -38,12 +41,15 @@ $a_inc
3841
$a_store
3942
$a_swap
4043
$add
44+
$cancel_notification
4145
$dispose_chunk
4246
$dlfree
4347
$dlmalloc
4448
$do_dispatch_to_thread
4549
$em_queued_call_malloc
50+
$em_task_queue_cancel
4651
$em_task_queue_create
52+
$em_task_queue_dequeue
4753
$em_task_queue_enqueue
4854
$em_task_queue_execute
4955
$em_task_queue_free
@@ -58,6 +64,7 @@ $init_file_lock
5864
$init_mparams
5965
$main
6066
$memset
67+
$nodtor
6168
$pthread_attr_destroy
6269
$receive_notification
6370
$sbrk
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
18494
1+
18911

test/pthread/test_pthread_proxying.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ void test_tasks_queue_growth(void) {
293293
printf("Testing tasks queue growth\n");
294294

295295
em_proxying_queue* queue = em_proxying_queue_create();
296-
assert(proxy_queue != NULL);
296+
assert(queue != NULL);
297297

298298
int incremented = 0;
299299

0 commit comments

Comments
 (0)