Skip to content

Commit cdbf2c9

Browse files
committed
added ticketing system
1 parent fb5ed11 commit cdbf2c9

File tree

2 files changed

+148
-119
lines changed

2 files changed

+148
-119
lines changed

futures-util/src/lock/rwlock.rs

+141-119
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,49 @@ use std::cell::UnsafeCell;
55
use std::fmt;
66
use std::ops::{Deref, DerefMut};
77
use std::pin::Pin;
8-
use std::process;
98
use std::sync::atomic::{AtomicUsize, Ordering};
109

10+
struct State {
11+
ins: AtomicUsize,
12+
out: AtomicUsize,
13+
}
14+
1115
/// A futures-aware read-write lock.
1216
pub struct RwLock<T: ?Sized> {
13-
state: AtomicUsize,
17+
read_state: State,
18+
write_state: State,
1419
readers: WaiterSet,
1520
writers: WaiterSet,
1621
value: UnsafeCell<T>,
1722
}
1823

1924
impl<T: ?Sized> fmt::Debug for RwLock<T> {
2025
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21-
let state = self.state.load(Ordering::SeqCst);
22-
f.debug_struct("RwLock")
23-
.field("is_locked", &((state & IS_LOCKED) != 0))
24-
.field("readers", &((state & READ_COUNT) >> 1))
25-
.finish()
26+
f.debug_struct("RwLock").finish()
2627
}
2728
}
2829

2930
#[allow(clippy::identity_op)]
30-
const IS_LOCKED: usize = 1 << 0;
31-
const ONE_READER: usize = 1 << 1;
32-
const READ_COUNT: usize = !(ONE_READER - 1);
33-
const MAX_READERS: usize = usize::max_value() >> 1;
31+
const PHASE: usize = 1 << 0;
32+
const ONE_WRITER: usize = 1 << 1;
33+
const ONE_READER: usize = 1 << 2;
34+
const WRITE_BITS: usize = ONE_WRITER | PHASE;
35+
36+
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
37+
const WAIT_KEY_NONE: usize = usize::max_value();
3438

3539
impl<T> RwLock<T> {
3640
/// Creates a new futures-aware read-write lock.
3741
pub fn new(t: T) -> RwLock<T> {
3842
RwLock {
39-
state: AtomicUsize::new(0),
43+
read_state: State {
44+
ins: AtomicUsize::new(0),
45+
out: AtomicUsize::new(0),
46+
},
47+
write_state: State {
48+
ins: AtomicUsize::new(0),
49+
out: AtomicUsize::new(0),
50+
},
4051
readers: WaiterSet::new(),
4152
writers: WaiterSet::new(),
4253
value: UnsafeCell::new(t),
@@ -59,52 +70,14 @@ impl<T> RwLock<T> {
5970
}
6071

6172
impl<T: ?Sized> RwLock<T> {
62-
/// Attempt to acquire a lock with shared read access immediately.
63-
///
64-
/// If the lock is currently held by a writer, this will return `None`.
65-
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
66-
let mut state = self.state.load(Ordering::Acquire);
67-
68-
loop {
69-
if state & IS_LOCKED != 0 {
70-
return None;
71-
}
72-
73-
if state > MAX_READERS {
74-
process::abort();
75-
}
76-
77-
match self.state.compare_exchange_weak(
78-
state,
79-
state + ONE_READER,
80-
Ordering::SeqCst,
81-
Ordering::SeqCst,
82-
) {
83-
Ok(_) => return Some(RwLockReadGuard { rwlock: self }),
84-
Err(s) => state = s,
85-
}
86-
}
87-
}
88-
89-
/// Attempt to acquire a lock with exclusive write access immediately.
90-
///
91-
/// If there are any other locks, either for read or write access, this
92-
/// will return `None`.
93-
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
94-
if self.state.compare_and_swap(0, IS_LOCKED, Ordering::SeqCst) == 0 {
95-
Some(RwLockWriteGuard { rwlock: self })
96-
} else {
97-
None
98-
}
99-
}
100-
10173
/// Acquire a read access lock asynchronously.
10274
///
10375
/// This method returns a future that will resolve once all write access
10476
/// locks have been dropped.
10577
pub fn read(&self) -> RwLockReadFuture<'_, T> {
10678
RwLockReadFuture {
10779
rwlock: Some(self),
80+
ticket: None,
10881
wait_key: WAIT_KEY_NONE,
10982
}
11083
}
@@ -116,6 +89,7 @@ impl<T: ?Sized> RwLock<T> {
11689
pub fn write(&self) -> RwLockWriteFuture<'_, T> {
11790
RwLockWriteFuture {
11891
rwlock: Some(self),
92+
ticket: None,
11993
wait_key: WAIT_KEY_NONE,
12094
}
12195
}
@@ -133,22 +107,35 @@ impl<T: ?Sized> RwLock<T> {
133107
///
134108
/// let mut rwlock = RwLock::new(0);
135109
/// *rwlock.get_mut() = 10;
136-
/// assert_eq!(*rwlock.lock().await, 10);
110+
/// assert_eq!(*rwlock.read().await, 10);
137111
/// # });
138112
/// ```
139113
pub fn get_mut(&mut self) -> &mut T {
140114
unsafe { &mut *self.value.get() }
141115
}
142116
}
143117

144-
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
145-
const WAIT_KEY_NONE: usize = usize::max_value();
118+
#[derive(Debug)]
119+
enum Ticket {
120+
Read(usize),
121+
Write(usize),
122+
}
123+
124+
impl Ticket {
125+
fn value(&self) -> usize {
126+
match self {
127+
Ticket::Read(value) => *value,
128+
Ticket::Write(value) => *value,
129+
}
130+
}
131+
}
146132

147133
/// A future which resolves when the target read access lock has been successfully
148134
/// acquired.
149135
pub struct RwLockReadFuture<'a, T: ?Sized> {
150136
// `None` indicates that the mutex was successfully acquired.
151137
rwlock: Option<&'a RwLock<T>>,
138+
ticket: Option<Ticket>,
152139
wait_key: usize,
153140
}
154141

