Skip to content

Commit 10e48d8

Browse files
authored
Merge pull request #545 from ishitatsuyuki/stream_pollfn
Implement stream::poll_fn
2 parents b21e4f4 + cce0615 commit 10e48d8

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

src/stream/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod merge;
4343
mod once;
4444
mod or_else;
4545
mod peek;
46+
mod poll_fn;
4647
mod select;
4748
mod skip;
4849
mod skip_while;
@@ -70,6 +71,7 @@ pub use self::merge::{Merge, MergedItem};
7071
pub use self::once::{Once, once};
7172
pub use self::or_else::OrElse;
7273
pub use self::peek::Peekable;
74+
pub use self::poll_fn::{poll_fn, PollFn};
7375
pub use self::select::Select;
7476
pub use self::skip::Skip;
7577
pub use self::skip_while::SkipWhile;

src/stream/poll_fn.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! Definition of the `PollFn` combinator
2+
3+
use {Stream, Poll};
4+
5+
/// A stream which adapts a function returning `Poll`.
6+
///
7+
/// Created by the `poll_fn` function.
8+
#[derive(Debug)]
9+
#[must_use = "streams do nothing unless polled"]
10+
pub struct PollFn<F> {
11+
inner: F,
12+
}
13+
14+
/// Creates a new stream wrapping around a function returning `Poll`.
15+
///
16+
/// Polling the returned stream delegates to the wrapped function.
17+
///
18+
/// # Examples
19+
///
20+
/// ```
21+
/// use futures::stream::poll_fn;
22+
/// use futures::{Async, Poll};
23+
///
24+
/// let mut counter = 1usize;
25+
///
26+
/// let read_stream = poll_fn(move || -> Poll<Option<String>, std::io::Error> {
27+
/// if counter == 0 { return Ok(Async::Ready(None)); }
28+
/// counter -= 1;
29+
/// Ok(Async::Ready(Some("Hello, World!".to_owned())))
30+
/// });
31+
/// ```
32+
pub fn poll_fn<T, E, F>(f: F) -> PollFn<F>
33+
where
34+
F: FnMut() -> Poll<Option<T>, E>,
35+
{
36+
PollFn { inner: f }
37+
}
38+
39+
impl<T, E, F> Stream for PollFn<F>
40+
where
41+
F: FnMut() -> Poll<Option<T>, E>,
42+
{
43+
type Item = T;
44+
type Error = E;
45+
46+
fn poll(&mut self) -> Poll<Option<T>, E> {
47+
(self.inner)()
48+
}
49+
}

tests/stream.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#[macro_use]
22
extern crate futures;
33

4-
use futures::{Poll, Future, Stream, Sink};
4+
use futures::{Async, Future, Poll, Sink, Stream};
55
use futures::executor;
6-
use futures::future::{ok, err};
7-
use futures::stream::{empty, iter, Peekable, BoxStream};
6+
use futures::future::{err, ok};
7+
use futures::stream::{empty, iter, poll_fn, BoxStream, Peekable};
88
use futures::sync::oneshot;
99
use futures::sync::mpsc;
1010

@@ -356,3 +356,18 @@ fn concat2() {
356356
let c = empty::<Vec<()>, ()>();
357357
assert_done(move || c.concat2(), Ok(vec![]))
358358
}
359+
360+
#[test]
361+
fn stream_poll_fn() {
362+
let mut counter = 5usize;
363+
364+
let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> {
365+
if counter == 0 {
366+
return Ok(Async::Ready(None));
367+
}
368+
counter -= 1;
369+
Ok(Async::Ready(Some(counter)))
370+
});
371+
372+
assert_eq!(read_stream.wait().count(), 5);
373+
}

0 commit comments

Comments
 (0)