Skip to content

Commit 338273e

Browse files
authored
Merge pull request #356 from Wassasin/342-stream-throttle
Implemented StreamExt::throttle
2 parents 0c2282f + dda65cb commit 338273e

File tree

3 files changed

+149
-0
lines changed

3 files changed

+149
-0
lines changed

examples/throttle.rs

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
//! Spawns a timed task which gets throttled.
2+
3+
fn main() {
4+
#[cfg(feature = "unstable")]
5+
{
6+
use async_std::prelude::*;
7+
use async_std::task;
8+
9+
task::block_on(async {
10+
use async_std::stream;
11+
use std::time::Duration;
12+
13+
// emit value every 1 second
14+
let s = stream::interval(Duration::from_secs(1)).enumerate();
15+
16+
// throttle for 2 seconds
17+
let s = s.throttle(Duration::from_secs(2));
18+
19+
s.for_each(|(n, _)| {
20+
dbg!(n);
21+
})
22+
.await;
23+
// => 0 .. 1 .. 2 .. 3
24+
// with a pause of 2 seconds between each print
25+
})
26+
}
27+
}

src/stream/stream/mod.rs

+52
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,13 @@ cfg_unstable! {
123123
pub use flatten::Flatten;
124124
pub use flat_map::FlatMap;
125125
pub use timeout::{TimeoutError, Timeout};
126+
pub use throttle::Throttle;
126127

127128
mod merge;
128129
mod flatten;
129130
mod flat_map;
130131
mod timeout;
132+
mod throttle;
131133
}
132134

133135
extension_trait! {
@@ -313,6 +315,56 @@ extension_trait! {
313315
TakeWhile::new(self, predicate)
314316
}
315317

318+
#[doc = r#"
319+
Limit the amount of items yielded per timeslice in a stream.
320+
321+
This stream does not drop any items, but will only limit the rate at which items pass through.
322+
# Examples
323+
```
324+
# fn main() { async_std::task::block_on(async {
325+
#
326+
use async_std::prelude::*;
327+
use async_std::stream;
328+
use std::time::{Duration, Instant};
329+
330+
let start = Instant::now();
331+
332+
// emit value every 5 milliseconds
333+
let s = stream::interval(Duration::from_millis(5))
334+
.enumerate()
335+
.take(3);
336+
337+
// throttle for 10 milliseconds
338+
let mut s = s.throttle(Duration::from_millis(10));
339+
340+
assert_eq!(s.next().await, Some((0, ())));
341+
let duration_ms = start.elapsed().as_millis();
342+
assert!(duration_ms >= 5);
343+
344+
assert_eq!(s.next().await, Some((1, ())));
345+
let duration_ms = start.elapsed().as_millis();
346+
assert!(duration_ms >= 15);
347+
348+
assert_eq!(s.next().await, Some((2, ())));
349+
let duration_ms = start.elapsed().as_millis();
350+
assert!(duration_ms >= 25);
351+
352+
assert_eq!(s.next().await, None);
353+
let duration_ms = start.elapsed().as_millis();
354+
assert!(duration_ms >= 35);
355+
#
356+
# }) }
357+
```
358+
"#]
359+
#[cfg(all(feature = "default", feature = "unstable"))]
360+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
361+
fn throttle(self, d: Duration) -> Throttle<Self>
362+
where
363+
Self: Sized,
364+
{
365+
Throttle::new(self, d)
366+
}
367+
316368
#[doc = r#"
317369
Creates a stream that yields each `step`th element.
318370

src/stream/stream/throttle.rs

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::time::{Duration, Instant};
4+
5+
use futures_timer::Delay;
6+
use pin_project_lite::pin_project;
7+
8+
use crate::stream::Stream;
9+
use crate::task::{Context, Poll};
10+
11+
pin_project! {
12+
/// A stream that only yields one element once every `duration`.
13+
///
14+
/// This `struct` is created by the [`throttle`] method on [`Stream`]. See its
15+
/// documentation for more.
16+
///
17+
/// [`throttle`]: trait.Stream.html#method.throttle
18+
/// [`Stream`]: trait.Stream.html
19+
#[doc(hidden)]
20+
#[allow(missing_debug_implementations)]
21+
pub struct Throttle<S> {
22+
#[pin]
23+
stream: S,
24+
duration: Duration,
25+
#[pin]
26+
blocked: bool,
27+
#[pin]
28+
delay: Delay,
29+
}
30+
}
31+
32+
impl<S: Stream> Throttle<S> {
33+
pub(super) fn new(stream: S, duration: Duration) -> Self {
34+
Throttle {
35+
stream,
36+
duration,
37+
blocked: false,
38+
delay: Delay::new(Duration::default()),
39+
}
40+
}
41+
}
42+
43+
impl<S: Stream> Stream for Throttle<S> {
44+
type Item = S::Item;
45+
46+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
47+
let mut this = self.project();
48+
if *this.blocked {
49+
let d = this.delay.as_mut();
50+
if d.poll(cx).is_ready() {
51+
*this.blocked = false;
52+
} else {
53+
return Poll::Pending;
54+
}
55+
}
56+
57+
match this.stream.poll_next(cx) {
58+
Poll::Pending => {
59+
cx.waker().wake_by_ref(); // Continue driving even though emitting Pending
60+
Poll::Pending
61+
}
62+
Poll::Ready(None) => Poll::Ready(None),
63+
Poll::Ready(Some(v)) => {
64+
*this.blocked = true;
65+
this.delay.reset(Instant::now() + *this.duration);
66+
Poll::Ready(Some(v))
67+
}
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)