Skip to content

Commit 0c216e8

Browse files
purplesyringanotgull
authored andcommitted
Ignore poisoning of active
Closes smol-rs#135. This enables the executor to be used in presence of panics in user callbacks, such as the iterator and `impl Extend` in `spawn_many`. Mutex poisoning is more of a lint than a safety requirement, as containers (such as `Slab`) and wakers have to be sound in presence of panics anyway. In this particular case, the exact behavior of `active` is not relied upon for soundness.
1 parent 9335b7e commit 0c216e8

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

src/lib.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use std::marker::PhantomData;
4444
use std::panic::{RefUnwindSafe, UnwindSafe};
4545
use std::rc::Rc;
4646
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
47-
use std::sync::{Arc, Mutex, RwLock, TryLockError};
47+
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
4848
use std::task::{Poll, Waker};
4949

5050
use async_task::{Builder, Runnable};
@@ -143,7 +143,7 @@ impl<'a> Executor<'a> {
143143
/// assert!(ex.is_empty());
144144
/// ```
145145
pub fn is_empty(&self) -> bool {
146-
self.state().active.lock().unwrap().is_empty()
146+
self.state().active().is_empty()
147147
}
148148

149149
/// Spawns a task onto the executor.
@@ -160,7 +160,7 @@ impl<'a> Executor<'a> {
160160
/// });
161161
/// ```
162162
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
163-
let mut active = self.state().active.lock().unwrap();
163+
let mut active = self.state().active();
164164

165165
// SAFETY: `T` and the future are `Send`.
166166
unsafe { self.spawn_inner(future, &mut active) }
@@ -211,7 +211,7 @@ impl<'a> Executor<'a> {
211211
futures: impl IntoIterator<Item = F>,
212212
handles: &mut impl Extend<Task<F::Output>>,
213213
) {
214-
let mut active = Some(self.state().active.lock().unwrap());
214+
let mut active = Some(self.state().active());
215215

216216
// Convert the futures into tasks.
217217
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
@@ -221,7 +221,7 @@ impl<'a> Executor<'a> {
221221
// Yield the lock every once in a while to ease contention.
222222
if i.wrapping_sub(1) % 500 == 0 {
223223
drop(active.take());
224-
active = Some(self.state().active.lock().unwrap());
224+
active = Some(self.state().active());
225225
}
226226

227227
task
@@ -246,7 +246,7 @@ impl<'a> Executor<'a> {
246246
let index = entry.key();
247247
let state = self.state_as_arc();
248248
let future = async move {
249-
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
249+
let _guard = CallOnDrop(move || drop(state.active().try_remove(index)));
250250
future.await
251251
};
252252

@@ -415,7 +415,7 @@ impl Drop for Executor<'_> {
415415
// via Arc::into_raw in state_ptr.
416416
let state = unsafe { Arc::from_raw(ptr) };
417417

418-
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
418+
let mut active = state.active();
419419
for w in active.drain() {
420420
w.wake();
421421
}
@@ -517,7 +517,7 @@ impl<'a> LocalExecutor<'a> {
517517
/// });
518518
/// ```
519519
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
520-
let mut active = self.inner().state().active.lock().unwrap();
520+
let mut active = self.inner().state().active();
521521

522522
// SAFETY: This executor is not thread safe, so the future and its result
523523
// cannot be sent to another thread.
@@ -569,7 +569,7 @@ impl<'a> LocalExecutor<'a> {
569569
futures: impl IntoIterator<Item = F>,
570570
handles: &mut impl Extend<Task<F::Output>>,
571571
) {
572-
let mut active = self.inner().state().active.lock().unwrap();
572+
let mut active = self.inner().state().active();
573573

574574
// Convert all of the futures to tasks.
575575
let tasks = futures.into_iter().map(|future| {
@@ -694,6 +694,11 @@ impl State {
694694
}
695695
}
696696

697+
/// Returns a reference to currently active tasks.
698+
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
699+
self.active.lock().unwrap_or_else(|e| e.into_inner())
700+
}
701+
697702
/// Notifies a sleeping ticker.
698703
#[inline]
699704
fn notify(&self) {
@@ -1099,7 +1104,7 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re
10991104
match self.0.try_lock() {
11001105
Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
11011106
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1102-
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1107+
Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
11031108
}
11041109
}
11051110
}

tests/drop.rs

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ fn iterator_panics_mid_run() {
133133
)
134134
});
135135
assert!(panic.is_err());
136+
137+
let task = ex.spawn(future::ready(0));
138+
assert_eq!(future::block_on(ex.run(task)), 0);
136139
}
137140

138141
struct CallOnDrop<F: Fn()>(F);

0 commit comments

Comments
 (0)