Skip to content

Commit 3442cda

Browse files
authored
[Proxying] Send messages via in-memory mailbox queues (#18852)
Threads were previously notified of new work via postMessage messages that carried pointers to the task queues to execute. There was no way to synchronously pump or inspect these pending messages however, and there is no central registry of all task queues for a thread, so this mechanism afforded no way to discover or cancel pending work when a thread dies. In preparation for implementing work cancellation, move the pending messages into userspace by giving each thread a "mailbox", which is an `em_task_queue` in the pthread struct. Instead of using `postMessage`, proxying queues now use the thread mailbox API to notify threads of new work. Internally, thread mailboxes still use postMessage to schedule work to be executed when a thread returns to its event loop. Since the only task queues involved in postMessages are now at known locations relative to the pthread struct, there is no longer any need to store pointers to them in the postMessage messages themselves. Removing these pointers works around tricky notification and lifetime management edge cases that would have caused problems such as dropped work or use-after-free bugs in future PRs. When a thread dies because it exits or is canceled, it "closes" its mailbox by decrementing a refcount and waiting to observe a refcount of 0. At this point, the thread mailbox API ensures that no new messages will be enqueued on the mailbox. Because the postMessage messages no longer contain task queue pointers, it is safe to destroy the mailbox immediately after it is closed.
1 parent 8778c72 commit 3442cda

20 files changed

+342
-90
lines changed

emcc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1712,7 +1712,7 @@ def setup_pthreads(target):
17121712
'__emscripten_thread_crashed',
17131713
'__emscripten_tls_init',
17141714
'_pthread_self',
1715-
'executeNotifiedProxyingQueue',
1715+
'checkMailbox',
17161716
]
17171717
settings.EXPORTED_FUNCTIONS += worker_imports
17181718
building.user_requested_exports.update(worker_imports)

src/library_pthread.js

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ var LibraryPThread = {
270270
return;
271271
}
272272

