Skip to content

Commit 8a10de5

Browse files
authored
Merge pull request #546 from tmccombs/unsync-stream-spawn
Unsync stream spawn
2 parents 3c067a7 + ed239fa commit 8a10de5

File tree

5 files changed

+188
-20
lines changed

5 files changed

+188
-20
lines changed

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ pub use future::{
200200
mod lock;
201201
mod task_impl;
202202

203+
mod resultstream;
204+
203205
pub mod task;
204206
pub mod executor;
205207
#[cfg(feature = "use_std")]

src/resultstream.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// This should really be in the the stream module,
2+
// but `pub(crate)` isn't available until Rust 1.18,
3+
// and pre-1.18 there isn't a really good way to have a sub-module
4+
// available to the crate, but not without it.
5+
use core::marker::PhantomData;
6+
7+
use {Poll, Async};
8+
use stream::Stream;
9+
10+
11+
/// A stream combinator used to convert a `Stream<Item=T,Error=E>`
12+
/// to a `Stream<Item=Result<T,E>>`.
13+
///
14+
/// A poll on this stream will never return an `Err`. As such the
15+
/// actual error type is parameterized, so it can match whatever error
16+
/// type is needed.
17+
///
18+
/// This structure is produced by the `Sream::results` method.
19+
#[derive(Debug)]
20+
#[must_use = "streams do nothing unless polled"]
21+
pub struct Results<S: Stream, E> {
22+
inner: S,
23+
phantom: PhantomData<E>
24+
}
25+
26+
pub fn new<S, E>(s: S) -> Results<S, E> where S: Stream {
27+
Results {
28+
inner: s,
29+
phantom: PhantomData
30+
}
31+
}
32+
33+
impl<S: Stream, E> Stream for Results<S, E> {
34+
type Item = Result<S::Item, S::Error>;
35+
type Error = E;
36+
37+
fn poll(&mut self) -> Poll<Option<Result<S::Item, S::Error>>, E> {
38+
match self.inner.poll() {
39+
Ok(Async::Ready(Some(item))) => Ok(Async::Ready(Some(Ok(item)))),
40+
Err(e) => Ok(Async::Ready(Some(Err(e)))),
41+
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
42+
Ok(Async::NotReady) => Ok(Async::NotReady)
43+
}
44+
}
45+
}
46+

src/sync/mpsc/mod.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use sync::mpsc::queue::{Queue, PopResult};
8080
use task::{self, Task};
8181
use future::Executor;
8282
use sink::SendAll;
83+
use resultstream::{self, Results};
8384
use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
8485

8586
mod queue;
@@ -841,11 +842,9 @@ pub struct SpawnHandle<Item, Error> {
841842

842843
/// Type of future which `Executor` instances must be able to execute for `spawn`.
843844
pub struct Execute<S: Stream> {
844-
inner: SendAll<Sender<Result<S::Item, S::Error>>, ResultStream<S>>
845+
inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>
845846
}
846847

847-
struct ResultStream<S: Stream>(S);
848-
849848
/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
850849
/// returning a handle representing the remote stream.
851850
///
@@ -869,7 +868,7 @@ pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Ite
869868
{
870869
let (tx, rx) = channel(buffer);
871870
executor.execute(Execute {
872-
inner: tx.send_all(ResultStream(stream))
871+
inner: tx.send_all(resultstream::new(stream))
873872
}).expect("failed to spawn stream");
874873
SpawnHandle {
875874
rx: rx
@@ -900,7 +899,7 @@ pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S:
900899
{
901900
let (tx, rx) = channel2(None);
902901
executor.execute(Execute {
903-
inner: tx.send_all(ResultStream(stream))
902+
inner: tx.send_all(resultstream::new(stream))
904903
}).expect("failed to spawn stream");
905904
SpawnHandle {
906905
rx: rx
@@ -948,19 +947,6 @@ impl<S: Stream> fmt::Debug for Execute<S> {
948947
}
949948
}
950949

951-
impl<S: Stream> Stream for ResultStream<S> {
952-
type Item = Result<S::Item, S::Error>;
953-
type Error = SendError<Self::Item>;
954-
fn poll(&mut self) -> Poll<Option<Result<S::Item, S::Error>>, Self::Error> {
955-
match self.0.poll() {
956-
Ok(Async::Ready(Some(item))) => Ok(Async::Ready(Some(Ok(item)))),
957-
Err(e) => Ok(Async::Ready(Some(Err(e)))),
958-
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
959-
Ok(Async::NotReady) => Ok(Async::NotReady)
960-
}
961-
}
962-
}
963-
964950
/*
965951
*
966952
* ===== impl Inner =====

src/unsync/mpsc.rs

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use std::mem;
1313
use std::rc::{Rc, Weak};
1414

1515
use task::{self, Task};
16-
use {Async, AsyncSink, Poll, StartSend, Sink, Stream};
16+
use future::Executor;
17+
use sink::SendAll;
18+
use resultstream::{self, Results};
19+
use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
1720

1821
/// Creates a bounded in-memory channel with buffered storage.
1922
///
@@ -330,3 +333,125 @@ impl<T> SendError<T> {
330333
self.0
331334
}
332335
}
336+
337+
/// Handle returned from the `spawn` function.
338+
///
339+
/// This handle is a stream that proxies a stream on a seperate `Executor`.
340+
/// Created through the `mpsc::spawn` function, this handle will produce
341+
/// the same values as the proxied stream, as they are produced in the executor,
342+
/// and uses a limited buffer to exert back-pressure on the remote stream.
343+
///
344+
/// If this handle is dropped, then the stream will no longer be polled and is
345+
/// scheduled to be dropped.
346+
pub struct SpawnHandle<Item, Error> {
347+
inner: Receiver<Result<Item, Error>>
348+
}
349+
350+
/// Type of future which `Executor` instances must be able to execut for `spawn`.
351+
pub struct Execute<S: Stream> {
352+
inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>
353+
}
354+
355+
/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
356+
/// returning a handle representing the remote stream.
357+
///
358+
/// The `stream` will be canceled if the `SpawnHandle` is dropped.
359+
///
360+
/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
361+
/// When `stream` has additional items available, then the `SpawnHandle`
362+
/// will have those same items available.
363+
///
364+
/// At most `buffer + 1` elements will be buffered at a time. If the buffer
365+
/// is full, then `stream` will stop progressing until more space is available.
366+
/// This allows the `SpawnHandle` to exert backpressure on the `stream`.
367+
///
368+
/// # Panics
369+
///
370+
/// This function will panic if `executor` is unable spawn a `Future` containing
371+
/// the entirety of the `stream`.
372+
pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
373+
where S: Stream,
374+
E: Executor<Execute<S>>
375+
{
376+
let (tx, rx) = channel(buffer);
377+
executor.execute(Execute {
378+
inner: tx.send_all(resultstream::new(stream))
379+
}).expect("failed to spawn stream");
380+
SpawnHandle {
381+
inner: rx
382+
}
383+
}
384+
385+
/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
386+
/// returning a handle representing the remote stream, with unbounded buffering.
387+
///
388+
/// The `stream` will be canceled if the `SpawnHandle` is dropped.
389+
///
390+
/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
391+
/// When `stream` has additional items available, then the `SpawnHandle`
392+
/// will have those same items available.
393+
///
394+
/// An unbounded buffer is used, which means that values will be buffered as
395+
/// fast as `stream` can produce them, without any backpressure. Therefore, if
396+
/// `stream` is an infinite stream, it can use an unbounded amount of memory, and
397+
/// potentially hog CPU resources. In particular, if `stream` is infinite
398+
/// and doesn't ever yield (by returnin `Async::NotReady` from `poll`), it
399+
/// will result in an infinite loop.
400+
///
401+
/// # Panics
402+
///
403+
/// This function will panic if `executor` is unable spawn a `Future` containing
404+
/// the entirety of the `stream`.
405+
pub fn spawn_unbounded<S,E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
406+
where S: Stream,
407+
E: Executor<Execute<S>>
408+
{
409+
let (tx, rx) = channel_(None);
410+
executor.execute(Execute {
411+
inner: tx.send_all(resultstream::new(stream))
412+
}).expect("failed to spawn stream");
413+
SpawnHandle {
414+
inner: rx
415+
}
416+
}
417+
418+
impl<I, E> Stream for SpawnHandle<I, E> {
419+
type Item = I;
420+
type Error = E;
421+
422+
fn poll(&mut self) -> Poll<Option<I>, E> {
423+
match self.inner.poll() {
424+
Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
425+
Ok(Async::Ready(Some(Err(e)))) => Err(e),
426+
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
427+
Ok(Async::NotReady) => Ok(Async::NotReady),
428+
Err(_) => unreachable!("mpsc::Receiver should never return Err"),
429+
}
430+
}
431+
}
432+
433+
impl<I, E> fmt::Debug for SpawnHandle<I, E> {
434+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
435+
f.debug_struct("SpawnHandle")
436+
.finish()
437+
}
438+
}
439+
440+
impl<S: Stream> Future for Execute<S> {
441+
type Item = ();
442+
type Error = ();
443+
444+
fn poll(&mut self) -> Poll<(), ()> {
445+
match self.inner.poll() {
446+
Ok(Async::NotReady) => Ok(Async::NotReady),
447+
_ => Ok(Async::Ready(()))
448+
}
449+
}
450+
}
451+
452+
impl<S: Stream> fmt::Debug for Execute<S> {
453+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
454+
f.debug_struct("Execute")
455+
.finish()
456+
}
457+
}

tests/unsync.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures::{Future, Stream, Sink, Async};
88
use futures::unsync::oneshot;
99
use futures::unsync::mpsc::{self, SendError};
1010
use futures::future::lazy;
11-
use futures::stream::iter;
11+
use futures::stream::{iter, unfold};
1212

1313
use support::local_executor::Core;
1414

@@ -112,3 +112,12 @@ fn mpsc_send_unpark() {
112112
core.spawn(lazy(move || { let _ = rx; Ok(()) }));
113113
core.run(donerx).unwrap();
114114
}
115+
116+
#[test]
117+
fn spawn_sends_items() {
118+
let core = Core::new();
119+
let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1))));
120+
let rx = mpsc::spawn(stream, &core, 1);
121+
assert_eq!(core.run(rx.take(4).collect()).unwrap(),
122+
[0, 1, 2, 3]);
123+
}

0 commit comments

Comments
 (0)