Skip to content

Split ticked async executor #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use droppable_future::*;
mod task_identifier;
pub use task_identifier::*;

mod split_ticked_async_executor;
pub use split_ticked_async_executor::*;

mod ticked_async_executor;
pub use ticked_async_executor::*;

Expand Down
165 changes: 165 additions & 0 deletions src/split_ticked_async_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
};

use crate::{DroppableFuture, TaskIdentifier, TickedTimer};

#[derive(Debug)]
pub enum TaskState {
Spawn(TaskIdentifier),
Wake(TaskIdentifier),
Tick(TaskIdentifier, f64),
Drop(TaskIdentifier),
}

pub type Task<T> = async_task::Task<T>;
type Payload = (TaskIdentifier, async_task::Runnable);

pub fn new_split_ticked_async_executor<O>(
observer: O,
) -> (TickedAsyncExecutorSpawner<O>, TickedAsyncExecutorTicker<O>)
where
O: Fn(TaskState) + Clone + Send + Sync + 'static,
{
let (tx_channel, rx_channel) = mpsc::channel();
let num_woken_tasks = Arc::new(AtomicUsize::new(0));
let num_spawned_tasks = Arc::new(AtomicUsize::new(0));
let (tx_tick_event, rx_tick_event) = tokio::sync::watch::channel(1.0);
let spawner = TickedAsyncExecutorSpawner {
tx_channel,
num_woken_tasks: num_woken_tasks.clone(),
num_spawned_tasks: num_spawned_tasks.clone(),
observer: observer.clone(),
rx_tick_event,
};
let ticker = TickedAsyncExecutorTicker {
rx_channel,
num_woken_tasks,
num_spawned_tasks,
observer,
tx_tick_event,
};
(spawner, ticker)
}

pub struct TickedAsyncExecutorSpawner<O> {
tx_channel: mpsc::Sender<Payload>,
num_woken_tasks: Arc<AtomicUsize>,

num_spawned_tasks: Arc<AtomicUsize>,
// TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel
// Broadcast recv channel should be notified when there are new messages in the queue
// Broadcast channel must also be able to remove older/stale messages (like a RingBuffer)
observer: O,
rx_tick_event: tokio::sync::watch::Receiver<f64>,
}

impl<O> TickedAsyncExecutorSpawner<O>
where
O: Fn(TaskState) + Clone + Send + Sync + 'static,
{
pub fn spawn_local<T>(
&self,
identifier: impl Into<TaskIdentifier>,
future: impl Future<Output = T> + 'static,
) -> Task<T>
where
T: 'static,
{
let identifier = identifier.into();
let future = self.droppable_future(identifier.clone(), future);
let schedule = self.runnable_schedule_cb(identifier);
let (runnable, task) = async_task::spawn_local(future, schedule);
runnable.schedule();
task
}

pub fn create_timer(&self) -> TickedTimer {
let tick_recv = self.rx_tick_event.clone();
TickedTimer { tick_recv }
}

pub fn tick_channel(&self) -> tokio::sync::watch::Receiver<f64> {
self.rx_tick_event.clone()
}

pub fn num_tasks(&self) -> usize {
self.num_spawned_tasks.load(Ordering::Relaxed)
}

fn droppable_future<F>(
&self,
identifier: TaskIdentifier,
future: F,
) -> DroppableFuture<F, impl Fn()>
where
F: Future,
{
let observer = self.observer.clone();

// Spawn Task
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
observer(TaskState::Spawn(identifier.clone()));

// Droppable Future registering on_drop callback
let num_spawned_tasks = self.num_spawned_tasks.clone();
DroppableFuture::new(future, move || {
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
observer(TaskState::Drop(identifier.clone()));
})
}

fn runnable_schedule_cb(&self, identifier: TaskIdentifier) -> impl Fn(async_task::Runnable) {
let sender = self.tx_channel.clone();
let num_woken_tasks = self.num_woken_tasks.clone();
let observer = self.observer.clone();
move |runnable| {
sender.send((identifier.clone(), runnable)).unwrap_or(());
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
observer(TaskState::Wake(identifier.clone()));
}
}
}

pub struct TickedAsyncExecutorTicker<O> {
rx_channel: mpsc::Receiver<Payload>,
num_woken_tasks: Arc<AtomicUsize>,
num_spawned_tasks: Arc<AtomicUsize>,
observer: O,
tx_tick_event: tokio::sync::watch::Sender<f64>,
}

impl<O> TickedAsyncExecutorTicker<O>
where
O: Fn(TaskState),
{
pub fn tick(&self, delta: f64, limit: Option<usize>) {
let _r = self.tx_tick_event.send(delta);

let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
if let Some(limit) = limit {
// Woken tasks should not exceed the allowed limit
num_woken_tasks = num_woken_tasks.min(limit);

Check warning on line 146 in src/split_ticked_async_executor.rs

View check run for this annotation

Codecov / codecov/patch

src/split_ticked_async_executor.rs#L146

Added line #L146 was not covered by tests
}

self.rx_channel
.try_iter()
.take(num_woken_tasks)
.for_each(|(identifier, runnable)| {
(self.observer)(TaskState::Tick(identifier, delta));
runnable.run();
});
self.num_woken_tasks
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
}

pub fn wait_till_completed(&self, constant_delta: f64) {
while self.num_spawned_tasks.load(Ordering::Relaxed) != 0 {
self.tick(constant_delta, None);
}
}
}
122 changes: 20 additions & 102 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
};

