Skip to content

Commit eb10d6e

Browse files
MajorBreakfastcramertj
authored andcommitted
Introduce LocalTaskObj
1 parent 5c3cff4 commit eb10d6e

File tree

9 files changed

+50
-59
lines changed

9 files changed

+50
-59
lines changed

futures-core/src/executor.rs

Lines changed: 0 additions & 4 deletions
This file was deleted.

futures-core/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,3 @@ pub use stream::Stream;
8383

8484
pub mod task;
8585
pub use task::Poll;
86-
87-
pub mod executor;

futures-core/src/task/mod.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,24 @@
22
33
use Future;
44

5-
pub use core::task::{UnsafeWake, Waker, LocalWaker};
5+
mod poll;
6+
pub use self::poll::Poll;
7+
8+
pub use core::task::{
9+
Context, Executor,
10+
Waker, LocalWaker, UnsafeWake,
11+
TaskObj, LocalTaskObj, UnsafeTask,
12+
SpawnErrorKind, SpawnObjError, SpawnLocalObjError,
13+
};
614

715
#[cfg(feature = "std")]
816
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};
917

10-
pub use core::task::Context;
11-
12-
mod poll;
13-
pub use self::poll::Poll;
14-
1518
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
1619
mod atomic_waker;
1720
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
1821
pub use self::atomic_waker::AtomicWaker;
1922

20-
pub use core::task::{TaskObj, UnsafeTask};
21-
2223
if_std! {
2324
use std::boxed::PinBox;
2425

futures-executor/src/local_pool.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::sync::Arc;
77
use std::thread::{self, Thread};
88

99
use futures_core::{Future, Poll, Stream};
10-
use futures_core::task::{self, Context, LocalWaker, TaskObj, Wake};
11-
use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind};
10+
use futures_core::task::{
11+
self, Context, LocalWaker, TaskObj, LocalTaskObj, Wake,
12+
Executor, SpawnObjError, SpawnLocalObjError, SpawnErrorKind};
1213
use futures_util::stream::FuturesUnordered;
1314
use futures_util::stream::StreamExt;
1415

@@ -27,7 +28,7 @@ use ThreadPool;
2728
/// single-threaded, it supports a special form of task spawning for non-`Send`
2829
/// futures, via [`spawn_local`](LocalExecutor::spawn_local).
2930
pub struct LocalPool {
30-
pool: FuturesUnordered<TaskObj>,
31+
pool: FuturesUnordered<LocalTaskObj>,
3132
incoming: Rc<Incoming>,
3233
}
3334

@@ -38,7 +39,7 @@ pub struct LocalExecutor {
3839
incoming: Weak<Incoming>,
3940
}
4041

41-
type Incoming = RefCell<Vec<TaskObj>>;
42+
type Incoming = RefCell<Vec<LocalTaskObj>>;
4243

