Skip to content

Commit c2249c7

Browse files
committed
Update to new futures_api (cx)
1 parent dfb7210 commit c2249c7

File tree

170 files changed

+1022
-1009
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+1022
-1009
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ matrix:
2525

2626
# When updating this, the reminder to update the minimum required version in README.md.
2727
- name: cargo test (minimum required version)
28-
rust: nightly-2019-02-15
28+
rust: nightly-2019-04-08
2929

3030
- name: cargo clippy
3131
rust: nightly

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
"futures-channel",
66
"futures-executor",
77
"futures-io",
8+
"futures-select-macro",
89
"futures-sink",
910
"futures-util",
1011
"futures-test",

futures-channel/benches/sync_mpsc.rs

+20-20
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use {
99
ready,
1010
stream::{Stream, StreamExt},
1111
sink::Sink,
12-
task::{Waker, Poll},
12+
task::{Context, Poll},
1313
},
1414
futures_test::task::noop_waker_ref,
1515
std::pin::Pin,
@@ -18,7 +18,7 @@ use {
1818
/// Single producer, single consumer
1919
#[bench]
2020
fn unbounded_1_tx(b: &mut Bencher) {
21-
let waker = noop_waker_ref();
21+
let mut cx = Context::from_waker(noop_waker_ref());
2222
b.iter(|| {
2323
let (tx, mut rx) = mpsc::unbounded();
2424

@@ -27,20 +27,20 @@ fn unbounded_1_tx(b: &mut Bencher) {
2727
for i in 0..1000 {
2828

2929
// Poll, not ready, park
30-
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
30+
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
3131

3232
UnboundedSender::unbounded_send(&tx, i).unwrap();
3333

3434
// Now poll ready
35-
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
35+
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
3636
}
3737
})
3838
}
3939

4040
/// 100 producers, single consumer
4141
#[bench]
4242
fn unbounded_100_tx(b: &mut Bencher) {
43-
let waker = noop_waker_ref();
43+
let mut cx = Context::from_waker(noop_waker_ref());
4444
b.iter(|| {
4545
let (tx, mut rx) = mpsc::unbounded();
4646

@@ -49,26 +49,26 @@ fn unbounded_100_tx(b: &mut Bencher) {
4949
// 1000 send/recv operations total, result should be divided by 1000
5050
for _ in 0..10 {
5151
for i in 0..tx.len() {
52-
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
52+
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
5353

5454
UnboundedSender::unbounded_send(&tx[i], i).unwrap();
5555

56-
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
56+
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
5757
}
5858
}
5959
})
6060
}
6161

6262
#[bench]
6363
fn unbounded_uncontended(b: &mut Bencher) {
64-
let waker = noop_waker_ref();
64+
let mut cx = Context::from_waker(noop_waker_ref());
6565
b.iter(|| {
6666
let (tx, mut rx) = mpsc::unbounded();
6767

6868
for i in 0..1000 {
6969
UnboundedSender::unbounded_send(&tx, i).expect("send");
7070
// No need to create a task, because poll is not going to park.
71-
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
71+
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
7272
}
7373
})
7474
}
@@ -84,41 +84,41 @@ struct TestSender {
8484
impl Stream for TestSender {
8585
type Item = u32;
8686

87-
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker)
87+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
8888
-> Poll<Option<Self::Item>>
8989
{
9090
let this = &mut *self;
9191
let mut tx = Pin::new(&mut this.tx);
9292

93-
ready!(tx.as_mut().poll_ready(waker)).unwrap();
93+
ready!(tx.as_mut().poll_ready(cx)).unwrap();
9494
tx.as_mut().start_send(this.last + 1).unwrap();
9595
this.last += 1;
96-
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(waker));
96+
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
9797
Poll::Ready(Some(this.last))
9898
}
9999
}
100100

101101
/// Single producers, single consumer
102102
#[bench]
103103
fn bounded_1_tx(b: &mut Bencher) {
104-
let waker = noop_waker_ref();
104+
let mut cx = Context::from_waker(noop_waker_ref());
105105
b.iter(|| {
106106
let (tx, mut rx) = mpsc::channel(0);
107107

108108
let mut tx = TestSender { tx, last: 0 };
109109

110110
for i in 0..1000 {
111-
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(waker));
112-
assert_eq!(Poll::Pending, tx.poll_next_unpin(waker));
113-
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
111+
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
112+
assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
113+
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
114114
}
115115
})
116116
}
117117

118118
/// 100 producers, single consumer
119119
#[bench]
120120
fn bounded_100_tx(b: &mut Bencher) {
121-
let waker = noop_waker_ref();
121+
let mut cx = Context::from_waker(noop_waker_ref());
122122
b.iter(|| {
123123
// Each sender can send one item after specified capacity
124124
let (tx, mut rx) = mpsc::channel(0);
@@ -133,11 +133,11 @@ fn bounded_100_tx(b: &mut Bencher) {
133133
for i in 0..10 {
134134
for j in 0..tx.len() {
135135
// Send an item
136-
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(waker));
136+
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(&mut cx));
137137
// Then block
138-
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(waker));
138+
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(&mut cx));
139139
// Recv the item
140-
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
140+
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
141141
}
142142
}
143143
})