@@ -157,6 +144,7 @@ impl<T: ?Sized> fmt::Debug for RwLockReadFuture<'_, T> {
157144
f.debug_struct("RwLockReadFuture")
158145
.field("was_acquired", &self.rwlock.is_none())
159146
.field("rwlock", &self.rwlock)
147+
.field("ticket", &self.ticket)
160148
.field(
161149
"wait_key",
162150
&(if self.wait_key == WAIT_KEY_NONE {
@@ -183,37 +171,48 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
183171
.rwlock
184172
.expect("polled RwLockReadFuture after completion");
185173

186-
if let Some(lock) = rwlock.try_read() {
174+
// The ticket is defined by the write bits stored within the read-in count
175+
let ticket = self
176+
.ticket
177+
.get_or_insert_with(|| {
178+
Ticket::Read(
179+
rwlock
180+
.read_state
181+
.ins
182+
.fetch_add(ONE_READER, Ordering::SeqCst)
183+
& WRITE_BITS,
184+
)
185+
})
186+
.value();
187+
188+
// Safe to create guard when either there are no writers (ticket == 0) or if
189+
// at least one of the two write bits change.
190+
// Writers always wait until the current reader phase completes before acquiring
191+
// the lock; thus the PHASE bit both maintains the read-write condition and
192+
// prevents deadlock in the case that this line isn't reached before a writer sets
193+
// the ONE_WRITER bit.
194+
if ticket == 0 || ticket != rwlock.read_state.ins.load(Ordering::Relaxed) & WRITE_BITS {
187195
if self.wait_key != WAIT_KEY_NONE {
188196
rwlock.readers.remove(self.wait_key);
189197
}
190198
self.rwlock = None;
191-
return Poll::Ready(lock);
192-
}
193-
194-
if self.wait_key == WAIT_KEY_NONE {
195-
self.wait_key = rwlock.readers.insert(cx.waker());
199+
Poll::Ready(RwLockReadGuard { rwlock })
196200
} else {
197-
rwlock.readers.register(self.wait_key, cx.waker());
198-
}
199-
200-
// Ensure that we haven't raced `RwLockWriteGuard::drop`'s unlock path by
201-
// attempting to acquire the lock again.
202-
if let Some(lock) = rwlock.try_read() {
203-
rwlock.readers.remove(self.wait_key);
204-
self.rwlock = None;
205-
return Poll::Ready(lock);
201+
if self.wait_key == WAIT_KEY_NONE {
202+
self.wait_key = rwlock.readers.insert(cx.waker());
203+
} else {
204+
rwlock.readers.register(self.wait_key, cx.waker());
205+
}
206+
Poll::Pending
206207
}
207-
208-
Poll::Pending
209208
}
210209
}
211210

212211
impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
213212
fn drop(&mut self) {
214-
if let Some(rwlock) = self.rwlock {
213+
if let Some(_) = self.rwlock {
215214
if self.wait_key != WAIT_KEY_NONE {
216-
rwlock.readers.remove(self.wait_key);
215+
panic!("RwLockReadFuture dropped before completion");
217216
}
218217
}
219218
}
@@ -223,6 +222,7 @@ impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
223222
/// acquired.
224223
pub struct RwLockWriteFuture<'a, T: ?Sized> {
225224
rwlock: Option<&'a RwLock<T>>,
225+
ticket: Option<Ticket>,
226226
wait_key: usize,
227227
}
228228

@@ -257,42 +257,69 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
257257
.rwlock
258258
.expect("polled RwLockWriteFuture after completion");
259259

260-
if let Some(lock) = rwlock.try_write() {
261-
if self.wait_key != WAIT_KEY_NONE {
262-
rwlock.writers.remove(self.wait_key);
260+
match self.ticket {
261+
None => {
262+
let ticket = rwlock.write_state.ins.fetch_add(1, Ordering::SeqCst);
263+
self.ticket = Some(Ticket::Write(ticket));
264+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
265+
// Note that the WRITE_BITS are always cleared at this point.
266+
let ticket = rwlock
267+
.read_state
268+
.ins
269+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
270+
self.ticket = Some(Ticket::Read(ticket));
271+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
272+
self.rwlock = None;
273+
Poll::Ready(RwLockWriteGuard { rwlock })
274+
} else {
275+
self.wait_key = rwlock.writers.insert(cx.waker());
276+
Poll::Pending
277+
}
278+
} else {
279+
self.wait_key = rwlock.writers.insert(cx.waker());
280+
Poll::Pending
281+
}
282+
}
283+
Some(Ticket::Write(ticket)) => {
284+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
285+
// Note that the WRITE_BITS are always cleared at this point.
286+
let ticket = rwlock
287+
.read_state
288+
.ins
289+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
290+
self.ticket = Some(Ticket::Read(ticket));
291+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
292+
rwlock.writers.remove(self.wait_key);
293+
self.rwlock = None;
294+
Poll::Ready(RwLockWriteGuard { rwlock })
295+
} else {
296+
rwlock.writers.register(self.wait_key, cx.waker());
297+
Poll::Pending
298+
}
299+
} else {
300+
rwlock.writers.register(self.wait_key, cx.waker());
301+
Poll::Pending
302+
}
303+
}
304+
Some(Ticket::Read(ticket)) => {
305+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
306+
rwlock.writers.remove(self.wait_key);
307+
self.rwlock = None;
308+
Poll::Ready(RwLockWriteGuard { rwlock })
309+
} else {
310+
rwlock.writers.register(self.wait_key, cx.waker());
311+
Poll::Pending
312+
}
263313
}
264-
self.rwlock = None;
265-
return Poll::Ready(lock);
266-
}
267-
268-
if self.wait_key == WAIT_KEY_NONE {
269-
self.wait_key = rwlock.writers.insert(cx.waker());
270-
} else {
271-
rwlock.writers.register(self.wait_key, cx.waker());
272-
}
273-
274-
// Ensure that we haven't raced `RwLockWriteGuard::drop` or
275-
// `RwLockReadGuard::drop`'s unlock path by attempting to acquire
276-
// the lock again.
277-
if let Some(lock) = rwlock.try_write() {
278-
rwlock.writers.remove(self.wait_key);
279-
self.rwlock = None;
280-
return Poll::Ready(lock);
281314
}
282-
283-
Poll::Pending
284315
}
285316
}
286317

