Skip to content

Commit ed54517

Browse files
committed
fixed deadlocking problem with new WaiterSet type
1 parent 4ad7ee5 commit ed54517

File tree

3 files changed

+236
-121
lines changed

3 files changed

+236
-121
lines changed

futures-util/src/lock/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ mod rwlock;
1414
pub use self::rwlock::{
1515
RwLock, RwLockReadFuture, RwLockReadGuard, RwLockWriteFuture, RwLockWriteGuard,
1616
};
17+
#[cfg(feature = "std")]
18+
pub(crate) mod waiter;
1719

1820
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
1921
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]

futures-util/src/lock/rwlock.rs

+42-121
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1+
use crate::lock::waiter::WaiterSet;
12
use futures_core::future::{FusedFuture, Future};
2-
use futures_core::task::{Context, Poll, Waker};
3-
use slab::Slab;
3+
use futures_core::task::{Context, Poll};
44
use std::cell::UnsafeCell;
5+
use std::fmt;
56
use std::ops::{Deref, DerefMut};
67
use std::pin::Pin;
78
use std::process;
89
use std::sync::atomic::{AtomicUsize, Ordering};
9-
use std::sync::Mutex as StdMutex;
10-
use std::{fmt, mem};
1110

1211
/// A futures-aware read-write lock.
1312
pub struct RwLock<T: ?Sized> {
1413
state: AtomicUsize,
15-
read_waiters: StdMutex<Slab<Waiter>>,
16-
write_waiters: StdMutex<Slab<Waiter>>,
14+
readers: WaiterSet,
15+
writers: WaiterSet,
1716
value: UnsafeCell<T>,
1817
}
1918

@@ -22,49 +21,24 @@ impl<T: ?Sized> fmt::Debug for RwLock<T> {
2221
let state = self.state.load(Ordering::SeqCst);
2322
f.debug_struct("RwLock")
2423
.field("is_locked", &((state & IS_LOCKED) != 0))
25-
.field("has_writers", &((state & HAS_WRITERS) != 0))
26-
.field("has_readers", &((state & HAS_READERS) != 0))
27-
.field("active_readers", &((state & READ_COUNT_MASK) >> 3))
24+
.field("active_readers", &((state & READ_COUNT) >> 1))
2825
.finish()
2926
}
3027
}
3128

32-
enum Waiter {
33-
Waiting(Waker),
34-
Woken,
35-
}
36-
37-
impl Waiter {
38-
fn register(&mut self, waker: &Waker) {
39-
match self {
40-
Waiter::Waiting(w) if waker.will_wake(w) => {}
41-
_ => *self = Waiter::Waiting(waker.clone()),
42-
}
43-
}
44-
45-
fn wake(&mut self) {
46-
match mem::replace(self, Waiter::Woken) {
47-
Waiter::Waiting(waker) => waker.wake(),
48-
Waiter::Woken => {}
49-
}
50-
}
51-
}
52-
5329
#[allow(clippy::identity_op)]
5430
const IS_LOCKED: usize = 1 << 0;
55-
const HAS_WRITERS: usize = 1 << 1;
56-
const HAS_READERS: usize = 1 << 2;
57-
const ONE_READER: usize = 1 << 3;
58-
const READ_COUNT_MASK: usize = !(ONE_READER - 1);
59-
const MAX_READERS: usize = usize::max_value() >> 3;
31+
const ONE_READER: usize = 1 << 1;
32+
const READ_COUNT: usize = !(ONE_READER - 1);
33+
const MAX_READERS: usize = usize::max_value() >> 1;
6034

