Skip to content

Commit 1614bfc

Browse files
committed
Added droppable future for Ticked Async Executor
- Gives visibility into number of spawned/dropped tasks
1 parent 701c476 commit 1614bfc

File tree

4 files changed

+190
-98
lines changed

4 files changed

+190
-98
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
async-task = "4.7"
8+
pin-project = "1"
89

910
[dev-dependencies]
1011
tokio = { version = "1", features = ["full"] }

src/droppable_future.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::{future::Future, pin::Pin};
2+
3+
use pin_project::{pin_project, pinned_drop};
4+
5+
#[pin_project(PinnedDrop)]
6+
pub struct DroppableFuture<F, D>
7+
where
8+
F: Future,
9+
D: Fn(),
10+
{
11+
#[pin]
12+
future: F,
13+
on_drop: D,
14+
}
15+
16+
impl<F, D> DroppableFuture<F, D>
17+
where
18+
F: Future,
19+
D: Fn(),
20+
{
21+
pub fn new(future: F, on_drop: D) -> Self {
22+
Self { future, on_drop }
23+
}
24+
}
25+
26+
impl<F, D> Future for DroppableFuture<F, D>
27+
where
28+
F: Future,
29+
D: Fn(),
30+
{
31+
type Output = F::Output;
32+
33+
fn poll(
34+
self: std::pin::Pin<&mut Self>,
35+
cx: &mut std::task::Context<'_>,
36+
) -> std::task::Poll<Self::Output> {
37+
let this = self.project();
38+
this.future.poll(cx)
39+
}
40+
}
41+
42+
#[pinned_drop]
43+
impl<F, D> PinnedDrop for DroppableFuture<F, D>
44+
where
45+
F: Future,
46+
D: Fn(),
47+
{
48+
fn drop(self: Pin<&mut Self>) {
49+
(self.on_drop)();
50+
}
51+
}

src/lib.rs

+4-98
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,5 @@
1-
use std::{
2-
future::Future,
3-
sync::{
4-
atomic::{AtomicUsize, Ordering},
5-
mpsc, Arc,
6-
},
7-
};
1+
mod droppable_future;
2+
use droppable_future::*;
83

9-
use async_task::{Runnable, Task};
10-
11-
pub struct TickedAsyncLocalExecutor {
12-
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
13-
num_woken_tasks: Arc<AtomicUsize>,
14-
}
15-
16-
impl Default for TickedAsyncLocalExecutor {
17-
fn default() -> Self {
18-
Self::new()
19-
}
20-
}
21-
22-
impl TickedAsyncLocalExecutor {
23-
pub fn new() -> Self {
24-
Self {
25-
channel: mpsc::channel(),
26-
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
27-
}
28-
}
29-
30-
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
31-
where
32-
T: 'static,
33-
{
34-
let sender = self.channel.0.clone();
35-
let num_woken_tasks = self.num_woken_tasks.clone();
36-
let schedule = move |runnable| {
37-
sender.send(runnable).unwrap_or(());
38-
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
39-
};
40-
let (runnable, task) = async_task::spawn_local(future, schedule);
41-
runnable.schedule();
42-
task
43-
}
44-
45-
/// Run the woken tasks once
46-
///
47-
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
48-
pub fn tick(&self) {
49-
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
50-
self.channel
51-
.1
52-
.try_iter()
53-
.take(num_woken_tasks)
54-
.for_each(|runnable| {
55-
runnable.run();
56-
});
57-
self.num_woken_tasks
58-
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
59-
}
60-
}
61-
62-
#[cfg(test)]
63-
mod tests {
64-
use super::*;
65-
66-
#[test]
67-
fn test_multiple_tasks() {
68-
let executor = TickedAsyncLocalExecutor::new();
69-
executor
70-
.spawn_local(async move {
71-
println!("A: Start");
72-
tokio::task::yield_now().await;
73-
println!("A: End");
74-
})
75-
.detach();
76-
77-
executor
78-
.spawn_local(async move {
79-
println!("B: Start");
80-
tokio::task::yield_now().await;
81-
println!("B: End");
82-
})
83-
.detach();
84-
85-
executor
86-
.spawn_local(async move {
87-
println!("C: Start");
88-
tokio::task::yield_now().await;
89-
println!("C: End");
90-
})
91-
.detach();
92-
93-
// A, B, C: Start
94-
executor.tick();
95-
96-
// A, B, C: End
97-
executor.tick();
98-
}
99-
}
4+
mod ticked_async_executor;
5+
pub use ticked_async_executor::*;

