Skip to content

Commit cc19592

Browse files
authoredMar 12, 2020
Revert "Stabilize most stream method and remove unnecessary macros"
1 parent 61f9483 commit cc19592

34 files changed

+1192
-110
lines changed
 

‎Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ default = [
2626
"async-task",
2727
"crossbeam-channel",
2828
"crossbeam-deque",
29+
"futures-timer",
2930
"kv-log-macro",
3031
"log",
3132
"mio",
@@ -34,14 +35,13 @@ default = [
3435
"pin-project-lite",
3536
]
3637
docs = ["attributes", "unstable", "default"]
37-
unstable = ["std", "broadcaster"]
38+
unstable = ["std", "broadcaster", "futures-timer"]
3839
attributes = ["async-attributes"]
3940
std = [
4041
"alloc",
4142
"crossbeam-utils",
4243
"futures-core/std",
4344
"futures-io",
44-
"futures-timer",
4545
"memchr",
4646
"once_cell",
4747
"pin-utils",

‎examples/a-chat/client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use futures::select;
22
use futures::FutureExt;
3-
use std::io::{self, BufRead, BufReader as StdBufReader};
43

54
use async_std::{
6-
io::BufReader,
5+
io::{stdin, BufReader},
76
net::{TcpStream, ToSocketAddrs},
87
prelude::*,
9-
stream, task,
8+
task,
109
};
1110

1211
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@@ -21,9 +20,8 @@ async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
2120
let reader = BufReader::new(reader);
2221
let mut lines_from_server = futures::StreamExt::fuse(reader.lines());
2322

24-
let stdin = StdBufReader::new(io::stdin());
25-
let mut lines_from_stdin = stream::from_iter(stdin.lines());
26-
23+
let stdin = BufReader::new(stdin());
24+
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines());
2725
loop {
2826
select! {
2927
line = lines_from_server.next().fuse() => match line {

‎examples/print-file.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Prints a file given as an argument to stdout.
22
33
use std::env::args;
4-
use std::io::Write;
54

65
use async_std::fs::File;
76
use async_std::io;
@@ -15,7 +14,7 @@ fn main() -> io::Result<()> {
1514

1615
task::block_on(async {
1716
let mut file = File::open(&path).await?;
18-
let mut stdout = std::io::stdout();
17+
let mut stdout = io::stdout();
1918
let mut buf = vec![0u8; LEN];
2019

2120
loop {
@@ -24,12 +23,12 @@ fn main() -> io::Result<()> {
2423

2524
// If this is the end of file, clean up and return.
2625
if n == 0 {
27-
stdout.flush()?;
26+
stdout.flush().await?;
2827
return Ok(());
2928
}
3029

3130
// Write the buffer into stdout.
32-
stdout.write_all(&buf[..n])?;
31+
stdout.write_all(&buf[..n]).await?;
3332
}
3433
})
3534
}

‎examples/stdin-echo.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
//! Echoes lines read on stdin to stdout.
22
33
use async_std::io;
4+
use async_std::prelude::*;
45
use async_std::task;
5-
use std::io::Write;
66

77
fn main() -> io::Result<()> {
88
task::block_on(async {
9-
let stdin = std::io::stdin();
10-
let mut stdout = std::io::stdout();
9+
let stdin = io::stdin();
10+
let mut stdout = io::stdout();
1111
let mut line = String::new();
1212

1313
loop {
1414
// Read a line from stdin.
15-
let n = stdin.read_line(&mut line)?;
15+
let n = stdin.read_line(&mut line).await?;
1616

1717
// If this is the end of stdin, return.
1818
if n == 0 {
1919
return Ok(());
2020
}
2121

2222
// Write the line to stdout.
23-
stdout.write_all(line.as_bytes())?;
24-
stdout.flush()?;
23+
stdout.write_all(line.as_bytes()).await?;
24+
stdout.flush().await?;
2525
line.clear();
2626
}
2727
})

‎examples/stdin-timeout.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use async_std::task;
88
fn main() -> io::Result<()> {
99
// This async scope times out after 5 seconds.
1010
task::block_on(io::timeout(Duration::from_secs(5), async {
11-
let stdin = std::io::stdin();
11+
let stdin = io::stdin();
1212

1313
// Read a line from the standard input and display it.
1414
let mut line = String::new();
15-
stdin.read_line(&mut line)?;
15+
stdin.read_line(&mut line).await?;
1616
dbg!(line);
1717

1818
Ok(())

‎src/future/into_future.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@ use std::future::Future;
55
/// # Examples
66
///
77
/// ```
8-
/// use std::pin::Pin;
9-
///
108
/// use async_std::future::{Future, IntoFuture};
119
/// use async_std::io;
10+
/// use async_std::pin::Pin;
1211
///
1312
/// struct Client;
1413
///

‎src/future/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,12 @@ cfg_std! {
6363

6464
cfg_default! {
6565
pub use timeout::{timeout, TimeoutError};
66-
6766
mod timeout;
6867
}
6968

7069
cfg_unstable! {
7170
pub use into_future::IntoFuture;
7271
pub(crate) use maybe_done::MaybeDone;
73-
7472
mod into_future;
7573
mod maybe_done;
7674
}

‎src/io/copy.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@ use crate::utils::Context as _;
3232
///
3333
/// # Examples
3434
///
35-
/// ```no_run
35+
/// ```
3636
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
3737
/// #
3838
/// use async_std::io;
39-
/// use async_std::fs::File;
4039
///
4140
/// let mut reader: &[u8] = b"hello";
42-
/// let mut writer = File::open("foo.txt").await?;
41+
/// let mut writer = io::stdout();
4342
///
4443
/// io::copy(&mut reader, &mut writer).await?;
4544
/// #
@@ -120,14 +119,13 @@ where
120119
///
121120
/// # Examples
122121
///
123-
/// ```no_run
122+
/// ```
124123
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
125124
/// #
126125
/// use async_std::io;
127-
/// use async_std::fs::File;
128126
///
129127
/// let mut reader: &[u8] = b"hello";
130-
/// let mut writer = File::open("foo.txt").await?;
128+
/// let mut writer = io::stdout();
131129
///
132130
/// io::copy(&mut reader, &mut writer).await?;
133131
/// #

‎src/io/mod.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,56 @@
122122
//! # Ok(()) }) }
123123
//! ```
124124
//!
125+
//! ## Standard input and output
126+
//!
127+
//! A very common source of input is standard input:
128+
//!
129+
//! ```no_run
130+
//! use async_std::io;
131+
//!
132+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
133+
//! #
134+
//! let mut input = String::new();
135+
//!
136+
//! io::stdin().read_line(&mut input).await?;
137+
//!
138+
//! println!("You typed: {}", input.trim());
139+
//! #
140+
//! # Ok(()) }) }
141+
//! ```
142+
//!
143+
//! Note that you cannot use the [`?` operator] in functions that do not return
144+
//! a [`Result<T, E>`][`Result`]. Instead, you can call [`.unwrap()`]
145+
//! or `match` on the return value to catch any possible errors:
146+
//!
147+
//! ```no_run
148+
//! use async_std::io;
149+
//!
150+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
151+
//! #
152+
//! let mut input = String::new();
153+
//!
154+
//! io::stdin().read_line(&mut input).await.unwrap();
155+
//! #
156+
//! # Ok(()) }) }
157+
//! ```
158+
//!
159+
//! And a very common source of output is standard output:
160+
//!
161+
//! ```no_run
162+
//! use async_std::io;
163+
//! use async_std::io::prelude::*;
164+
//!
165+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
166+
//! #
167+
//! io::stdout().write(&[42]).await?;
168+
//! #
169+
//! # Ok(()) }) }
170+
//! ```
171+
//!
172+
//! Of course, using [`io::stdout`] directly is less common than something like
173+
//! [`println!`].
174+
//!
125175
//! ## Iterator types
126176
//!
127177
//! A large number of the structures provided by `std::io` are for various
@@ -154,14 +204,10 @@
154204
//!
155205
//! ```no_run
156206
//! use async_std::io;
157-
//! use async_std::fs::File;
158207
//!
159208
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
160209
//! #
161-
//! let mut reader: &[u8] = b"hello";
162-
//! let mut writer = File::open("foo.txt").await?;
163-
//!
164-
//! io::copy(&mut reader, &mut writer).await?;
210+
//! io::copy(&mut io::stdin(), &mut io::stdout()).await?;
165211
//! #
166212
//! # Ok(()) }) }
167213
//! ```
@@ -178,14 +224,13 @@
178224
//! ```
179225
//! #![allow(dead_code)]
180226
//! use async_std::io;
181-
//! use std::time::Duration;
182227
//!
183228
//! async fn read_input() -> io::Result<()> {
184-
//! let f = io::timeout(Duration::from_secs(5), async {
185-
//! Ok(())
186-
//! });
229+
//! let mut input = String::new();
230+
//!
231+
//! io::stdin().read_line(&mut input).await?;
187232
//!
188-
//! assert_eq!(f.await?, ());
233+
//! println!("You typed: {}", input.trim());
189234
//!
190235
//! Ok(())
191236
//! }
@@ -215,6 +260,8 @@
215260
//! [`BufReader`]: struct.BufReader.html
216261
//! [`BufWriter`]: struct.BufWriter.html
217262
//! [`Write::write`]: trait.Write.html#tymethod.write
263+
//! [`io::stdout`]: fn.stdout.html
264+
//! [`println!`]: ../macro.println.html
218265
//! [`Lines`]: struct.Lines.html
219266
//! [`io::Result`]: type.Result.html
220267
//! [`?` operator]: https://doc.rust-lang.org/stable/book/appendix-02-operators.html
@@ -258,7 +305,24 @@ cfg_std! {
258305
}
259306

260307
cfg_default! {
308+
// For use in the print macros.
309+
#[doc(hidden)]
310+
pub use stdio::{_eprint, _print};
311+
312+
pub use stderr::{stderr, Stderr};
313+
pub use stdin::{stdin, Stdin};
314+
pub use stdout::{stdout, Stdout};
261315
pub use timeout::timeout;
262316

263317
mod timeout;
318+
mod stderr;
319+
mod stdin;
320+
mod stdio;
321+
mod stdout;
322+
}
323+
324+
cfg_unstable_default! {
325+
pub use stderr::StderrLock;
326+
pub use stdin::StdinLock;
327+
pub use stdout::StdoutLock;
264328
}

‎src/io/stderr.rs

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
use std::pin::Pin;
2+
use std::sync::Mutex;
3+
use std::future::Future;
4+
5+
use crate::io::{self, Write};
6+
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
7+
8+
cfg_unstable! {
9+
use once_cell::sync::Lazy;
10+
use std::io::Write as _;
11+
}
12+
13+
/// Constructs a new handle to the standard error of the current process.
14+
///
15+
/// This function is an async version of [`std::io::stderr`].
16+
///
17+
/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
18+
///
19+
/// ### Note: Windows Portability Consideration
20+
///
21+
/// When operating in a console, the Windows implementation of this stream does not support
22+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
23+
/// an error.
24+
///
25+
/// # Examples
26+
///
27+
/// ```no_run
28+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
29+
/// #
30+
/// use async_std::io;
31+
/// use async_std::prelude::*;
32+
///
33+
/// let mut stderr = io::stderr();
34+
/// stderr.write_all(b"Hello, world!").await?;
35+
/// #
36+
/// # Ok(()) }) }
37+
/// ```
38+
pub fn stderr() -> Stderr {
39+
Stderr(Mutex::new(State::Idle(Some(Inner {
40+
stderr: std::io::stderr(),
41+
buf: Vec::new(),
42+
last_op: None,
43+
}))))
44+
}
45+
46+
/// A handle to the standard error of the current process.
47+
///
48+
/// This writer is created by the [`stderr`] function. See its documentation for
49+
/// more.
50+
///
51+
/// ### Note: Windows Portability Consideration
52+
///
53+
/// When operating in a console, the Windows implementation of this stream does not support
54+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
55+
/// an error.
56+
///
57+
/// [`stderr`]: fn.stderr.html
58+
#[derive(Debug)]
59+
pub struct Stderr(Mutex<State>);
60+
61+
/// A locked reference to the Stderr handle.
62+
///
63+
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`]
64+
/// method.
65+
///
66+
/// [`Write`]: trait.Read.html
67+
/// [`Stderr::lock`]: struct.Stderr.html#method.lock
68+
#[cfg(feature = "unstable")]
69+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
70+
#[derive(Debug)]
71+
pub struct StderrLock<'a>(std::io::StderrLock<'a>);
72+
73+
#[cfg(feature = "unstable")]
74+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
75+
unsafe impl Send for StderrLock<'_> {}
76+
77+
/// The state of the asynchronous stderr.
78+
///
79+
/// The stderr can be either idle or busy performing an asynchronous operation.
80+
#[derive(Debug)]
81+
enum State {
82+
/// The stderr is idle.
83+
Idle(Option<Inner>),
84+
85+
/// The stderr is blocked on an asynchronous operation.
86+
///
87+
/// Awaiting this operation will result in the new state of the stderr.
88+
Busy(JoinHandle<State>),
89+
}
90+
91+
/// Inner representation of the asynchronous stderr.
92+
#[derive(Debug)]
93+
struct Inner {
94+
/// The blocking stderr handle.
95+
stderr: std::io::Stderr,
96+
97+
/// The write buffer.
98+
buf: Vec<u8>,
99+
100+
/// The result of the last asynchronous operation on the stderr.
101+
last_op: Option<Operation>,
102+
}
103+
104+
/// Possible results of an asynchronous operation on the stderr.
105+
#[derive(Debug)]
106+
enum Operation {
107+
Write(io::Result<usize>),
108+
Flush(io::Result<()>),
109+
}
110+
111+
impl Stderr {
112+
/// Locks this handle to the standard error stream, returning a writable guard.
113+
///
114+
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data.
115+
///
116+
/// # Examples
117+
///
118+
/// ```no_run
119+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
120+
/// #
121+
/// use async_std::io;
122+
/// use async_std::prelude::*;
123+
///
124+
/// let stderr = io::stderr();
125+
/// let mut handle = stderr.lock().await;
126+
///
127+
/// handle.write_all(b"hello world").await?;
128+
/// #
129+
/// # Ok(()) }) }
130+
/// ```
131+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
132+
#[cfg(any(feature = "unstable", feature = "docs"))]
133+
pub async fn lock(&self) -> StderrLock<'static> {
134+
static STDERR: Lazy<std::io::Stderr> = Lazy::new(std::io::stderr);
135+
136+
spawn_blocking(move || StderrLock(STDERR.lock())).await
137+
}
138+
}
139+
140+
impl Write for Stderr {
141+
fn poll_write(
142+
mut self: Pin<&mut Self>,
143+
cx: &mut Context<'_>,
144+
buf: &[u8],
145+
) -> Poll<io::Result<usize>> {
146+
let state = &mut *self.0.lock().unwrap();
147+
148+
loop {
149+
match state {
150+
State::Idle(opt) => {
151+
let inner = opt.as_mut().unwrap();
152+
153+
// Check if the operation has completed.
154+
if let Some(Operation::Write(res)) = inner.last_op.take() {
155+
let n = res?;
156+
157+
// If more data was written than is available in the buffer, let's retry
158+
// the write operation.
159+
if n <= buf.len() {
160+
return Poll::Ready(Ok(n));
161+
}
162+
} else {
163+
let mut inner = opt.take().unwrap();
164+
165+
// Set the length of the inner buffer to the length of the provided buffer.
166+
if inner.buf.len() < buf.len() {
167+
inner.buf.reserve(buf.len() - inner.buf.len());
168+
}
169+
unsafe {
170+
inner.buf.set_len(buf.len());
171+
}
172+
173+
// Copy the data to write into the inner buffer.
174+
inner.buf[..buf.len()].copy_from_slice(buf);
175+
176+
// Start the operation asynchronously.
177+
*state = State::Busy(spawn_blocking(move || {
178+
let res = std::io::Write::write(&mut inner.stderr, &inner.buf);
179+
inner.last_op = Some(Operation::Write(res));
180+
State::Idle(Some(inner))
181+
}));
182+
}
183+
}
184+
// Poll the asynchronous operation the stderr is currently blocked on.
185+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
186+
}
187+
}
188+
}
189+
190+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
191+
let state = &mut *self.0.lock().unwrap();
192+
193+
loop {
194+
match state {
195+
State::Idle(opt) => {
196+
let inner = opt.as_mut().unwrap();
197+
198+
// Check if the operation has completed.
199+
if let Some(Operation::Flush(res)) = inner.last_op.take() {
200+
return Poll::Ready(res);
201+
} else {
202+
let mut inner = opt.take().unwrap();
203+
204+
// Start the operation asynchronously.
205+
*state = State::Busy(spawn_blocking(move || {
206+
let res = std::io::Write::flush(&mut inner.stderr);
207+
inner.last_op = Some(Operation::Flush(res));
208+
State::Idle(Some(inner))
209+
}));
210+
}
211+
}
212+
// Poll the asynchronous operation the stderr is currently blocked on.
213+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
214+
}
215+
}
216+
}
217+
218+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
219+
self.poll_flush(cx)
220+
}
221+
}
222+
223+
cfg_unix! {
224+
use crate::os::unix::io::{AsRawFd, RawFd};
225+
226+
impl AsRawFd for Stderr {
227+
fn as_raw_fd(&self) -> RawFd {
228+
std::io::stderr().as_raw_fd()
229+
}
230+
}
231+
}
232+
233+
cfg_windows! {
234+
use crate::os::windows::io::{AsRawHandle, RawHandle};
235+
236+
impl AsRawHandle for Stderr {
237+
fn as_raw_handle(&self) -> RawHandle {
238+
std::io::stderr().as_raw_handle()
239+
}
240+
}
241+
}
242+
243+
#[cfg(feature = "unstable")]
244+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
245+
impl io::Write for StderrLock<'_> {
246+
fn poll_write(
247+
mut self: Pin<&mut Self>,
248+
_cx: &mut Context<'_>,
249+
buf: &[u8],
250+
) -> Poll<io::Result<usize>> {
251+
Poll::Ready(self.0.write(buf))
252+
}
253+
254+
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
255+
Poll::Ready(self.0.flush())
256+
}
257+
258+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
259+
self.poll_flush(cx)
260+
}
261+
}

‎src/io/stdin.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::sync::Mutex;
4+
5+
use crate::future;
6+
use crate::io::{self, Read};
7+
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
8+
use crate::utils::Context as _;
9+
10+
cfg_unstable! {
11+
use once_cell::sync::Lazy;
12+
use std::io::Read as _;
13+
}
14+
15+
/// Constructs a new handle to the standard input of the current process.
16+
///
17+
/// This function is an async version of [`std::io::stdin`].
18+
///
19+
/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
20+
///
21+
/// ### Note: Windows Portability Consideration
22+
///
23+
/// When operating in a console, the Windows implementation of this stream does not support
24+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
25+
/// an error.
26+
///
27+
/// # Examples
28+
///
29+
/// ```no_run
30+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
31+
/// #
32+
/// use async_std::io;
33+
///
34+
/// let stdin = io::stdin();
35+
/// let mut line = String::new();
36+
/// stdin.read_line(&mut line).await?;
37+
/// #
38+
/// # Ok(()) }) }
39+
/// ```
40+
pub fn stdin() -> Stdin {
41+
Stdin(Mutex::new(State::Idle(Some(Inner {
42+
stdin: std::io::stdin(),
43+
line: String::new(),
44+
buf: Vec::new(),
45+
last_op: None,
46+
}))))
47+
}
48+
49+
/// A handle to the standard input of the current process.
50+
///
51+
/// This reader is created by the [`stdin`] function. See its documentation for
52+
/// more.
53+
///
54+
/// ### Note: Windows Portability Consideration
55+
///
56+
/// When operating in a console, the Windows implementation of this stream does not support
57+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
58+
/// an error.
59+
///
60+
/// [`stdin`]: fn.stdin.html
61+
#[derive(Debug)]
62+
pub struct Stdin(Mutex<State>);
63+
64+
/// A locked reference to the Stdin handle.
65+
///
66+
/// This handle implements the [`Read`] traits, and is constructed via the [`Stdin::lock`] method.
67+
///
68+
/// [`Read`]: trait.Read.html
69+
/// [`Stdin::lock`]: struct.Stdin.html#method.lock
70+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
71+
#[cfg(feature = "unstable")]
72+
#[derive(Debug)]
73+
pub struct StdinLock<'a>(std::io::StdinLock<'a>);
74+
75+
#[cfg(feature = "unstable")]
76+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
77+
unsafe impl Send for StdinLock<'_> {}
78+
79+
/// The state of the asynchronous stdin.
80+
///
81+
/// The stdin can be either idle or busy performing an asynchronous operation.
82+
#[derive(Debug)]
83+
enum State {
84+
/// The stdin is idle.
85+
Idle(Option<Inner>),
86+
87+
/// The stdin is blocked on an asynchronous operation.
88+
///
89+
/// Awaiting this operation will result in the new state of the stdin.
90+
Busy(JoinHandle<State>),
91+
}
92+
93+
/// Inner representation of the asynchronous stdin.
94+
#[derive(Debug)]
95+
struct Inner {
96+
/// The blocking stdin handle.
97+
stdin: std::io::Stdin,
98+
99+
/// The line buffer.
100+
line: String,
101+
102+
/// The write buffer.
103+
buf: Vec<u8>,
104+
105+
/// The result of the last asynchronous operation on the stdin.
106+
last_op: Option<Operation>,
107+
}
108+
109+
/// Possible results of an asynchronous operation on the stdin.
110+
#[derive(Debug)]
111+
enum Operation {
112+
ReadLine(io::Result<usize>),
113+
Read(io::Result<usize>),
114+
}
115+
116+
impl Stdin {
117+
/// Reads a line of input into the specified buffer.
118+
///
119+
/// # Examples
120+
///
121+
/// ```no_run
122+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
123+
/// #
124+
/// use async_std::io;
125+
///
126+
/// let stdin = io::stdin();
127+
/// let mut line = String::new();
128+
/// stdin.read_line(&mut line).await?;
129+
/// #
130+
/// # Ok(()) }) }
131+
/// ```
132+
pub async fn read_line(&self, buf: &mut String) -> io::Result<usize> {
133+
future::poll_fn(|cx| {
134+
let state = &mut *self.0.lock().unwrap();
135+
136+
loop {
137+
match state {
138+
State::Idle(opt) => {
139+
let inner = opt.as_mut().unwrap();
140+
141+
// Check if the operation has completed.
142+
if let Some(Operation::ReadLine(res)) = inner.last_op.take() {
143+
let n = res?;
144+
145+
// Copy the read data into the buffer and return.
146+
buf.push_str(&inner.line);
147+
return Poll::Ready(Ok(n));
148+
} else {
149+
let mut inner = opt.take().unwrap();
150+
151+
// Start the operation asynchronously.
152+
*state = State::Busy(spawn_blocking(move || {
153+
inner.line.clear();
154+
let res = inner.stdin.read_line(&mut inner.line);
155+
inner.last_op = Some(Operation::ReadLine(res));
156+
State::Idle(Some(inner))
157+
}));
158+
}
159+
}
160+
// Poll the asynchronous operation the stdin is currently blocked on.
161+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
162+
}
163+
}
164+
})
165+
.await
166+
.context(|| String::from("could not read line on stdin"))
167+
}
168+
169+
/// Locks this handle to the standard input stream, returning a readable guard.
170+
///
171+
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Read trait for accessing the underlying data.
172+
///
173+
/// # Examples
174+
///
175+
/// ```no_run
176+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
177+
/// #
178+
/// use async_std::io;
179+
/// use async_std::prelude::*;
180+
///
181+
/// let mut buffer = String::new();
182+
///
183+
/// let stdin = io::stdin();
184+
/// let mut handle = stdin.lock().await;
185+
///
186+
/// handle.read_to_string(&mut buffer).await?;
187+
/// #
188+
/// # Ok(()) }) }
189+
/// ```
190+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
191+
#[cfg(any(feature = "unstable", feature = "docs"))]
192+
pub async fn lock(&self) -> StdinLock<'static> {
193+
static STDIN: Lazy<std::io::Stdin> = Lazy::new(std::io::stdin);
194+
195+
spawn_blocking(move || StdinLock(STDIN.lock())).await
196+
}
197+
}
198+
199+
impl Read for Stdin {
200+
fn poll_read(
201+
mut self: Pin<&mut Self>,
202+
cx: &mut Context<'_>,
203+
buf: &mut [u8],
204+
) -> Poll<io::Result<usize>> {
205+
let state = &mut *self.0.lock().unwrap();
206+
207+
loop {
208+
match state {
209+
State::Idle(opt) => {
210+
let inner = opt.as_mut().unwrap();
211+
212+
// Check if the operation has completed.
213+
if let Some(Operation::Read(res)) = inner.last_op.take() {
214+
let n = res?;
215+
216+
// If more data was read than fits into the buffer, let's retry the read
217+
// operation.
218+
if n <= buf.len() {
219+
// Copy the read data into the buffer and return.
220+
buf[..n].copy_from_slice(&inner.buf[..n]);
221+
return Poll::Ready(Ok(n));
222+
}
223+
} else {
224+
let mut inner = opt.take().unwrap();
225+
226+
// Set the length of the inner buffer to the length of the provided buffer.
227+
if inner.buf.len() < buf.len() {
228+
inner.buf.reserve(buf.len() - inner.buf.len());
229+
}
230+
unsafe {
231+
inner.buf.set_len(buf.len());
232+
}
233+
234+
// Start the operation asynchronously.
235+
*state = State::Busy(spawn_blocking(move || {
236+
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
237+
inner.last_op = Some(Operation::Read(res));
238+
State::Idle(Some(inner))
239+
}));
240+
}
241+
}
242+
// Poll the asynchronous operation the stdin is currently blocked on.
243+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
244+
}
245+
}
246+
}
247+
}
248+
249+
cfg_unix! {
250+
use crate::os::unix::io::{AsRawFd, RawFd};
251+
252+
impl AsRawFd for Stdin {
253+
fn as_raw_fd(&self) -> RawFd {
254+
std::io::stdin().as_raw_fd()
255+
}
256+
}
257+
}
258+
259+
cfg_windows! {
260+
use crate::os::windows::io::{AsRawHandle, RawHandle};
261+
262+
impl AsRawHandle for Stdin {
263+
fn as_raw_handle(&self) -> RawHandle {
264+
std::io::stdin().as_raw_handle()
265+
}
266+
}
267+
}
268+
269+
#[cfg(feature = "unstable")]
270+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
271+
impl Read for StdinLock<'_> {
272+
fn poll_read(
273+
mut self: Pin<&mut Self>,
274+
_cx: &mut Context<'_>,
275+
buf: &mut [u8],
276+
) -> Poll<io::Result<usize>> {
277+
Poll::Ready(self.0.read(buf))
278+
}
279+
}

‎src/io/stdio.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
//! Internal types for stdio.
2+
//!
3+
//! This module is a port of `libstd/io/stdio.rs`,and contains internal types for `print`/`eprint`.
4+
5+
use crate::io::{stderr, stdout};
6+
use crate::prelude::*;
7+
use std::fmt;
8+
9+
#[doc(hidden)]
10+
pub async fn _print(args: fmt::Arguments<'_>) {
11+
if let Err(e) = stdout().write_fmt(args).await {
12+
panic!("failed printing to stdout: {}", e);
13+
}
14+
}
15+
16+
#[doc(hidden)]
17+
pub async fn _eprint(args: fmt::Arguments<'_>) {
18+
if let Err(e) = stderr().write_fmt(args).await {
19+
panic!("failed printing to stderr: {}", e);
20+
}
21+
}

‎src/io/stdout.rs

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
use std::pin::Pin;
2+
use std::sync::Mutex;
3+
use std::future::Future;
4+
5+
use crate::io::{self, Write};
6+
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
7+
8+
cfg_unstable! {
9+
use once_cell::sync::Lazy;
10+
use std::io::Write as _;
11+
}
12+
13+
/// Constructs a new handle to the standard output of the current process.
14+
///
15+
/// This function is an async version of [`std::io::stdout`].
16+
///
17+
/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
18+
///
19+
/// ### Note: Windows Portability Consideration
20+
///
21+
/// When operating in a console, the Windows implementation of this stream does not support
22+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
23+
/// an error.
24+
///
25+
/// # Examples
26+
///
27+
/// ```no_run
28+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
29+
/// #
30+
/// use async_std::io;
31+
/// use async_std::prelude::*;
32+
///
33+
/// let mut stdout = io::stdout();
34+
/// stdout.write_all(b"Hello, world!").await?;
35+
/// #
36+
/// # Ok(()) }) }
37+
/// ```
38+
pub fn stdout() -> Stdout {
39+
Stdout(Mutex::new(State::Idle(Some(Inner {
40+
stdout: std::io::stdout(),
41+
buf: Vec::new(),
42+
last_op: None,
43+
}))))
44+
}
45+
46+
/// A handle to the standard output of the current process.
47+
///
48+
/// This writer is created by the [`stdout`] function. See its documentation
49+
/// for more.
50+
///
51+
/// ### Note: Windows Portability Consideration
52+
///
53+
/// When operating in a console, the Windows implementation of this stream does not support
54+
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
55+
/// an error.
56+
///
57+
/// [`stdout`]: fn.stdout.html
58+
#[derive(Debug)]
59+
pub struct Stdout(Mutex<State>);
60+
61+
/// A locked reference to the Stderr handle.
62+
///
63+
/// This handle implements the [`Write`] traits, and is constructed via the [`Stdout::lock`]
64+
/// method.
65+
///
66+
/// [`Write`]: trait.Read.html
67+
/// [`Stdout::lock`]: struct.Stdout.html#method.lock
68+
#[cfg(feature = "unstable")]
69+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
70+
#[derive(Debug)]
71+
pub struct StdoutLock<'a>(std::io::StdoutLock<'a>);
72+
73+
#[cfg(feature = "unstable")]
74+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
75+
unsafe impl Send for StdoutLock<'_> {}
76+
77+
/// The state of the asynchronous stdout.
78+
///
79+
/// The stdout can be either idle or busy performing an asynchronous operation.
80+
#[derive(Debug)]
81+
enum State {
82+
/// The stdout is idle.
83+
Idle(Option<Inner>),
84+
85+
/// The stdout is blocked on an asynchronous operation.
86+
///
87+
/// Awaiting this operation will result in the new state of the stdout.
88+
Busy(JoinHandle<State>),
89+
}
90+
91+
/// Inner representation of the asynchronous stdout.
92+
#[derive(Debug)]
93+
struct Inner {
94+
/// The blocking stdout handle.
95+
stdout: std::io::Stdout,
96+
97+
/// The write buffer.
98+
buf: Vec<u8>,
99+
100+
/// The result of the last asynchronous operation on the stdout.
101+
last_op: Option<Operation>,
102+
}
103+
104+
/// Possible results of an asynchronous operation on the stdout.
105+
#[derive(Debug)]
106+
enum Operation {
107+
Write(io::Result<usize>),
108+
Flush(io::Result<()>),
109+
}
110+
111+
impl Stdout {
112+
/// Locks this handle to the standard error stream, returning a writable guard.
113+
///
114+
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data.
115+
///
116+
/// # Examples
117+
///
118+
/// ```no_run
119+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
120+
/// #
121+
/// use async_std::io;
122+
/// use async_std::prelude::*;
123+
///
124+
/// let stdout = io::stdout();
125+
/// let mut handle = stdout.lock().await;
126+
///
127+
/// handle.write_all(b"hello world").await?;
128+
/// #
129+
/// # Ok(()) }) }
130+
/// ```
131+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
132+
#[cfg(any(feature = "unstable", feature = "docs"))]
133+
pub async fn lock(&self) -> StdoutLock<'static> {
134+
static STDOUT: Lazy<std::io::Stdout> = Lazy::new(std::io::stdout);
135+
136+
spawn_blocking(move || StdoutLock(STDOUT.lock())).await
137+
}
138+
}
139+
140+
impl Write for Stdout {
141+
fn poll_write(
142+
mut self: Pin<&mut Self>,
143+
cx: &mut Context<'_>,
144+
buf: &[u8],
145+
) -> Poll<io::Result<usize>> {
146+
let state = &mut *self.0.lock().unwrap();
147+
148+
loop {
149+
match state {
150+
State::Idle(opt) => {
151+
let inner = opt.as_mut().unwrap();
152+
153+
// Check if the operation has completed.
154+
if let Some(Operation::Write(res)) = inner.last_op.take() {
155+
let n = res?;
156+
157+
// If more data was written than is available in the buffer, let's retry
158+
// the write operation.
159+
if n <= buf.len() {
160+
return Poll::Ready(Ok(n));
161+
}
162+
} else {
163+
let mut inner = opt.take().unwrap();
164+
165+
// Set the length of the inner buffer to the length of the provided buffer.
166+
if inner.buf.len() < buf.len() {
167+
inner.buf.reserve(buf.len() - inner.buf.len());
168+
}
169+
unsafe {
170+
inner.buf.set_len(buf.len());
171+
}
172+
173+
// Copy the data to write into the inner buffer.
174+
inner.buf[..buf.len()].copy_from_slice(buf);
175+
176+
// Start the operation asynchronously.
177+
*state = State::Busy(spawn_blocking(move || {
178+
let res = std::io::Write::write(&mut inner.stdout, &inner.buf);
179+
inner.last_op = Some(Operation::Write(res));
180+
State::Idle(Some(inner))
181+
}));
182+
}
183+
}
184+
// Poll the asynchronous operation the stdout is currently blocked on.
185+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
186+
}
187+
}
188+
}
189+
190+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
191+
let state = &mut *self.0.lock().unwrap();
192+
193+
loop {
194+
match state {
195+
State::Idle(opt) => {
196+
let inner = opt.as_mut().unwrap();
197+
198+
// Check if the operation has completed.
199+
if let Some(Operation::Flush(res)) = inner.last_op.take() {
200+
return Poll::Ready(res);
201+
} else {
202+
let mut inner = opt.take().unwrap();
203+
204+
// Start the operation asynchronously.
205+
*state = State::Busy(spawn_blocking(move || {
206+
let res = std::io::Write::flush(&mut inner.stdout);
207+
inner.last_op = Some(Operation::Flush(res));
208+
State::Idle(Some(inner))
209+
}));
210+
}
211+
}
212+
// Poll the asynchronous operation the stdout is currently blocked on.
213+
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
214+
}
215+
}
216+
}
217+
218+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
219+
self.poll_flush(cx)
220+
}
221+
}
222+
223+
cfg_unix! {
224+
use crate::os::unix::io::{AsRawFd, RawFd};
225+
226+
impl AsRawFd for Stdout {
227+
fn as_raw_fd(&self) -> RawFd {
228+
std::io::stdout().as_raw_fd()
229+
}
230+
}
231+
}
232+
233+
cfg_windows! {
234+
use crate::os::windows::io::{AsRawHandle, RawHandle};
235+
236+
impl AsRawHandle for Stdout {
237+
fn as_raw_handle(&self) -> RawHandle {
238+
std::io::stdout().as_raw_handle()
239+
}
240+
}
241+
}
242+
243+
#[cfg(feature = "unstable")]
244+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
245+
impl Write for StdoutLock<'_> {
246+
fn poll_write(
247+
mut self: Pin<&mut Self>,
248+
_cx: &mut Context<'_>,
249+
buf: &[u8],
250+
) -> Poll<io::Result<usize>> {
251+
Poll::Ready(self.0.write(buf))
252+
}
253+
254+
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
255+
Poll::Ready(self.0.flush())
256+
}
257+
258+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
259+
self.poll_flush(cx)
260+
}
261+
}

