Skip to content

Commit f342e3f

Browse files
committed
Add limit to tick
1 parent 51d01f7 commit f342e3f

File tree

2 files changed

+52
-14
lines changed

2 files changed

+52
-14
lines changed

src/ticked_async_executor.rs

+48-10
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,23 @@ where
7878
/// `delta` is used for timing based operations
7979
/// - `TickedTimer` uses this delta value to tick till completion
8080
///
81-
/// `maybe_limit` is used to limit the number of woken tasks run per tick
81+
/// `limit` is used to limit the number of woken tasks run per tick
8282
/// - None would imply that there is no limit (all woken tasks would run)
8383
/// - Some(limit) would imply that [0..limit] woken tasks would run,
8484
/// even if more tasks are woken.
8585
///
8686
/// Tick is !Sync i.e cannot be invoked from multiple threads
8787
///
8888
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
89-
pub fn tick(&self, delta: f64) {
89+
pub fn tick(&self, delta: f64, limit: Option<usize>) {
9090
let _r = self.tick_event.send(delta);
9191

92-
// Clamp woken tasks to limit
93-
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
92+
let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
93+
if let Some(limit) = limit {
94+
// Woken tasks should not exceed the allowed limit
95+
num_woken_tasks = num_woken_tasks.min(limit);
96+
}
97+
9498
self.channel
9599
.1
96100
.try_iter()
@@ -153,6 +157,20 @@ mod tests {
153157

154158
const DELTA: f64 = 1000.0 / 60.0;
155159

160+
#[test]
161+
fn test_one_task() {
162+
const DELTA: f64 = 1.0 / 60.0;
163+
const LIMIT: Option<usize> = None;
164+
165+
let executor = TickedAsyncExecutor::default();
166+
167+
executor.spawn_local("MyIdentifier", async move {}).detach();
168+
169+
// Make sure to tick your executor to run the tasks
170+
executor.tick(DELTA, LIMIT);
171+
assert_eq!(executor.num_tasks(), 0);
172+
}
173+
156174
#[test]
157175
fn test_multiple_tasks() {
158176
let executor = TickedAsyncExecutor::default();
@@ -168,10 +186,10 @@ mod tests {
168186
})
169187
.detach();
170188

171-
executor.tick(DELTA);
189+
executor.tick(DELTA, None);
172190
assert_eq!(executor.num_tasks(), 2);
173191

174-
executor.tick(DELTA);
192+
executor.tick(DELTA, None);
175193
assert_eq!(executor.num_tasks(), 0);
176194
}
177195

@@ -190,7 +208,7 @@ mod tests {
190208
}
191209
});
192210
assert_eq!(executor.num_tasks(), 2);
193-
executor.tick(DELTA);
211+
executor.tick(DELTA, None);
194212

195213
executor
196214
.spawn_local("CancelTasks", async move {
@@ -203,7 +221,7 @@ mod tests {
203221

204222
// Since we have cancelled the tasks above, the loops should eventually end
205223
while executor.num_tasks() != 0 {
206-
executor.tick(DELTA);
224+
executor.tick(DELTA, None);
207225
}
208226
}
209227

@@ -224,7 +242,7 @@ mod tests {
224242
let mut instances = vec![];
225243
while executor.num_tasks() != 0 {
226244
let current = Instant::now();
227-
executor.tick(DELTA);
245+
executor.tick(DELTA, None);
228246
instances.push(current.elapsed());
229247
std::thread::sleep(Duration::from_millis(16));
230248
}
@@ -276,8 +294,28 @@ mod tests {
276294
})
277295
.detach();
278296

279-
executor.tick(DELTA);
297+
executor.tick(DELTA, None);
280298
assert_eq!(executor.num_tasks(), 4);
281299
drop(executor);
282300
}
301+
302+
#[test]
303+
fn test_limit() {
304+
let executor = TickedAsyncExecutor::default();
305+
for i in 0..10 {
306+
executor
307+
.spawn_local(format!("{i}"), async move {
308+
println!("Finish {i}");
309+
})
310+
.detach();
311+
}
312+
313+
for i in 0..10 {
314+
let woken_tasks = executor.num_woken_tasks.load(Ordering::Relaxed);
315+
assert_eq!(woken_tasks, 10 - i);
316+
executor.tick(0.1, Some(1));
317+
}
318+
319+
assert_eq!(executor.num_tasks(), 0);
320+
}
283321
}

tests/tokio_tests.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ fn test_tokio_join() {
2929
tx1.try_send(10).unwrap();
3030
tx3.try_send(10).unwrap();
3131
for _ in 0..10 {
32-
executor.tick(DELTA);
32+
executor.tick(DELTA, None);
3333
}
3434
tx2.try_send(20).unwrap();
3535
tx4.try_send(20).unwrap();
3636

3737
while executor.num_tasks() != 0 {
38-
executor.tick(DELTA);
38+
executor.tick(DELTA, None);
3939
}
4040
}
4141

@@ -70,12 +70,12 @@ fn test_tokio_select() {
7070
.detach();
7171

7272
for _ in 0..10 {
73-
executor.tick(DELTA);
73+
executor.tick(DELTA, None);
7474
}
7575

7676
tx1.try_send(10).unwrap();
7777
tx3.try_send(10).unwrap();
7878
while executor.num_tasks() != 0 {
79-
executor.tick(DELTA);
79+
executor.tick(DELTA, None);
8080
}
8181
}

0 commit comments

Comments
 (0)