Skip to content

Introduce LocalTaskObj #1046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions futures-core/src/executor.rs

This file was deleted.

2 changes: 0 additions & 2 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,3 @@ pub use stream::Stream;

pub mod task;
pub use task::Poll;

pub mod executor;
17 changes: 9 additions & 8 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@

use Future;

pub use core::task::{UnsafeWake, Waker, LocalWaker};
mod poll;
pub use self::poll::Poll;

pub use core::task::{
Context, Executor,
Waker, LocalWaker, UnsafeWake,
TaskObj, LocalTaskObj, UnsafeTask,
SpawnErrorKind, SpawnObjError, SpawnLocalObjError,
};

#[cfg(feature = "std")]
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};

pub use core::task::Context;

mod poll;
pub use self::poll::Poll;

#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
mod atomic_waker;
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
pub use self::atomic_waker::AtomicWaker;

pub use core::task::{TaskObj, UnsafeTask};

if_std! {
use std::boxed::PinBox;

Expand Down
27 changes: 14 additions & 13 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::sync::Arc;
use std::thread::{self, Thread};

use futures_core::{Future, Poll, Stream};
use futures_core::task::{self, Context, LocalWaker, TaskObj, Wake};
use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind};
use futures_core::task::{
self, Context, LocalWaker, TaskObj, LocalTaskObj, Wake,
Executor, SpawnObjError, SpawnLocalObjError, SpawnErrorKind};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;

