Skip to content

Commit 701c476

Browse files
committed
Added TickedAsyncLocalExecutor implementation
1 parent 7e849e4 commit 701c476

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[package]
2+
name = "ticked_async_executor"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
async-task = "4.7"
8+
9+
[dev-dependencies]
10+
tokio = { version = "1", features = ["full"] }

src/lib.rs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::{
2+
future::Future,
3+
sync::{
4+
atomic::{AtomicUsize, Ordering},
5+
mpsc, Arc,
6+
},
7+
};
8+
9+
use async_task::{Runnable, Task};
10+
11+
pub struct TickedAsyncLocalExecutor {
12+
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
13+
num_woken_tasks: Arc<AtomicUsize>,
14+
}
15+
16+
impl Default for TickedAsyncLocalExecutor {
17+
fn default() -> Self {
18+
Self::new()
19+
}
20+
}
21+
22+
impl TickedAsyncLocalExecutor {
23+
pub fn new() -> Self {
24+
Self {
25+
channel: mpsc::channel(),
26+
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
27+
}
28+
}
29+
30+
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
31+
where
32+
T: 'static,
33+
{
34+
let sender = self.channel.0.clone();
35+
let num_woken_tasks = self.num_woken_tasks.clone();
36+
let schedule = move |runnable| {
37+
sender.send(runnable).unwrap_or(());
38+
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
39+
};
40+
let (runnable, task) = async_task::spawn_local(future, schedule);
41+
runnable.schedule();
42+
task
43+
}
44+
45+
/// Run the woken tasks once
46+
///
47+
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
48+
pub fn tick(&self) {
49+
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
50+
self.channel
51+
.1
52+
.try_iter()
53+
.take(num_woken_tasks)
54+
.for_each(|runnable| {
55+
runnable.run();
56+
});
57+
self.num_woken_tasks
58+
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
59+
}
60+
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
use super::*;
65+
66+
#[test]
67+
fn test_multiple_tasks() {
68+
let executor = TickedAsyncLocalExecutor::new();
69+
executor
70+
.spawn_local(async move {
71+
println!("A: Start");
72+
tokio::task::yield_now().await;
73+
println!("A: End");
74+
})
75+
.detach();
76+
77+
executor
78+
.spawn_local(async move {
79+
println!("B: Start");
80+
tokio::task::yield_now().await;
81+
println!("B: End");
82+
})
83+
.detach();
84+
85+
executor
86+
.spawn_local(async move {
87+
println!("C: Start");
88+
tokio::task::yield_now().await;
89+
println!("C: End");
90+
})
91+
.detach();
92+
93+
// A, B, C: Start
94+
executor.tick();
95+
96+
// A, B, C: End
97+
executor.tick();
98+
}
99+
}

0 commit comments

Comments
 (0)