src/ticked_async_executor.rs

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use std::{
2+
future::Future,
3+
sync::{
4+
atomic::{AtomicUsize, Ordering},
5+
mpsc, Arc,
6+
},
7+
};
8+
9+
use async_task::{Runnable, Task};
10+
11+
use crate::DroppableFuture;
12+
13+
pub struct TickedAsyncExecutor {
14+
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
15+
num_woken_tasks: Arc<AtomicUsize>,
16+
num_spawned_tasks: Arc<AtomicUsize>,
17+
}
18+
19+
impl Default for TickedAsyncExecutor {
20+
fn default() -> Self {
21+
Self::new()
22+
}
23+
}
24+
25+
impl TickedAsyncExecutor {
26+
pub fn new() -> Self {
27+
Self {
28+
channel: mpsc::channel(),
29+
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
30+
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
31+
}
32+
}
33+
34+
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>
35+
where
36+
T: Send + 'static,
37+
{
38+
let future = self.droppable_future(future);
39+
let schedule = self.runnable_schedule_cb();
40+
let (runnable, task) = async_task::spawn(future, schedule);
41+
runnable.schedule();
42+
task
43+
}
44+
45+
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
46+
where
47+
T: 'static,
48+
{
49+
let future = self.droppable_future(future);
50+
let schedule = self.runnable_schedule_cb();
51+
let (runnable, task) = async_task::spawn_local(future, schedule);
52+
runnable.schedule();
53+
task
54+
}
55+
56+
/// Run the woken tasks once
57+
///
58+
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
59+
pub fn tick(&self) {
60+
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
61+
self.channel
62+
.1
63+
.try_iter()
64+
.take(num_woken_tasks)
65+
.for_each(|runnable| {
66+
runnable.run();
67+
});
68+
self.num_woken_tasks
69+
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
70+
}
71+
72+
fn droppable_future<F>(&self, future: F) -> DroppableFuture<F, impl Fn()>
73+
where
74+
F: Future,
75+
{
76+
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
77+
let num_spawned_tasks = self.num_spawned_tasks.clone();
78+
DroppableFuture::new(future, move || {
79+
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
80+
})
81+
}
82+
83+
fn runnable_schedule_cb(&self) -> impl Fn(Runnable) {
84+
let sender = self.channel.0.clone();
85+
let num_woken_tasks = self.num_woken_tasks.clone();
86+
move |runnable| {
87+
sender.send(runnable).unwrap_or(());
88+
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
89+
}
90+
}
91+
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
97+
#[test]
98+
fn test_multiple_tasks() {
99+
let executor = TickedAsyncExecutor::new();
100+
executor
101+
.spawn_local(async move {
102+
println!("A: Start");
103+
tokio::task::yield_now().await;
104+
println!("A: End");
105+
})
106+
.detach();
107+
108+
executor
109+
.spawn_local(async move {
110+
println!("B: Start");
111+
tokio::task::yield_now().await;
112+
println!("B: End");
113+
})
114+
.detach();
115+
116+
executor
117+
.spawn_local(async move {
118+
println!("C: Start");
119+
tokio::task::yield_now().await;
120+
println!("C: End");
121+
})
122+
.detach();
123+
124+
// A, B, C: Start
125+
executor.tick();
126+
127+
// A, B, C: End
128+
executor.tick();
129+
}
130+
131+
// TODO, Test Task cancellation
132+
// TODO, Test FallibleTasks
133+
// TODO, Test Edge cases
134+
}

0 commit comments

Comments
 (0)