Skip to content

Commit fc5f44c

Browse files
Marwescramertj
authored andcommitted
fix: [Local]SpawnExt should take &self as their base traits
Seems like an oversight in #1950 Technically a BREAKING CHANGE
1 parent 438e53a commit fc5f44c

File tree

9 files changed

+86
-82
lines changed

9 files changed

+86
-82
lines changed

futures-executor/src/local_pool.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use crate::enter;
22
use futures_core::future::Future;
33
use futures_core::stream::Stream;
44
use futures_core::task::{Context, Poll};
5-
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
65
use futures_task::{waker_ref, ArcWake};
6+
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7+
use futures_util::pin_mut;
78
use futures_util::stream::FuturesUnordered;
89
use futures_util::stream::StreamExt;
9-
use futures_util::pin_mut;
10-
use std::cell::{RefCell};
10+
use std::cell::RefCell;
1111
use std::ops::{Deref, DerefMut};
1212
use std::rc::{Rc, Weak};
1313
use std::sync::Arc;
@@ -40,7 +40,7 @@ pub struct LocalSpawner {
4040
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
4141

4242
pub(crate) struct ThreadNotify {
43-
thread: Thread
43+
thread: Thread,
4444
}
4545

4646
thread_local! {
@@ -58,9 +58,10 @@ impl ArcWake for ThreadNotify {
5858
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
5959
// turn.
6060
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
61-
let _enter = enter()
62-
.expect("cannot execute `LocalPool` executor from within \
63-
another executor");
61+
let _enter = enter().expect(
62+
"cannot execute `LocalPool` executor from within \
63+
another executor",
64+
);
6465

6566
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
6667
let waker = waker_ref(thread_notify);
@@ -75,9 +76,10 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
7576
}
7677

7778
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
78-
let _enter = enter()
79-
.expect("cannot execute `LocalPool` executor from within \
80-
another executor");
79+
let _enter = enter().expect(
80+
"cannot execute `LocalPool` executor from within \
81+
another executor",
82+
);
8183

8284
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
8385
let waker = waker_ref(thread_notify);
@@ -98,7 +100,7 @@ impl LocalPool {
98100
/// Get a clonable handle to the pool as a [`Spawn`].
99101
pub fn spawner(&self) -> LocalSpawner {
100102
LocalSpawner {
101-
incoming: Rc::downgrade(&self.incoming)
103+
incoming: Rc::downgrade(&self.incoming),
102104
}
103105
}
104106

@@ -164,7 +166,7 @@ impl LocalPool {
164166
/// use futures::future::{ready, pending};
165167
///
166168
/// let mut pool = LocalPool::new();
167-
/// let mut spawner = pool.spawner();
169+
/// let spawner = pool.spawner();
168170
///
169171
/// spawner.spawn_local(ready(())).unwrap();
170172
/// spawner.spawn_local(ready(())).unwrap();
@@ -212,7 +214,7 @@ impl LocalPool {
212214
/// use futures::future::{ready, pending};
213215
///
214216
/// let mut pool = LocalPool::new();
215-
/// let mut spawner = pool.spawner();
217+
/// let spawner = pool.spawner();
216218
///
217219
/// spawner.spawn_local(ready(())).unwrap();
218220
/// spawner.spawn_local(ready(())).unwrap();
@@ -229,7 +231,7 @@ impl LocalPool {
229231
/// of the pool's run or poll methods. While the function is running, all tasks
230232
/// in the pool will try to make progress.
231233
pub fn run_until_stalled(&mut self) {
232-
poll_executor(|ctx| {
234+
poll_executor(|ctx| {
233235
let _ = self.poll_pool(ctx);
234236
});
235237
}
@@ -297,7 +299,9 @@ pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
297299

298300
/// An iterator which blocks on values from a stream until they become available.
299301
#[derive(Debug)]
300-
pub struct BlockingStream<S: Stream + Unpin> { stream: S }
302+
pub struct BlockingStream<S: Stream + Unpin> {
303+
stream: S,
304+
}
301305

302306
impl<S: Stream + Unpin> Deref for BlockingStream<S> {
303307
type Target = S;
@@ -332,10 +336,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
332336
}
333337

334338
impl Spawn for LocalSpawner {
335-
fn spawn_obj(
336-
&self,
337-
future: FutureObj<'static, ()>,
338-
) -> Result<(), SpawnError> {
339+
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
339340
if let Some(incoming) = self.incoming.upgrade() {
340341
incoming.borrow_mut().push(future.into());
341342
Ok(())
@@ -354,10 +355,7 @@ impl Spawn for LocalSpawner {
354355
}
355356

356357
impl LocalSpawn for LocalSpawner {
357-
fn spawn_local_obj(
358-
&self,
359-
future: LocalFutureObj<'static, ()>,
360-
) -> Result<(), SpawnError> {
358+
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
361359
if let Some(incoming) = self.incoming.upgrade() {
362360
incoming.borrow_mut().push(future);
363361
Ok(())

futures-task/src/spawn.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ impl SpawnError {
8383
}
8484
}
8585

86+
impl<Sp: ?Sized + Spawn> Spawn for &Sp {
87+
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
88+
Sp::spawn_obj(self, future)
89+
}
90+
91+
fn status(&self) -> Result<(), SpawnError> {
92+
Sp::status(self)
93+
}
94+
}
95+
8696
impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
8797
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
8898
Sp::spawn_obj(self, future)
@@ -103,6 +113,16 @@ impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &Sp {
103113
}
104114
}
105115

116+
impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
117+
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
118+
Sp::spawn_local_obj(self, future)
119+
}
120+
121+
fn status_local(&self) -> Result<(), SpawnError> {
122+
Sp::status_local(self)
123+
}
124+
}
125+
106126
#[cfg(feature = "alloc")]
107127
mod if_alloc {
108128
use super::*;

futures-test/src/task/noop_spawner.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_task::{Spawn, SpawnError, FutureObj};
1+
use futures_task::{FutureObj, Spawn, SpawnError};
22

33
/// An implementation of [`Spawn`](futures_task::Spawn) that
44
/// discards spawned futures when used.
@@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
99
/// use futures::task::SpawnExt;
1010
/// use futures_test::task::NoopSpawner;
1111
///
12-
/// let mut spawner = NoopSpawner::new();
12+
/// let spawner = NoopSpawner::new();
1313
/// spawner.spawn(async { }).unwrap();
1414
/// ```
1515
#[derive(Debug)]
@@ -25,10 +25,7 @@ impl NoopSpawner {
2525
}
2626

2727
impl Spawn for NoopSpawner {
28-
fn spawn_obj(
29-
&self,
30-
_future: FutureObj<'static, ()>,
31-
) -> Result<(), SpawnError> {
28+
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
3229
Ok(())
3330
}
3431
}

futures-test/src/task/panic_spawner.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_task::{Spawn, SpawnError, FutureObj};
1+
use futures_task::{FutureObj, Spawn, SpawnError};
22

33
/// An implementation of [`Spawn`](futures_task::Spawn) that panics
44
/// when used.
@@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
99
/// use futures::task::SpawnExt;
1010
/// use futures_test::task::PanicSpawner;
1111
///
12-
/// let mut spawn = PanicSpawner::new();
12+
/// let spawn = PanicSpawner::new();
1313
/// spawn.spawn(async { })?; // Will panic
1414
/// # Ok::<(), Box<dyn std::error::Error>>(())
1515
/// ```
@@ -26,10 +26,7 @@ impl PanicSpawner {
2626
}
2727

2828
impl Spawn for PanicSpawner {
29-
fn spawn_obj(
30-
&self,
31-
_future: FutureObj<'static, ()>,
32-
) -> Result<(), SpawnError> {
29+
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
3330
panic!("should not spawn")
3431
}
3532
}

futures-test/src/task/record_spawner.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_task::{Spawn, SpawnError, FutureObj};
1+
use futures_task::{FutureObj, Spawn, SpawnError};
22
use std::cell::{Ref, RefCell};
33

44
/// An implementation of [`Spawn`](futures_task::Spawn) that records
@@ -10,7 +10,7 @@ use std::cell::{Ref, RefCell};
1010
/// use futures::task::SpawnExt;
1111
/// use futures_test::task::RecordSpawner;
1212
///
13-
/// let mut recorder = RecordSpawner::new();
13+
/// let recorder = RecordSpawner::new();
1414
/// recorder.spawn(async { }).unwrap();
1515
/// assert_eq!(recorder.spawned().len(), 1);
1616
/// ```
@@ -32,10 +32,7 @@ impl RecordSpawner {
3232
}
3333

3434
impl Spawn for RecordSpawner {
35-
fn spawn_obj(
36-
&self,
37-
future: FutureObj<'static, ()>,
38-
) -> Result<(), SpawnError> {
35+
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
3936
self.spawned.borrow_mut().push(future);
4037
Ok(())
4138
}

futures-util/src/compat/executor.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
1-
21
use super::{Compat, Future01CompatExt};
32
use crate::{
43
future::{FutureExt, TryFutureExt, UnitError},
54
task::SpawnExt,
65
};
6+
use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
77
use futures_01::Future as Future01;
8-
use futures_01::future::{Executor as Executor01, ExecuteError as ExecuteError01};
98
use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};
109

1110
/// A future that can run on a futures 0.1
1211
/// [`Executor`](futures_01::future::Executor).
1312
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;
1413

1514
/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
16-
pub trait Executor01CompatExt: Executor01<Executor01Future> +
17-
Clone + Send + 'static
18-
{
15+
pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
1916
/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
2017
/// futures 0.3 [`Spawn`](futures_task::Spawn).
2118
///
@@ -27,7 +24,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
2724
///
2825
/// # let (tx, rx) = futures::channel::oneshot::channel();
2926
///
30-
/// let mut spawner = DefaultExecutor::current().compat();
27+
/// let spawner = DefaultExecutor::current().compat();
3128
/// let future03 = async move {
3229
/// println!("Running on the pool");
3330
/// spawner.spawn(async {
@@ -42,39 +39,36 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
4239
/// # futures::executor::block_on(rx).unwrap();
4340
/// ```
4441
fn compat(self) -> Executor01As03<Self>
45-
where Self: Sized;
42+
where
43+
Self: Sized;
4644
}
4745

4846
impl<Ex> Executor01CompatExt for Ex
49-
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
47+
where
48+
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
5049
{
5150
fn compat(self) -> Executor01As03<Self> {
52-
Executor01As03 {
53-
executor01: self,
54-
}
51+
Executor01As03 { executor01: self }
5552
}
5653
}
5754

5855
/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
5956
/// futures 0.3 [`Spawn`](futures_task::Spawn).
6057
#[derive(Debug, Clone)]
6158
pub struct Executor01As03<Ex> {
62-
executor01: Ex
59+
executor01: Ex,
6360
}
6461

6562
impl<Ex> Spawn03 for Executor01As03<Ex>
6663
where
6764
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
6865
{
69-
fn spawn_obj(
70-
&self,
71-
future: FutureObj<'static, ()>,
72-
) -> Result<(), SpawnError03> {
66+
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
7367
let future = future.unit_error().compat();
7468

75-
self.executor01.execute(future).map_err(|_|
76-
SpawnError03::shutdown()
77-
)
69+
self.executor01
70+
.execute(future)
71+
.map_err(|_| SpawnError03::shutdown())
7872
}
7973
}
8074

@@ -85,7 +79,8 @@ where
8579
Fut: Future01<Item = (), Error = ()> + Send + 'static,
8680
{
8781
fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
88-
(&self.inner).spawn(future.compat().map(|_| ()))
82+
(&self.inner)
83+
.spawn(future.compat().map(|_| ()))
8984
.expect("unable to spawn future from Compat executor");
9085
Ok(())
9186
}

0 commit comments

Comments
 (0)