Skip to content

Commit 03f5022

Browse files
bors[bot]Stjepan Glavina
and
Stjepan Glavina
authored
Merge #195
195: Remove the Send bound from block_on r=stjepang a=stjepang Co-authored-by: Stjepan Glavina <[email protected]>
2 parents f4182ca + 1d862cf commit 03f5022

File tree

4 files changed

+113
-79
lines changed

4 files changed

+113
-79
lines changed

src/task/block_on.rs

+44-9
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ use std::sync::Arc;
66
use std::task::{RawWaker, RawWakerVTable};
77
use std::thread::{self, Thread};
88

9+
use super::log_utils;
910
use super::pool;
10-
use super::Builder;
11+
use super::task;
1112
use crate::future::Future;
1213
use crate::task::{Context, Poll, Waker};
14+
use crate::utils::abort_on_panic;
1315

1416
/// Spawns a task and blocks the current thread on its result.
1517
///
@@ -32,8 +34,7 @@ use crate::task::{Context, Poll, Waker};
3234
/// ```
3335
pub fn block_on<F, T>(future: F) -> T
3436
where
35-
F: Future<Output = T> + Send,
36-
T: Send,
37+
F: Future<Output = T>,
3738
{
3839
unsafe {
3940
// A place on the stack where the result will be stored.
@@ -51,17 +52,48 @@ where
5152
}
5253
};
5354

55+
// Create a tag for the task.
56+
let tag = task::Tag::new(None);
57+
58+
// Log this `block_on` operation.
59+
let child_id = tag.task_id().as_u64();
60+
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
61+
log_utils::print(
62+
format_args!("block_on"),
63+
log_utils::LogData {
64+
parent_id,
65+
child_id,
66+
},
67+
);
68+
69+
// Wrap the future into one that drops task-local variables on exit.
70+
let future = async move {
71+
let res = future.await;
72+
73+
// Abort on panic because thread-local variables behave the same way.
74+
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
75+
76+
log_utils::print(
77+
format_args!("block_on completed"),
78+
log_utils::LogData {
79+
parent_id,
80+
child_id,
81+
},
82+
);
83+
res
84+
};
85+
5486
// Pin the future onto the stack.
5587
pin_utils::pin_mut!(future);
5688

57-
// Transmute the future into one that is static and sendable.
89+
// Transmute the future into one that is static.
5890
let future = mem::transmute::<
59-
Pin<&mut dyn Future<Output = ()>>,
60-
Pin<&'static mut (dyn Future<Output = ()> + Send)>,
91+
Pin<&'_ mut dyn Future<Output = ()>>,
92+
Pin<&'static mut dyn Future<Output = ()>>,
6193
>(future);
6294

63-
// Spawn the future and wait for it to complete.
64-
block(pool::spawn_with_builder(Builder::new(), future, "block_on"));
95+
// Block on the future and and wait for it to complete.
96+
pool::set_tag(&tag, || block(future));
6597

6698
// Take out the result.
6799
match (*out.get()).take().unwrap() {
@@ -87,7 +119,10 @@ impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
87119
}
88120
}
89121

90-
fn block<F: Future>(f: F) -> F::Output {
122+
fn block<F, T>(f: F) -> T
123+
where
124+
F: Future<Output = T>,
125+
{
91126
thread_local! {
92127
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
93128
}

src/task/log_utils.rs

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::fmt::Arguments;
2+
3+
/// This struct only exists because kv logging isn't supported from the macros right now.
4+
pub(crate) struct LogData {
5+
pub parent_id: u64,
6+
pub child_id: u64,
7+
}
8+
9+
impl<'a> log::kv::Source for LogData {
10+
fn visit<'kvs>(
11+
&'kvs self,
12+
visitor: &mut dyn log::kv::Visitor<'kvs>,
13+
) -> Result<(), log::kv::Error> {
14+
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
15+
visitor.visit_pair("child_id".into(), self.child_id.into())?;
16+
Ok(())
17+
}
18+
}
19+
20+
pub fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
21+
log::logger().log(
22+
&log::Record::builder()
23+
.args(msg)
24+
.key_values(&key_values)
25+
.level(log::Level::Trace)
26+
.target(module_path!())
27+
.module_path(Some(module_path!()))
28+
.file(Some(file!()))
29+
.line(Some(line!()))
30+
.build(),
31+
);
32+
}

src/task/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub use task::{JoinHandle, Task, TaskId};
3232

3333
mod block_on;
3434
mod local;
35+
mod log_utils;
3536
mod pool;
3637
mod sleep;
3738
mod task;

src/task/pool.rs

+36-70
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use std::cell::Cell;
2-
use std::fmt::Arguments;
3-
use std::mem;
42
use std::ptr;
53
use std::thread;
64

75
use crossbeam_channel::{unbounded, Sender};
86
use lazy_static::lazy_static;
97

8+
use super::log_utils;
109
use super::task;
1110
use super::{JoinHandle, Task};
1211
use crate::future::Future;
1312
use crate::io;
13+
use crate::utils::abort_on_panic;
1414