futures-channel/src/mpsc/mod.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
// by the queue structure.
8080

8181
use futures_core::stream::{FusedStream, Stream};
82-
use futures_core::task::{Waker, Poll};
82+
use futures_core::task::{Context, Poll, Waker};
8383
use futures_core::task::__internal::AtomicWaker;
8484
use std::any::Any;
8585
use std::error::Error;
@@ -555,7 +555,7 @@ impl<T> SenderInner<T> {
555555
/// - `Err(SendError)` if the receiver has been dropped.
556556
fn poll_ready(
557557
&mut self,
558-
waker: &Waker
558+
cx: &mut Context<'_>,
559559
) -> Poll<Result<(), SendError>> {
560560
let state = decode_state(self.inner.state.load(SeqCst));
561561
if !state.is_open {
@@ -564,7 +564,7 @@ impl<T> SenderInner<T> {
564564
}));
565565
}
566566

567-
self.poll_unparked(Some(waker)).map(Ok)
567+
self.poll_unparked(Some(cx)).map(Ok)
568568
}
569569

570570
/// Returns whether this channel is closed without needing a context.
@@ -582,7 +582,7 @@ impl<T> SenderInner<T> {
582582
self.inner.recv_task.wake();
583583
}
584584

585-
fn poll_unparked(&mut self, waker: Option<&Waker>) -> Poll<()> {
585+
fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
586586
// First check the `maybe_parked` variable. This avoids acquiring the
587587
// lock in most cases
588588
if self.maybe_parked {
@@ -600,7 +600,7 @@ impl<T> SenderInner<T> {
600600
//
601601
// Update the task in case the `Sender` has been moved to another
602602
// task
603-
task.task = waker.cloned();
603+
task.task = cx.map(|cx| cx.waker().clone());
604604

605605
Poll::Pending
606606
} else {
@@ -649,12 +649,12 @@ impl<T> Sender<T> {
649649
/// - `Err(SendError)` if the receiver has been dropped.
650650
pub fn poll_ready(
651651
&mut self,
652-
waker: &Waker,
652+
cx: &mut Context<'_>,
653653
) -> Poll<Result<(), SendError>> {
654654
let inner = self.0.as_mut().ok_or(SendError {
655655
kind: SendErrorKind::Disconnected,
656656
})?;
657-
inner.poll_ready(waker)
657+
inner.poll_ready(cx)
658658
}
659659

660660
/// Returns whether this channel is closed without needing a context.
@@ -679,7 +679,7 @@ impl<T> UnboundedSender<T> {
679679
/// Check if the channel is ready to receive a message.
680680
pub fn poll_ready(
681681
&self,
682-
_: &Waker,
682+
_: &mut Context<'_>,
683683
) -> Poll<Result<(), SendError>> {
684684
let inner = self.0.as_ref().ok_or(SendError {
685685
kind: SendErrorKind::Disconnected,
@@ -904,7 +904,7 @@ impl<T> Stream for Receiver<T> {
904904

905905
fn poll_next(
906906
mut self: Pin<&mut Self>,
907-
waker: &Waker,
907+
cx: &mut Context<'_>,
908908
) -> Poll<Option<T>> {
909909
// Try to read a message off of the message queue.
910910
match self.next_message() {
@@ -916,7 +916,7 @@ impl<T> Stream for Receiver<T> {
916916
},
917917
Poll::Pending => {
918918
// There are no messages to read, in this case, park.
919-
self.inner.as_ref().unwrap().recv_task.register(waker);
919+
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
920920
// Check queue again after parking to prevent race condition:
921921
// a message could be added to the queue after previous `next_message`
922922
// before `register` call.
@@ -971,9 +971,9 @@ impl<T> Stream for UnboundedReceiver<T> {
971971

972972
fn poll_next(
973973
mut self: Pin<&mut Self>,
974-
waker: &Waker,
974+
cx: &mut Context<'_>,
975975
) -> Poll<Option<T>> {
976-
Pin::new(&mut self.0).poll_next(waker)
976+
Pin::new(&mut self.0).poll_next(cx)
977977
}
978978
}
979979

futures-channel/src/oneshot.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A channel for sending a single message between asynchronous tasks.
22
33
use futures_core::future::Future;
4-
use futures_core::task::{Waker, Poll};
4+
use futures_core::task::{Context, Poll, Waker};
55
use std::pin::Pin;
66
use std::sync::Arc;
77
use std::sync::atomic::AtomicBool;
@@ -154,7 +154,7 @@ impl<T> Inner<T> {
154154
}
155155
}
156156

157-
fn poll_cancel(&self, waker: &Waker) -> Poll<()> {
157+
fn poll_cancel(&self, cx: &mut Context<'_>) -> Poll<()> {
158158
// Fast path up first, just read the flag and see if our other half is
159159
// gone. This flag is set both in our destructor and the oneshot
160160
// destructor, but our destructor hasn't run yet so if it's set then the
@@ -176,7 +176,7 @@ impl<T> Inner<T> {
176176
// `Receiver` may have been dropped. The first thing it does is set the
177177
// flag, and if it fails to acquire the lock it assumes that we'll see
178178
// the flag later on. So... we then try to see the flag later on!
179-
let handle = waker.clone();
179+
let handle = cx.waker().clone();
180180
match self.tx_task.try_lock() {
181181
Some(mut p) => *p = Some(handle),
182182
None => return Poll::Ready(()),
@@ -249,7 +249,7 @@ impl<T> Inner<T> {
249249
}
250250
}
251251

252-
fn recv(&self, waker: &Waker) -> Poll<Result<T, Canceled>> {
252+
fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
253253
// Check to see if some data has arrived. If it hasn't then we need to
254254
// block our task.
255255
//
@@ -260,7 +260,7 @@ impl<T> Inner<T> {
260260
let done = if self.complete.load(SeqCst) {
261261
true
262262
} else {
263-
let task = waker.clone();
263+
let task = cx.waker().clone();
264264
match self.rx_task.try_lock() {
265265
Some(mut slot) => { *slot = Some(task); false },
266266
None => true,
@@ -348,8 +348,8 @@ impl<T> Sender<T> {
348348
/// alive and may be able to receive a message if sent. The current task,
349349
/// however, is scheduled to receive a notification if the corresponding
350350
/// `Receiver` goes away.
351-
pub fn poll_cancel(&mut self, waker: &Waker) -> Poll<()> {
352-
self.inner.poll_cancel(waker)
351+
pub fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()> {
352+
self.inner.poll_cancel(cx)
353353
}
354354

355355
/// Tests to see whether this `Sender`'s corresponding `Receiver`
@@ -416,9 +416,9 @@ impl<T> Future for Receiver<T> {
416416

417417
fn poll(
418418
self: Pin<&mut Self>,
419-
waker: &Waker,
419+
cx: &mut Context<'_>,
420420
) -> Poll<Result<T, Canceled>> {
421-
self.inner.recv(waker)
421+
self.inner.recv(cx)
422422
}
423423
}
424424

futures-channel/tests/channel.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
3636
fn drop_sender() {
3737
let (tx, mut rx) = mpsc::channel::<u32>(1);
3838
drop(tx);
39-
let f = poll_fn(|lw| {
40-
rx.poll_next_unpin(lw)
39+
let f = poll_fn(|cx| {
40+
rx.poll_next_unpin(cx)
4141
});
4242
assert_eq!(block_on(f), None)
4343
}

0 commit comments

Comments
 (0)