Skip to content

Commit e648614

Browse files
taiki-eNemo157
authored andcommitted
Update to new futures_api (wake-by-ref)
1 parent 828a7d1 commit e648614

File tree

21 files changed

+75
-54
lines changed

21 files changed

+75
-54
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ matrix:
2525

2626
# When updating this, the reminder to update the minimum required version in README.md.
2727
- name: cargo test (minimum required version)
28-
rust: nightly-2019-04-08
28+
rust: nightly-2019-04-13
2929

3030
- name: cargo clippy
3131
rust: nightly

futures-core/src/task/__internal/atomic_waker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ impl AtomicWaker {
241241
// Currently in the process of waking the task, i.e.,
242242
// `wake` is currently being called on the old task handle.
243243
// So, we call wake on the new waker
244-
waker.wake();
244+
waker.wake_by_ref();
245245
}
246246
state => {
247247
// In this case, a concurrent thread is holding the

futures-executor/benches/thread_notify.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
2424
Poll::Ready(())
2525
} else {
2626
self.rem -= 1;
27-
cx.waker().wake();
27+
cx.waker().wake_by_ref();
2828
Poll::Pending
2929
}
3030
}
@@ -52,7 +52,7 @@ fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
5252
Poll::Ready(())
5353
} else {
5454
self.rem -= 1;
55-
cx.waker().wake();
55+
cx.waker().wake_by_ref();
5656
Poll::Pending
5757
}
5858
}

futures-executor/src/local_pool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ thread_local! {
5050
}
5151