287318
impl<T: ?Sized> Drop for RwLockWriteFuture<'_, T> {
288319
fn drop(&mut self) {
289-
if let Some(rwlock) = self.rwlock {
320+
if let Some(_) = self.rwlock {
290321
if self.wait_key != WAIT_KEY_NONE {
291-
// This future was dropped before it acquired the rwlock.
292-
//
293-
// Remove ourselves from the map, waking up another waiter if we
294-
// had been awoken to acquire the lock.
295-
rwlock.writers.cancel(self.wait_key);
322+
panic!("RwLockWriteFuture dropped before completion");
296323
}
297324
}
298325
}
@@ -316,15 +343,17 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
316343

317344
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
318345
fn drop(&mut self) {
319-
let old_state = self.rwlock.state.fetch_sub(ONE_READER, Ordering::SeqCst);
320-
if old_state & READ_COUNT == ONE_READER {
321-
self.rwlock.writers.notify_any();
322-
}
346+
self.rwlock
347+
.read_state
348+
.out
349+
.fetch_add(ONE_READER, Ordering::SeqCst);
350+
self.rwlock.writers.notify_all();
323351
}
324352
}
325353

326354
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
327355
type Target = T;
356+
328357
fn deref(&self) -> &T {
329358
unsafe { &*self.rwlock.value.get() }
330359
}
@@ -354,10 +383,13 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
354383

355384
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
356385
fn drop(&mut self) {
357-
self.rwlock.state.store(0, Ordering::SeqCst);
358-
if !self.rwlock.readers.notify_all() {
359-
self.rwlock.writers.notify_any();
360-
}
386+
self.rwlock
387+
.read_state
388+
.ins
389+
.fetch_and(!WRITE_BITS, Ordering::Relaxed);
390+
self.rwlock.write_state.out.fetch_add(1, Ordering::Relaxed);
391+
self.rwlock.writers.notify_all();
392+
self.rwlock.readers.notify_all();
361393
}
362394
}
363395

@@ -388,13 +420,3 @@ unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
388420

389421
unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {}
390422
unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
391-
392-
#[test]
393-
fn test_rwlock_guard_debug_not_recurse() {
394-
let rwlock = RwLock::new(42);
395-
let guard = rwlock.try_read().unwrap();
396-
let _ = format!("{:?}", guard);
397-
drop(guard);
398-
let guard = rwlock.try_write().unwrap();
399-
let _ = format!("{:?}", guard);
400-
}

0 commit comments

Comments
 (0)