Skip to content

Commit 4e1d79a

Browse files
authored
Merge pull request #524 from yjhmelody/stream-max
Add Stream max
2 parents 338273e + 879af6d commit 4e1d79a

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

src/stream/stream/max.rs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::cmp::{Ord, Ordering};
2+
use std::marker::PhantomData;
3+
use std::pin::Pin;
4+
use std::future::Future;
5+
6+
use pin_project_lite::pin_project;
7+
8+
use crate::stream::Stream;
9+
use crate::task::{Context, Poll};
10+
11+
pin_project! {
12+
#[doc(hidden)]
13+
#[allow(missing_debug_implementations)]
14+
pub struct MaxFuture<S, F, T> {
15+
#[pin]
16+
stream: S,
17+
_compare: PhantomData<F>,
18+
max: Option<T>,
19+
}
20+
}
21+
22+
impl<S, F, T> MaxFuture<S, F, T> {
23+
pub(super) fn new(stream: S) -> Self {
24+
Self {
25+
stream,
26+
_compare: PhantomData,
27+
max: None,
28+
}
29+
}
30+
}
31+
32+
impl<S, F> Future for MaxFuture<S, F, S::Item>
33+
where
34+
S: Stream,
35+
S::Item: Ord,
36+
F: FnMut(&S::Item, &S::Item) -> Ordering,
37+
{
38+
type Output = Option<S::Item>;
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
let this = self.project();
42+
let next = futures_core::ready!(this.stream.poll_next(cx));
43+
44+
match next {
45+
Some(new) => {
46+
cx.waker().wake_by_ref();
47+
match this.max.take() {
48+
None => *this.max = Some(new),
49+
50+
Some(old) => match new.cmp(&old) {
51+
Ordering::Greater => *this.max = Some(new),
52+
_ => *this.max = Some(old),
53+
},
54+
}
55+
Poll::Pending
56+
}
57+
None => Poll::Ready(this.max.take()),
58+
}
59+
}
60+
}

src/stream/stream/mod.rs

+35
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod last;
4444
mod le;
4545
mod lt;
4646
mod map;
47+
mod max;
4748
mod max_by;
4849
mod max_by_key;
4950
mod min;
@@ -80,6 +81,7 @@ use gt::GtFuture;
8081
use last::LastFuture;
8182
use le::LeFuture;
8283
use lt::LtFuture;
84+
use max::MaxFuture;
8385
use max_by::MaxByFuture;
8486
use max_by_key::MaxByKeyFuture;
8587
use min::MinFuture;
@@ -965,6 +967,39 @@ extension_trait! {
965967
}
966968

967969
#[doc = r#"
970+
Returns the element that gives the maximum value. If several elements are equally maximum,
971+
the first element is returned. If the stream is empty, `None` is returned.
972+
973+
# Examples
974+
975+
```ignore
976+
# fn main() { async_std::task::block_on(async {
977+
#
978+
use async_std::prelude::*;
979+
use async_std::stream;
980+
981+
let s = stream::from_iter(vec![1usize, 2, 3]);
982+
983+
let max = s.clone().max().await;
984+
assert_eq!(max, Some(3));
985+
986+
let max = stream::empty::<usize>().max().await;
987+
assert_eq!(max, None);
988+
#
989+
# }) }
990+
```
991+
"#]
992+
fn max<F>(
993+
self,
994+
) -> impl Future<Output = Option<Self::Item>> [MaxFuture<Self, F, Self::Item>]
995+
where
996+
Self: Sized,
997+
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
998+
{
999+
MaxFuture::new(self)
1000+
}
1001+
1002+
#[doc = r#"
9681003
Returns the element that gives the minimum value. If several elements are equally minimum,
9691004
the first element is returned. If the stream is empty, `None` is returned.
9701005

0 commit comments

Comments
 (0)