‎src/io/timeout.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ use crate::io;
2323
/// use async_std::io;
2424
///
2525
/// io::timeout(Duration::from_secs(5), async {
26-
/// let stdin = std::io::stdin();
26+
/// let stdin = io::stdin();
2727
/// let mut line = String::new();
28-
/// let n = stdin.read_line(&mut line)?;
28+
/// let n = stdin.read_line(&mut line).await?;
2929
/// Ok(())
3030
/// })
3131
/// .await?;

‎src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ cfg_default! {
273273
}
274274

275275
cfg_unstable! {
276+
pub mod pin;
277+
pub mod process;
278+
276279
mod unit;
277280
mod vec;
278281
mod result;

‎src/macros.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,171 @@
1+
/// Prints to the standard output.
2+
///
3+
/// Equivalent to the [`println!`] macro except that a newline is not printed at
4+
/// the end of the message.
5+
///
6+
/// Note that stdout is frequently line-buffered by default so it may be
7+
/// necessary to use [`io::stdout().flush()`][flush] to ensure the output is emitted
8+
/// immediately.
9+
///
10+
/// Use `print!` only for the primary output of your program. Use
11+
/// [`eprint!`] instead to print error and progress messages.
12+
///
13+
/// [`println!`]: macro.println.html
14+
/// [flush]: io/trait.Write.html#tymethod.flush
15+
/// [`eprint!`]: macro.eprint.html
16+
///
17+
/// # Panics
18+
///
19+
/// Panics if writing to `io::stdout()` fails.
20+
///
21+
/// # Examples
22+
///
23+
/// ```
24+
/// # async_std::task::block_on(async {
25+
/// #
26+
/// use async_std::io;
27+
/// use async_std::prelude::*;
28+
/// use async_std::print;
29+
///
30+
/// print!("this ").await;
31+
/// print!("will ").await;
32+
/// print!("be ").await;
33+
/// print!("on ").await;
34+
/// print!("the ").await;
35+
/// print!("same ").await;
36+
/// print!("line ").await;
37+
///
38+
/// io::stdout().flush().await.unwrap();
39+
///
40+
/// print!("this string has a newline, why not choose println! instead?\n").await;
41+
///
42+
/// io::stdout().flush().await.unwrap();
43+
/// #
44+
/// # })
45+
/// ```
46+
#[cfg(feature = "unstable")]
47+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
48+
#[macro_export]
49+
macro_rules! print {
50+
($($arg:tt)*) => ($crate::io::_print(format_args!($($arg)*)))
51+
}
52+
53+
/// Prints to the standard output, with a newline.
54+
///
55+
/// On all platforms, the newline is the LINE FEED character (`\n`/`U+000A`) alone
56+
/// (no additional CARRIAGE RETURN (`\r`/`U+000D`)).
57+
///
58+
/// Use the [`format!`] syntax to write data to the standard output.
59+
/// See [`std::fmt`] for more information.
60+
///
61+
/// Use `println!` only for the primary output of your program. Use
62+
/// [`eprintln!`] instead to print error and progress messages.
63+
///
64+
/// [`format!`]: macro.format.html
65+
/// [`std::fmt`]: https://doc.rust-lang.org/std/fmt/index.html
66+
/// [`eprintln!`]: macro.eprintln.html
67+
/// # Panics
68+
///
69+
/// Panics if writing to `io::stdout` fails.
70+
///
71+
/// # Examples
72+
///
73+
/// ```
74+
/// # async_std::task::block_on(async {
75+
/// #
76+
/// use async_std::println;
77+
///
78+
/// println!().await; // prints just a newline
79+
/// println!("hello there!").await;
80+
/// println!("format {} arguments", "some").await;
81+
/// #
82+
/// # })
83+
/// ```
84+
#[cfg(feature = "unstable")]
85+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
86+
#[macro_export]
87+
macro_rules! println {
88+
() => ($crate::print!("\n"));
89+
($($arg:tt)*) => (async {
90+
$crate::io::_print(format_args!($($arg)*)).await;
91+
$crate::io::_print(format_args!("\n")).await;
92+
})
93+
}
94+
95+
/// Prints to the standard error.
96+
///
97+
/// Equivalent to the [`print!`] macro, except that output goes to
98+
/// [`io::stderr`] instead of `io::stdout`. See [`print!`] for
99+
/// example usage.
100+
///
101+
/// Use `eprint!` only for error and progress messages. Use `print!`
102+
/// instead for the primary output of your program.
103+
///
104+
/// [`io::stderr`]: io/struct.Stderr.html
105+
/// [`print!`]: macro.print.html
106+
///
107+
/// # Panics
108+
///
109+
/// Panics if writing to `io::stderr` fails.
110+
///
111+
/// # Examples
112+
///
113+
/// ```
114+
/// # async_std::task::block_on(async {
115+
/// #
116+
/// use async_std::eprint;
117+
///
118+
/// eprint!("Error: Could not complete task").await;
119+
/// #
120+
/// # })
121+
/// ```
122+
#[cfg(feature = "unstable")]
123+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
124+
#[macro_export]
125+
macro_rules! eprint {
126+
($($arg:tt)*) => ($crate::io::_eprint(format_args!($($arg)*)))
127+
}
128+
129+
/// Prints to the standard error, with a newline.
130+
///
131+
/// Equivalent to the [`println!`] macro, except that output goes to
132+
/// [`io::stderr`] instead of `io::stdout`. See [`println!`] for
133+
/// example usage.
134+
///
135+
/// Use `eprintln!` only for error and progress messages. Use `println!`
136+
/// instead for the primary output of your program.
137+
///
138+
/// [`io::stderr`]: io/struct.Stderr.html
139+
/// [`println!`]: macro.println.html
140+
///
141+
/// # Panics
142+
///
143+
/// Panics if writing to `io::stderr` fails.
144+
///
145+
/// # Examples
146+
///
147+
/// ```
148+
/// # async_std::task::block_on(async {
149+
/// #
150+
/// use async_std::eprintln;
151+
///
152+
/// eprintln!("Error: Could not complete task").await;
153+
/// #
154+
/// # })
155+
/// ```
156+
#[cfg(feature = "unstable")]
157+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
158+
#[macro_export]
159+
macro_rules! eprintln {
160+
() => (async { $crate::eprint!("\n").await; });
161+
($($arg:tt)*) => (
162+
async {
163+
$crate::io::_eprint(format_args!($($arg)*)).await;
164+
$crate::io::_eprint(format_args!("\n")).await;
165+
}
166+
);
167+
}
168+
1169
/// Declares task-local values.
2170
///
3171
/// The macro wraps any number of static declarations and makes them task-local. Attributes and

‎src/prelude.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@ cfg_std! {
3838
pub use crate::io::prelude::SeekExt as _;
3939
#[doc(no_inline)]
4040
pub use crate::io::prelude::WriteExt as _;
41-
42-
#[doc(no_inline)]
43-
pub use crate::stream::DoubleEndedStream;
44-
#[doc(no_inline)]
45-
pub use crate::stream::ExactSizeStream;
4641
}
4742

4843
cfg_default! {
4944
#[doc(no_inline)]
5045
pub use crate::task_local;
5146
}
47+
48+
cfg_unstable! {
49+
#[doc(no_inline)]
50+
pub use crate::stream::DoubleEndedStream;
51+
#[doc(no_inline)]
52+
pub use crate::stream::ExactSizeStream;
53+
}

‎src/stream/double_ended_stream/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::stream::Stream;
22

3-
use core::pin::Pin;
4-
use core::task::{Context, Poll};
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
55

66
mod next_back;
77
mod nth_back;
@@ -22,6 +22,8 @@ use try_rfold::TryRFoldFuture;
2222
/// `Item`s from the back, as well as the front.
2323
///
2424
/// [`Stream`]: trait.Stream.html
25+
#[cfg(feature = "unstable")]
26+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2527
pub trait DoubleEndedStream: Stream {
2628
#[doc = r#"
2729
Attempts to receive the next item from the back of the stream.

‎src/stream/exact_size_stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub use crate::stream::Stream;
7676
/// # });
7777
/// ```
7878
#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable
79+
#[cfg(feature = "unstable")]
80+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
7981
pub trait ExactSizeStream: Stream {
8082
/// Returns the exact number of times the stream will iterate.
8183
///

‎src/stream/extend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::stream::IntoStream;
2727
/// #
2828
/// # })
2929
/// ```
30+
#[cfg(feature = "unstable")]
31+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
3032
pub trait Extend<A> {
3133
/// Extends a collection with the contents of a stream.
3234
fn extend<'a, T: IntoStream<Item = A> + 'a>(

‎src/stream/fused_stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::stream::Stream;
1414
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1515
/// [`Stream::fuse`]: trait.Stream.html#method.fuse
1616
/// [`Fuse`]: struct.Fuse.html
17+
#[cfg(feature = "unstable")]
18+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1719
pub trait FusedStream: Stream {}
1820

1921
impl<S: FusedStream + ?Sized + Unpin> FusedStream for &mut S {}

‎src/stream/interval.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ use futures_timer::Delay;
4141
/// #
4242
/// # Ok(()) }) }
4343
/// ```
44+
#[cfg(feature = "unstable")]
45+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
4446
pub fn interval(dur: Duration) -> Interval {
4547
Interval {
4648
delay: Delay::new(dur),
@@ -54,6 +56,8 @@ pub fn interval(dur: Duration) -> Interval {
5456
/// documentation for more.
5557
///
5658
/// [`interval`]: fn.interval.html
59+
#[cfg(feature = "unstable")]
60+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
5761
#[derive(Debug)]
5862
pub struct Interval {
5963
delay: Delay,

‎src/stream/mod.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -317,32 +317,29 @@ mod once;
317317
mod repeat;
318318
mod repeat_with;
319319

320-
cfg_std! {
321-
pub use double_ended_stream::DoubleEndedStream;
322-
pub use exact_size_stream::ExactSizeStream;
323-
pub use fused_stream::FusedStream;
324-
pub use interval::{interval, Interval};
325-
pub use pending::{pending, Pending};
326-
pub use product::Product;
327-
pub use successors::{successors, Successors};
328-
pub use sum::Sum;
329-
320+
cfg_unstable! {
330321
mod double_ended_stream;
331322
mod exact_size_stream;
323+
mod extend;
324+
mod from_stream;
332325
mod fused_stream;
333326
mod interval;
327+
mod into_stream;
334328
mod pending;
335329
mod product;
336330
mod successors;
337331
mod sum;
338-
}
339-
340-
cfg_unstable! {
341-
mod from_stream;
342-
mod into_stream;
343-
mod extend;
344332

333+
pub use double_ended_stream::DoubleEndedStream;
334+
pub use exact_size_stream::ExactSizeStream;
345335
pub use extend::{extend, Extend};
346336
pub use from_stream::FromStream;
337+
pub use fused_stream::FusedStream;
338+
pub use interval::{interval, Interval};
347339
pub use into_stream::IntoStream;
340+
pub use pending::{pending, Pending};
341+
pub use product::Product;
342+
pub use stream::Merge;
343+
pub use successors::{successors, Successors};
344+
pub use sum::Sum;
348345
}

‎src/stream/once.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
55
use crate::stream::Stream;
66
use crate::task::{Context, Poll};
77

8-
#[cfg(feature = "std")]
8+
#[cfg(feature = "unstable")]
99
use crate::stream::DoubleEndedStream;
1010

1111
/// Creates a stream that yields a single item.
@@ -50,8 +50,8 @@ impl<T> Stream for Once<T> {
5050
}
5151
}
5252

53-
#[cfg(feature = "std")]
54-
impl<T> DoubleEndedStream for Once<T> {
53+
#[cfg(feature = "unstable")]
54+
impl <T> DoubleEndedStream for Once<T> {
5555
fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5656
Poll::Ready(self.project().value.take())
5757
}

‎src/stream/product.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use alloc::boxed::Box;
2-
use core::future::Future;
31
use core::pin::Pin;
2+
use core::future::Future;
43

54
use crate::stream::Stream;
65

@@ -14,6 +13,8 @@ use crate::stream::Stream;
1413
/// [`product`]: trait.Product.html#tymethod.product
1514
/// [`FromStream`]: trait.FromStream.html
1615
/// [`Stream::product`]: trait.Stream.html#method.product
16+
#[cfg(feature = "unstable")]
17+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1718
pub trait Product<A = Self>: Sized {
1819
/// Method which takes a stream and generates `Self` from the elements by
1920
/// multiplying the items.
@@ -22,9 +23,9 @@ pub trait Product<A = Self>: Sized {
2223
S: Stream<Item = A> + 'a;
2324
}
2425

25-
use crate::stream::stream::StreamExt;
26-
use core::num::Wrapping;
2726
use core::ops::Mul;
27+
use core::num::Wrapping;
28+
use crate::stream::stream::StreamExt;
2829

2930
macro_rules! integer_product {
3031
(@impls $one: expr, $($a:ty)*) => ($(
@@ -74,5 +75,5 @@ macro_rules! float_product {
7475
);
7576
}
7677

77-
integer_product! { i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
78-
float_product! { f32 f64 }
78+
integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
79+
float_product!{ f32 f64 }

‎src/stream/stream/count.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use crate::task::{Context, Poll};
99
pin_project! {
1010
#[doc(hidden)]
1111
#[allow(missing_debug_implementations)]
12+
#[cfg(feature = "unstable")]
13+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1214
pub struct CountFuture<S> {
1315
#[pin]
1416
stream: S,

‎src/stream/stream/merge.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pin_project! {
1616
///
1717
/// [`merge`]: trait.Stream.html#method.merge
1818
/// [`Stream`]: trait.Stream.html
19+
#[cfg(feature = "unstable")]
20+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1921
#[derive(Debug)]
2022
pub struct Merge<L, R> {
2123
#[pin]

‎src/stream/stream/mod.rs

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -112,43 +112,35 @@ pub use zip::Zip;
112112

113113
use core::cmp::Ordering;
114114

115-
cfg_std! {
116-
use core::time::Duration;
117-
use crate::stream::{Product, Sum};
118-
use alloc::boxed::Box;
115+
cfg_unstable! {
119116
use core::future::Future;
120117
use core::pin::Pin;
118+
use core::time::Duration;
121119

122-
use unzip::UnzipFuture;
123-
use count::CountFuture;
124-
125-
pub use throttle::Throttle;
126-
pub use merge::Merge;
127-
pub use delay::Delay;
128-
pub use timeout::{Timeout, TimeoutError};
129-
130-
mod timeout;
131-
mod throttle;
132-
mod merge;
133-
mod delay;
134-
mod unzip;
135-
mod count;
136-
}
137-
138-
cfg_unstable! {
139-
use crate::stream::FromStream;
140120
use crate::stream::into_stream::IntoStream;
121+
use crate::stream::{FromStream, Product, Sum};
141122
use crate::stream::Extend;
142123

124+
use count::CountFuture;
143125
use partition::PartitionFuture;
126+
use unzip::UnzipFuture;
144127

128+
pub use merge::Merge;
145129
pub use flatten::Flatten;
146130
pub use flat_map::FlatMap;
131+
pub use timeout::{TimeoutError, Timeout};
132+
pub use throttle::Throttle;
133+
pub use delay::Delay;
147134

148-
135+
mod count;
136+
mod merge;
149137
mod flatten;
150138
mod flat_map;
151139
mod partition;
140+
mod timeout;
141+
mod throttle;
142+
mod delay;
143+
mod unzip;
152144
}
153145

154146
extension_trait! {
@@ -363,7 +355,8 @@ extension_trait! {
363355
# }) }
364356
```
365357
"#]
366-
#[cfg(feature = "std")]
358+
#[cfg(feature = "unstable")]
359+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
367360
fn throttle(self, d: Duration) -> Throttle<Self>
368361
where
369362
Self: Sized,
@@ -605,7 +598,8 @@ extension_trait! {
605598
# }) }
606599
```
607600
"#]
608-
#[cfg(feature = "std")]
601+
#[cfg(any(feature = "unstable", feature = "docs"))]
602+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
609603
fn delay(self, dur: std::time::Duration) -> Delay<Self>
610604
where
611605
Self: Sized,
@@ -1517,6 +1511,8 @@ extension_trait! {
15171511
# }) }
15181512
```
15191513
"#]
1514+
#[cfg(feature = "unstable")]
1515+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
15201516
fn by_ref(&mut self) -> &mut Self {
15211517
self
15221518
}
@@ -1660,7 +1656,8 @@ extension_trait! {
16601656
# Ok(()) }) }
16611657
```
16621658
"#]
1663-
#[cfg(feature = "std")]
1659+
#[cfg(any(feature = "unstable", feature = "docs"))]
1660+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
16641661
fn timeout(self, dur: Duration) -> Timeout<Self>
16651662
where
16661663
Self: Stream + Sized,
@@ -1825,7 +1822,8 @@ extension_trait! {
18251822
# }) }
18261823
```
18271824
"#]
1828-
#[cfg(feature = "std")]
1825+
#[cfg(feature = "unstable")]
1826+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
18291827
fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
18301828
where
18311829
FromA: Default + Extend<A>,
@@ -1923,7 +1921,8 @@ extension_trait! {
19231921
# });
19241922
```
19251923
"#]
1926-
#[cfg(feature = "std")]
1924+
#[cfg(feature = "unstable")]
1925+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
19271926
fn merge<U>(self, other: U) -> Merge<Self, U>
19281927
where
19291928
Self: Sized,
@@ -2069,7 +2068,8 @@ extension_trait! {
20692068
# }) }
20702069
```
20712070
"#]
2072-
#[cfg(feature = "std")]
2071+
#[cfg(feature = "unstable")]
2072+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
20732073
fn count(self) -> impl Future<Output = usize> [CountFuture<Self>]
20742074
where
20752075
Self: Sized,
@@ -2330,7 +2330,8 @@ extension_trait! {
23302330
# }) }
23312331
```
23322332
"#]
2333-
#[cfg(feature = "std")]
2333+
#[cfg(feature = "unstable")]
2334+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
23342335
fn sum<'a, S>(
23352336
self,
23362337
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
@@ -2375,7 +2376,8 @@ extension_trait! {
23752376
# }) }
23762377
```
23772378
"#]
2378-
#[cfg(feature = "std")]
2379+
#[cfg(feature = "unstable")]
2380+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
23792381
fn product<'a, P>(
23802382
self,
23812383
) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]

‎src/stream/stream/timeout.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ impl<S: Stream> Stream for Timeout<S> {
4747
}
4848

4949
/// An error returned when a stream times out.
50+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
51+
#[cfg(any(feature = "unstable", feature = "docs"))]
5052
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
5153
pub struct TimeoutError {
5254
_private: (),

‎src/stream/stream/unzip.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use crate::task::{Context, Poll};
88

99
pin_project! {
1010
#[derive(Clone, Debug)]
11+
#[cfg(feature = "unstable")]
12+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1113
pub struct UnzipFuture<S, FromA, FromB> {
1214
#[pin]
1315
stream: S,

‎src/stream/successors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use pin_project_lite::pin_project;
2727
/// #
2828
/// # }) }
2929
/// ```
30+
#[cfg(feature = "unstable")]
31+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
3032
pub fn successors<F, T>(first: Option<T>, succ: F) -> Successors<F, T>
3133
where
3234
F: FnMut(&T) -> Option<T>,
@@ -41,6 +43,8 @@ pin_project! {
4143
/// This stream is constructed by [`successors`] function
4244
///
4345
/// [`successors`]: fn.succssors.html
46+
#[cfg(feature = "unstable")]
47+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
4448
#[derive(Debug)]
4549
pub struct Successors<F, T>
4650
where

‎src/stream/sum.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use alloc::boxed::Box;
21
use core::future::Future;
32
use core::pin::Pin;
43

@@ -14,6 +13,8 @@ use crate::stream::Stream;
1413
/// [`sum`]: trait.Sum.html#tymethod.sum
1514
/// [`FromStream`]: trait.FromStream.html
1615
/// [`Stream::sum`]: trait.Stream.html#method.sum
16+
#[cfg(feature = "unstable")]
17+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1718
pub trait Sum<A = Self>: Sized {
1819
/// Method which takes a stream and generates `Self` from the elements by
1920
/// "summing up" the items.

‎src/utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
2121
}
2222

2323
/// Generates a random number in `0..n`.
24-
#[cfg(any(feature = "std", feature = "default"))]
24+
#[cfg(any(feature = "unstable", feature = "default"))]
2525
pub fn random(n: u32) -> u32 {
2626
use std::cell::Cell;
2727
use std::num::Wrapping;
@@ -257,6 +257,11 @@ macro_rules! extension_trait {
257257
$(#[cfg(feature = "docs")] $imp)*
258258
};
259259

260+
// Optimization: expand `$head` eagerly before starting a new method definition.
261+
(@ext ($($head:tt)*) #[doc = $d:literal] $($tail:tt)*) => {
262+
$($head)* extension_trait!(@ext (#[doc = $d]) $($tail)*);
263+
};
264+
260265
// Parse the return type in an extension method.
261266
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
262267
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);

‎tests/io_timeout.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use async_std::task;
77
#[should_panic(expected = "timed out")]
88
fn io_timeout_timedout() {
99
task::block_on(async {
10-
io::timeout(Duration::from_millis(100), async {
11-
task::sleep(Duration::from_secs(1)).await;
12-
10+
io::timeout(Duration::from_secs(1), async {
11+
let stdin = io::stdin();
12+
let mut line = String::new();
13+
let _n = stdin.read_line(&mut line).await?;
1314
Ok(())
1415
})
1516
.await

0 commit comments

Comments
 (0)
Please sign in to comment.