4344
pub(crate) struct ThreadNotify {
4445
thread: Thread
@@ -255,7 +256,7 @@ impl<S: Stream> Iterator for BlockingStream<S> where S: Unpin {
255256
impl Executor for LocalExecutor {
256257
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> {
257258
if let Some(incoming) = self.incoming.upgrade() {
258-
incoming.borrow_mut().push(task);
259+
incoming.borrow_mut().push(task.into());
259260
Ok(())
260261
} else {
261262
Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() })
@@ -272,15 +273,15 @@ impl Executor for LocalExecutor {
272273
}
273274

274275
impl LocalExecutor {
275-
/*
276276
/// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
277-
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnObjError>
278-
where F: Future<Item = (), Error = Never> + 'static
277+
pub fn spawn_local_obj(&mut self, task: LocalTaskObj)
278+
-> Result<(), SpawnLocalObjError>
279279
{
280-
self.spawn_task(Task {
281-
fut: Box::new(f),
282-
map: LocalMap::new(),
283-
})
280+
if let Some(incoming) = self.incoming.upgrade() {
281+
incoming.borrow_mut().push(task);
282+
Ok(())
283+
} else {
284+
Err(SpawnLocalObjError{ task, kind: SpawnErrorKind::shutdown() })
285+
}
284286
}
285-
*/
286287
}

futures-executor/src/thread_pool.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use std::thread;
88
use std::fmt;
99

1010
use futures_core::*;
11-
use futures_core::task::{self, Wake, TaskObj};
12-
use futures_core::executor::{Executor, SpawnObjError};
11+
use futures_core::task::{self, Wake, TaskObj, Executor, SpawnObjError};
1312

1413
use enter;
1514
use num_cpus;

futures-executor/tests/local_pool.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ extern crate futures;
66
extern crate futures_executor;
77
extern crate futures_channel;
88

9+
use std::boxed::PinBox;
910
use std::cell::{Cell, RefCell};
10-
use std::sync::Arc;
11+
use std::mem::PinMut;
12+
use std::rc::Rc;
1113
use std::thread;
1214
use std::time::Duration;
13-
use std::mem::PinMut;
1415

1516
use futures::future::lazy;
1617
use futures::prelude::*;
17-
use futures::executor::Executor;
18-
use futures::task;
18+
use futures::task::{self, Executor};
1919
use futures_executor::*;
2020
use futures_channel::oneshot;
2121

22-
struct Pending(Arc<()>);
22+
struct Pending(Rc<()>);
2323

2424
impl Future for Pending {
2525
type Output = ();
@@ -30,7 +30,7 @@ impl Future for Pending {
3030
}
3131

3232
fn pending() -> Pending {
33-
Pending(Arc::new(()))
33+
Pending(Rc::new(()))
3434
}
3535

3636
#[test]
@@ -54,7 +54,7 @@ fn run_until_single_future() {
5454
fn run_until_ignores_spawned() {
5555
let mut pool = LocalPool::new();
5656
let mut exec = pool.executor();
57-
exec.spawn_obj(Box::new(pending()).into()).unwrap(); // This test used the currently not implemented spawn_local method before
57+
exec.spawn_local_obj(PinBox::new(pending()).into()).unwrap();
5858
assert_eq!(pool.run_until(lazy(|_| ()), &mut exec), ());
5959
}
6060

@@ -63,14 +63,13 @@ fn run_until_executes_spawned() {
6363
let (tx, rx) = oneshot::channel();
6464
let mut pool = LocalPool::new();
6565
let mut exec = pool.executor();
66-
exec.spawn_obj(Box::new(lazy(move |_| { // This test used the currently not implemented spawn_local method before
66+
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
6767
tx.send(()).unwrap();
6868
()
6969
})).into()).unwrap();
7070
pool.run_until(rx, &mut exec).unwrap();
7171
}
7272

73-
/* // This test does not work because it relies on spawn_local which is not implemented
7473
#[test]
7574
fn run_executes_spawned() {
7675
let cnt = Rc::new(Cell::new(0));
@@ -80,8 +79,8 @@ fn run_executes_spawned() {
8079
let mut exec = pool.executor();
8180
let mut exec2 = pool.executor();
8281

83-
exec.spawn_local(Box::new(lazy(move |_| {
84-
exec2.spawn_local(Box::new(lazy(move |_| {
82+
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
83+
exec2.spawn_local_obj(PinBox::new(lazy(move |_| {
8584
cnt2.set(cnt2.get() + 1);
8685
()
8786
})).into()).unwrap();
@@ -105,10 +104,10 @@ fn run_spawn_many() {
105104

106105
for _ in 0..ITER {
107106
let cnt = cnt.clone();
108-
exec.spawn_local(Box::new(lazy(move |_| {
107+
exec.spawn_local_obj(PinBox::new(lazy(move |_| {
109108
cnt.set(cnt.get() + 1);
110109
()
111-
}))).unwrap();
110+
})).into()).unwrap();
112111
}
113112

114113
pool.run(&mut exec);
@@ -122,12 +121,11 @@ fn nesting_run() {
122121
let mut pool = LocalPool::new();
123122
let mut exec = pool.executor();
124123

125-
exec.spawn(Box::new(lazy(|_| {
124+
exec.spawn_obj(PinBox::new(lazy(|_| {
126125
let mut pool = LocalPool::new();
127126
let mut exec = pool.executor();
128127
pool.run(&mut exec);
129-
Ok(())
130-
}))).unwrap();
128+
})).into()).unwrap();
131129
pool.run(&mut exec);
132130
}
133131

@@ -143,7 +141,7 @@ fn tasks_are_scheduled_fairly() {
143141
impl Future for Spin {
144142
type Output = ();
145143

146-
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
144+
fn poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
147145
let mut state = self.state.borrow_mut();
148146

149147
if self.idx == 0 {
@@ -170,16 +168,16 @@ fn tasks_are_scheduled_fairly() {
170168
let mut pool = LocalPool::new();
171169
let mut exec = pool.executor();
172170

173-
exec.spawn_local(Box::new(Spin {
171+
exec.spawn_local_obj(PinBox::new(Spin {
174172
state: state.clone(),
175173
idx: 0,
176-
})).unwrap();
174+
}).into()).unwrap();
177175

178-
exec.spawn_local(Box::new(Spin {
176+
exec.spawn_local_obj(PinBox::new(Spin {
179177
state: state,
180178
idx: 1,
181-
})).unwrap();
179+
}).into()).unwrap();
182180

183181
pool.run(&mut exec);
184182
}
185-
*/
183+

futures-util/src/future/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ pub trait FutureExt: Future {
605605
#[cfg(feature = "std")]
606606
fn with_executor<E>(self, executor: E) -> WithExecutor<Self, E>
607607
where Self: Sized,
608-
E: ::futures_core::executor::Executor
608+
E: ::futures_core::task::Executor
609609
{
610610
with_executor::new(self, executor)
611611
}

futures-util/src/future/with_executor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use core::marker::Unpin;
22
use core::mem::PinMut;
33

44
use futures_core::{Future, Poll};
5-
use futures_core::task;
6-
use futures_core::executor::Executor;
5+
use futures_core::task::{self, Executor};
76

87
/// Future for the `with_executor` combinator, assigning an executor
98
/// to be used when spawning other futures.

futures/src/lib.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ pub mod executor {
175175
ThreadPool, ThreadPoolBuilder, JoinHandle,
176176
block_on, block_on_stream, enter, spawn, spawn_with_handle
177177
};
178-
pub use futures_core::executor::{SpawnObjError, Executor};
179178
}
180179

181180
pub mod future {
@@ -254,9 +253,6 @@ pub mod prelude {
254253
Future, TryFuture, Stream, Poll, task
255254
};
256255

257-
#[cfg(feature = "std")]
258-
pub use futures_core::executor::Executor;
259-
260256
#[cfg(feature = "nightly")]
261257
pub use futures_stable::{
262258
StableFuture,
@@ -381,7 +377,10 @@ pub mod task {
381377
//! executors or dealing with synchronization issues around task wakeup.
382378
383379
pub use futures_core::task::{
384-
Context, Waker, UnsafeWake
380+
Context, Waker, UnsafeWake,
381+
Executor,
382+
TaskObj, LocalTaskObj,
383+
SpawnErrorKind, SpawnObjError, SpawnLocalObjError,
385384
};
386385

387386
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]

0 commit comments

Comments
 (0)