5252
impl ArcWake for ThreadNotify {
53-
fn wake(arc_self: &Arc<Self>) {
53+
fn wake_by_ref(arc_self: &Arc<Self>) {
5454
arc_self.thread.unpark();
5555
}
5656
}

futures-executor/src/thread_pool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ impl fmt::Debug for Task {
339339
}
340340

341341
impl ArcWake for WakeHandle {
342-
fn wake(arc_self: &Arc<Self>) {
342+
fn wake_by_ref(arc_self: &Arc<Self>) {
343343
match arc_self.mutex.notify() {
344344
Ok(task) => arc_self.exec.state.send(Message::Run(task)),
345345
Err(()) => {}

futures-executor/tests/local_pool.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ fn tasks_are_scheduled_fairly() {
147147
return Poll::Ready(());
148148
}
149149

150-
cx.waker().wake();
150+
cx.waker().wake_by_ref();
151151
Poll::Pending
152152
}
153153
}
@@ -167,4 +167,3 @@ fn tasks_are_scheduled_fairly() {
167167

168168
pool.run();
169169
}
170-

futures-io/src/lib.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ mod if_std {
100100
///
101101
/// If no data is available for reading, the method returns
102102
/// `Ok(Poll::Pending)` and arranges for the current task (via
103-
/// `cx.waker().wake()`) to receive a notification when the object becomes
103+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
104104
/// readable or is closed.
105105
///
106106
/// # Implementation
@@ -122,7 +122,7 @@ mod if_std {
122122
///
123123
/// If no data is available for reading, the method returns
124124
/// `Ok(Poll::Pending)` and arranges for the current task (via
125-
/// `cx.waker().wake()`) to receive a notification when the object becomes
125+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
126126
/// readable or is closed.
127127
/// By default, this method delegates to using `poll_read` on the first
128128
/// buffer in `vec`. Objects which support vectored IO should override
@@ -160,7 +160,7 @@ mod if_std {
160160
///
161161
/// If the object is not ready for writing, the method returns
162162
/// `Ok(Poll::Pending)` and arranges for the current task (via
163-
/// `cx.waker().wake()`) to receive a notification when the object becomes
163+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
164164
/// readable or is closed.
165165
///
166166
/// # Implementation
@@ -182,7 +182,7 @@ mod if_std {
182182
///
183183
/// If the object is not ready for writing, the method returns
184184
/// `Ok(Poll::Pending)` and arranges for the current task (via
185-
/// `cx.waker().wake()`) to receive a notification when the object becomes
185+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
186186
/// readable or is closed.
187187
///
188188
/// By default, this method delegates to using `poll_write` on the first
@@ -213,7 +213,7 @@ mod if_std {
213213
///
214214
/// If flushing cannot immediately complete, this method returns
215215
/// `Ok(Poll::Pending)` and arranges for the current task (via
216-
/// `cx.waker().wake()`) to receive a notification when the object can make
216+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
217217
/// progress towards flushing.
218218
///
219219
/// # Implementation
@@ -230,7 +230,7 @@ mod if_std {
230230
///
231231
/// If closing cannot immediately complete, this function returns
232232
/// `Ok(Poll::Pending)` and arranges for the current task (via
233-
/// `cx.waker().wake()`) to receive a notification when the object can make
233+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
234234
/// progress towards closing.
235235
///
236236
/// # Implementation

futures-sink/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub trait Sink<Item> {
5959
///
6060
/// This method returns `Poll::Ready` once the underlying sink is ready to
6161
/// receive data. If this method returns `Poll::Pending`, the current task
62-
/// is registered to be notified (via `cx.waker().wake()`) when `poll_ready`
62+
/// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
6363
/// should be called again.
6464
///
6565
/// In most cases, if the sink encounters an error, the sink will
@@ -95,7 +95,7 @@ pub trait Sink<Item> {
9595
/// via `start_send` have been flushed.
9696
///
9797
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
98-
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
98+
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
9999
/// `poll_flush` should be called again.
100100
///
101101
/// In most cases, if the sink encounters an error, the sink will
@@ -108,7 +108,7 @@ pub trait Sink<Item> {
108108
/// has been successfully closed.
109109
///
110110
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
111-
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
111+
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
112112
/// `poll_close` should be called again.
113113
///
114114
/// If this function encounters an error, the sink should be considered to

futures-test/src/future/pending_once.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl<Fut: Future> Future for PendingOnce<Fut> {
3939
self.as_mut().future().poll(cx)
4040
} else {
4141
*self.as_mut().polled_before() = true;
42-
cx.waker().wake();
42+
cx.waker().wake_by_ref();
4343
Poll::Pending
4444
}
4545
}

futures-test/src/task/panic_waker.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@ use futures_core::task::{Waker, RawWaker, RawWakerVTable};
22
use core::cell::UnsafeCell;
33
use core::ptr::null;
44

5-
unsafe fn clone_panic_waker(_data: *const()) -> RawWaker {
5+
unsafe fn clone_panic_waker(_data: *const ()) -> RawWaker {
66
raw_panic_waker()
77
}
88

9-
unsafe fn noop(_data: *const()) {
10-
}
9+
unsafe fn noop(_data: *const ()) {}
1110

12-
unsafe fn wake_panic(_data: *const()) {
11+
unsafe fn wake_panic(_data: *const ()) {
1312
panic!("should not be woken");
1413
}
1514

1615
const PANIC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1716
clone_panic_waker,
1817
wake_panic,
18+
wake_panic,
1919
noop,
2020
);
2121

@@ -37,7 +37,7 @@ fn raw_panic_waker() -> RawWaker {
3737
/// waker.wake(); // Will panic
3838
/// ```
3939
pub fn panic_waker() -> Waker {
40-
unsafe { Waker::new_unchecked(raw_panic_waker()) }
40+
unsafe { Waker::from_raw(raw_panic_waker()) }
4141
}
4242

4343
/// Get a global reference to a
@@ -52,7 +52,7 @@ pub fn panic_waker() -> Waker {
5252
/// use futures_test::task::panic_waker_ref;
5353
///
5454
/// let waker = panic_waker_ref();
55-
/// waker.wake(); // Will panic
55+
/// waker.wake_by_ref(); // Will panic
5656
/// ```
5757
pub fn panic_waker_ref() -> &'static Waker {
5858
thread_local! {

futures-test/src/task/wake_counter.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct WakerInner {
3030
}
3131

3232
impl ArcWake for WakerInner {
33-
fn wake(arc_self: &Arc<Self>) {
33+
fn wake_by_ref(arc_self: &Arc<Self>) {
3434
let _ = arc_self.count.fetch_add(1, Ordering::SeqCst);
3535
}
3636
}
@@ -49,7 +49,7 @@ impl ArcWake for WakerInner {
4949
///
5050
/// assert_eq!(count, 0);
5151
///
52-
/// waker.wake();
52+
/// waker.wake_by_ref();
5353
/// waker.wake();
5454
///
5555
/// assert_eq!(count, 2);

futures-util/src/compat/compat01as03.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
316316

317317
impl Notify01 for NotifyWaker {
318318
fn notify(&self, _: usize) {
319-
self.0.wake();
319+
self.0.wake_by_ref();
320320
}
321321
}
322322

futures-util/src/compat/compat03as01.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,15 @@ impl Current {
174174
}
175175

176176
let ptr = current_to_ptr(self);
177-
let vtable = &RawWakerVTable::new(clone, wake, drop);
177+
let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
178178
unsafe {
179-
WakerRef::new(task03::Waker::new_unchecked(RawWaker::new(ptr, vtable)))
179+
WakerRef::new(task03::Waker::from_raw(RawWaker::new(ptr, vtable)))
180180
}
181181
}
182182
}
183183

184184
impl ArcWake03 for Current {
185-
fn wake(arc_self: &Arc<Self>) {
185+
fn wake_by_ref(arc_self: &Arc<Self>) {
186186
arc_self.0.notify();
187187
}
188188
}

futures-util/src/future/shared.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ where
315315
}
316316

317317
impl ArcWake for Notifier {
318-
fn wake(arc_self: &Arc<Self>) {
318+
fn wake_by_ref(arc_self: &Arc<Self>) {
319319
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);
320320

321321
let wakers = &mut *arc_self.wakers.lock().unwrap();

futures-util/src/stream/futures_unordered/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
311311
// At this point, it may be worth yielding the thread &
312312
// spinning a few times... but for now, just yield using the
313313
// task system.
314-
cx.waker().wake();
314+
cx.waker().wake_by_ref();
315315
return Poll::Pending;
316316
}
317317
Dequeue::Data(task) => task,

futures-util/src/stream/futures_unordered/task.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ unsafe impl<Fut> Send for Task<Fut> {}
3636
unsafe impl<Fut> Sync for Task<Fut> {}
3737

3838
impl<Fut> ArcWake for Task<Fut> {
39-
fn wake(arc_self: &Arc<Self>) {
39+
fn wake_by_ref(arc_self: &Arc<Self>) {
4040
let inner = match arc_self.ready_to_run_queue.upgrade() {
4141
Some(inner) => inner,
4242
None => return,

futures-util/src/stream/select_next_some.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl<'a, St: Stream + FusedStream + Unpin> Future for SelectNextSome<'a, St> {
3434
Poll::Ready(item)
3535
} else {
3636
debug_assert!(self.stream.is_terminated());
37-
cx.waker().wake();
37+
cx.waker().wake_by_ref();
3838
Poll::Pending
3939
}
4040
}

futures-util/src/task/arc_wake.rs

+32-11
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,41 @@ pub trait ArcWake {
1717
///
1818
/// Executors generally maintain a queue of "ready" tasks; `wake` should place
1919
/// the associated task onto this queue.
20-
fn wake(arc_self: &Arc<Self>);
20+
fn wake(self: Arc<Self>) {
21+
Self::wake_by_ref(&self)
22+
}
23+
24+
/// Indicates that the associated task is ready to make progress and should
25+
/// be `poll`ed.
26+
///
27+
/// This function can be called from an arbitrary thread, including threads which
28+
/// did not create the `ArcWake` based `Waker`.
29+
///
30+
/// Executors generally maintain a queue of "ready" tasks; `wake_by_ref` should place
31+
/// the associated task onto this queue.
32+
///
33+
/// This function is similar to `wake`, but must not consume the provided data
34+
/// pointer.
35+
fn wake_by_ref(arc_self: &Arc<Self>);
2136

2237
/// Creates a `Waker` from an Arc<T>, if T implements `ArcWake`.
2338
///
2439
/// If `wake()` is called on the returned `Waker`,
2540
/// the `wake()` function that is defined inside this trait will get called.
2641
fn into_waker(self: Arc<Self>) -> Waker where Self: Sized
2742
{
28-
let ptr = Arc::into_raw(self) as *const();
43+
let ptr = Arc::into_raw(self) as *const ();
2944

3045
unsafe {
31-
Waker::new_unchecked(RawWaker::new(ptr, waker_vtable!(Self)))
46+
Waker::from_raw(RawWaker::new(ptr, waker_vtable!(Self)))
3247
}
3348
}
3449
}
3550

3651
// FIXME: panics on Arc::clone / refcount changes could wreak havoc on the
3752
// code here. We should guard against this by aborting.
3853

39-
unsafe fn increase_refcount<T: ArcWake>(data: *const()) {
54+
unsafe fn increase_refcount<T: ArcWake>(data: *const ()) {
4055
// Retain Arc by creating a copy
4156
let arc: Arc<T> = Arc::from_raw(data as *const T);
4257
let arc_clone = arc.clone();
@@ -46,19 +61,25 @@ unsafe fn increase_refcount<T: ArcWake>(data: *const()) {
4661
}
4762

4863
// used by `waker_ref`
49-
pub(super) unsafe fn clone_arc_raw<T: ArcWake>(data: *const()) -> RawWaker {
64+
pub(super) unsafe fn clone_arc_raw<T: ArcWake>(data: *const ()) -> RawWaker {
5065
increase_refcount::<T>(data);
5166
RawWaker::new(data, waker_vtable!(T))
5267
}
5368

54-
unsafe fn drop_arc_raw<T: ArcWake>(data: *const()) {
69+
unsafe fn drop_arc_raw<T: ArcWake>(data: *const ()) {
5570
drop(Arc::<T>::from_raw(data as *const T))
5671
}
5772

5873
// used by `waker_ref`
59-
pub(super) unsafe fn wake_arc_raw<T: ArcWake>(data: *const()) {
74+
pub(super) unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
75+
let arc: Arc<T> = Arc::from_raw(data as *const T);
76+
ArcWake::wake(arc);
77+
}
78+
79+
// used by `waker_ref`
80+
pub(super) unsafe fn wake_by_ref_arc_raw<T: ArcWake>(data: *const ()) {
6081
let arc: Arc<T> = Arc::from_raw(data as *const T);
61-
ArcWake::wake(&arc);
82+
ArcWake::wake_by_ref(&arc);
6283
mem::forget(arc);
6384
}
6485

@@ -84,7 +105,7 @@ mod tests {
84105
}
85106

86107
impl ArcWake for CountingWaker {
87-
fn wake(arc_self: &Arc<Self>) {
108+
fn wake_by_ref(arc_self: &Arc<Self>) {
88109
let mut lock = arc_self.nr_wake.lock().unwrap();
89110
*lock += 1;
90111
}
@@ -96,13 +117,13 @@ mod tests {
96117

97118
let w1: Waker = ArcWake::into_waker(some_w.clone());
98119
assert_eq!(2, Arc::strong_count(&some_w));
99-
w1.wake();
120+
w1.wake_by_ref();
100121
assert_eq!(1, some_w.wakes());
101122

102123
let w2 = w1.clone();
103124
assert_eq!(3, Arc::strong_count(&some_w));
104125

105-
w2.wake();
126+
w2.wake_by_ref();
106127
assert_eq!(2, some_w.wakes());
107128

108129
drop(w2);

futures-util/src/task/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ cfg_target_has_atomic! {
99
&RawWakerVTable::new(
1010
clone_arc_raw::<$ty>,
1111
wake_arc_raw::<$ty>,
12+
wake_by_ref_arc_raw::<$ty>,
1213
drop_arc_raw::<$ty>,
1314
)
1415
};

0 commit comments

Comments
 (0)