Skip to content

Commit ed944d0

Browse files
committed
adds stream::enumerate combinator
1 parent 6f9ec66 commit ed944d0

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

src/stream/stream/enumerate.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use crate::task::{Context, Poll};
2+
use std::pin::Pin;
3+
4+
#[doc(hidden)]
5+
#[allow(missing_debug_implementations)]
6+
pub struct Enumerate<S> {
7+
stream: S,
8+
i: usize,
9+
}
10+
11+
impl<S> Enumerate<S> {
12+
pin_utils::unsafe_pinned!(stream: S);
13+
pin_utils::unsafe_unpinned!(i: usize);
14+
15+
pub(super) fn new(stream: S) -> Self {
16+
Enumerate { stream, i: 0 }
17+
}
18+
}
19+
20+
impl<S> futures_core::stream::Stream for Enumerate<S>
21+
where
22+
S: futures_core::stream::Stream,
23+
{
24+
type Item = (usize, S::Item);
25+
26+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
27+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
28+
29+
match next {
30+
Some(v) => {
31+
let ret = (self.i, v);
32+
*self.as_mut().i() += 1;
33+
Poll::Ready(Some(ret))
34+
}
35+
None => Poll::Ready(None),
36+
}
37+
}
38+
}

src/stream/stream/mod.rs

+32
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
mod all;
2525
mod any;
26+
mod enumerate;
2627
mod filter_map;
2728
mod find_map;
2829
mod min_by;
@@ -34,6 +35,7 @@ pub use take::Take;
3435

3536
use all::AllFuture;
3637
use any::AnyFuture;
38+
use enumerate::Enumerate;
3739
use filter_map::FilterMap;
3840
use find_map::FindMapFuture;
3941
use min_by::MinByFuture;
@@ -136,6 +138,36 @@ pub trait Stream {
136138
}
137139
}
138140

141+
/// Creates a stream that gives the current element's count as well as the next value.
142+
///
143+
/// # Overflow behaviour.
144+
///
145+
/// This combinator does no guarding against overflows.
146+
///
147+
/// # Examples
148+
/// ```
149+
/// # fn main() { async_std::task::block_on(async {
150+
/// #
151+
/// use async_std::prelude::*;
152+
/// use async_std::stream;
153+
///
154+
/// let mut s = stream::repeat(9).take(4).enumerate();
155+
/// let mut c: usize = 0;
156+
///
157+
/// while let Some((i, v)) = s.next().await {
158+
/// assert_eq!(c, i);
159+
/// assert_eq!(v, 9);
160+
/// c += 1;
161+
/// }
162+
/// #
163+
/// # }) }
164+
fn enumerate(self) -> Enumerate<Self>
165+
where
166+
Self: Sized,
167+
{
168+
Enumerate::new(self)
169+
}
170+
139171
/// Both filters and maps a stream.
140172
///
141173
/// # Examples

0 commit comments

Comments
 (0)