use crate::{DroppableFuture, TaskIdentifier, TickedTimer};

#[derive(Debug)]
pub enum TaskState {
Spawn(TaskIdentifier),
Wake(TaskIdentifier),
Tick(TaskIdentifier, f64),
Drop(TaskIdentifier),
}
use std::future::Future;

pub type Task<T> = async_task::Task<T>;
type Payload = (TaskIdentifier, async_task::Runnable);
use crate::{
new_split_ticked_async_executor, Task, TaskIdentifier, TaskState, TickedAsyncExecutorSpawner,
TickedAsyncExecutorTicker, TickedTimer,
};

pub struct TickedAsyncExecutor<O> {
channel: (mpsc::Sender<Payload>, mpsc::Receiver<Payload>),
num_woken_tasks: Arc<AtomicUsize>,

num_spawned_tasks: Arc<AtomicUsize>,

// TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel
// Broadcast recv channel should be notified when there are new messages in the queue
// Broadcast channel must also be able to remove older/stale messages (like a RingBuffer)
observer: O,

tick_event: tokio::sync::watch::Sender<f64>,
spawner: TickedAsyncExecutorSpawner<O>,
ticker: TickedAsyncExecutorTicker<O>,
}

impl Default for TickedAsyncExecutor<fn(TaskState)> {
Expand All @@ -44,13 +21,8 @@ where
O: Fn(TaskState) + Clone + Send + Sync + 'static,
{
pub fn new(observer: O) -> Self {
Self {
channel: mpsc::channel(),
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
observer,
tick_event: tokio::sync::watch::channel(1.0).0,
}
let (spawner, ticker) = new_split_ticked_async_executor(observer);
Self { spawner, ticker }
}

pub fn spawn_local<T>(
Expand All @@ -61,16 +33,11 @@ where
where
T: 'static,
{
let identifier = identifier.into();
let future = self.droppable_future(identifier.clone(), future);
let schedule = self.runnable_schedule_cb(identifier);
let (runnable, task) = async_task::spawn_local(future, schedule);
runnable.schedule();
task
self.spawner.spawn_local(identifier, future)
}

pub fn num_tasks(&self) -> usize {
self.num_spawned_tasks.load(Ordering::Relaxed)
self.spawner.num_tasks()
}

/// Run the woken tasks once
Expand All @@ -81,72 +48,25 @@ where
/// `limit` is used to limit the number of woken tasks run per tick
/// - None would imply that there is no limit (all woken tasks would run)
/// - Some(limit) would imply that [0..limit] woken tasks would run,
/// even if more tasks are woken.
/// even if more tasks are woken.
///
/// Tick is !Sync i.e cannot be invoked from multiple threads
///
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
pub fn tick(&self, delta: f64, limit: Option<usize>) {
let _r = self.tick_event.send(delta);

let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
if let Some(limit) = limit {
// Woken tasks should not exceed the allowed limit
num_woken_tasks = num_woken_tasks.min(limit);
}

self.channel
.1
.try_iter()
.take(num_woken_tasks)
.for_each(|(identifier, runnable)| {
(self.observer)(TaskState::Tick(identifier, delta));
runnable.run();
});
self.num_woken_tasks
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
self.ticker.tick(delta, limit);
}

pub fn create_timer(&self) -> TickedTimer {
let tick_recv = self.tick_event.subscribe();
TickedTimer { tick_recv }
self.spawner.create_timer()
}

pub fn tick_channel(&self) -> tokio::sync::watch::Receiver<f64> {
self.tick_event.subscribe()
}

fn droppable_future<F>(
&self,
identifier: TaskIdentifier,
future: F,
) -> DroppableFuture<F, impl Fn()>
where
F: Future,
{
let observer = self.observer.clone();

// Spawn Task
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
observer(TaskState::Spawn(identifier.clone()));

// Droppable Future registering on_drop callback
let num_spawned_tasks = self.num_spawned_tasks.clone();
DroppableFuture::new(future, move || {
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
observer(TaskState::Drop(identifier.clone()));
})
self.spawner.tick_channel()
}

fn runnable_schedule_cb(&self, identifier: TaskIdentifier) -> impl Fn(async_task::Runnable) {
let sender = self.channel.0.clone();
let num_woken_tasks = self.num_woken_tasks.clone();
let observer = self.observer.clone();
move |runnable| {
sender.send((identifier.clone(), runnable)).unwrap_or(());
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
observer(TaskState::Wake(identifier.clone()));
}
pub fn wait_till_completed(&self, delta: f64) {
self.ticker.wait_till_completed(delta);
}
}

Expand Down Expand Up @@ -220,9 +140,7 @@ mod tests {
assert_eq!(executor.num_tasks(), 3);

// Since we have cancelled the tasks above, the loops should eventually end
while executor.num_tasks() != 0 {
executor.tick(DELTA, None);
}
executor.wait_till_completed(DELTA);
}

#[test]
Expand Down Expand Up @@ -311,8 +229,8 @@ mod tests {
}

for i in 0..10 {
let woken_tasks = executor.num_woken_tasks.load(Ordering::Relaxed);
assert_eq!(woken_tasks, 10 - i);
let num_tasks = executor.num_tasks();
assert_eq!(num_tasks, 10 - i);
executor.tick(0.1, Some(1));
}

Expand Down
Loading