Skip to content

Commit 4584e50

Browse files
MajorBreakfastcramertj
authored andcommitted
Rename Inner to ReadyToRunQueue for clarity
1 parent 8be84ea commit 4584e50

File tree

3 files changed

+36
-36
lines changed

3 files changed

+36
-36
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ use futures_core::{Stream, Future, Poll};
1515
use futures_core::task::{self, AtomicWaker};
1616

1717
mod abort;
18-
mod inner;
18+
mod ready_to_run_queue;
1919
mod iter_mut;
2020
mod node;
2121

22-
use self::inner::{Inner, Dequeue};
22+
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
2323
use self::iter_mut::IterMut;
2424
use self::node::Node;
2525

@@ -40,7 +40,7 @@ use self::node::Node;
4040
/// an empty set with the `FuturesUnordered::new` constructor.
4141
#[must_use = "streams do nothing unless polled"]
4242
pub struct FuturesUnordered<F> {
43-
inner: Arc<Inner<F>>,
43+
ready_to_run_queue: Arc<ReadyToRunQueue<F>>,
4444
len: usize,
4545
head_all: *const Node<F>,
4646
}
@@ -85,22 +85,22 @@ impl<T> FuturesUnordered<T>
8585
future: UnsafeCell::new(None),
8686
next_all: UnsafeCell::new(ptr::null()),
8787
prev_all: UnsafeCell::new(ptr::null()),
88-
next_readiness: AtomicPtr::new(ptr::null_mut()),
88+
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
8989
queued: AtomicBool::new(true),
90-
queue: Weak::new(),
90+
ready_to_run_queue: Weak::new(),
9191
});
9292
let stub_ptr = &*stub as *const Node<T>;
93-
let inner = Arc::new(Inner {
93+
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
9494
parent: AtomicWaker::new(),
95-
head_readiness: AtomicPtr::new(stub_ptr as *mut _),
96-
tail_readiness: UnsafeCell::new(stub_ptr),
95+
head: AtomicPtr::new(stub_ptr as *mut _),
96+
tail: UnsafeCell::new(stub_ptr),
9797
stub: stub,
9898
});
9999

100100
FuturesUnordered {
101101
len: 0,
102102
head_all: ptr::null_mut(),
103-
inner: inner,
103+
ready_to_run_queue: ready_to_run_queue,
104104
}
105105
}
106106
}
@@ -129,9 +129,9 @@ impl<T> FuturesUnordered<T> {
129129
future: UnsafeCell::new(Some(future)),
130130
next_all: UnsafeCell::new(ptr::null_mut()),
131131
prev_all: UnsafeCell::new(ptr::null_mut()),
132-
next_readiness: AtomicPtr::new(ptr::null_mut()),
132+
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
133133
queued: AtomicBool::new(true),
134-
queue: Arc::downgrade(&self.inner),
134+
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
135135
});
136136

137137
// Right now our node has a strong reference count of 1. We transfer
@@ -143,7 +143,7 @@ impl<T> FuturesUnordered<T> {
143143
// e.g. getting its unpark notifications going to us tracking which
144144
// futures are ready. To do that we unconditionally enqueue it for
145145
// polling here.
146-
self.inner.enqueue(ptr);
146+
self.ready_to_run_queue.enqueue(ptr);
147147
}
148148

