Skip to content

Commit e36172e

Browse files
author
Abhishek C. Sharma
committed
Merge remote-tracking branch 'upstream/master'
2 parents a3e6870 + 20cdf73 commit e36172e

22 files changed

+816
-397
lines changed

Cargo.toml

+8
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,11 @@ futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
5656
# These are used by the book for examples
5757
futures-channel-preview = "=0.3.0-alpha.19"
5858
futures-util-preview = "=0.3.0-alpha.19"
59+
60+
[[test]]
61+
name = "stream"
62+
required-features = ["unstable"]
63+
64+
[[example]]
65+
name = "tcp-ipv4-and-6-echo"
66+
required-features = ["unstable"]

src/future/future.rs renamed to src/future/future/mod.rs

+92
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
cfg_unstable! {
22
mod delay;
33
mod flatten;
4+
mod race;
5+
mod try_race;
46

57
use std::time::Duration;
68

79
use delay::DelayFuture;
810
use flatten::FlattenFuture;
911
use crate::future::IntoFuture;
12+
use race::Race;
13+
use try_race::TryRace;
1014
}
1115

1216
extension_trait! {
@@ -156,6 +160,94 @@ extension_trait! {
156160
{
157161
FlattenFuture::new(self)
158162
}
163+
164+
#[doc = r#"
165+
Waits for one of two similarly-typed futures to complete.
166+
167+
Awaits multiple futures simultaneously, returning the output of the
168+
first future that completes.
169+
170+
This function will return a new future which awaits for either one of both
171+
futures to complete. If multiple futures are completed at the same time,
172+
resolution will occur in the order that they have been passed.
173+
174+
Note that this macro consumes all futures passed, and once a future is
175+
completed, all other futures are dropped.
176+
177+
This macro is only usable inside of async functions, closures, and blocks.
178+
179+
# Examples
180+
181+
```
182+
# async_std::task::block_on(async {
183+
use async_std::prelude::*;
184+
use async_std::future;
185+
186+
let a = future::pending();
187+
let b = future::ready(1u8);
188+
let c = future::ready(2u8);
189+
190+
let f = a.race(b).race(c);
191+
assert_eq!(f.await, 1u8);
192+
# });
193+
```
194+
"#]
195+
#[cfg(any(feature = "unstable", feature = "docs"))]
196+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
197+
fn race<F>(
198+
self,
199+
other: F
200+
) -> impl Future<Output = <Self as std::future::Future>::Output> [Race<Self, F>]
201+
where
202+
Self: std::future::Future + Sized,
203+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
204+
{
205+
Race::new(self, other)
206+
}
207+
208+
#[doc = r#"
209+
Waits for one of two similarly-typed fallible futures to complete.
210+
211+
Awaits multiple futures simultaneously, returning all results once complete.
212+
213+
`try_race` is similar to [`race`], but keeps going if a future
214+
resolved to an error until all futures have been resolved. In which case
215+
an error is returned.
216+
217+
The ordering of which value is yielded when two futures resolve
218+
simultaneously is intentionally left unspecified.
219+
220+
# Examples
221+
222+
```
223+
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
224+
#
225+
use async_std::prelude::*;
226+
use async_std::future;
227+
use std::io::{Error, ErrorKind};
228+
229+
let a = future::pending::<Result<_, Error>>();
230+
let b = future::ready(Err(Error::from(ErrorKind::Other)));
231+
let c = future::ready(Ok(1u8));
232+
233+
let f = a.try_race(b).try_race(c);
234+
assert_eq!(f.await?, 1u8);
235+
#
236+
# Ok(()) }) }
237+
```
238+
"#]
239+
#[cfg(any(feature = "unstable", feature = "docs"))]
240+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
241+
fn try_race<F: std::future::Future, T, E>(
242+
self,
243+
other: F
244+
) -> impl Future<Output = <Self as std::future::Future>::Output> [TryRace<Self, F>]
245+
where
246+
Self: std::future::Future<Output = Result<T, E>> + Sized,
247+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
248+
{
249+
TryRace::new(self, other)
250+
}
159251
}
160252

161253
impl<F: Future + Unpin + ?Sized> Future for Box<F> {

src/future/future/race.rs

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct Race<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> Race<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R> Future for Race<L, R>
36+
where
37+
L: Future,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = L::Output;
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
45+
let mut left = this.left;
46+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
47+
return Poll::Ready(left.take().unwrap());
48+
}
49+
50+
let mut right = this.right;
51+
if Future::poll(Pin::new(&mut right), cx).is_ready() {
52+
return Poll::Ready(right.take().unwrap());
53+
}
54+
55+
Poll::Pending
56+
}
57+
}

src/future/future/try_race.rs

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct TryRace<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> TryRace<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R, T, E> Future for TryRace<L, R>
36+
where
37+
L: Future<Output = Result<T, E>>,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = L::Output;
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
let mut left_errored = false;
45+
46+
// Check if the left future is ready & successful. Continue if not.
47+
let mut left = this.left;
48+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
49+
if left.as_ref().output().unwrap().is_ok() {
50+
return Poll::Ready(left.take().unwrap());
51+
} else {
52+
left_errored = true;
53+
}
54+
}
55+
56+
// Check if the right future is ready & successful. Return err if left
57+
// future also resolved to err. Continue if not.
58+
let mut right = this.right;
59+
let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
60+
if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) {
61+
return Poll::Ready(right.take().unwrap());
62+
}
63+
64+
Poll::Pending
65+
}
66+
}

