Skip to content

Commit 2ddc413

Browse files
authored
Merge pull request #779 from aturon/join-handle
Introduce top-level `spawn` combinators
2 parents 3e5aa4b + 732dabb commit 2ddc413

File tree

3 files changed

+150
-0
lines changed

3 files changed

+150
-0
lines changed

futures-core/src/task/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ impl<'a> Context<'a> {
6969
}
7070

7171
if_std! {
72+
use std::boxed::Box;
73+
use Future;
74+
7275
impl<'a> Context<'a> {
7376
/// TODO: dox
7477
pub fn new(map: &'a mut LocalMap, waker: &'a Waker, executor: &'a mut Executor) -> Context<'a> {
@@ -79,6 +82,13 @@ if_std! {
7982
pub fn executor(&mut self) -> &mut Executor {
8083
self.executor
8184
}
85+
86+
/// TODO: dox
87+
pub fn spawn<F>(&mut self, f: F)
88+
where F: Future<Item = (), Error = ()> + 'static + Send
89+
{
90+
self.executor.spawn(Box::new(f)).unwrap()
91+
}
8292
}
8393
}
8494

futures-executor/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ if_std! {
3232

3333
mod enter;
3434
pub use enter::{enter, Enter, EnterError};
35+
36+
mod spawn;
37+
pub use spawn::{spawn, Spawn, spawn_with_handle, SpawnWithHandle};
3538
}

futures-executor/src/spawn.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
use futures_core::{Future, Async, Poll};
2+
use futures_core::never::Never;
3+
use futures_core::task::{self, Context};
4+
use futures_channel::oneshot::{channel, Sender, Receiver};
5+
use futures_util::FutureExt;
6+
7+
use std::thread;
8+
use std::sync::Arc;
9+
use std::sync::atomic::Ordering;
10+
use std::panic::{self, AssertUnwindSafe};
11+
use std::sync::atomic::AtomicBool;
12+
13+
/// dox
14+
#[derive(Debug)]
15+
pub struct Spawn<F>(Option<F>);
16+
17+
/// TODO: dox
18+
pub fn spawn<F>(f: F) -> Spawn<F>
19+
where F: Future<Item = (), Error = ()> + 'static + Send
20+
{
21+
Spawn(Some(f))
22+
}
23+
24+
impl<F: Future<Item = (), Error = ()> + Send + 'static> Future for Spawn<F> {
25+
type Item = ();
26+
type Error = Never;
27+
fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
28+
cx.spawn(self.0.take().unwrap());
29+
Ok(Async::Ready(()))
30+
}
31+
}
32+
33+
/// dox
34+
#[derive(Debug)]
35+
pub struct SpawnWithHandle<F>(Option<F>);
36+
37+
/// TODO: dox
38+
pub fn spawn_with_handle<F>(f: F) -> SpawnWithHandle<F>
39+
where F: Future + 'static + Send, F::Item: Send, F::Error: Send
40+
{
41+
SpawnWithHandle(Some(f))
42+
}
43+
44+
impl<F> Future for SpawnWithHandle<F>
45+
where F: Future<Item = (), Error = ()> + Send + 'static,
46+
F::Item: Send,
47+
F::Error: Send,
48+
{
49+
type Item = JoinHandle<F::Item, F::Error>;
50+
type Error = Never;
51+
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Never> {
52+
let (tx, rx) = channel();
53+
let keep_running_flag = Arc::new(AtomicBool::new(false));
54+
// AssertUnwindSafe is used here because `Send + 'static` is basically
55+
// an alias for an implementation of the `UnwindSafe` trait but we can't
56+
// express that in the standard library right now.
57+
let sender = MySender {
58+
fut: AssertUnwindSafe(self.0.take().unwrap()).catch_unwind(),
59+
tx: Some(tx),
60+
keep_running_flag: keep_running_flag.clone(),
61+
};
62+
63+
cx.spawn(sender);
64+
Ok(Async::Ready(JoinHandle {
65+
inner: rx ,
66+
keep_running_flag: keep_running_flag.clone()
67+
}))
68+
}
69+
}
70+
71+
struct MySender<F, T> {
72+
fut: F,
73+
tx: Option<Sender<T>>,
74+
keep_running_flag: Arc<AtomicBool>,
75+
}
76+
77+
/// The type of future returned from the `ThreadPool::spawn` function, which
78+
/// proxies the futures running on the thread pool.
79+
///
80+
/// This future will resolve in the same way as the underlying future, and it
81+
/// will propagate panics.
82+
#[must_use]
83+
#[derive(Debug)]
84+
pub struct JoinHandle<T, E> {
85+
inner: Receiver<thread::Result<Result<T, E>>>,
86+
keep_running_flag: Arc<AtomicBool>,
87+
}
88+
89+
impl<T, E> JoinHandle<T, E> {
90+
/// Drop this future without canceling the underlying future.
91+
///
92+
/// When `JoinHandle` is dropped, `ThreadPool` will try to abort the underlying
93+
/// future. This function can be used when user wants to drop but keep
94+
/// executing the underlying future.
95+
pub fn forget(self) {
96+
self.keep_running_flag.store(true, Ordering::SeqCst);
97+
}
98+
}
99+
100+
impl<T: Send + 'static, E: Send + 'static> Future for JoinHandle<T, E> {
101+
type Item = T;
102+
type Error = E;
103+
104+
fn poll(&mut self, cx: &mut task::Context) -> Poll<T, E> {
105+
match self.inner.poll(cx).expect("cannot poll JoinHandle twice") {
106+
Async::Ready(Ok(Ok(e))) => Ok(e.into()),
107+
Async::Ready(Ok(Err(e))) => Err(e),
108+
Async::Ready(Err(e)) => panic::resume_unwind(e),
109+
Async::Pending => Ok(Async::Pending),
110+
}
111+
}
112+
}
113+
114+
impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
115+
type Item = ();
116+
type Error = ();
117+
118+
fn poll(&mut self, cx: &mut task::Context) -> Poll<(), ()> {
119+
if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel(cx) {
120+
if !self.keep_running_flag.load(Ordering::SeqCst) {
121+
// Cancelled, bail out
122+
return Ok(().into())
123+
}
124+
}
125+
126+
let res = match self.fut.poll(cx) {
127+
Ok(Async::Ready(e)) => Ok(e),
128+
Ok(Async::Pending) => return Ok(Async::Pending),
129+
Err(e) => Err(e),
130+
};
131+
132+
// if the receiving end has gone away then that's ok, we just ignore the
133+
// send error here.
134+
drop(self.tx.take().unwrap().send(res));
135+
Ok(Async::Ready(()))
136+
}
137+
}

0 commit comments

Comments
 (0)