273-
if (cmd === 'processProxyingQueue') {
274-
executeNotifiedProxyingQueue(d['queue']);
273+
if (cmd === 'checkMailbox') {
274+
checkMailbox();
275275
} else if (cmd === 'spawnThread') {
276276
spawnThread(d);
277277
} else if (cmd === 'cleanupThread') {
@@ -1212,29 +1212,22 @@ var LibraryPThread = {
12121212
},
12131213
#endif // MAIN_MODULE
12141214

1215-
$executeNotifiedProxyingQueue__deps: ['$callUserCallback'],
1216-
$executeNotifiedProxyingQueue: function(queue) {
1217-
// Set the notification state to processing.
1218-
Atomics.store(HEAP32, queue >> 2, {{{ cDefine('NOTIFICATION_RECEIVED') }}});
1219-
// Only execute the queue if we have a live pthread runtime. We
1220-
// implement pthread_self to return 0 if there is no live runtime.
1215+
$checkMailbox__deps: ['$callUserCallback'],
1216+
$checkMailbox: function() {
1217+
// Only check the mailbox if we have a live pthread runtime. We implement
1218+
// pthread_self to return 0 if there is no live runtime.
12211219
if (_pthread_self()) {
1222-
callUserCallback(() => __emscripten_proxy_execute_task_queue(queue));
1220+
callUserCallback(() => __emscripten_check_mailbox());
12231221
}
1224-
// Set the notification state to none as long as a new notification has not
1225-
// been sent while we were processing.
1226-
Atomics.compareExchange(HEAP32, queue >> 2,
1227-
{{{ cDefine('NOTIFICATION_RECEIVED') }}},
1228-
{{{ cDefine('NOTIFICATION_NONE') }}});
12291222
},
12301223

1231-
_emscripten_notify_task_queue__deps: ['$executeNotifiedProxyingQueue'],
1232-
_emscripten_notify_task_queue__sig: 'vpppp',
1233-
_emscripten_notify_task_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
1224+
_emscripten_notify_mailbox__deps: ['$checkMailbox'],
1225+
_emscripten_notify_mailbox__sig: 'vppp',
1226+
_emscripten_notify_mailbox: function(targetThreadId, currThreadId, mainThreadId) {
12341227
if (targetThreadId == currThreadId) {
1235-
setTimeout(() => executeNotifiedProxyingQueue(queue));
1228+
setTimeout(() => checkMailbox());
12361229
} else if (ENVIRONMENT_IS_PTHREAD) {
1237-
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});
1230+
postMessage({'targetThread' : targetThreadId, 'cmd' : 'checkMailbox'});
12381231
} else {
12391232
var worker = PThread.pthreads[targetThreadId];
12401233
if (!worker) {
@@ -1243,7 +1236,7 @@ var LibraryPThread = {
12431236
#endif
12441237
return /*0*/;
12451238
}
1246-
worker.postMessage({'cmd' : 'processProxyingQueue', 'queue': queue});
1239+
worker.postMessage({'cmd' : 'checkMailbox'});
12471240
}
12481241
}
12491242
};

src/worker.js

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ if (ENVIRONMENT_IS_NODE) {
5252
// Thread-local guard variable for one-time init of the JS state
5353
var initializedJS = false;
5454

55-
// Proxying queues that were notified before the thread started and need to be
56-
// executed as part of startup.
57-
var pendingNotifiedProxyingQueues = [];
58-
5955
#if ASSERTIONS
6056
function assert(condition, text) {
6157
if (!condition) abort('Assertion failed: ' + text);
@@ -237,15 +233,6 @@ function handleMessage(e) {
237233
// We only do this once per worker since they get reused
238234
Module['__embind_initialize_bindings']();
239235
#endif // EMBIND
240-
241-
// Execute any proxied work that came in before the thread was
242-
// initialized. Only do this once because it is only possible for
243-
// proxying notifications to arrive before thread initialization on
244-
// fresh workers.
245-
pendingNotifiedProxyingQueues.forEach(queue => {
246-
Module['executeNotifiedProxyingQueue'](queue);
247-
});
248-
pendingNotifiedProxyingQueues = [];
249236
initializedJS = true;
250237
}
251238

@@ -268,12 +255,9 @@ function handleMessage(e) {
268255
}
269256
} else if (e.data.target === 'setimmediate') {
270257
// no-op
271-
} else if (e.data.cmd === 'processProxyingQueue') {
258+
} else if (e.data.cmd === 'checkMailbox') {
272259
if (initializedJS) {
273-
Module['executeNotifiedProxyingQueue'](e.data.queue);
274-
} else {
275-
// Defer executing this queue until the runtime is initialized.
276-
pendingNotifiedProxyingQueues.push(e.data.queue);
260+
Module['checkMailbox']();
277261
}
278262
} else if (e.data.cmd) {
279263
// The received message looks like something that should be handled by this message

system/lib/libc/musl/src/internal/pthread_impl.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
#include "syscall.h"
1111
#include "atomic.h"
1212
#ifdef __EMSCRIPTEN__
13-
#include <emscripten/threading.h>
13+
#include "em_task_queue.h"
14+
#include "thread_mailbox.h"
1415
#include "threading_internal.h"
16+
#include <emscripten/threading.h>
1517
#endif
1618
#include "futex.h"
1719

@@ -78,6 +80,23 @@ struct pthread {
7880
// The TLS base to use the main module TLS data. Secondary modules
7981
// still require dynamic allocation.
8082
void* tls_base;
83+
// The lowest level of the proxying system. Other threads can enqueue
84+
// messages on the mailbox and notify this thread to asynchronously
85+
// process them once it returns to its event loop. When this thread is
86+
// shut down, the mailbox is closed (see below) to prevent further
87+
// messages from being enqueued and all the remaining queued messages
88+
// are dequeued and their shutdown handlers are executed. This allows
89+
// other threads waiting for their messages to be processed to be
90+
// notified that their messages will not be processed after all.
91+
em_task_queue* mailbox;
92+
// To ensure that no other thread is concurrently enqueueing a message
93+
// when this thread shuts down, maintain an atomic refcount. Enqueueing
94+
// threads atomically increment the count from a nonzero number to
95+
// acquire the mailbox and decrement the count when they finish. When
96+
// this thread shuts down it will atomically decrement the count and
97+
// wait until it reaches 0, at which point the mailbox is considered
98+
// closed and no further messages will be enqueued.
99+
_Atomic int mailbox_refcount;
81100
#endif
82101
#if _REENTRANT
83102
_Atomic char sleeping;

system/lib/pthread/em_task_queue.c

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "em_task_queue.h"
1515
#include "proxying_notification_state.h"
16+
#include "thread_mailbox.h"
1617

1718
#define EM_TASK_QUEUE_INITIAL_CAPACITY 128
1819

@@ -194,26 +195,41 @@ task em_task_queue_dequeue(em_task_queue* queue) {
194195
return t;
195196
}
196197

197-
// Send a postMessage notification containing the em_task_queue pointer to the
198-
// target thread so it will execute the queue when it returns to the event loop.
199-
// Also pass in the current thread and main thread ids to minimize calls back
200-
// into Wasm.
201-
void _emscripten_notify_task_queue(pthread_t target_thread,
202-
pthread_t curr_thread,
203-
pthread_t main_thread,
204-
em_task_queue* queue);
205-
206-
void em_task_queue_notify(em_task_queue* queue) {
207-
// If there is no pending notification for this queue, create one. If an old
208-
// notification is currently being processed, it may or may not execute this
209-
// work. In case it does not, the new notification will ensure the work is
210-
// still executed.
198+
static void receive_notification(void* arg) {
199+
em_task_queue* tasks = arg;
200+
tasks->notification = NOTIFICATION_RECEIVED;
201+
em_task_queue_execute(tasks);
202+
notification_state expected = NOTIFICATION_RECEIVED;
203+
atomic_compare_exchange_strong(
204+
&tasks->notification, &expected, NOTIFICATION_NONE);
205+
}
206+
207+
int em_task_queue_send(em_task_queue* queue, task t) {
208+
// Ensure the target mailbox will remain open or detect that it is already
209+
// closed.
210+
if (!emscripten_thread_mailbox_ref(queue->thread)) {
211+
return 0;
212+
}
213+
214+
pthread_mutex_lock(&queue->mutex);
215+
int enqueued = em_task_queue_enqueue(queue, t);
216+
pthread_mutex_unlock(&queue->mutex);
217+
if (!enqueued) {
218+
emscripten_thread_mailbox_unref(queue->thread);
219+
return 0;
220+
}
221+
222+
// We're done if there is already a pending notification for this task queue.
223+
// Otherwise, we will send one.
211224
notification_state previous =
212225
atomic_exchange(&queue->notification, NOTIFICATION_PENDING);
213-
if (previous != NOTIFICATION_PENDING) {
214-
_emscripten_notify_task_queue(queue->thread,
215-
pthread_self(),
216-
emscripten_main_runtime_thread_id(),
217-
queue);
226+
if (previous == NOTIFICATION_PENDING) {
227+
emscripten_thread_mailbox_unref(queue->thread);
228+
return 1;
218229
}
230+
231+
emscripten_thread_mailbox_send(
232+
queue->thread, (task){.func = receive_notification, .arg = queue});
233+
emscripten_thread_mailbox_unref(queue->thread);
234+
return 1;
219235
}

system/lib/pthread/em_task_queue.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ em_task_queue* em_task_queue_create(pthread_t thread);
5050

5151
void em_task_queue_destroy(em_task_queue* queue);
5252

53-
// Execute tasks until an empty queue is observed.
53+
// Execute tasks until an empty queue is observed. Internally locks the queue.
5454
void em_task_queue_execute(em_task_queue* queue);
5555

5656
// Not thread safe.
@@ -69,6 +69,7 @@ int em_task_queue_enqueue(em_task_queue* queue, task t);
6969
// Not thread safe. Assumes the queue is not empty.
7070
task em_task_queue_dequeue(em_task_queue* queue);
7171

72-
// Schedule the queue to be executed next time its owning thread returns to its
73-
// event loop.
74-
void em_task_queue_notify(em_task_queue* queue);
72+
// Atomically enqueue the task and schedule the queue to be executed next time
73+
// its owning thread returns to its event loop. Returns 1 on success and 0
74+
// otherwise. Internally locks the queue.
75+
int em_task_queue_send(em_task_queue* queue, task t);

system/lib/pthread/library_pthread.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,4 +595,6 @@ void __emscripten_init_main_thread(void) {
595595
// this is used by pthread_key_delete for deleting thread-specific data.
596596
__main_pthread.next = __main_pthread.prev = &__main_pthread;
597597
__main_pthread.tsd = (void **)__pthread_tsd_main;
598+
599+
_emscripten_thread_mailbox_init(&__main_pthread);
598600
}

system/lib/pthread/proxying.c

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
#include <emscripten/proxying.h>
1010
#include <emscripten/threading.h>
1111
#include <pthread.h>
12+
#include <stdatomic.h>
1213
#include <stdlib.h>
1314
#include <string.h>
1415

1516
#include "em_task_queue.h"
16-
#include "proxying_notification_state.h"
17+
#include "thread_mailbox.h"
1718

1819
struct em_proxying_queue {
1920
// Protects all accesses to em_task_queues, size, and capacity.
@@ -103,17 +104,6 @@ static em_task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
103104
return tasks;
104105
}
105106

106-
// Exported for use in worker.js, but otherwise an internal function.
107-
EMSCRIPTEN_KEEPALIVE
108-
void _emscripten_proxy_execute_task_queue(em_task_queue* tasks) {
109-
// Before we attempt to execute a request from another thread make sure we
110-
// are in sync with all the loaded code.
111-
// For example, in PROXY_TO_PTHREAD the atexit functions are called via
112-
// a proxied call, and without this call to syncronize we would crash if
113-
// any atexit functions were registered from a side module.
114-
em_task_queue_execute(tasks);
115-
}
116-
117107
void emscripten_proxy_execute_queue(em_proxying_queue* q) {
118108
assert(q != NULL);
119109
assert(pthread_self());
@@ -156,15 +146,8 @@ int emscripten_proxy_async(em_proxying_queue* q,
156146
if (tasks == NULL) {
157147
return 0;
158148
}
159-
pthread_mutex_lock(&tasks->mutex);
160-
int enqueued = em_task_queue_enqueue(tasks, (task){func, arg});
161-
pthread_mutex_unlock(&tasks->mutex);
162-
if (!enqueued) {
163-
return 0;
164-
}
165149

166-
em_task_queue_notify(tasks);
167-
return 1;
150+
return em_task_queue_send(tasks, (task){func, arg});
168151
}
169152

170153
struct em_proxying_ctx {

system/lib/pthread/pthread_create.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ int __pthread_create(pthread_t* restrict res,
223223
_emscripten_thread_profiler_init(new);
224224
#endif
225225

226+
_emscripten_thread_mailbox_init(new);
227+
226228
struct pthread *self = __pthread_self();
227229
dbg("start __pthread_create: new=%p new_end=%p stack=%p->%p "
228230
"stack_size=%zu tls_base=%p",
@@ -303,6 +305,8 @@ void _emscripten_thread_exit(void* result) {
303305
self->cancelasync = PTHREAD_CANCEL_DEFERRED;
304306
self->result = result;
305307

308+
_emscripten_thread_mailbox_shutdown(self);
309+
306310
// Run any handlers registered with pthread_cleanup_push
307311
__run_cleanup_handlers();
308312

0 commit comments

Comments
 (0)