Skip to content

Commit 74e7fa9

Browse files
committed
Fix waker behavior when invoked before a poll finishes
It's possible for a task's waker to be invoked in the middle of a call to that task's `poll` by the executor. We had accounted for that possibility if the task called *its own* waker, but that's not good enough: the waker can escape to other threads that can invoke it before `poll` finishes (e.g., if the task blocks to acquire a lock). This change fixes the waker behavior by clarifying the semantics of a call to `wake`: a task whose waker is invoked should not be blocked *when it next returns Pending to the executor*, and should be woken if that has already happened. To do this, we introduce a new `Sleeping` state for tasks, that has the same semantics as `Blocked` but that is recognized by waker invocations, which will only unblock a task in `Sleeping` state. This also removes the special case "woken by self" behavior -- being woken by *any* thread should be enough to trigger this sleep logic.
1 parent 31f169d commit 74e7fa9

File tree

6 files changed

+155
-50
lines changed

6 files changed

+155
-50
lines changed

src/asynch.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,7 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
148148
match future.as_mut().poll(cx) {
149149
Poll::Ready(result) => break result,
150150
Poll::Pending => {
151-
ExecutionState::with(|state| {
152-
state.current_mut().block_unless_self_woken();
153-
});
151+
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
154152
}
155153
}
156154

