Skip to content

Commit cbc2e3b

Browse files
authored
Merge pull request #557 from alexcrichton/add-iter-ok
Add an `iter_ok` adaptor for non-Result iterators
2 parents dfc40f3 + f62ddbe commit cbc2e3b

13 files changed

+157
-39
lines changed

src/stream/iter.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#![deprecated(note = "implemention moved to `iter_ok`")]
2+
#![allow(deprecated)]
3+
14
use {Async, Poll};
25
use stream::Stream;
36

src/stream/iter_ok.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use core::marker;
2+
3+
use {Async, Poll};
4+
use stream::Stream;
5+
6+
/// A stream which is just a shim over an underlying instance of `Iterator`.
7+
///
8+
/// This stream will never block and is always ready.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct IterOk<I, E> {
12+
iter: I,
13+
_marker: marker::PhantomData<fn() -> E>,
14+
}
15+
16+
/// Converts an `Iterator` over into a `Stream` which is always ready
17+
/// to yield the next value.
18+
///
19+
/// Iterators in Rust don't express the ability to block, so this adapter
20+
/// simply always calls `iter.next()` and returns that.
21+
///
22+
/// ```rust
23+
/// use futures::*;
24+
///
25+
/// let mut stream = stream::iter_ok::<_, ()>(vec![17, 19]);
26+
/// assert_eq!(Ok(Async::Ready(Some(17))), stream.poll());
27+
/// assert_eq!(Ok(Async::Ready(Some(19))), stream.poll());
28+
/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
29+
/// ```
30+
pub fn iter_ok<I, E>(i: I) -> IterOk<I::IntoIter, E>
31+
where I: IntoIterator,
32+
{
33+
IterOk {
34+
iter: i.into_iter(),
35+
_marker: marker::PhantomData,
36+
}
37+
}
38+
39+
impl<I, E> Stream for IterOk<I, E>
40+
where I: Iterator,
41+
{
42+
type Item = I::Item;
43+
type Error = E;
44+
45+
fn poll(&mut self) -> Poll<Option<I::Item>, E> {
46+
Ok(Async::Ready(self.iter.next()))
47+
}
48+
}

src/stream/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
use {IntoFuture, Poll};
1919

2020
mod iter;
21+
#[allow(deprecated)]
2122
pub use self::iter::{iter, Iter};
2223
#[cfg(feature = "with-deprecated")]
24+
#[allow(deprecated)]
2325
pub use self::Iter as IterStream;
26+
mod iter_ok;
27+
pub use self::iter_ok::{iter_ok, IterOk};
2428

2529
mod repeat;
2630
pub use self::repeat::{repeat, Repeat};

src/stream/once.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
use core;
2-
3-
use Poll;
4-
use stream;
1+
use {Poll, Async};
52
use stream::Stream;
63

74
/// A stream which emits single element and then EOF.
85
///
96
/// This stream will never block and is always ready.
107
#[derive(Debug)]
118
#[must_use = "streams do nothing unless polled"]
12-
pub struct Once<T, E>(stream::Iter<core::iter::Once<Result<T, E>>>);
9+
pub struct Once<T, E>(Option<Result<T, E>>);
1310

1411
/// Creates a stream of single element
1512
///
@@ -21,14 +18,18 @@ pub struct Once<T, E>(stream::Iter<core::iter::Once<Result<T, E>>>);
2118
/// assert_eq!(Ok(Async::Ready(None)), stream.poll());
2219
/// ```
2320
pub fn once<T, E>(item: Result<T, E>) -> Once<T, E> {
24-
Once(stream::iter(core::iter::once(item)))
21+
Once(Some(item))
2522
}
2623

2724
impl<T, E> Stream for Once<T, E> {
2825
type Item = T;
2926
type Error = E;
3027

3128
fn poll(&mut self) -> Poll<Option<T>, E> {
32-
self.0.poll()
29+
match self.0.take() {
30+
Some(Ok(e)) => Ok(Async::Ready(Some(e))),
31+
Some(Err(e)) => Err(e),
32+
None => Ok(Async::Ready(None)),
33+
}
3334
}
3435
}

src/sync/mpsc/mod.rs

+18
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,24 @@ impl<T> Sender<T> {
484484
Async::Ready(())
485485
}
486486
}
487+
488+
/// Attempts to send a message on this `Sender<T>` without blocking.
489+
///
490+
/// This function, unlike `start_send`, is safe to call whether or not it's
491+
/// being called on a task or not. Note that this function, however, will
492+
/// *not* attempt to block the current task if the message cannot be sent.
493+
///
494+
/// It is not recommended to call this function from inside of a future,
495+
/// only from an external thread where you've otherwise arranged to be
496+
/// notified when the channel is no longer full.
497+
pub fn try_send(&self, msg: T) -> Result<(), SendError<T>> {
498+
match self.inc_num_messages(false) {
499+
Some(_park_self) => {}
500+
None => return Err(SendError(msg)),
501+
};
502+
self.queue_push_and_signal(Some(msg));
503+
Ok(())
504+
}
487505
}
488506