149149
/// Returns an iterator that allows modifying each future in the set.
@@ -232,12 +232,12 @@ impl<T> Stream for FuturesUnordered<T>
232232
-> Poll<Option<Self::Item>>
233233
{
234234
// Ensure `parent` is correctly set.
235-
self.inner.parent.register(cx.waker());
235+
self.ready_to_run_queue.parent.register(cx.waker());
236236

237237
loop {
238238
// Safety: &mut self guarantees the mutual exclusion `dequeue`
239239
// expects
240-
let node = match unsafe { self.inner.dequeue() } {
240+
let node = match unsafe { self.ready_to_run_queue.dequeue() } {
241241
Dequeue::Empty => {
242242
if self.is_empty() {
243243
return Poll::Ready(None);
@@ -255,7 +255,7 @@ impl<T> Stream for FuturesUnordered<T>
255255
Dequeue::Data(node) => node,
256256
};
257257

258-
debug_assert!(node != self.inner.stub());
258+
debug_assert!(node != self.ready_to_run_queue.stub());
259259

260260
// Safety:
261261
// - Node is a valid pointer.
@@ -376,7 +376,7 @@ impl<T> Drop for FuturesUnordered<T> {
376376
// mpsc queue. None of those nodes, however, have futures associated
377377
// with them so they're safe to destroy on any thread. At this point
378378
// the `FuturesUnordered` struct, the owner of the one strong reference
379-
// to `Inner<T>` will drop the strong reference. At that point
379+
// to `MPSCQueue<T>` will drop the strong reference. At that point
380380
// whichever thread releases the strong refcount last (be it this
381381
// thread or some other thread as part of an `upgrade`) will clear out
382382
// the mpsc queue and free all remaining nodes.

futures-util/src/stream/futures_unordered/node.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::sync::atomic::Ordering::SeqCst;
88

99
use futures_core::task::{UnsafeWake, Waker, LocalWaker};
1010

11-
use super::Inner;
11+
use super::ReadyToRunQueue;
1212
use super::abort::abort;
1313

1414
pub(super) struct Node<T> {
@@ -22,18 +22,18 @@ pub(super) struct Node<T> {
2222
pub(super) prev_all: UnsafeCell<*const Node<T>>,
2323

2424
// Next pointer in readiness queue
25-
pub(super) next_readiness: AtomicPtr<Node<T>>,
25+
pub(super) next_ready_to_run: AtomicPtr<Node<T>>,
2626

2727
// Queue that we'll be enqueued to when notified
28-
pub(super) queue: Weak<Inner<T>>,
28+
pub(super) ready_to_run_queue: Weak<ReadyToRunQueue<T>>,
2929

3030
// Whether or not this node is currently in the mpsc queue.
3131
pub(super) queued: AtomicBool,
3232
}
3333

3434
impl<T> Node<T> {
3535
pub(super) fn wake(self: &Arc<Node<T>>) {
36-
let inner = match self.queue.upgrade() {
36+
let inner = match self.ready_to_run_queue.upgrade() {
3737
Some(inner) => inner,
3838
None => return,
3939
};

futures-util/src/stream/futures_unordered/inner.rs renamed to futures-util/src/stream/futures_unordered/ready_to_run_queue.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,29 @@ pub(super) enum Dequeue<T> {
1515
Inconsistent,
1616
}
1717

18-
pub(super) struct Inner<T> {
18+
pub(super) struct ReadyToRunQueue<T> {
1919
// The task using `FuturesUnordered`.
2020
pub(super) parent: AtomicWaker,
2121

2222
// Head/tail of the readiness queue
23-
pub(super) head_readiness: AtomicPtr<Node<T>>,
24-
pub(super) tail_readiness: UnsafeCell<*const Node<T>>,
23+
pub(super) head: AtomicPtr<Node<T>>,
24+
pub(super) tail: UnsafeCell<*const Node<T>>,
2525
pub(super) stub: Arc<Node<T>>,
2626
}
2727

28-
impl<T> Inner<T> {
28+
impl<T> ReadyToRunQueue<T> {
2929
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
3030
pub(super) fn enqueue(&self, node: *const Node<T>) {
3131
unsafe {
3232
debug_assert!((*node).queued.load(Relaxed));
3333

3434
// This action does not require any coordination
35-
(*node).next_readiness.store(ptr::null_mut(), Relaxed);
35+
(*node).next_ready_to_run.store(ptr::null_mut(), Relaxed);
3636

3737
// Note that these atomic orderings come from 1024cores
3838
let node = node as *mut _;
39-
let prev = self.head_readiness.swap(node, AcqRel);
40-
(*prev).next_readiness.store(node, Release);
39+
let prev = self.head.swap(node, AcqRel);
40+
(*prev).next_ready_to_run.store(node, Release);
4141
}
4242
}
4343

@@ -46,35 +46,35 @@ impl<T> Inner<T> {
4646
/// Note that this is unsafe as it required mutual exclusion (only one
4747
/// thread can call this) to be guaranteed elsewhere.
4848
pub(super) unsafe fn dequeue(&self) -> Dequeue<T> {
49-
let mut tail = *self.tail_readiness.get();
50-
let mut next = (*tail).next_readiness.load(Acquire);
49+
let mut tail = *self.tail.get();
50+
let mut next = (*tail).next_ready_to_run.load(Acquire);
5151

5252
if tail == self.stub() {
5353
if next.is_null() {
5454
return Dequeue::Empty;
5555
}
5656

57-
*self.tail_readiness.get() = next;
57+
*self.tail.get() = next;
5858
tail = next;
59-
next = (*next).next_readiness.load(Acquire);
59+
next = (*next).next_ready_to_run.load(Acquire);
6060
}
6161

6262
if !next.is_null() {
63-
*self.tail_readiness.get() = next;
63+
*self.tail.get() = next;
6464
debug_assert!(tail != self.stub());
6565
return Dequeue::Data(tail);
6666
}
6767

68-
if self.head_readiness.load(Acquire) as *const _ != tail {
68+
if self.head.load(Acquire) as *const _ != tail {
6969
return Dequeue::Inconsistent;
7070
}
7171

7272
self.enqueue(self.stub());
7373

74-
next = (*tail).next_readiness.load(Acquire);
74+
next = (*tail).next_ready_to_run.load(Acquire);
7575

7676
if !next.is_null() {
77-
*self.tail_readiness.get() = next;
77+
*self.tail.get() = next;
7878
return Dequeue::Data(tail);
7979
}
8080

@@ -86,7 +86,7 @@ impl<T> Inner<T> {
8686
}
8787
}
8888

89-
impl<T> Drop for Inner<T> {
89+
impl<T> Drop for ReadyToRunQueue<T> {
9090
fn drop(&mut self) {
9191
// Once we're in the destructor for `Inner<T>` we need to clear out the
9292
// mpsc queue of nodes if there's anything left in there.

0 commit comments

Comments
 (0)