6135
impl<T> RwLock<T> {
6236
/// Creates a new futures-aware read-write lock.
6337
pub fn new(t: T) -> RwLock<T> {
6438
RwLock {
6539
state: AtomicUsize::new(0),
66-
read_waiters: StdMutex::new(Slab::new()),
67-
write_waiters: StdMutex::new(Slab::new()),
40+
readers: WaiterSet::new(),
41+
writers: WaiterSet::new(),
6842
value: UnsafeCell::new(t),
6943
}
7044
}
@@ -165,40 +139,6 @@ impl<T: ?Sized> RwLock<T> {
165139
pub fn get_mut(&mut self) -> &mut T {
166140
unsafe { &mut *self.value.get() }
167141
}
168-
169-
fn remove_reader(&self, wait_key: usize) {
170-
if wait_key != WAIT_KEY_NONE {
171-
let mut readers = self.read_waiters.lock().unwrap();
172-
// No need to check whether another waiter needs to be
173-
// woken up since no other readers depend on this.
174-
readers.remove(wait_key);
175-
if readers.is_empty() {
176-
self.state.fetch_and(!HAS_READERS, Ordering::Relaxed);
177-
}
178-
}
179-
}
180-
181-
fn remove_writer(&self, wait_key: usize, wake_another: bool) {
182-
if wait_key != WAIT_KEY_NONE {
183-
let mut writers = self.write_waiters.lock().unwrap();
184-
match writers.remove(wait_key) {
185-
Waiter::Waiting(_) => {}
186-
Waiter::Woken => {
187-
// We were awoken, but then dropped before we could
188-
// wake up to acquire the lock. Wake up another
189-
// waiter.
190-
if wake_another {
191-
if let Some((_, waiter)) = writers.iter_mut().next() {
192-
waiter.wake();
193-
}
194-
}
195-
}
196-
}
197-
if writers.is_empty() {
198-
self.state.fetch_and(!HAS_WRITERS, Ordering::Relaxed);
199-
}
200-
}
201-
}
202142
}
203143

204144
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
@@ -244,27 +184,23 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
244184
.expect("polled RwLockReadFuture after completion");
245185

246186
if let Some(lock) = rwlock.try_read() {
247-
rwlock.remove_reader(self.wait_key);
187+
if self.wait_key != WAIT_KEY_NONE {
188+
rwlock.readers.remove(self.wait_key);
189+
}
248190
self.rwlock = None;
249191
return Poll::Ready(lock);
250192
}
251193

252-
{
253-
let mut readers = rwlock.read_waiters.lock().unwrap();
254-
if self.wait_key == WAIT_KEY_NONE {
255-
self.wait_key = readers.insert(Waiter::Waiting(cx.waker().clone()));
256-
if readers.len() == 1 {
257-
rwlock.state.fetch_or(HAS_READERS, Ordering::Relaxed);
258-
}
259-
} else {
260-
readers[self.wait_key].register(cx.waker());
261-
}
194+
if self.wait_key == WAIT_KEY_NONE {
195+
self.wait_key = rwlock.readers.insert(cx.waker());
196+
} else {
197+
rwlock.readers.register(self.wait_key, cx.waker());
262198
}
263199

264200
// Ensure that we haven't raced `RwLockWriteGuard::drop`'s unlock path by
265201
// attempting to acquire the lock again.
266202
if let Some(lock) = rwlock.try_read() {
267-
rwlock.remove_reader(self.wait_key);
203+
rwlock.readers.remove(self.wait_key);
268204
self.rwlock = None;
269205
return Poll::Ready(lock);
270206
}
@@ -276,7 +212,9 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
276212
impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
277213
fn drop(&mut self) {
278214
if let Some(rwlock) = self.rwlock {
279-
rwlock.remove_reader(self.wait_key);
215+
if self.wait_key != WAIT_KEY_NONE {
216+
rwlock.readers.remove(self.wait_key);
217+
}
280218
}
281219
}
282220
}
@@ -320,28 +258,24 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
320258
.expect("polled RwLockWriteFuture after completion");
321259

322260
if let Some(lock) = rwlock.try_write() {
323-
rwlock.remove_writer(self.wait_key, false);
261+
if self.wait_key != WAIT_KEY_NONE {
262+
rwlock.writers.remove(self.wait_key);
263+
}
324264
self.rwlock = None;
325265
return Poll::Ready(lock);
326266
}
327267

328-
{
329-
let mut writers = rwlock.write_waiters.lock().unwrap();
330-
if self.wait_key == WAIT_KEY_NONE {
331-
self.wait_key = writers.insert(Waiter::Waiting(cx.waker().clone()));
332-
if writers.len() == 1 {
333-
rwlock.state.fetch_or(HAS_WRITERS, Ordering::Relaxed);
334-
}
335-
} else {
336-
writers[self.wait_key].register(cx.waker());
337-
}
268+
if self.wait_key == WAIT_KEY_NONE {
269+
self.wait_key = rwlock.writers.insert(cx.waker());
270+
} else {
271+
rwlock.writers.register(self.wait_key, cx.waker());
338272
}
339273

340274
// Ensure that we haven't raced `RwLockWriteGuard::drop` or
341275
// `RwLockReadGuard::drop`'s unlock path by attempting to acquire
342276
// the lock again.
343277
if let Some(lock) = rwlock.try_write() {
344-
rwlock.remove_writer(self.wait_key, false);
278+
rwlock.writers.remove(self.wait_key);
345279
self.rwlock = None;
346280
return Poll::Ready(lock);
347281
}
@@ -353,11 +287,13 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
353287
impl<T: ?Sized> Drop for RwLockWriteFuture<'_, T> {
354288
fn drop(&mut self) {
355289
if let Some(rwlock) = self.rwlock {
356-
// This future was dropped before it acquired the rwlock.
357-
//
358-
// Remove ourselves from the map, waking up another waiter if we
359-
// had been awoken to acquire the lock.
360-
rwlock.remove_writer(self.wait_key, true);
290+
if self.wait_key != WAIT_KEY_NONE {
291+
// This future was dropped before it acquired the rwlock.
292+
//
293+
// Remove ourselves from the map, waking up another waiter if we
294+
// had been awoken to acquire the lock.
295+
rwlock.writers.cancel(self.wait_key);
296+
}
361297
}
362298
}
363299
}
@@ -381,11 +317,8 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
381317
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
382318
fn drop(&mut self) {
383319
let old_state = self.rwlock.state.fetch_sub(ONE_READER, Ordering::SeqCst);
384-
if old_state & READ_COUNT_MASK == ONE_READER && old_state & HAS_WRITERS != 0 {
385-
let mut writers = self.rwlock.write_waiters.lock().unwrap();
386-
if let Some((_, waiter)) = writers.iter_mut().next() {
387-
waiter.wake();
388-
}
320+
if old_state & READ_COUNT == ONE_READER {
321+
self.rwlock.writers.notify_any();
389322
}
390323
}
391324
}
@@ -421,21 +354,9 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
421354

422355
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
423356
fn drop(&mut self) {
424-
let old_state = self.rwlock.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
425-
match (old_state & HAS_WRITERS, old_state & HAS_READERS) {
426-
(0, 0) => {}
427-
(0, _) => {
428-
let mut readers = self.rwlock.read_waiters.lock().unwrap();
429-
for (_, waiter) in readers.iter_mut() {
430-
waiter.wake();
431-
}
432-
}
433-
_ => {
434-
let mut writers = self.rwlock.write_waiters.lock().unwrap();
435-
if let Some((_, waiter)) = writers.iter_mut().next() {
436-
waiter.wake();
437-
}
438-
}
357+
self.rwlock.state.store(0, Ordering::SeqCst);
358+
if !self.rwlock.readers.notify_all() {
359+
self.rwlock.writers.notify_any();
439360
}
440361
}
441362
}

0 commit comments

Comments
 (0)