@@ -21,16 +21,24 @@ impl<'a> DeadlineQueue<'a> {
2121 Self { queue : VecDeque :: with_capacity ( capacity) }
2222 }
2323
24+ fn now ( & self ) -> Instant {
25+ Instant :: now ( )
26+ }
27+
2428 pub ( crate ) fn push ( & mut self , id : TestId , test : & ' a CollectedTest ) {
25- let deadline = Instant :: now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
29+ let deadline = self . now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
30+ if let Some ( back) = self . queue . back ( ) {
31+ assert ! ( back. deadline <= deadline) ;
32+ }
2633 self . queue . push_back ( DeadlineEntry { id, test, deadline } ) ;
2734 }
2835
29- /// Equivalent to `rx.read ()`, except that if any test exceeds its deadline
36+ /// Equivalent to `rx.recv ()`, except that if a test exceeds its deadline
3037 /// during the wait, the given callback will also be called for that test.
3138 pub ( crate ) fn read_channel_while_checking_deadlines < T > (
3239 & mut self ,
3340 rx : & mpsc:: Receiver < T > ,
41+ is_running : impl Fn ( TestId ) -> bool ,
3442 mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
3543 ) -> Result < T , RecvError > {
3644 loop {
@@ -39,18 +47,18 @@ impl<'a> DeadlineQueue<'a> {
3947 // deadline, so do a normal receive.
4048 return rx. recv ( ) ;
4149 } ;
42- let wait_duration = next_deadline. saturating_duration_since ( Instant :: now ( ) ) ;
50+ let next_deadline_timeout = next_deadline. saturating_duration_since ( self . now ( ) ) ;
51+
52+ let recv_result = rx. recv_timeout ( next_deadline_timeout) ;
53+ // Process deadlines after every receive attempt, regardless of
54+ // outcome, so that we don't build up an unbounded backlog of stale
55+ // entries due to a constant stream of tests finishing.
56+ self . for_each_entry_past_deadline ( & is_running, & mut on_deadline_passed) ;
4357
44- let recv_result = rx. recv_timeout ( wait_duration) ;
4558 match recv_result {
4659 Ok ( value) => return Ok ( value) ,
47- Err ( RecvTimeoutError :: Timeout ) => {
48- // Notify the callback of tests that have exceeded their
49- // deadline, then loop and do annother channel read.
50- for DeadlineEntry { id, test, .. } in self . remove_tests_past_deadline ( ) {
51- on_deadline_passed ( id, test) ;
52- }
53- }
60+ // Deadlines have already been processed, so loop and do another receive.
61+ Err ( RecvTimeoutError :: Timeout ) => { }
5462 Err ( RecvTimeoutError :: Disconnected ) => return Err ( RecvError ) ,
5563 }
5664 }
@@ -60,14 +68,28 @@ impl<'a> DeadlineQueue<'a> {
6068 Some ( self . queue . front ( ) ?. deadline )
6169 }
6270
63- fn remove_tests_past_deadline ( & mut self ) -> Vec < DeadlineEntry < ' a > > {
64- let now = Instant :: now ( ) ;
65- let mut timed_out = vec ! [ ] ;
66- while let Some ( deadline_entry) = pop_front_if ( & mut self . queue , |entry| now < entry. deadline )
67- {
68- timed_out. push ( deadline_entry) ;
71+ fn for_each_entry_past_deadline (
72+ & mut self ,
73+ is_running : impl Fn ( TestId ) -> bool ,
74+ mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
75+ ) {
76+ let now = self . now ( ) ;
77+
78+ // Clear out entries that are past their deadline, but only invoke the
79+ // callback for tests that are still considered running.
80+ while let Some ( entry) = pop_front_if ( & mut self . queue , |entry| entry. deadline <= now) {
81+ if is_running ( entry. id ) {
82+ on_deadline_passed ( entry. id , entry. test ) ;
83+ }
84+ }
85+
86+ // Also clear out any leading entries that are no longer running, even
87+ // if their deadline hasn't been reached.
88+ while let Some ( _) = pop_front_if ( & mut self . queue , |entry| !is_running ( entry. id ) ) { }
89+
90+ if let Some ( front) = self . queue . front ( ) {
91+ assert ! ( now < front. deadline) ;
6992 }
70- timed_out
7193 }
7294}
7395
0 commit comments