Skip to content

Commit f3a5481

Browse files
authored
Fix wasm workers under node (#22721)
- Use callUserCallback to invoke callback in _wasmWorkerRunPostMessage. Without this calls to exit/emscripten_force_exit within the callback don't work as expected (they cause unhandled exception errors). - Fix `onmessage` handling under node so that the message payload always arrives as the `data` member of the message. - Update a few of the wasm workers tests do they actually exit (required for running tests under node).
1 parent f88c213 commit f3a5481

8 files changed

+67
-36
lines changed

src/library_wasm_worker.js

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,13 @@ addToLibrary({
5555
},
5656

5757
// Executes a wasm function call received via a postMessage.
58+
$_wasmWorkerRunPostMessage__deps: ['$callUserCallback'],
5859
$_wasmWorkerRunPostMessage: (e) => {
5960
// '_wsc' is short for 'wasm call', trying to use an identifier name that
6061
// will never conflict with user code
61-
#if ENVIRONMENT_MAY_BE_NODE
62-
// In Node.js environment, message event 'e' containing the actual data sent,
63-
// while in the browser environment it's contained by 'e.data'.
64-
let data = ENVIRONMENT_IS_NODE ? e : e.data;
65-
#else
6662
let data = e.data;
67-
#endif
6863
let wasmCall = data['_wsc'];
69-
wasmCall && getWasmTableEntry(wasmCall)(...data['x']);
64+
wasmCall && callUserCallback(() => getWasmTableEntry(wasmCall)(...data['x']));
7065
},
7166

7267
// src/postamble_minimal.js brings this symbol in to the build, and calls this
@@ -87,6 +82,11 @@ addToLibrary({
8782
assert(m['sz'] % 16 == 0);
8883
#endif
8984

85+
#if !MINIMAL_RUNTIME && isSymbolNeeded('$noExitRuntime')
86+
// Wasm workers basically never exit their runtime
87+
noExitRuntime = 1;
88+
#endif
89+
9090
#if STACK_OVERFLOW_CHECK >= 2
9191
// _emscripten_wasm_worker_initialize() initializes the stack for this
9292
// Worker, but it cannot call to extern __set_stack_limits() function, or
@@ -209,6 +209,12 @@ if (ENVIRONMENT_IS_WASM_WORKER
209209
#endif
210210
});
211211
worker.onmessage = _wasmWorkerRunPostMessage;
212+
#if ENVIRONMENT_MAY_BE_NODE
213+
if (ENVIRONMENT_IS_NODE) {
214+
/** @suppress {checkTypes} */
215+
worker.on('message', (msg) => worker.onmessage({ data: msg }));
216+
}
217+
#endif
212218
return _wasmWorkersID++;
213219
},
214220

@@ -226,9 +232,7 @@ if (ENVIRONMENT_IS_WASM_WORKER
226232
#if ASSERTIONS
227233
assert(!ENVIRONMENT_IS_WASM_WORKER, 'emscripten_terminate_all_wasm_workers() cannot be called from a Wasm Worker: only the main browser thread has visibility to terminate all Workers!');
228234
#endif
229-
Object.values(_wasmWorkers).forEach((worker) => {
230-
worker.terminate();
231-
});
235+
Object.values(_wasmWorkers).forEach((worker) => worker.terminate());
232236
_wasmWorkers = {};
233237
},
234238

src/runtime_pthread.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
2727
// Create as web-worker-like an environment as we can.
2828

2929
var parentPort = worker_threads['parentPort'];
30-
parentPort.on('message', (data) => onmessage({ data: data }));
30+
parentPort.on('message', (msg) => onmessage({ data: msg }));
3131

3232
Object.assign(globalThis, {
3333
self: global,

src/wasm_worker.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,19 @@ if (ENVIRONMENT_IS_NODE) {
1414

1515
var parentPort = nodeWorkerThreads.parentPort;
1616

17-
parentPort.on('message', (data) => typeof onmessage === "function" && onmessage({ data: data }));
17+
parentPort.on('message', (msg) => global.onmessage?.({ data: msg }));
18+
19+
// Weak map of handle functions to their wrapper. Used to implement
20+
// addEventListener/removeEventListener.
21+
var wrappedHandlers = new WeakMap();
22+
function wrapMsgHandler(h) {
23+
var f = wrappedHandlers.get(h)
24+
if (!f) {
25+
f = (msg) => h({data: msg});
26+
wrappedHandlers.set(h, f);
27+
}
28+
return f;
29+
}
1830

1931
var fs = require('fs');
2032
var vm = require('vm');
@@ -28,8 +40,8 @@ if (ENVIRONMENT_IS_NODE) {
2840
importScripts: (f) => vm.runInThisContext(fs.readFileSync(f, 'utf8'), {filename: f}),
2941
postMessage: (msg) => parentPort.postMessage(msg),
3042
performance: global.performance || { now: Date.now },
31-
addEventListener: (name, handler) => parentPort.on(name, handler),
32-
removeEventListener: (name, handler) => parentPort.off(name, handler),
43+
addEventListener: (name, handler) => parentPort.on(name, wrapMsgHandler(handler)),
44+
removeEventListener: (name, handler) => parentPort.off(name, wrapMsgHandler(handler)),
3345
});
3446
}
3547
#endif // ENVIRONMENT_MAY_BE_NODE

test/test_browser.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4981,21 +4981,21 @@ def test_system(self):
49814981
# Tests the hello_wasm_worker.c documentation example code.
49824982
@also_with_minimal_runtime
49834983
def test_wasm_worker_hello(self):
4984-
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
4984+
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS'])
49854985

49864986
def test_wasm_worker_hello_minimal_runtime_2(self):
4987-
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])
4987+
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])
49884988