489507
impl<T> Sink for Sender<T> {

tests/bilock.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ fn concurrent() {
5959
a: Some(a),
6060
remaining: N,
6161
};
62-
let b = stream::iter((0..N).map(Ok::<_, ()>)).fold(b, |b, _n| {
62+
let b = stream::iter_ok::<_, ()>((0..N)).fold(b, |b, _n| {
6363
b.lock().map(|mut b| {
6464
*b += 1;
6565
b.unlock()

tests/future_flatten_stream.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ use futures::stream;
99

1010
#[test]
1111
fn successful_future() {
12-
let stream_items = vec![Ok(17), Err(true), Ok(19)];
13-
let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items));
12+
let stream_items = vec![17, 19];
13+
let future_of_a_stream = ok::<_, bool>(stream::iter_ok(stream_items));
1414

1515
let stream = future_of_a_stream.flatten_stream();
1616

1717
let mut iter = stream.wait();
1818
assert_eq!(Ok(17), iter.next().unwrap());
19-
assert_eq!(Err(true), iter.next().unwrap());
2019
assert_eq!(Ok(19), iter.next().unwrap());
2120
assert_eq!(None, iter.next());
2221
}

tests/mpsc.rs

+20
Original file line numberDiff line numberDiff line change
@@ -354,3 +354,23 @@ fn is_ready<T>(res: &AsyncSink<T>) -> bool {
354354
_ => false,
355355
}
356356
}
357+
358+
#[test]
359+
fn try_send() {
360+
const N: usize = 3000;
361+
let (tx, rx) = mpsc::channel(0);
362+
363+
let t = thread::spawn(move || {
364+
for i in 0..N {
365+
loop {
366+
if tx.try_send(i).is_ok() {
367+
break
368+
}
369+
}
370+
}
371+
});
372+
for (i, j) in rx.wait().enumerate() {
373+
assert_eq!(i, j.unwrap());
374+
}
375+
t.join().unwrap();
376+
}

tests/sink.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ fn send() {
4444
fn send_all() {
4545
let v = Vec::new();
4646

47-
let (v, _) = v.send_all(stream::iter(vec![Ok(0), Ok(1)])).wait().unwrap();
47+
let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
4848
assert_eq!(v, vec![0, 1]);
4949

50-
let (v, _) = v.send_all(stream::iter(vec![Ok(2), Ok(3)])).wait().unwrap();
50+
let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
5151
assert_eq!(v, vec![0, 1, 2, 3]);
5252

5353
assert_done(
54-
move || v.send_all(stream::iter(vec![Ok(4), Ok(5)])).map(|(v, _)| v),
54+
move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
5555
Ok(vec![0, 1, 2, 3, 4, 5]));
5656
}
5757

@@ -171,7 +171,7 @@ fn with_as_map() {
171171
// test simple use of with_flat_map
172172
fn with_flat_map() {
173173
let sink = Vec::new().with_flat_map(|item| {
174-
stream::iter(vec!(item; item).into_iter().map(Ok))
174+
stream::iter_ok(vec![item; item])
175175
});
176176
let sink = sink.send(0).wait().unwrap();
177177
let sink = sink.send(1).wait().unwrap();

tests/split.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
extern crate futures;
22

33
use futures::{Future, StartSend, Sink, Stream, Poll};
4-
use futures::stream::iter;
4+
use futures::stream::iter_ok;
55

66
struct Join<T, U>(T, U);
77

@@ -37,7 +37,7 @@ impl<T, U: Sink> Sink for Join<T, U> {
3737
fn test_split() {
3838
let mut dest = Vec::new();
3939
{
40-
let j = Join(iter(vec![Ok(10), Ok(20), Ok(30)]), &mut dest);
40+
let j = Join(iter_ok(vec![10, 20, 30]), &mut dest);
4141
let (sink, stream) = j.split();
4242
let j = sink.reunite(stream).expect("test_split: reunite error");
4343
let (sink, stream) = j.split();

tests/stream.rs

+41-14
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,39 @@ extern crate futures;
44
use futures::{Async, Future, Poll, Sink, Stream};
55
use futures::executor;
66
use futures::future::{err, ok};
7-
use futures::stream::{empty, iter, poll_fn, Peekable};
7+
use futures::stream::{empty, iter_ok, poll_fn, Peekable};
88
use futures::sync::oneshot;
99
use futures::sync::mpsc;
1010

1111
mod support;
1212
use support::*;
1313

14+
pub struct Iter<I> {
15+
iter: I,
16+
}
17+
18+
pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
19+
where J: IntoIterator<Item=Result<T, E>>,
20+
{
21+
Iter {
22+
iter: i.into_iter(),
23+
}
24+
}
25+
26+
impl<I, T, E> Stream for Iter<I>
27+
where I: Iterator<Item=Result<T, E>>,
28+
{
29+
type Item = T;
30+
type Error = E;
31+
32+
fn poll(&mut self) -> Poll<Option<T>, E> {
33+
match self.iter.next() {
34+
Some(Ok(e)) => Ok(Async::Ready(Some(e))),
35+
Some(Err(e)) => Err(e),
36+
None => Ok(Async::Ready(None)),
37+
}
38+
}
39+
}
1440

1541
fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
1642
let (tx, rx) = mpsc::channel(1);
@@ -310,44 +336,45 @@ fn chunks_panic_on_cap_zero() {
310336

311337
#[test]
312338
fn select() {
313-
let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
314-
let b = iter(vec![Ok(4), Ok(5), Ok(6)]);
339+
let a = iter_ok::<_, u32>(vec![1, 2, 3]);
340+
let b = iter_ok(vec![4, 5, 6]);
315341
assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));
316342

317-
let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
318-
let b = iter(vec![Ok(1), Ok(2)]);
343+
let a = iter_ok::<_, u32>(vec![1, 2, 3]);
344+
let b = iter_ok(vec![1, 2]);
319345
assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
320346

321-
let a = iter(vec![Ok(1), Ok(2)]);
322-
let b = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]);
347+
let a = iter_ok(vec![1, 2]);
348+
let b = iter_ok::<_, u32>(vec![1, 2, 3]);
323349
assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
324350
}
325351

326352
#[test]
327353
fn forward() {
328354
let v = Vec::new();
329-
let v = iter(vec![Ok::<_, ()>(0), Ok(1)]).forward(v).wait().unwrap().1;
355+
let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1;
330356
assert_eq!(v, vec![0, 1]);
331357

332-
let v = iter(vec![Ok::<_, ()>(2), Ok(3)]).forward(v).wait().unwrap().1;
358+
let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1;
333359
assert_eq!(v, vec![0, 1, 2, 3]);
334360

335-
assert_done(move || iter(vec![Ok(4), Ok(5)]).forward(v).map(|(_, s)| s),
361+
assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s),
336362
Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5]));
337363
}
338364

339365
#[test]
366+
#[allow(deprecated)]
340367
fn concat() {
341-
let a = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Ok(vec![4, 5, 6]), Ok(vec![7, 8, 9])]);
342-
assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
368+
let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
369+
assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
343370

344371
let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
345-
assert_done(move || b.concat2(), Err(()));
372+
assert_done(move || b.concat(), Err(()));
346373
}
347374

348375
#[test]
349376
fn concat2() {
350-
let a = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Ok(vec![4, 5, 6]), Ok(vec![7, 8, 9])]);
377+
let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
351378
assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
352379

353380
let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);