Expand All @@ -27,7 +28,7 @@ use ThreadPool;
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local`](LocalExecutor::spawn_local).
pub struct LocalPool {
pool: FuturesUnordered<TaskObj>,
pool: FuturesUnordered<LocalTaskObj>,
incoming: Rc<Incoming>,
}

Expand All @@ -38,7 +39,7 @@ pub struct LocalExecutor {
incoming: Weak<Incoming>,
}

type Incoming = RefCell<Vec<TaskObj>>;
type Incoming = RefCell<Vec<LocalTaskObj>>;

pub(crate) struct ThreadNotify {
thread: Thread
Expand Down Expand Up @@ -255,7 +256,7 @@ impl<S: Stream> Iterator for BlockingStream<S> where S: Unpin {
impl Executor for LocalExecutor {
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(task);
incoming.borrow_mut().push(task.into());
Ok(())
} else {
Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() })
Expand All @@ -272,15 +273,15 @@ impl Executor for LocalExecutor {
}

impl LocalExecutor {
/*
/// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnObjError>
where F: Future<Item = (), Error = Never> + 'static
pub fn spawn_local_obj(&mut self, task: LocalTaskObj)
-> Result<(), SpawnLocalObjError>
{
self.spawn_task(Task {
fut: Box::new(f),
map: LocalMap::new(),
})
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(task);
Ok(())
} else {
Err(SpawnLocalObjError{ task, kind: SpawnErrorKind::shutdown() })
}
}
*/
}
3 changes: 1 addition & 2 deletions futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use std::thread;
use std::fmt;

use futures_core::*;
use futures_core::task::{self, Wake, TaskObj};
use futures_core::executor::{Executor, SpawnObjError};
use futures_core::task::{self, Wake, TaskObj, Executor, SpawnObjError};

use enter;
use num_cpus;
Expand Down
42 changes: 20 additions & 22 deletions futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ extern crate futures;
extern crate futures_executor;
extern crate futures_channel;

use std::boxed::PinBox;
use std::cell::{Cell, RefCell};
use std::sync::Arc;
use std::mem::PinMut;
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use std::mem::PinMut;

use futures::future::lazy;
use futures::prelude::*;
use futures::executor::Executor;
use futures::task;
use futures::task::{self, Executor};
use futures_executor::*;
use futures_channel::oneshot;

struct Pending(Arc<()>);
struct Pending(Rc<()>);

impl Future for Pending {
type Output = ();
Expand All @@ -30,7 +30,7 @@ impl Future for Pending {
}

fn pending() -> Pending {
Pending(Arc::new(()))
Pending(Rc::new(()))
}

#[test]
Expand All @@ -54,7 +54,7 @@ fn run_until_single_future() {
fn run_until_ignores_spawned() {
let mut pool = LocalPool::new();
let mut exec = pool.executor();
exec.spawn_obj(Box::new(pending()).into()).unwrap(); // This test used the currently not implemented spawn_local method before
exec.spawn_local_obj(PinBox::new(pending()).into()).unwrap();
assert_eq!(pool.run_until(lazy(|_| ()), &mut exec), ());
}

Expand All @@ -63,14 +63,13 @@ fn run_until_executes_spawned() {
let (tx, rx) = oneshot::channel();
let mut pool = LocalPool::new();
let mut exec = pool.executor();
exec.spawn_obj(Box::new(lazy(move |_| { // This test used the currently not implemented spawn_local method before
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
tx.send(()).unwrap();
()
})).into()).unwrap();
pool.run_until(rx, &mut exec).unwrap();
}

/* // This test does not work because it relies on spawn_local which is not implemented
#[test]
fn run_executes_spawned() {
let cnt = Rc::new(Cell::new(0));
Expand All @@ -80,8 +79,8 @@ fn run_executes_spawned() {
let mut exec = pool.executor();
let mut exec2 = pool.executor();

exec.spawn_local(Box::new(lazy(move |_| {
exec2.spawn_local(Box::new(lazy(move |_| {
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
exec2.spawn_local_obj(PinBox::new(lazy(move |_| {
cnt2.set(cnt2.get() + 1);
()
})).into()).unwrap();
Expand All @@ -105,10 +104,10 @@ fn run_spawn_many() {

for _ in 0..ITER {
let cnt = cnt.clone();
exec.spawn_local(Box::new(lazy(move |_| {
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
cnt.set(cnt.get() + 1);
()
}))).unwrap();
})).into()).unwrap();
}

pool.run(&mut exec);
Expand All @@ -122,12 +121,11 @@ fn nesting_run() {
let mut pool = LocalPool::new();
let mut exec = pool.executor();

exec.spawn(Box::new(lazy(|_| {
exec.spawn_obj(PinBox::new(lazy(|_| {
let mut pool = LocalPool::new();
let mut exec = pool.executor();
pool.run(&mut exec);
Ok(())
}))).unwrap();
})).into()).unwrap();
pool.run(&mut exec);
}

Expand All @@ -143,7 +141,7 @@ fn tasks_are_scheduled_fairly() {
impl Future for Spin {
type Output = ();

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
fn poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
let mut state = self.state.borrow_mut();

if self.idx == 0 {
Expand All @@ -170,16 +168,16 @@ fn tasks_are_scheduled_fairly() {
let mut pool = LocalPool::new();
let mut exec = pool.executor();

exec.spawn_local(Box::new(Spin {
exec.spawn_local_obj(PinBox::new(Spin {
state: state.clone(),
idx: 0,
})).unwrap();
}).into()).unwrap();

exec.spawn_local(Box::new(Spin {
exec.spawn_local_obj(PinBox::new(Spin {
state: state,
idx: 1,
})).unwrap();
}).into()).unwrap();

pool.run(&mut exec);
}
*/

2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ pub trait FutureExt: Future {
#[cfg(feature = "std")]
fn with_executor<E>(self, executor: E) -> WithExecutor<Self, E>
where Self: Sized,
E: ::futures_core::executor::Executor
E: ::futures_core::task::Executor
{
with_executor::new(self, executor)
}
Expand Down
3 changes: 1 addition & 2 deletions futures-util/src/future/with_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use core::marker::Unpin;
use core::mem::PinMut;

use futures_core::{Future, Poll};
use futures_core::task;
use futures_core::executor::Executor;
use futures_core::task::{self, Executor};

/// Future for the `with_executor` combinator, assigning an executor
/// to be used when spawning other futures.
Expand Down
9 changes: 4 additions & 5 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ pub mod executor {
ThreadPool, ThreadPoolBuilder, JoinHandle,
block_on, block_on_stream, enter, spawn, spawn_with_handle
};
pub use futures_core::executor::{SpawnObjError, Executor};
}

pub mod future {
Expand Down Expand Up @@ -254,9 +253,6 @@ pub mod prelude {
Future, TryFuture, Stream, Poll, task
};

#[cfg(feature = "std")]
pub use futures_core::executor::Executor;

#[cfg(feature = "nightly")]
pub use futures_stable::{
StableFuture,
Expand Down Expand Up @@ -381,7 +377,10 @@ pub mod task {
//! executors or dealing with synchronization issues around task wakeup.

pub use futures_core::task::{
Context, Waker, UnsafeWake
Context, Waker, UnsafeWake,
Executor,
TaskObj, LocalTaskObj,
SpawnErrorKind, SpawnObjError, SpawnLocalObjError,
};

#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
Expand Down