49894989
# Tests Wasm Workers build in Wasm2JS mode.
49904990
@requires_wasm2js
49914991
@also_with_minimal_runtime
49924992
def test_wasm_worker_hello_wasm2js(self):
4993-
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sWASM=0'])
4993+
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sWASM=0'])
49944994

49954995
# Tests the WASM_WORKERS=2 build mode, which embeds the Wasm Worker bootstrap JS script file to the main JS file.
49964996
@also_with_minimal_runtime
4997-
def test_wasm_worker_embedded(self):
4998-
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS=2'])
4997+
def test_wasm_worker_hello_embedded(self):
4998+
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS=2'])
49994999

50005000
# Tests that it is possible to call emscripten_futex_wait() in Wasm Workers.
50015001
@parameterized({
@@ -5059,7 +5059,7 @@ def test_wasm_worker_sleep(self):
50595059
# Tests emscripten_terminate_wasm_worker()
50605060
@also_with_minimal_runtime
50615061
def test_wasm_worker_terminate(self):
5062-
self.btest('wasm_worker/terminate_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
5062+
self.btest_exit('wasm_worker/terminate_wasm_worker.c', args=['-sWASM_WORKERS'])
50635063

50645064
# Tests emscripten_terminate_all_wasm_workers()
50655065
@also_with_minimal_runtime
@@ -5133,7 +5133,7 @@ def test_wasm_worker_lock_wait2(self):
51335133
# Tests emscripten_lock_async_acquire() function.
51345134
@also_with_minimal_runtime
51355135
def test_wasm_worker_lock_async_acquire(self):
5136-
self.btest('wasm_worker/lock_async_acquire.c', expected='0', args=['--closure=1', '-sWASM_WORKERS'])
5136+
self.btest_exit('wasm_worker/lock_async_acquire.c', args=['--closure=1', '-sWASM_WORKERS'])
51375137

51385138
# Tests emscripten_lock_busyspin_wait_acquire() in Worker and main thread.
51395139
@also_with_minimal_runtime

test/test_other.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13904,6 +13904,12 @@ def test_debug_opt_warning(self, should_fail, args):
1390413904
else:
1390513905
self.run_process([EMCC, test_file('hello_world.c'), '-Werror'] + args)
1390613906

13907+
def test_wasm_worker_hello(self):
13908+
self.do_runf(test_file('wasm_worker/hello_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])
13909+
13910+
def test_wasm_worker_terminate(self):
13911+
self.do_runf(test_file('wasm_worker/terminate_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])
13912+
1390713913
@also_with_minimal_runtime
1390813914
def test_wasm_worker_closure(self):
1390913915
self.run_process([EMCC, test_file('wasm_worker/lock_async_acquire.c'), '-O2', '-sWASM_WORKERS', '--closure=1'])

test/wasm_worker/hello_wasm_worker.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
1+
#include <emscripten/emscripten.h>
12
#include <emscripten/console.h>
23
#include <emscripten/em_asm.h>
34
#include <emscripten/wasm_worker.h>
45
#include <assert.h>
56

67
// This is the code example in site/source/docs/api_reference/wasm_workers.rst
8+
void do_exit() {
9+
emscripten_out("do_exit");
10+
emscripten_terminate_all_wasm_workers();
11+
emscripten_force_exit(0);
12+
}
713

814
void run_in_worker() {
915
emscripten_out("Hello from wasm worker!\n");
1016
EM_ASM(typeof checkStackCookie == 'function' && checkStackCookie());
11-
#ifdef REPORT_RESULT
12-
REPORT_RESULT(0);
13-
#endif
17+
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
1418
}
1519

1620
int main() {
1721
emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(/*stack size: */1024);
1822
assert(worker);
1923
emscripten_wasm_worker_post_function_v(worker, run_in_worker);
24+
emscripten_exit_with_live_runtime();
2025
}

test/wasm_worker/lock_async_acquire.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ bool testFinished = false;
1717
int numTimesMainThreadAcquiredLock = 0;
1818
int numTimesWasmWorkerAcquiredLock = 0;
1919

20+
void do_exit() {
21+
emscripten_out("do_exit");
22+
emscripten_terminate_all_wasm_workers();
23+
emscripten_force_exit(0);
24+
}
25+
2026
void work() {
2127
// emscripten_out("work");
2228
volatile int x = sharedState0;
@@ -37,18 +43,17 @@ void work() {
3743
sharedState0 = x;
3844
} else {
3945
y = x + 1;
40-
if (emscripten_current_thread_is_wasm_worker())
46+
if (emscripten_current_thread_is_wasm_worker()) {
4147
emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000));
48+
}
4249
sharedState1 = y;
4350

4451
if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) {
4552
if (!testFinished) {
53+
testFinished = true;
4654
emscripten_out("test finished");
47-
#ifdef REPORT_RESULT
48-
REPORT_RESULT(0);
49-
#endif
55+
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
5056
}
51-
testFinished = true;
5257
}
5358
}
5459
}
@@ -75,8 +80,9 @@ void schedule_work(void *userData) {
7580
// emscripten_out("sync lock acquired");
7681
work();
7782
emscripten_lock_release(&lock);
78-
if (!testFinished)
83+
if (!testFinished) {
7984
emscripten_set_timeout(schedule_work, 0, 0);
85+
}
8086
} else {
8187
emscripten_lock_async_acquire(&lock, lock_async_acquired, (void*)42, EMSCRIPTEN_WAIT_ASYNC_INFINITY);
8288
}
@@ -94,4 +100,5 @@ int main() {
94100
}
95101

96102
schedule_work(0);
103+
emscripten_exit_with_live_runtime();
97104
}

test/wasm_worker/terminate_wasm_worker.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <emscripten/emscripten.h>
12
#include <emscripten/console.h>
23
#include <emscripten/em_asm.h>
34
#include <emscripten/em_js.h>
@@ -13,17 +14,13 @@ static volatile int worker_started = 0;
1314
void this_function_should_not_be_called(void *userData) {
1415
worker_started = -1;
1516
emscripten_err("this_function_should_not_be_called");
16-
#ifdef REPORT_RESULT
17-
REPORT_RESULT(1/*fail*/);
18-
#endif
17+
emscripten_force_exit(1);
1918
}
2019

2120
void test_passed(void *userData) {
2221
if (worker_started == 1) {
2322
emscripten_err("test_passed");
24-
#ifdef REPORT_RESULT
25-
REPORT_RESULT(0/*ok*/);
26-
#endif
23+
emscripten_force_exit(0);
2724
}
2825
}
2926

0 commit comments

Comments
 (0)