Skip to content

Commit 526c4da

Browse files
authored
Merge pull request #395 from async-rs/stream-docs
add stream mod docs
2 parents 997e811 + eb081b1 commit 526c4da

File tree

1 file changed

+288
-9
lines changed

1 file changed

+288
-9
lines changed

src/stream/mod.rs

+288-9
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,303 @@
22
//!
33
//! This module is an async version of [`std::iter`].
44
//!
5-
//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
5+
//! If you've found yourself with an asynchronous collection of some kind,
6+
//! and needed to perform an operation on the elements of said collection,
7+
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
8+
//! asynchronous Rust code, so it's worth becoming familiar with them.
9+
//!
10+
//! Before explaining more, let's talk about how this module is structured:
11+
//!
12+
//! # Organization
13+
//!
14+
//! This module is largely organized by type:
15+
//!
16+
//! * [Traits] are the core portion: these traits define what kind of streams
17+
//! exist and what you can do with them. The methods of these traits are worth
18+
//! putting some extra study time into.
19+
//! * [Functions] provide some helpful ways to create some basic streams.
20+
//! * [Structs] are often the return types of the various methods on this
21+
//! module's traits. You'll usually want to look at the method that creates
22+
//! the `struct`, rather than the `struct` itself. For more detail about why,
23+
//! see '[Implementing Stream](#implementing-stream)'.
24+
//!
25+
//! [Traits]: #traits
26+
//! [Functions]: #functions
27+
//! [Structs]: #structs
28+
//!
29+
//! That's it! Let's dig into streams.
30+
//!
31+
//! # Stream
32+
//!
33+
//! The heart and soul of this module is the [`Stream`] trait. The core of
34+
//! [`Stream`] looks like this:
35+
//!
36+
//! ```
37+
//! # use async_std::task::{Context, Poll};
38+
//! # use std::pin::Pin;
39+
//! trait Stream {
40+
//! type Item;
41+
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
42+
//! }
43+
//! ```
44+
//!
45+
//! A stream has a method, [`next`], which when called, returns an
46+
//! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))`
47+
//! as long as there are elements, and once they've all been exhausted, will
48+
//! return `None` to indicate that iteration is finished. If we're waiting on
49+
//! something asynchronous to resolve `Pending` is returned.
50+
//!
51+
//! Individual streams may choose to resume iteration, and so calling
52+
//! [`next`] again may or may not eventually start returning `Ready(Some(Item))`
53+
//! again at some point.
54+
//!
55+
//! [`Stream`]'s full definition includes a number of other methods as well,
56+
//! but they are default methods, built on top of [`next`], and so you get
57+
//! them for free.
58+
//!
59+
//! Streams are also composable, and it's common to chain them together to do
60+
//! more complex forms of processing. See the [Adapters](#adapters) section
61+
//! below for more details.
62+
//!
63+
//! [`Poll`]: ../task/enum.Poll.html
64+
//! [`Stream`]: trait.Stream.html
65+
//! [`next`]: trait.Stream.html#tymethod.next
66+
//! [`Option`]: ../../std/option/enum.Option.html
67+
//!
68+
//! # The three forms of streaming
69+
//!
70+
//! There are three common methods which can create streams from a collection:
71+
//!
72+
//! * `stream()`, which iterates over `&T`.
73+
//! * `stream_mut()`, which iterates over `&mut T`.
74+
//! * `into_stream()`, which iterates over `T`.
75+
//!
76+
//! Various things in async-std may implement one or more of the
77+
//! three, where appropriate.
78+
//!
79+
//! # Implementing Stream
80+
//!
81+
//! Creating a stream of your own involves two steps: creating a `struct` to
82+
//! hold the stream's state, and then `impl`ementing [`Stream`] for that
83+
//! `struct`. This is why there are so many `struct`s in this module: there is
84+
//! one for each stream and iterator adapter.
685
//!
7-
//! # Examples
86+
//! Let's make a stream named `Counter` which counts from `1` to `5`:
887
//!
988
//! ```
10-
//! # async_std::task::block_on(async {
89+
//! # use async_std::prelude::*;
90+
//! # use async_std::task::{Context, Poll};
91+
//! # use std::pin::Pin;
92+
//! // First, the struct:
93+
//!
94+
//! /// A stream which counts from one to five
95+
//! struct Counter {
96+
//! count: usize,
97+
//! }
98+
//!
99+
//! // we want our count to start at one, so let's add a new() method to help.
100+
//! // This isn't strictly necessary, but is convenient. Note that we start
101+
//! // `count` at zero, we'll see why in `next()`'s implementation below.
102+
//! impl Counter {
103+
//! fn new() -> Counter {
104+
//! Counter { count: 0 }
105+
//! }
106+
//! }
107+
//!
108+
//! // Then, we implement `Stream` for our `Counter`:
109+
//!
110+
//! impl Stream for Counter {
111+
//! // we will be counting with usize
112+
//! type Item = usize;
113+
//!
114+
//! // poll_next() is the only required method
115+
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116+
//! // Increment our count. This is why we started at zero.
117+
//! self.count += 1;
118+
//!
119+
//! // Check to see if we've finished counting or not.
120+
//! if self.count < 6 {
121+
//! Poll::Ready(Some(self.count))
122+
//! } else {
123+
//! Poll::Ready(None)
124+
//! }
125+
//! }
126+
//! }
127+
//!
128+
//! // And now we can use it!
129+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
130+
//! #
131+
//! let mut counter = Counter::new();
132+
//!
133+
//! let x = counter.next().await.unwrap();
134+
//! println!("{}", x);
135+
//!
136+
//! let x = counter.next().await.unwrap();
137+
//! println!("{}", x);
138+
//!
139+
//! let x = counter.next().await.unwrap();
140+
//! println!("{}", x);
141+
//!
142+
//! let x = counter.next().await.unwrap();
143+
//! println!("{}", x);
144+
//!
145+
//! let x = counter.next().await.unwrap();
146+
//! println!("{}", x);
11147
//! #
12-
//! use async_std::prelude::*;
13-
//! use async_std::stream;
148+
//! # Ok(()) }) }
149+
//! ```
150+
//!
151+
//! This will print `1` through `5`, each on their own line.
14152
//!
15-
//! let mut s = stream::repeat(9).take(3);
153+
//! Calling `next().await` this way gets repetitive. Rust has a construct which
154+
//! can call `next()` on your stream, until it reaches `None`. Let's go over
155+
//! that next.
16156
//!
17-
//! while let Some(v) = s.next().await {
18-
//! assert_eq!(v, 9);
157+
//! # while let Loops and IntoStream
158+
//!
159+
//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic
160+
//! example of `while let`:
161+
//!
162+
//! ```
163+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
164+
//! #
165+
//! # use async_std::prelude::*;
166+
//! # use async_std::stream;
167+
//! let mut values = stream::repeat(1u8).take(5);
168+
//!
169+
//! while let Some(x) = values.next().await {
170+
//! println!("{}", x);
19171
//! }
20172
//! #
21-
//! # })
173+
//! # Ok(()) }) }
22174
//! ```
175+
//!
176+
//! This will print the numbers one through five, each on their own line. But
177+
//! you'll notice something here: we never called anything on our vector to
178+
//! produce a stream. What gives?
179+
//!
180+
//! There's a trait in the standard library for converting something into an
181+
//! stream: [`IntoStream`]. This trait has one method, [`into_stream],
182+
//! which converts the thing implementing [`IntoStream`] into a stream.
183+
//!
184+
//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler
185+
//! support yet. This means that automatic conversions like with `for` loops
186+
//! doesn't occur yet, and `into_stream` will always have to be called manually.
187+
//!
188+
//! [`IntoStream`]: trait.IntoStream.html
189+
//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
190+
//!
191+
//! # Adapters
192+
//!
193+
//! Functions which take an [`Stream`] and return another [`Stream`] are
194+
//! often called 'stream adapters', as they are a form of the 'adapter
195+
//! pattern'.
196+
//!
197+
//! Common stream adapters include [`map`], [`take`], and [`filter`].
198+
//! For more, see their documentation.
199+
//!
200+
//! [`map`]: trait.Stream.html#method.map
201+
//! [`take`]: trait.Stream.html#method.take
202+
//! [`filter`]: trait.Stream.html#method.filter
203+
//!
204+
//! # Laziness
205+
//!
206+
//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that
207+
//! just creating a stream doesn't _do_ a whole lot. Nothing really happens
208+
//! until you call [`next`]. This is sometimes a source of confusion when
209+
//! creating a stream solely for its side effects. For example, the [`map`]
210+
//! method calls a closure on each element it iterates over:
211+
//!
212+
//! ```
213+
//! # #![allow(unused_must_use)]
214+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
215+
//! #
216+
//! # use async_std::prelude::*;
217+
//! # use async_std::stream;
218+
//! let v = stream::repeat(1u8).take(5);
219+
//! v.map(|x| println!("{}", x));
220+
//! #
221+
//! # Ok(()) }) }
222+
//! ```
223+
//!
224+
//! This will not print any values, as we only created a stream, rather than
225+
//! using it. The compiler will warn us about this kind of behavior:
226+
//!
227+
//! ```text
228+
//! warning: unused result that must be used: streams are lazy and
229+
//! do nothing unless consumed
230+
//! ```
231+
//!
232+
//! The idiomatic way to write a [`map`] for its side effects is to use a
233+
//! `while let` loop instead:
234+
//!
235+
//! ```
236+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
237+
//! #
238+
//! # use async_std::prelude::*;
239+
//! # use async_std::stream;
240+
//! let mut v = stream::repeat(1u8).take(5);
241+
//!
242+
//! while let Some(x) = &v.next().await {
243+
//! println!("{}", x);
244+
//! }
245+
//! #
246+
//! # Ok(()) }) }
247+
//! ```
248+
//!
249+
//! [`map`]: trait.Stream.html#method.map
250+
//!
251+
//! The two most common ways to evaluate a stream are to use a `while let` loop
252+
//! like this, or using the [`collect`] method to produce a new collection.
253+
//!
254+
//! [`collect`]: trait.Stream.html#method.collect
255+
//!
256+
//! # Infinity
257+
//!
258+
//! Streams do not have to be finite. As an example, an repeat stream is
259+
//! an infinite stream:
260+
//!
261+
//! ```
262+
//! # use async_std::stream;
263+
//! let numbers = stream::repeat(1u8);
264+
//! ```
265+
//!
266+
//! It is common to use the [`take`] stream adapter to turn an infinite
267+
//! stream into a finite one:
268+
//!
269+
//! ```
270+
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
271+
//! #
272+
//! # use async_std::prelude::*;
273+
//! # use async_std::stream;
274+
//! let numbers = stream::repeat(1u8);
275+
//! let mut five_numbers = numbers.take(5);
276+
//!
277+
//! while let Some(number) = five_numbers.next().await {
278+
//! println!("{}", number);
279+
//! }
280+
//! #
281+
//! # Ok(()) }) }
282+
//! ```
283+
//!
284+
//! This will print the numbers `0` through `4`, each on their own line.
285+
//!
286+
//! Bear in mind that methods on infinite streams, even those for which a
287+
//! result can be determined mathematically in finite time, may not terminate.
288+
//! Specifically, methods such as [`min`], which in the general case require
289+
//! traversing every element in the stream, are likely not to return
290+
//! successfully for any infinite streams.
291+
//!
292+
//! ```ignore
293+
//! let ones = async_std::stream::repeat(1);
294+
//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop!
295+
//! // `ones.min()` causes an infinite loop, so we won't reach this point!
296+
//! println!("The smallest number one is {}.", least);
297+
//! ```
298+
//!
299+
//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
300+
//! [`take`]: trait.Stream.html#method.take
301+
//! [`min`]: trait.Stream.html#method.min
23302
24303
pub use empty::{empty, Empty};
25304
pub use from_fn::{from_fn, FromFn};

0 commit comments

Comments
 (0)