Skip to content

Commit efacd8b

Browse files
authored
Merge pull request #1514 from ibaryshnikov/threadsafe-futures
Threadsafe futures
2 parents 55b486a + b13f757 commit efacd8b

File tree

11 files changed

+654
-412
lines changed

11 files changed

+654
-412
lines changed

crates/futures/Cargo.toml

+9
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,22 @@ version = "0.3.25"
1111
edition = "2018"
1212

1313
[dependencies]
14+
cfg-if = "0.1.9"
1415
futures = "0.1.20"
1516
js-sys = { path = "../js-sys", version = '0.3.25' }
1617
wasm-bindgen = { path = "../..", version = '0.2.48' }
1718
futures-util-preview = { version = "0.3.0-alpha.15", optional = true }
1819
futures-channel-preview = { version = "0.3.0-alpha.15", optional = true }
1920
lazy_static = { version = "1.3.0", optional = true }
2021

22+
[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
23+
path = "../web-sys"
24+
version = "0.3.24"
25+
features = [
26+
"MessageEvent",
27+
"Worker",
28+
]
29+
2130
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
2231
wasm-bindgen-test = { path = '../test', version = '0.2.48' }
2332

crates/futures/src/legacy.rs

+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
use futures::executor::{self, Notify, Spawn};
2+
use futures::prelude::*;
3+
use js_sys::{Function, Promise};
4+
use std::cell::{Cell, RefCell};
5+
use std::rc::Rc;
6+
use std::sync::Arc;
7+
use wasm_bindgen::prelude::*;
8+
9+
/// Converts a Rust `Future` into a JavaScript `Promise`.
10+
///
11+
/// This function will take any future in Rust and schedule it to be executed,
12+
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
13+
/// to get plumbed into the rest of a system.
14+
///
15+
/// The `future` provided must adhere to `'static` because it'll be scheduled
16+
/// to run in the background and cannot contain any stack references. The
17+
/// returned `Promise` will be resolved or rejected when the future completes,
18+
/// depending on whether it finishes with `Ok` or `Err`.
19+
///
20+
/// # Panics
21+
///
22+
/// Note that in wasm panics are currently translated to aborts, but "abort" in
23+
/// this case means that a JavaScript exception is thrown. The wasm module is
24+
/// still usable (likely erroneously) after Rust panics.
25+
///
26+
/// If the `future` provided panics then the returned `Promise` **will not
27+
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
28+
/// limitation of wasm currently that's hoped to be fixed one day!
29+
pub fn future_to_promise<F>(future: F) -> Promise
30+
where
31+
F: Future<Item = JsValue, Error = JsValue> + 'static,
32+
{
33+
_future_to_promise(Box::new(future))
34+
}
35+
36+
// Implementation of actually transforming a future into a JavaScript `Promise`.
37+
//
38+
// The only primitive we have to work with here is `Promise::new`, which gives
39+
// us two callbacks that we can use to either reject or resolve the promise.
40+
// It's our job to ensure that one of those callbacks is called at the
41+
// appropriate time.
42+
//
43+
// Now we know that JavaScript (in general) can't block and is largely
44+
// notification/callback driven. That means that our future must either have
45+
// synchronous computational work to do, or it's "scheduled a notification" to
46+
// happen. These notifications are likely callbacks to get executed when things
47+
// finish (like a different promise or something like `setTimeout`). The general
48+
// idea here is thus to do as much synchronous work as we can and then otherwise
49+
// translate notifications of a future's task into "let's poll the future!"
50+
//
51+
// This isn't necessarily the greatest future executor in the world, but it
52+
// should get the job done for now hopefully.
53+
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
54+
let mut future = Some(executor::spawn(future));
55+
return Promise::new(&mut |resolve, reject| {
56+
Package::poll(&Arc::new(Package {
57+
spawn: RefCell::new(future.take().unwrap()),
58+
resolve,
59+
reject,
60+
notified: Cell::new(State::Notified),
61+
}));
62+
});
63+
64+
struct Package {
65+
// Our "spawned future". This'll have everything we need to poll the
66+
// future and continue to move it forward.
67+
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
68+
69+
// The current state of this future, expressed in an enum below. This
70+
// indicates whether we're currently polling the future, received a
71+
// notification and need to keep polling, or if we're waiting for a
72+
// notification to come in (and no one is polling).
73+
notified: Cell<State>,
74+
75+
// Our two callbacks connected to the `Promise` that we returned to
76+
// JavaScript. We'll be invoking one of these at the end.
77+
resolve: Function,
78+
reject: Function,
79+
}
80+
81+
// The possible states our `Package` (future) can be in, tracked internally
82+
// and used to guide what happens when polling a future.
83+
enum State {
84+
// This future is currently and actively being polled. Attempting to
85+
// access the future will result in a runtime panic and is considered a
86+
// bug.
87+
Polling,
88+
89+
// This future has been notified, while it was being polled. This marker
90+
// is used in the `Notify` implementation below, and indicates that a
91+
// notification was received that the future is ready to make progress.
92+
// If seen, however, it probably means that the future is also currently
93+
// being polled.
94+
Notified,
95+
96+
// The future is blocked, waiting for something to happen. Stored here
97+
// is a self-reference to the future itself so we can pull it out in
98+
// `Notify` and continue polling.
99+
//
100+
// Note that the self-reference here is an Arc-cycle that will leak
101+
// memory unless the future completes, but currently that should be ok
102+
// as we'll have to stick around anyway while the future is executing!
103+
//
104+
// This state is removed as soon as a notification comes in, so the leak
105+
// should only be "temporary"
106+
Waiting(Arc<Package>),
107+
}
108+
109+
// No shared memory right now, wasm is single threaded, no need to worry
110+
// about this!
111+
unsafe impl Send for Package {}
112+
unsafe impl Sync for Package {}
113+
114+
impl Package {
115+
// Move the future contained in `me` as far forward as we can. This will
116+
// do as much synchronous work as possible to complete the future,
117+
// ensuring that when it blocks we're scheduled to get notified via some
118+
// callback somewhere at some point (vague, right?)
119+
//
120+
// TODO: this probably shouldn't do as much synchronous work as possible
121+
// as it can starve other computations. Rather it should instead
122+
// yield every so often with something like `setTimeout` with the
123+
// timeout set to zero.
124+
fn poll(me: &Arc<Package>) {
125+
loop {
126+
match me.notified.replace(State::Polling) {
127+
// We received a notification while previously polling, or
128+
// this is the initial poll. We've got work to do below!
129+
State::Notified => {}
130+
131+
// We've gone through this loop once and no notification was
132+
// received while we were executing work. That means we got
133+
// `NotReady` below and we're scheduled to receive a
134+
// notification. Block ourselves and wait for later.
135+
//
136+
// When the notification comes in it'll notify our task, see
137+
// our `Waiting` state, and resume the polling process
138+
State::Polling => {
139+
me.notified.set(State::Waiting(me.clone()));
140+
break;
141+
}
142+
143+
State::Waiting(_) => panic!("shouldn't see waiting state!"),
144+
}
145+
146+
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
147+
// If the future is ready, immediately call the
148+
// resolve/reject callback and then return as we're done.
149+
Ok(Async::Ready(value)) => (value, &me.resolve),
150+
Err(value) => (value, &me.reject),
151+
152+
// Otherwise keep going in our loop, if we weren't notified
153+
// we'll break out and start waiting.
154+
Ok(Async::NotReady) => continue,
155+
};
156+
157+
drop(f.call1(&JsValue::undefined(), &val));
158+
break;
159+
}
160+
}
161+
}
162+
163+
impl Notify for Package {
164+
fn notify(&self, _id: usize) {
165+
let me = match self.notified.replace(State::Notified) {
166+
// we need to schedule polling to resume, so keep going
167+
State::Waiting(me) => me,
168+
169+
// we were already notified, and were just notified again;
170+
// having now coalesced the notifications we return as it's
171+
// still someone else's job to process this
172+
State::Notified => return,
173+
174+
// the future was previously being polled, and we've just
175+
// switched it to the "you're notified" state. We don't have
176+
// access to the future as it's being polled, so the future
177+
// polling process later sees this notification and will
178+
// continue polling. For us, though, there's nothing else to do,
179+
// so we bail out.
180+
// later see
181+
State::Polling => return,
182+
};
183+
184+
// Use `Promise.then` on a resolved promise to place our execution
185+
// onto the next turn of the microtask queue, enqueueing our poll
186+
// operation. We don't currently poll immediately as it turns out
187+
// `futures` crate adapters aren't compatible with it and it also
188+
// helps avoid blowing the stack by accident.
189+
//
190+
// Note that the `Rc`/`RefCell` trick here is basically to just
191+
// ensure that our `Closure` gets cleaned up appropriately.
192+
let promise = Promise::resolve(&JsValue::undefined());
193+
let slot = Rc::new(RefCell::new(None));
194+
let slot2 = slot.clone();
195+
let closure = Closure::wrap(Box::new(move |_| {
196+
let myself = slot2.borrow_mut().take();
197+
debug_assert!(myself.is_some());
198+
Package::poll(&me);
199+
}) as Box<dyn FnMut(JsValue)>);
200+
promise.then(&closure);
201+
*slot.borrow_mut() = Some(closure);
202+
}
203+
}
204+
}