tests/stream_catch_unwind.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use futures::stream::Stream;
55

66
#[test]
77
fn panic_in_the_middle_of_the_stream() {
8-
let stream = stream::iter::<_, Option<i32>, bool>(vec![
9-
Some(10), None, Some(11)].into_iter().map(Ok));
8+
let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
109

1110
// panic on second element
1211
let stream_panicking = stream.map(|o| o.unwrap());
@@ -19,8 +18,7 @@ fn panic_in_the_middle_of_the_stream() {
1918

2019
#[test]
2120
fn no_panic() {
22-
let stream = stream::iter::<_, _, bool>(vec![
23-
10, 11, 12].into_iter().map(Ok));
21+
let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]);
2422

2523
let mut iter = stream.catch_unwind().wait();
2624

tests/unsync.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures::{Future, Stream, Sink, Async};
88
use futures::unsync::oneshot;
99
use futures::unsync::mpsc::{self, SendError};
1010
use futures::future::lazy;
11-
use futures::stream::{iter, unfold};
11+
use futures::stream::{iter_ok, unfold};
1212

1313
use support::local_executor::Core;
1414

@@ -66,7 +66,7 @@ fn mpsc_tx_err() {
6666
fn mpsc_backpressure() {
6767
let (tx, rx) = mpsc::channel::<i32>(1);
6868
lazy(move || {
69-
iter(vec![1, 2, 3].into_iter().map(Ok))
69+
iter_ok(vec![1, 2, 3])
7070
.forward(tx)
7171
.map_err(|e: SendError<i32>| panic!("{}", e))
7272
.join(rx.take(3).collect().map(|xs| {
@@ -79,7 +79,7 @@ fn mpsc_backpressure() {
7979
fn mpsc_unbounded() {
8080
let (tx, rx) = mpsc::unbounded::<i32>();
8181
lazy(move || {
82-
iter(vec![1, 2, 3].into_iter().map(Ok))
82+
iter_ok(vec![1, 2, 3])
8383
.forward(tx)
8484
.map_err(|e: SendError<i32>| panic!("{}", e))
8585
.join(rx.take(3).collect().map(|xs| {
@@ -103,7 +103,7 @@ fn mpsc_send_unpark() {
103103
let core = Core::new();
104104
let (tx, rx) = mpsc::channel::<i32>(1);
105105
let (donetx, donerx) = oneshot::channel();
106-
core.spawn(iter(vec![1, 2].into_iter().map(Ok)).forward(tx)
106+
core.spawn(iter_ok(vec![1, 2]).forward(tx)
107107
.then(|x: Result<_, SendError<i32>>| {
108108
assert!(x.is_err());
109109
donetx.send(()).unwrap();

0 commit comments

Comments
 (0)