Skip to content

Commit 65105ff

Browse files
authored
Delayed queuing of spawn_local tasks (#2854)
* add test case * delay scheduling of tasks spawned inside other tasks to the next tick * fix multithreading * no need for first_run flag * Use a single queue, always schedule for future ticks * pin wasmprinter
1 parent 1ceec1a commit 65105ff

File tree

5 files changed

+73
-25
lines changed

5 files changed

+73
-25
lines changed

crates/cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ diff = "0.1"
3535
predicates = "1.0.0"
3636
rayon = "1.0"
3737
tempfile = "3.0"
38+
wasmprinter = "0.2, <=0.2.33" # pinned for wit-printer
3839
wit-printer = "0.2"
3940
wit-text = "0.8"
4041
wit-validator = "0.2"

crates/futures/src/queue.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,27 @@ use std::rc::Rc;
55
use wasm_bindgen::prelude::*;
66

77
struct QueueState {
8-
// The queue of Tasks which will be run in order. In practice this is all the
8+
// The queue of Tasks which are to be run in order. In practice this is all the
99
// synchronous work of futures, and each `Task` represents calling `poll` on
10-
// a future "at the right time"
10+
// a future "at the right time".
1111
tasks: RefCell<VecDeque<Rc<crate::task::Task>>>,
1212

13-
// This flag indicates whether we're currently executing inside of
14-
// `run_all` or have scheduled `run_all` to run in the future. This is
15-
// used to ensure that it's only scheduled once.
16-
is_spinning: Cell<bool>,
13+
// This flag indicates whether we've scheduled `run_all` to run in the future.
14+
// This is used to ensure that it's only scheduled once.
15+
is_scheduled: Cell<bool>,
1716
}
1817

1918
impl QueueState {
2019
fn run_all(&self) {
21-
debug_assert!(self.is_spinning.get());
20+
// "consume" the schedule
21+
let _was_scheduled = self.is_scheduled.replace(false);
22+
debug_assert!(_was_scheduled);
2223

23-
// Runs all Tasks until empty. This blocks the event loop if a Future is
24-
// stuck in an infinite loop, so we may want to yield back to the main
25-
// event loop occasionally. For now though greedy execution should get
26-
// the job done.
27-
loop {
24+
// Stop when all tasks that have been scheduled before this tick have been run.
25+
// Tasks that are scheduled while running tasks will run on the next tick.
26+
let mut task_count_left = self.tasks.borrow().len();
27+
while task_count_left > 0 {
28+
task_count_left -= 1;
2829
let task = match self.tasks.borrow_mut().pop_front() {
2930
Some(task) => task,
3031
None => break,
@@ -34,7 +35,6 @@ impl QueueState {
3435

3536
// All of the Tasks have been run, so it's now possible to schedule the
3637
// next tick again
37-
self.is_spinning.set(false);
3838
}
3939
}
4040

@@ -45,26 +45,28 @@ pub(crate) struct Queue {
4545
}
4646

4747
impl Queue {
48-
pub(crate) fn push_task(&self, task: Rc<crate::task::Task>) {
48+
// Schedule a task to run on the next tick
49+
pub(crate) fn schedule_task(&self, task: Rc<crate::task::Task>) {
4950
self.state.tasks.borrow_mut().push_back(task);
50-
51-
// If we're already inside the `run_all` loop then that'll pick up the
52-
// task we just enqueued. If we're not in `run_all`, though, then we need
53-
// to schedule a microtask.
54-
//
5551
// Note that we currently use a promise and a closure to do this, but
5652
// eventually we should probably use something like `queueMicrotask`:
5753
// https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/queueMicrotask
58-
if !self.state.is_spinning.replace(true) {
54+
if !self.state.is_scheduled.replace(true) {
5955
let _ = self.promise.then(&self.closure);
6056
}
6157
}
58+
// Append a task to the currently running queue, or schedule it
59+
pub(crate) fn push_task(&self, task: Rc<crate::task::Task>) {
60+
// It would make sense to run this task on the same tick. For now, we
61+
// make the simplifying choice of always scheduling tasks for a future tick.
62+
self.schedule_task(task)
63+
}
6264
}
6365

6466
impl Queue {
6567
fn new() -> Self {
6668
let state = Rc::new(QueueState {
67-
is_spinning: Cell::new(false),
69+
is_scheduled: Cell::new(false),
6870
tasks: RefCell::new(VecDeque::new()),
6971
});
7072

crates/futures/src/task/multithread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Task {
102102
*this.inner.borrow_mut() = Some(Inner { future, closure });
103103

104104
// Queue up the Future's work to happen on the next microtask tick.
105-
crate::queue::QUEUE.with(move |queue| queue.push_task(this));
105+
crate::queue::QUEUE.with(move |queue| queue.schedule_task(this));
106106
}
107107

108108
pub(crate) fn run(&self) {

crates/futures/src/task/singlethread.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ impl Task {
2525
pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
2626
let this = Rc::new(Self {
2727
inner: RefCell::new(None),
28-
is_queued: Cell::new(false),
28+
is_queued: Cell::new(true),
2929
});
3030

3131
let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };
3232

3333
*this.inner.borrow_mut() = Some(Inner { future, waker });
3434

35-
Task::wake_by_ref(&this);
35+
crate::queue::QUEUE.with(|queue| queue.schedule_task(this));
3636
}
3737

3838
fn wake_by_ref(this: &Rc<Self>) {

crates/futures/tests/tests.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
44

55
use futures_channel::oneshot;
6-
use wasm_bindgen::prelude::*;
6+
use js_sys::Promise;
7+
use std::ops::FnMut;
8+
use wasm_bindgen::{prelude::*, JsValue};
79
use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture};
810
use wasm_bindgen_test::*;
911

@@ -68,6 +70,49 @@ async fn spawn_local_runs() {
6870
assert_eq!(rx.await.unwrap(), 42);
6971
}
7072

73+
#[wasm_bindgen_test]
74+
async fn spawn_local_nested() {
75+
let (ta, mut ra) = oneshot::channel::<u32>();
76+
let (ts, rs) = oneshot::channel::<u32>();
77+
let (tx, rx) = oneshot::channel::<u32>();
78+
// The order in which the various promises and tasks run is important!
79+
// We want, on different ticks each, the following things to happen
80+
// 1. A promise resolves, off of which we can spawn our inbetween assertion
81+
// 2. The outer task runs, spawns in the inner task, and the inbetween promise, then yields
82+
// 3. The inbetween promise runs and asserts that the inner task hasn't run
83+
// 4. The inner task runs
84+
// This depends crucially on two facts:
85+
// - JsFuture schedules on ticks independently from tasks
86+
// - The order of ticks is the same as the code flow
87+
let promise = Promise::resolve(&JsValue::null());
88+
89+
spawn_local(async move {
90+
// Create a closure that runs in between the two ticks and
91+
// assert that the inner task hasn't run yet
92+
let inbetween = Closure::wrap(Box::new(move |_| {
93+
assert_eq!(
94+
ra.try_recv().unwrap(),
95+
None,
96+
"Nested task should not have run yet"
97+
);
98+
}) as Box<dyn FnMut(JsValue)>);
99+
let inbetween = promise.then(&inbetween);
100+
spawn_local(async {
101+
ta.send(0xdead).unwrap();
102+
ts.send(0xbeaf).unwrap();
103+
});
104+
JsFuture::from(inbetween).await.unwrap();
105+
assert_eq!(
106+
rs.await.unwrap(),
107+
0xbeaf,
108+
"Nested task should run eventually"
109+
);
110+
tx.send(42).unwrap();
111+
});
112+
113+
assert_eq!(rx.await.unwrap(), 42);
114+
}
115+
71116
#[wasm_bindgen_test]
72117
async fn spawn_local_err_no_exception() {
73118
let (tx, rx) = oneshot::channel::<u32>();

0 commit comments

Comments
 (0)