src/runtime/execution.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure};
22
use crate::runtime::task::clock::VectorClock;
3-
use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS};
3+
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
44
use crate::runtime::thread::continuation::PooledContinuation;
55
use crate::scheduler::{Schedule, Scheduler};
66
use crate::{Config, MaxSteps};
@@ -61,7 +61,12 @@ impl Execution {
6161

6262
EXECUTION_STATE.set(&state, move || {
6363
// Spawn `f` as the first task
64-
ExecutionState::spawn_thread(f, config.stack_size, None, Some(VectorClock::new()));
64+
ExecutionState::spawn_thread(
65+
f,
66+
config.stack_size,
67+
Some("main-thread".to_string()),
68+
Some(VectorClock::new()),
69+
);
6570

6671
// Run the test to completion
6772
while self.step(config) {}
@@ -92,21 +97,29 @@ impl Execution {
9297
NextStep::Task(Rc::clone(&task.continuation))
9398
}
9499
ScheduledTask::Finished => {
95-
let task_states = state
96-
.tasks
97-
.iter()
98-
.map(|t| (t.id, t.state, t.detached))
99-
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
100-
if task_states
101-
.iter()
102-
.any(|(_, s, detached)| !detached && *s == TaskState::Blocked)
103-
{
100+
// The scheduler decided we're finished, so there are either no runnable tasks,
101+
// or all runnable tasks are detached and there are no unfinished attached
102+
// tasks. Therefore, it's a deadlock if there are unfinished attached tasks.
103+
if state.tasks.iter().any(|t| !t.finished() && !t.detached) {
104+
let blocked_tasks = state
105+
.tasks
106+
.iter()
107+
.filter(|t| !t.finished())
108+
.map(|t| {
109+
format!(
110+
"{} (task {}{}{})",
111+
t.name().unwrap_or_else(|| "<unknown>".to_string()),
112+
t.id().0,
113+
if t.detached { ", detached" } else { "" },
114+
if t.sleeping() { ", pending future" } else { "" },
115+
)
116+
})
117+
.collect::<Vec<_>>();
104118
NextStep::Failure(
105-
format!("deadlock! runnable tasks: {:?}", task_states),
119+
format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")),
106120
state.current_schedule.clone(),
107121
)
108122
} else {
109-
debug_assert!(state.tasks.iter().all(|t| t.detached || t.finished()));
110123
NextStep::Finished
111124
}
112125
}
@@ -485,21 +498,21 @@ impl ExecutionState {
485498
_ => {}
486499
}
487500

488-
let mut blocked_attached = false;
501+
let mut unfinished_attached = false;
489502
let runnable = self
490503
.tasks
491504
.iter()
492-
.inspect(|t| blocked_attached = blocked_attached || (t.blocked() && !t.detached))
505+
.inspect(|t| unfinished_attached = unfinished_attached || (!t.finished() && !t.detached))
493506
.filter(|t| t.runnable())
494507
.map(|t| t.id)
495508
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
496509

497510
// We should finish execution when either
498511
// (1) There are no runnable tasks, or
499-
// (2) All runnable tasks have been detached AND there are no blocked attached tasks
500-
// If there are some blocked attached tasks and all runnable tasks are detached,
501-
// we must run some detached task so that blocked attached tasks may become unblocked.
502-
if runnable.is_empty() || (!blocked_attached && runnable.iter().all(|id| self.get(*id).detached)) {
512+
// (2) All runnable tasks have been detached AND there are no unfinished attached tasks
513+
// If there are some unfinished attached tasks and all runnable tasks are detached, we must
514+
// run some detached task to give them a chance to unblock some unfinished attached task.
515+
if runnable.is_empty() || (!unfinished_attached && runnable.iter().all(|id| self.get(*id).detached)) {
503516
self.next_task = ScheduledTask::Finished;
504517
return Ok(());
505518
}

src/runtime/task/mod.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub(crate) const DEFAULT_INLINE_TASKS: usize = 16;
3939

4040
/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
4141
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
42+
#[derive(Debug)]
4243
pub(crate) struct Task {
4344
pub(super) id: TaskId,
4445
pub(super) state: TaskState,
@@ -51,8 +52,8 @@ pub(crate) struct Task {
5152
waiter: Option<TaskId>,
5253

5354
waker: Waker,
54-
// Remember whether the waker was invoked while we were running so we don't re-block
55-
woken_by_self: bool,
55+
// Remember whether the waker was invoked while we were running
56+
woken: bool,
5657

5758
name: Option<String>,
5859

@@ -77,7 +78,7 @@ impl Task {
7778
clock,
7879
waiter: None,
7980
waker,
80-
woken_by_self: false,
81+
woken: false,
8182
detached: false,
8283
name,
8384
local_storage: LocalMap::new(),
@@ -107,10 +108,7 @@ impl Task {
107108
let waker = ExecutionState::with(|state| state.current_mut().waker());
108109
let cx = &mut Context::from_waker(&waker);
109110
while future.as_mut().poll(cx).is_pending() {
110-
ExecutionState::with(|state| {
111-
// We need to block before thread::switch() unless we woke ourselves up
112-
state.current_mut().block_unless_self_woken();
113-
});
111+
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
114112
thread::switch();
115113
}
116114
},
@@ -133,6 +131,10 @@ impl Task {
133131
self.state == TaskState::Blocked
134132
}
135133

134+
pub(crate) fn sleeping(&self) -> bool {
135+
self.state == TaskState::Sleeping
136+
}
137+
136138
pub(crate) fn finished(&self) -> bool {
137139
self.state == TaskState::Finished
138140
}
@@ -150,6 +152,11 @@ impl Task {
150152
self.state = TaskState::Blocked;
151153
}
152154

155+
pub(crate) fn sleep(&mut self) {
156+
assert!(self.state != TaskState::Finished);
157+
self.state = TaskState::Sleeping;
158+
}
159+
153160
pub(crate) fn unblock(&mut self) {
154161
// Note we don't assert the task is blocked here. For example, a task invoking its own waker
155162
// will not be blocked when this is called.
@@ -162,23 +169,25 @@ impl Task {
162169
self.state = TaskState::Finished;
163170
}
164171

165-
/// Potentially block this task after it was polled by the executor.
172+
/// Potentially put this task to sleep after it was polled by the executor, unless someone has
173+
/// called its waker first.
166174
///
167-
/// A synchronous Task should never call this, because we want threads to be
168-
/// enabled-by-default to avoid bugs where Shuttle incorrectly omits a potential execution.
169-
/// We also need to handle a special case where a task invoked its own waker, in which case
170-
/// we should not block the task.
171-
pub(crate) fn block_unless_self_woken(&mut self) {
172-
let was_woken_by_self = std::mem::replace(&mut self.woken_by_self, false);
173-
if !was_woken_by_self {
174-
self.block();
175+
/// A synchronous Task should never call this, because we want threads to be enabled-by-default
176+
/// to avoid bugs where Shuttle incorrectly omits a potential execution.
177+
pub(crate) fn sleep_unless_woken(&mut self) {
178+
let was_woken = std::mem::replace(&mut self.woken, false);
179+
if !was_woken {
180+
self.sleep();
175181
}
176182
}
177183

178-
/// Remember that we have been unblocked while we were currently running, and therefore should
179-
/// not be blocked again by `block_unless_self_woken`.
180-
pub(super) fn set_woken_by_self(&mut self) {
181-
self.woken_by_self = true;
184+
/// Remember that our waker has been called, and so we should not block the next time the
185+
/// executor tries to put us to sleep.
186+
pub(super) fn wake(&mut self) {
187+
self.woken = true;
188+
if self.state == TaskState::Sleeping {
189+
self.unblock();
190+
}
182191
}
183192

184193
/// Register a waiter for this thread to terminate. Returns a boolean indicating whether the
@@ -241,8 +250,13 @@ impl Task {
241250

242251
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
243252
pub(crate) enum TaskState {
253+
/// Available to be scheduled
244254
Runnable,
255+
/// Blocked in a synchronization operation
245256
Blocked,
257+
/// A `Future` that returned `Pending` is waiting to be woken up
258+
Sleeping,
259+
/// Task has finished
246260
Finished,
247261
}
248262

@@ -332,6 +346,7 @@ impl LocalKeyId {
332346
/// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid
333347
/// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed
334348
/// from `order` and its value is replaced with None.
349+
#[derive(Debug)]
335350
struct LocalMap {
336351
locals: HashMap<LocalKeyId, Option<Box<dyn Any>>>,
337352
order: VecDeque<LocalKeyId>,

src/runtime/task/waker.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ unsafe fn raw_waker_wake(data: *const ()) {
3838
return;
3939
}
4040

41-
waiter.unblock();
42-
43-
let current = state.current_mut();
44-
if current.id() == task_id {
45-
current.set_woken_by_self();
46-
}
41+
waiter.wake();
4742
});
4843
}
4944

src/runtime/thread/continuation.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,12 @@ impl DerefMut for PooledContinuation {
247247
}
248248
}
249249

250+
impl std::fmt::Debug for PooledContinuation {
251+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
252+
f.debug_struct("PooledContinuation").finish()
253+
}
254+
}
255+
250256
// Safety: these aren't sent across real threads
251257
unsafe impl Send for PooledContinuation {}
252258

tests/asynch/waker.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use futures::future::poll_fn;
2+
use shuttle::sync::atomic::{AtomicBool, Ordering};
3+
use shuttle::sync::Mutex;
4+
use shuttle::{asynch, check_dfs, thread};
15
use std::future::Future;
26
use std::pin::Pin;
7+
use std::sync::Arc;
38
use std::task::{Context, Poll, Waker};
4-
5-
use shuttle::{asynch, check_dfs};
69
use test_env_log::test;
710

811
#[test]
@@ -49,3 +52,78 @@ fn wake_after_finish() {
4952
None,
5053
)
5154
}
55+
56+
// Test that we can pass wakers across threads and have them work correctly
57+
#[test]
58+
fn wake_during_poll() {
59+
check_dfs(
60+
|| {
61+
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
62+
let waker_clone = Arc::clone(&waker);
63+
let signal = Arc::new(AtomicBool::new(false));
64+
let signal_clone = Arc::clone(&signal);
65+
66+
// This thread might invoke `wake` before the other task finishes running a single
67+
// invocation of `poll`. If that happens, that task must not be blocked.
68+
thread::spawn(move || {
69+
signal_clone.store(true, Ordering::SeqCst);
70+
71+
if let Some(waker) = waker_clone.lock().unwrap().take() {
72+
waker.wake();
73+
}
74+
});
75+
76+
asynch::block_on(poll_fn(move |cx| {
77+
*waker.lock().unwrap() = Some(cx.waker().clone());
78+
79+
if signal.load(Ordering::SeqCst) {
80+
Poll::Ready(())
81+
} else {
82+
Poll::Pending
83+
}
84+
}));
85+
},
86+
None,
87+
);
88+
}
89+
90+
// Test that a waker invocation doesn't unblock a task that is blocked due to synchronization
91+
// operations
92+
#[test]
93+
fn wake_during_blocked_poll() {
94+
check_dfs(
95+
|| {
96+
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
97+
let waker_clone = Arc::clone(&waker);
98+
let counter = Arc::new(Mutex::new(0));
99+
let counter_clone = Arc::clone(&counter);
100+
101+
thread::spawn(move || {
102+
let mut counter = counter_clone.lock().unwrap();
103+
thread::yield_now();
104+
*counter += 1;
105+
});
106+
107+
// If this `wake()` invocation happens while the thread above holds the `counter` lock
108+
// and the `block_on` task below is blocked waiting to acquire that same lock, then
109+
// `wake` must not unblock the `block_on` task. That is, `wake` should prevent the task
110+
// from being blocked *the next time it returns Pending*, not just any time it is
111+
// blocked.
112+
thread::spawn(move || {
113+
if let Some(waker) = waker_clone.lock().unwrap().take() {
114+
waker.wake();
115+
}
116+
});
117+
118+
asynch::block_on(poll_fn(move |cx| {
119+
*waker.lock().unwrap() = Some(cx.waker().clone());
120+
121+
let mut counter = counter.lock().unwrap();
122+
*counter += 1;
123+
124+
Poll::Ready(())
125+
}));
126+
},
127+
None,
128+
);
129+
}

0 commit comments

Comments
 (0)