3333// ///////////////////////////////////////////////////////////////////
3434#include " P_EventSystem.h"
3535#include " iocore/eventsystem/Lock.h"
36+ #include " tscore/ink_hrtime.h"
3637
3738#if HAVE_EVENTFD
3839#include < sys/eventfd.h>
@@ -53,6 +54,7 @@ char const *const EThread::Metrics::Slice::STAT_NAME[] = {
5354};
5455
5556int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
57+ int loop_time_update_probability = 10 ;
5658
5759// To define a class inherits from Thread:
5860// 1) Define an independent thread_local static member
@@ -144,26 +146,33 @@ EThread::set_event_type(EventType et)
144146 event_types |= (1 << static_cast <int >(et));
145147}
146148
147- void
148- EThread::process_event (Event *e, int calling_code)
149+ ink_hrtime
150+ EThread::process_event (Event *e, int calling_code, ink_hrtime event_time )
149151{
150152 ink_assert ((!e->in_the_prot_queue && !e->in_the_priority_queue ));
151153 WEAK_MUTEX_TRY_LOCK (lock, e->mutex , this );
152154 if (!lock.is_locked ()) {
153- e->timeout_at = ink_get_hrtime () + DELAY_FOR_RETRY;
155+ e->timeout_at = event_time + DELAY_FOR_RETRY;
154156 EventQueueExternal.enqueue_local (e);
155157 } else {
156158 if (e->cancelled ) {
157159 MUTEX_RELEASE (lock);
158160 free_event (e);
159- return ;
161+ return event_time ;
160162 }
161163 Continuation *c_temp = e->continuation ;
162164
163165 // Restore the client IP debugging flags
164166 set_cont_flags (e->continuation ->control_flags );
165167
166168 e->continuation ->handleEvent (calling_code, e);
169+ if (loop_time_update_probability == 100 ) {
170+ event_time = ink_get_hrtime ();
171+ } else if (loop_time_update_probability > 0 ) {
172+ if (static_cast <int >(generator.random () % 100 ) < loop_time_update_probability) {
173+ event_time = ink_get_hrtime ();
174+ }
175+ }
167176 ink_assert (!e->in_the_priority_queue );
168177 ink_assert (c_temp == e->continuation );
169178 MUTEX_RELEASE (lock);
@@ -172,18 +181,19 @@ EThread::process_event(Event *e, int calling_code)
172181 if (e->period < 0 ) {
173182 e->timeout_at = e->period ;
174183 } else {
175- e->timeout_at = ink_get_hrtime () + e->period ;
184+ e->timeout_at = event_time + e->period ;
176185 }
177186 EventQueueExternal.enqueue_local (e);
178187 }
179188 } else if (!e->in_the_prot_queue && !e->in_the_priority_queue ) {
180189 free_event (e);
181190 }
182191 }
192+ return event_time;
183193}
184194
185- void
186- EThread::process_queue (Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
195+ ink_hrtime
196+ EThread::process_queue (Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count, ink_hrtime event_time )
187197{
188198 Event *e;
189199
@@ -198,9 +208,9 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_
198208 free_event (e);
199209 } else if (!e->timeout_at ) { // IMMEDIATE
200210 ink_assert (e->period == 0 );
201- process_event (e, e->callback_event );
211+ event_time = process_event (e, e->callback_event , event_time );
202212 } else if (e->timeout_at > 0 ) { // INTERVAL
203- EventQueue.enqueue (e, ink_get_hrtime () );
213+ EventQueue.enqueue (e, event_time );
204214 } else { // NEGATIVE
205215 Event *p = nullptr ;
206216 Event *a = NegativeQueue->head ;
@@ -216,6 +226,7 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_
216226 }
217227 ++(*nq_count);
218228 }
229+ return event_time;
219230}
220231
221232void
@@ -228,8 +239,10 @@ EThread::execute_regular()
228239 ink_hrtime loop_start_time; // Time the loop started.
229240 ink_hrtime loop_finish_time; // Time at the end of the loop.
230241
242+ loop_start_time = ink_get_hrtime ();
243+
231244 // Track this so we can update on boundary crossing.
232- auto prev_slice = this ->metrics .prev_slice (metrics._slice .data () + (ink_get_hrtime () / HRTIME_SECOND) % Metrics::N_SLICES);
245+ auto prev_slice = this ->metrics .prev_slice (metrics._slice .data () + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES);
233246
234247 int nq_count;
235248 int ev_count;
@@ -241,9 +254,8 @@ EThread::execute_regular()
241254
242255 // give priority to immediate events
243256 while (!TSSystemState::is_event_system_shut_down ()) {
244- loop_start_time = ink_get_hrtime ();
245- nq_count = 0 ; // count # of elements put on negative queue.
246- ev_count = 0 ; // # of events handled.
257+ nq_count = 0 ; // count # of elements put on negative queue.
258+ ev_count = 0 ; // # of events handled.
247259
248260 current_slice = metrics._slice .data () + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES;
249261 metrics.current_slice .store (current_slice, std::memory_order_release);
@@ -256,37 +268,37 @@ EThread::execute_regular()
256268 }
257269 ++(current_slice->_count ); // loop started, bump count.
258270
259- process_queue (&NegativeQueue, &ev_count, &nq_count);
271+ ink_hrtime event_time = process_queue (&NegativeQueue, &ev_count, &nq_count, loop_start_time );
260272
261273 bool done_one;
262274 do {
263275 done_one = false ;
264276 // execute all the eligible internal events
265- EventQueue.check_ready (loop_start_time , this );
266- while ((e = EventQueue.dequeue_ready (ink_get_hrtime () ))) {
277+ EventQueue.check_ready (event_time , this );
278+ while ((e = EventQueue.dequeue_ready (event_time ))) {
267279 ink_assert (e);
268280 ink_assert (e->timeout_at > 0 );
269281 if (e->cancelled ) {
270282 free_event (e);
271283 } else {
272- done_one = true ;
273- process_event (e, e->callback_event );
284+ done_one = true ;
285+ event_time = process_event (e, e->callback_event , event_time );
274286 }
275287 }
276288 } while (done_one);
277289
278290 // execute any negative (poll) events
279291 if (NegativeQueue.head ) {
280- process_queue (&NegativeQueue, &ev_count, &nq_count);
292+ event_time = process_queue (&NegativeQueue, &ev_count, &nq_count, event_time );
281293
282294 // execute poll events
283295 while ((e = NegativeQueue.dequeue ())) {
284- process_event (e, EVENT_POLL);
296+ event_time = process_event (e, EVENT_POLL, event_time );
285297 }
286298 }
287299
288300 next_time = EventQueue.earliest_timeout ();
289- ink_hrtime sleep_time = next_time - ink_get_hrtime () ;
301+ ink_hrtime sleep_time = next_time - event_time ;
290302 if (sleep_time > 0 ) {
291303 if (EventQueueExternal.localQueue .empty ()) {
292304 sleep_time = std::min (sleep_time, HRTIME_MSECONDS (thread_max_heartbeat_mseconds));
@@ -301,7 +313,7 @@ EThread::execute_regular()
301313 }
302314
303315 // drained the queue by this point
304- ink_hrtime post_drain = ink_get_hrtime () ;
316+ ink_hrtime post_drain = event_time ;
305317 ink_hrtime drain_queue = post_drain - loop_start_time;
306318
307319 // watchdog kick - pre-sleep
@@ -320,7 +332,8 @@ EThread::execute_regular()
320332 // @a delta can be negative due to time of day adjustments (which apparently happen quite frequently). I
321333 // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
322334 // of milliseconds), far too much to be actually used.
323- delta = std::max<ink_hrtime>(0 , loop_finish_time - loop_start_time);
335+ delta = std::max<ink_hrtime>(0 , loop_finish_time - loop_start_time);
336+ loop_start_time = loop_finish_time;
324337
325338 metrics.decay ();
326339 metrics.record_loop_time (delta);
0 commit comments