crates/futures/src/legacy_atomics.rs

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use futures::executor::{self, Notify, Spawn};
2+
use futures::prelude::*;
3+
use js_sys::Function;
4+
use std::sync::atomic::{AtomicI32, Ordering};
5+
use std::sync::Arc;
6+
use wasm_bindgen::prelude::*;
7+
use wasm_bindgen::JsCast;
8+
9+
// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`.
10+
#[wasm_bindgen]
11+
extern "C" {
12+
type Promise;
13+
#[wasm_bindgen(method)]
14+
fn then(this: &Promise, cb: &JsValue) -> Promise;
15+
16+
type Atomics;
17+
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)]
18+
fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise;
19+
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)]
20+
fn get_wait_async() -> JsValue;
21+
}
22+
23+
/// Converts a Rust `Future` into a JavaScript `Promise`.
24+
///
25+
/// This function will take any future in Rust and schedule it to be executed,
26+
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
27+
/// to get plumbed into the rest of a system.
28+
///
29+
/// The `future` provided must adhere to `'static` because it'll be scheduled
30+
/// to run in the background and cannot contain any stack references. The
31+
/// returned `Promise` will be resolved or rejected when the future completes,
32+
/// depending on whether it finishes with `Ok` or `Err`.
33+
///
34+
/// # Panics
35+
///
36+
/// Note that in wasm panics are currently translated to aborts, but "abort" in
37+
/// this case means that a JavaScript exception is thrown. The wasm module is
38+
/// still usable (likely erroneously) after Rust panics.
39+
///
40+
/// If the `future` provided panics then the returned `Promise` **will not
41+
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
42+
/// limitation of wasm currently that's hoped to be fixed one day!
43+
pub fn future_to_promise<F>(future: F) -> js_sys::Promise
44+
where
45+
F: Future<Item = JsValue, Error = JsValue> + 'static,
46+
{
47+
_future_to_promise(Box::new(future))
48+
}
49+
50+
// Implementation of actually transforming a future into a JavaScript `Promise`.
51+
//
52+
// The main primitives used here are `Promise::new` to actually create a JS
53+
// promise to return as well as `Atomics.waitAsync` to create a promise that we
54+
// can asynchronously wait on. The general idea here is that we'll create a
55+
// promise to return and schedule work to happen in `Atomics.waitAsync`
56+
// callbacks.
57+
//
58+
// After we've created a promise we start polling a future, and whenever it's
59+
// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep
60+
// polling the future, and this happens until the future is done. Finally
61+
// when it's all finished we call either resolver or reject depending on the
62+
// result of the future.
63+
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> js_sys::Promise {
64+
let mut future = Some(executor::spawn(future));
65+
return js_sys::Promise::new(&mut |resolve, reject| {
66+
Package {
67+
spawn: future.take().unwrap(),
68+
resolve,
69+
reject,
70+
waker: Arc::new(Waker {
71+
value: AtomicI32::new(1), // 1 == "notified, ready to poll"
72+
}),
73+
}
74+
.poll();
75+
});
76+
77+
struct Package {
78+
// Our "spawned future". This'll have everything we need to poll the
79+
// future and continue to move it forward.
80+
spawn: Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>,
81+
82+
// Our two callbacks connected to the `Promise` that we returned to
83+
// JavaScript. We'll be invoking one of these at the end.
84+
resolve: Function,
85+
reject: Function,
86+
87+
// Shared state used to communicate waking up this future, this is the
88+
// `Send + Sync` piece needed by the async task system.
89+
waker: Arc<Waker>,
90+
}
91+
92+
struct Waker {
93+
value: AtomicI32,
94+
};
95+
96+
impl Notify for Waker {
97+
fn notify(&self, _id: usize) {
98+
// Attempt to notify us by storing 1. If we're already 1 then we
99+
// were previously notified and there's nothing to do. Otherwise
100+
// we execute the native `notify` instruction to wake up the
101+
// corresponding `waitAsync` that was waiting for the transition
102+
// from 0 to 1.
103+
let prev = self.value.swap(1, Ordering::SeqCst);
104+
if prev == 1 {
105+
return;
106+
}
107+
debug_assert_eq!(prev, 0);
108+
unsafe {
109+
core::arch::wasm32::atomic_notify(
110+
&self.value as *const AtomicI32 as *mut i32,
111+
1, // number of threads to notify
112+
);
113+
}
114+
}
115+
}
116+
117+
impl Package {
118+
fn poll(mut self) {
119+
// Poll in a loop waiting for the future to become ready. Note that
120+
// we probably shouldn't maximize synchronous work here but rather
121+
// we should occasionally yield back to the runtime and schedule
122+
// ourselves to resume this future later on.
123+
//
124+
// Note that 0 here means "need a notification" and 1 means "we got
125+
// a notification". That means we're storing 0 into the `notified`
126+
// slot and we're trying to read 1 to keep on going.
127+
while self.waker.value.swap(0, Ordering::SeqCst) == 1 {
128+
let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) {
129+
// If the future is ready, immediately call the
130+
// resolve/reject callback and then return as we're done.
131+
Ok(Async::Ready(value)) => (value, &self.resolve),
132+
Err(value) => (value, &self.reject),
133+
134+
// ... otherwise let's break out and wait
135+
Ok(Async::NotReady) => break,
136+
};
137+
138+
// Call the resolution function, and then when we're done
139+
// destroy ourselves through `drop` since our future is no
140+
// longer needed.
141+
drop(f.call1(&JsValue::undefined(), &val));
142+
return;
143+
}
144+
145+
// Create a `js_sys::Promise` using `Atomics.waitAsync` (or our
146+
// polyfill) and then register its completion callback as simply
147+
// calling this function again.
148+
let promise = wait_async(&self.waker.value, 0).unchecked_into::<Promise>();
149+
let closure = Closure::once_into_js(move || {
150+
self.poll();
151+
});
152+
promise.then(&closure);
153+
}
154+
}
155+
}
156+
157+
fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise {
158+
// If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today)
159+
// then we use our fallback, otherwise we use the native function.
160+
if Atomics::get_wait_async().is_undefined() {
161+
crate::wait_async_polyfill::wait_async(ptr, val)
162+
} else {
163+
let mem = wasm_bindgen::memory().unchecked_into::<js_sys::WebAssembly::Memory>();
164+
Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val)
165+
}
166+
}

0 commit comments

Comments
 (0)