Skip to content

Commit 8f2ed48

Browse files
committed
add stream::peekable
peek() implementation is still missing
1 parent ca80ca9 commit 8f2ed48

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

src/stream/stream/mod.rs

+33
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod min_by;
4444
mod next;
4545
mod nth;
4646
mod partial_cmp;
47+
mod peekable;
4748
mod scan;
4849
mod skip;
4950
mod skip_while;
@@ -80,6 +81,7 @@ pub use filter::Filter;
8081
pub use fuse::Fuse;
8182
pub use inspect::Inspect;
8283
pub use map::Map;
84+
pub use peekable::Peekable;
8385
pub use scan::Scan;
8486
pub use skip::Skip;
8587
pub use skip_while::SkipWhile;
@@ -984,6 +986,37 @@ extension_trait! {
984986
}
985987
}
986988

989+
#[doc = r#"
990+
## Examples
991+
992+
```
993+
# fn main() { async_std::task::block_on(async {
994+
#
995+
use std::collections::VecDeque;
996+
997+
use async_std::prelude::*;
998+
999+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
1000+
let mut s = s.peekable();
1001+
1002+
assert_eq!(s.next().await, Some(1));
1003+
assert_eq!(s.peek().await, Some(1));
1004+
assert_eq!(s.next().await, Some(2));
1005+
assert_eq!(s.next().await, Some(3));
1006+
assert_eq!(s.next().await, None);
1007+
#
1008+
# }) }
1009+
```
1010+
"#]
1011+
#[inline]
1012+
fn peekable(self) -> Peekable<Self>
1013+
where
1014+
Self: Sized,
1015+
{
1016+
Peekable::new(self)
1017+
}
1018+
1019+
9871020
#[doc = r#"
9881021
A stream adaptor similar to [`fold`] that holds internal state and produces a new
9891022
stream.

src/stream/stream/peekable.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::pin::Pin;
2+
3+
use crate::stream::Stream;
4+
use crate::task::{Context, Poll};
5+
6+
#[doc(hidden)]
7+
#[allow(missing_debug_implementations)]
8+
pub struct Peekable<S: Stream> {
9+
stream: S,
10+
peeked: Option<Option<S::Item>>,
11+
}
12+
13+
14+
impl<S> Peekable<S>
15+
where
16+
S: Stream,
17+
{
18+
pin_utils::unsafe_pinned!(stream: S);
19+
pin_utils::unsafe_unpinned!(peeked: Option<Option<S::Item>>);
20+
21+
pub(crate) fn new(stream: S) -> Self {
22+
Peekable{
23+
stream: stream,
24+
peeked: None,
25+
}
26+
}
27+
28+
pub fn peek(&mut self) -> Poll<Option<&S::Item>> {
29+
Poll::Ready(None)
30+
}
31+
32+
}
33+
34+
impl<S> Stream for Peekable<S>
35+
where
36+
S: Stream,
37+
{
38+
type Item = S::Item;
39+
40+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41+
match &self.peeked {
42+
Some(_) => Poll::Ready(self.as_mut().peeked().take().unwrap()) ,
43+
None => {
44+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
45+
Poll::Ready(next)
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)