src/future/mod.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
//!
55
//! Often it's desireable to await multiple futures as if it was a single
66
//! future. The `join` family of operations converts multiple futures into a
7-
//! single future that returns all of their outputs. The `select` family of
7+
//! single future that returns all of their outputs. The `race` family of
88
//! operations converts multiple future into a single future that returns the
99
//! first output.
1010
//!
1111
//! For operating on futures the following macros can be used:
1212
//!
13-
//! | Name | Return signature | When does it return? |
14-
//! | --- | --- | --- |
15-
//! | `future::join` | `(T1, T2)` | Wait for all to complete
16-
//! | `future::select` | `T` | Return on first value
13+
//! | Name | Return signature | When does it return? |
14+
//! | --- | --- | --- |
15+
//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete
16+
//! | [`Future::race`] | `T` | Return on first value
1717
//!
1818
//! ## Fallible Futures Concurrency
1919
//!
@@ -25,21 +25,26 @@
2525
//! futures are dropped and an error is returned. This is referred to as
2626
//! "short-circuiting".
2727
//!
28-
//! In the case of `try_select`, instead of returning the first future that
28+
//! In the case of `try_race`, instead of returning the first future that
2929
//! completes it returns the first future that _successfully_ completes. This
30-
//! means `try_select` will keep going until any one of the futures returns
30+
//! means `try_race` will keep going until any one of the futures returns
3131
//! `Ok`, or _all_ futures have returned `Err`.
3232
//!
3333
//! However sometimes it can be useful to use the base variants of the macros
3434
//! even on futures that return `Result`. Here is an overview of operations that
3535
//! work on `Result`, and their respective semantics:
3636
//!
37-
//! | Name | Return signature | When does it return? |
38-
//! | --- | --- | --- |
39-
//! | `future::join` | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
40-
//! | `future::try_join` | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
41-
//! | `future::select` | `Result<T, E>` | Return on first value
42-
//! | `future::try_select` | `Result<T, E>` | Return on first `Ok`, reject on last Err
37+
//! | Name | Return signature | When does it return? |
38+
//! | --- | --- | --- |
39+
//! | [`future::join!`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
40+
//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
41+
//! | [`Future::race`] | `Result<T, E>` | Return on first value
42+
//! | [`Future::try_race`] | `Result<T, E>` | Return on first `Ok`, reject on last Err
43+
//!
44+
//! [`future::join!`]: macro.join.html
45+
//! [`future::try_join!`]: macro.try_join.html
46+
//! [`Future::race`]: trait.Future.html#method.race
47+
//! [`Future::try_race`]: trait.Future.html#method.try_race
4348
4449
#[doc(inline)]
4550
pub use async_macros::{join, try_join};
@@ -57,9 +62,6 @@ mod ready;
5762
mod timeout;
5863

5964
cfg_unstable! {
60-
#[doc(inline)]
61-
pub use async_macros::{select, try_select};
62-
6365
pub use into_future::IntoFuture;
6466
mod into_future;
6567
}

src/io/mod.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,9 @@ pub use read::Read;
282282
pub use repeat::{repeat, Repeat};
283283
pub use seek::Seek;
284284
pub use sink::{sink, Sink};
285-
pub use stderr::{stderr, Stderr, StderrLock};
286-
pub use stdin::{stdin, Stdin, StdinLock};
287-
pub use stdout::{stdout, Stdout, StdoutLock};
285+
pub use stderr::{stderr, Stderr};
286+
pub use stdin::{stdin, Stdin};
287+
pub use stdout::{stdout, Stdout};
288288
pub use timeout::timeout;
289289
pub use write::Write;
290290

@@ -311,3 +311,9 @@ mod stdin;
311311
mod stdio;
312312
mod stdout;
313313
mod timeout;
314+
315+
cfg_unstable! {
316+
pub use stderr::StderrLock;
317+
pub use stdin::StdinLock;
318+
pub use stdout::StdoutLock;
319+
}

src/io/stderr.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::io::Write as StdWrite;
21
use std::pin::Pin;
32
use std::sync::Mutex;
43

@@ -8,6 +7,7 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
87

98
cfg_unstable! {
109
use once_cell::sync::Lazy;
10+
use std::io::Write as _;
1111
}
1212

1313
/// Constructs a new handle to the standard error of the current process.
@@ -59,13 +59,19 @@ pub fn stderr() -> Stderr {
5959
pub struct Stderr(Mutex<State>);
6060

6161
/// A locked reference to the Stderr handle.
62-
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`] method.
62+
///
63+
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`]
64+
/// method.
6365
///
6466
/// [`Write`]: trait.Read.html
6567
/// [`Stderr::lock`]: struct.Stderr.html#method.lock
68+
#[cfg(feature = "unstable")]
69+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
6670
#[derive(Debug)]
6771
pub struct StderrLock<'a>(std::io::StderrLock<'a>);
6872

73+
#[cfg(feature = "unstable")]
74+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
6975
unsafe impl Send for StderrLock<'_> {}
7076

7177
/// The state of the asynchronous stderr.
@@ -234,7 +240,9 @@ cfg_windows! {
234240
}
235241
}
236242

237-
impl Write for StderrLock<'_> {
243+
#[cfg(feature = "unstable")]
244+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
245+
impl io::Write for StderrLock<'_> {
238246
fn poll_write(
239247
mut self: Pin<&mut Self>,
240248
_cx: &mut Context<'_>,

0 commit comments

Comments
 (0)