Skip to content

Commit eecdf8a

Browse files
committed
Add Stream count
1 parent f611cec commit eecdf8a

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

src/stream/stream/count.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::pin::Pin;
2+
use std::future::Future;
3+
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[doc(hidden)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct CountFuture<S> {
13+
#[pin]
14+
stream: S,
15+
count: usize,
16+
}
17+
}
18+
19+
impl<S> CountFuture<S> {
20+
pub(super) fn new(stream: S) -> Self {
21+
Self {
22+
stream,
23+
count: 0,
24+
}
25+
}
26+
}
27+
28+
impl<S> Future for CountFuture<S>
29+
where
30+
S: Stream,
31+
{
32+
type Output = usize;
33+
34+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35+
let this = self.project();
36+
let next = futures_core::ready!(this.stream.poll_next(cx));
37+
38+
match next {
39+
Some(_) => {
40+
cx.waker().wake_by_ref();
41+
*this.count += 1;
42+
Poll::Pending
43+
}
44+
None => Poll::Ready(*this.count),
45+
}
46+
}
47+
}

src/stream/stream/mod.rs

+31
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod chain;
2727
mod cloned;
2828
mod cmp;
2929
mod copied;
30+
mod count;
3031
mod cycle;
3132
mod enumerate;
3233
mod eq;
@@ -67,6 +68,7 @@ mod zip;
6768
use all::AllFuture;
6869
use any::AnyFuture;
6970
use cmp::CmpFuture;
71+
use count::CountFuture;
7072
use cycle::Cycle;
7173
use enumerate::Enumerate;
7274
use eq::EqFuture;
@@ -912,6 +914,35 @@ extension_trait! {
912914
MinByFuture::new(self, compare)
913915
}
914916

917+
#[doc = r#"
918+
Counting the number of elements and returning it.
919+
920+
# Examples
921+
922+
```ignore
923+
# fn main() { async_std::task::block_on(async {
924+
#
925+
use async_std::prelude::*;
926+
use async_std::stream;
927+
928+
let s = stream::from_iter(vec![1, 2, 3]);
929+
930+
let count = s.count().await;
931+
assert_eq!(count, 3);
932+
933+
#
934+
# }) }
935+
```
936+
"#]
937+
fn count<F>(
938+
self,
939+
) -> impl Future<Output = usize> [CountFuture<Self>]
940+
where
941+
Self: Sized,
942+
{
943+
CountFuture::new(self)
944+
}
945+
915946
#[doc = r#"
916947
Returns the element that gives the minimum value. If several elements are equally minimum,
917948
the first element is returned. If the stream is empty, `None` is returned.

0 commit comments

Comments
 (0)