Skip to content

Commit 005edae

Browse files
committed
Revert "Remove spawn API"
This reverts commit 339580d.
1 parent 44d5a32 commit 005edae

File tree

2 files changed

+34
-15
lines changed

2 files changed

+34
-15
lines changed

src/ticked_async_executor.rs

+30-11
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type Payload = (TaskIdentifier, async_task::Runnable);
2222
pub struct TickedAsyncExecutor<O> {
2323
channel: (mpsc::Sender<Payload>, mpsc::Receiver<Payload>),
2424
num_woken_tasks: Arc<AtomicUsize>,
25-
2625
num_spawned_tasks: Arc<AtomicUsize>,
2726

2827
// TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel
@@ -53,6 +52,22 @@ where
5352
}
5453
}
5554

55+
pub fn spawn<T>(
56+
&self,
57+
identifier: impl Into<TaskIdentifier>,
58+
future: impl Future<Output = T> + Send + 'static,
59+
) -> Task<T>
60+
where
61+
T: Send + 'static,
62+
{
63+
let identifier = identifier.into();
64+
let future = self.droppable_future(identifier.clone(), future);
65+
let schedule = self.runnable_schedule_cb(identifier);
66+
let (runnable, task) = async_task::spawn(future, schedule);
67+
runnable.schedule();
68+
task
69+
}
70+
5671
pub fn spawn_local<T>(
5772
&self,
5873
identifier: impl Into<TaskIdentifier>,
@@ -157,7 +172,7 @@ mod tests {
157172
fn test_multiple_tasks() {
158173
let executor = TickedAsyncExecutor::default();
159174
executor
160-
.spawn_local("A", async move {
175+
.spawn("A", async move {
161176
tokio::task::yield_now().await;
162177
})
163178
.detach();
@@ -211,6 +226,15 @@ mod tests {
211226
fn test_ticked_timer() {
212227
let executor = TickedAsyncExecutor::default();
213228

229+
for _ in 0..10 {
230+
let timer: TickedTimer = executor.create_timer();
231+
executor
232+
.spawn("ThreadedTimer", async move {
233+
timer.sleep_for(256.0).await;
234+
})
235+
.detach();
236+
}
237+
214238
for _ in 0..10 {
215239
let timer = executor.create_timer();
216240
executor
@@ -231,30 +255,25 @@ mod tests {
231255
let elapsed = now.elapsed();
232256
println!("Elapsed: {:?}", elapsed);
233257
println!("Total: {:?}", instances);
234-
println!(
235-
"Min: {:?}, Max: {:?}",
236-
instances.iter().min(),
237-
instances.iter().max()
238-
);
239258

240259
// Test Timer cancellation
241260
let timer = executor.create_timer();
242261
executor
243-
.spawn_local("LocalFuture1", async move {
262+
.spawn("ThreadedFuture", async move {
244263
timer.sleep_for(1000.0).await;
245264
})
246265
.detach();
247266

248267
let timer = executor.create_timer();
249268
executor
250-
.spawn_local("LocalFuture2", async move {
269+
.spawn_local("LocalFuture", async move {
251270
timer.sleep_for(1000.0).await;
252271
})
253272
.detach();
254273

255274
let mut tick_event = executor.tick_channel();
256275
executor
257-
.spawn_local("LocalTickFuture1", async move {
276+
.spawn("ThreadedTickFuture", async move {
258277
loop {
259278
let _r = tick_event.changed().await;
260279
if _r.is_err() {
@@ -266,7 +285,7 @@ mod tests {
266285

267286
let mut tick_event = executor.tick_channel();
268287
executor
269-
.spawn_local("LocalTickFuture2", async move {
288+
.spawn_local("LocalTickFuture", async move {
270289
loop {
271290
let _r = tick_event.changed().await;
272291
if _r.is_err() {

tests/tokio_tests.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ fn test_tokio_join() {
99
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
1010
let (tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
1111
executor
12-
.spawn_local("LocalFuture1", async move {
12+
.spawn("ThreadedFuture", async move {
1313
let (a, b) = tokio::join!(rx1.recv(), rx2.recv());
1414
assert_eq!(a.unwrap(), 10);
1515
assert_eq!(b.unwrap(), 20);
@@ -19,7 +19,7 @@ fn test_tokio_join() {
1919
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
2020
let (tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
2121
executor
22-
.spawn_local("LocalFuture2", async move {
22+
.spawn("LocalFuture", async move {
2323
let (a, b) = tokio::join!(rx3.recv(), rx4.recv());
2424
assert_eq!(a.unwrap(), 10);
2525
assert_eq!(b.unwrap(), 20);
@@ -46,7 +46,7 @@ fn test_tokio_select() {
4646
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
4747
let (_tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
4848
executor
49-
.spawn_local("LocalFuture1", async move {
49+
.spawn("ThreadedFuture", async move {
5050
tokio::select! {
5151
data = rx1.recv() => {
5252
assert_eq!(data.unwrap(), 10);
@@ -59,7 +59,7 @@ fn test_tokio_select() {
5959
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
6060
let (_tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
6161
executor
62-
.spawn_local("LocalFuture2", async move {
62+
.spawn("LocalFuture", async move {
6363
tokio::select! {
6464
data = rx3.recv() => {
6565
assert_eq!(data.unwrap(), 10);

0 commit comments

Comments
 (0)