1515
/// Returns a handle to the current task.
1616
///
@@ -64,7 +64,7 @@ where
6464
F: Future<Output = T> + Send + 'static,
6565
T: Send + 'static,
6666
{
67-
spawn_with_builder(Builder::new(), future, "spawn")
67+
spawn_with_builder(Builder::new(), future)
6868
}
6969

7070
/// Task builder that configures the settings of a new task.
@@ -91,15 +91,11 @@ impl Builder {
9191
F: Future<Output = T> + Send + 'static,
9292
T: Send + 'static,
9393
{
94-
Ok(spawn_with_builder(self, future, "spawn"))
94+
Ok(spawn_with_builder(self, future))
9595
}
9696
}
9797

98-
pub(crate) fn spawn_with_builder<F, T>(
99-
builder: Builder,
100-
future: F,
101-
fn_name: &'static str,
102-
) -> JoinHandle<T>
98+
pub(crate) fn spawn_with_builder<F, T>(builder: Builder, future: F) -> JoinHandle<T>
10399
where
104100
F: Future<Output = T> + Send + 'static,
105101
T: Send + 'static,
@@ -117,13 +113,9 @@ where
117113
thread::Builder::new()
118114
.name("async-task-driver".to_string())
119115
.spawn(|| {
120-
TAG.with(|tag| {
121-
for job in receiver {
122-
tag.set(job.tag());
123-
abort_on_panic(|| job.run());
124-
tag.set(ptr::null());
125-
}
126-
});
116+
for job in receiver {
117+
set_tag(job.tag(), || abort_on_panic(|| job.run()))
118+
}
127119
})
128120
.expect("cannot start a thread driving tasks");
129121
}
@@ -135,11 +127,12 @@ where
135127
let tag = task::Tag::new(name);
136128
let schedule = |job| QUEUE.send(job).unwrap();
137129

130+
// Log this `spawn` operation.
138131
let child_id = tag.task_id().as_u64();
139132
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
140-
print(
141-
format_args!("{}", fn_name),
142-
LogData {
133+
log_utils::print(
134+
format_args!("spawn"),
135+
log_utils::LogData {
143136
parent_id,
144137
child_id,
145138
},
@@ -152,9 +145,9 @@ where
152145
// Abort on panic because thread-local variables behave the same way.
153146
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
154147

155-
print(
156-
format_args!("{} completed", fn_name),
157-
LogData {
148+
log_utils::print(
149+
format_args!("spawn completed"),
150+
log_utils::LogData {
158151
parent_id,
159152
child_id,
160153
},
@@ -171,61 +164,34 @@ thread_local! {
171164
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
172165
}
173166

174-
pub(crate) fn get_task<F: FnOnce(&Task) -> R, R>(f: F) -> Option<R> {
175-
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
176-
177-
match res {
178-
Ok(Some(val)) => Some(val),
179-
Ok(None) | Err(_) => None,
180-
}
181-
}
182-
183-
/// Calls a function and aborts if it panics.
184-
///
185-
/// This is useful in unsafe code where we can't recover from panics.
186-
#[inline]
187-
fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
188-
struct Bomb;
167+
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
168+
where
169+
F: FnOnce() -> R,
170+
{
171+
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
189172

190-
impl Drop for Bomb {
173+
impl Drop for ResetTag<'_> {
191174
fn drop(&mut self) {
192-
std::process::abort();
175+
self.0.set(ptr::null());
193176
}
194177
}
195178

196-
let bomb = Bomb;
197-
let t = f();
198-
mem::forget(bomb);
199-
t
200-
}
179+
TAG.with(|t| {
180+
t.set(tag);
181+
let _guard = ResetTag(t);
201182

202-
/// This struct only exists because kv logging isn't supported from the macros right now.
203-
struct LogData {
204-
parent_id: u64,
205-
child_id: u64,
183+
f()
184+
})
206185
}
207186

208-
impl<'a> log::kv::Source for LogData {
209-
fn visit<'kvs>(
210-
&'kvs self,
211-
visitor: &mut dyn log::kv::Visitor<'kvs>,
212-
) -> Result<(), log::kv::Error> {
213-
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
214-
visitor.visit_pair("child_id".into(), self.child_id.into())?;
215-
Ok(())
216-
}
217-
}
187+
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
188+
where
189+
F: FnOnce(&Task) -> R,
190+
{
191+
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
218192

219-
fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
220-
log::logger().log(
221-
&log::Record::builder()
222-
.args(msg)
223-
.key_values(&key_values)
224-
.level(log::Level::Trace)
225-
.target(module_path!())
226-
.module_path(Some(module_path!()))
227-
.file(Some(file!()))
228-
.line(Some(line!()))
229-
.build(),
230-
);
193+
match res {
194+
Ok(Some(val)) => Some(val),
195+
Ok(None) | Err(_) => None,
196+
}
231197
}

0 commit comments

Comments
 (0)