28
28
from _tilebox .grpc .error import InternalServerError
29
29
from tilebox .datasets .sync .dataset import DatasetClient
30
30
from tilebox .workflows .cache import JobCache
31
- from tilebox .workflows .data import ComputedTask , NextTaskToRun , Task , TaskLease
31
+ from tilebox .workflows .data import ComputedTask , Idling , NextTaskToRun , Task , TaskLease
32
32
from tilebox .workflows .interceptors import Interceptor , InterceptorType
33
33
from tilebox .workflows .observability .logging import get_logger
34
34
from tilebox .workflows .observability .tracing import WorkflowTracer
37
37
from tilebox .workflows .task import FutureTask , RunnerContext , TaskMeta
38
38
from tilebox .workflows .task import Task as TaskInstance
39
39
40
- # In seconds
40
+ # The time we give a task to finish it's execution when a runner shutdown is requested before we forcefully stop it
41
41
_SHUTDOWN_GRACE_PERIOD = timedelta (seconds = 2 )
42
- _POLL_INTERVAL = timedelta ( seconds = 5 )
43
- _JITTER_INTERVAL = timedelta ( seconds = 5 )
42
+
43
+ # Retry configuration for retrying failed requests to the workflows API
44
44
_INITIAL_RETRY_BACKOFF = timedelta (seconds = 5 )
45
45
_MAX_RETRY_BACKOFF = timedelta (hours = 1 ) # 1 hour
46
46
47
+ # A maximum idling duration, as a safeguard to avoid way too long sleep times in case the suggested idling duration is
48
+ # ever too long. 5 minutes should be plenty of time to wait.
49
+ _MAX_IDLING_DURATION = timedelta (minutes = 5 )
50
+ # A minimum idling duration, as a safeguard to avoid too short sleep times in case the suggested idling duration is
51
+ # ever too short.
52
+ _MIN_IDLING_DURATION = timedelta (milliseconds = 1 )
53
+
54
+ # Fallback polling interval and jitter in case the workflows API fails to respond with a suggested idling duration
55
+ _FALLBACK_POLL_INTERVAL = timedelta (seconds = 5 )
56
+ _FALLBACK_JITTER_INTERVAL = timedelta (seconds = 5 )
57
+
47
58
WrappedFnReturnT = TypeVar ("WrappedFnReturnT" )
48
59
49
60
@@ -96,14 +107,14 @@ def _extend_lease_while_task_is_running(
96
107
97
108
break
98
109
99
- logger .info (f"Extending task lease for { task_id = } , { task_lease = } " )
110
+ logger .debug (f"Extending task lease for { task_id = } , { task_lease = } " )
100
111
try :
101
112
# The first time we call the function, we pass the argument we received
102
113
# After that, we call it with the result of the previous call
103
114
task_lease = service .extend_task_lease (task_id , 2 * task_lease .lease )
104
115
if task_lease .lease == 0 :
105
116
# The server did not return a lease extension, it means that there is no need in trying to extend the lease
106
- logger .info (f"task lease extension not granted for task { task_id } " )
117
+ logger .debug (f"task lease extension not granted for task { task_id } " )
107
118
# even though we failed to extend the lease, let's still wait till the task is done
108
119
# otherwise we might end up with a mismatch between the task currently being executed and the task
109
120
# that we extend leases for (and the runner can anyways only execute one task at a time)
@@ -331,41 +342,59 @@ def run_all(self) -> None:
331
342
"""
332
343
self ._run (stop_when_idling = True )
333
344
334
- def _run (self , stop_when_idling : bool = True ) -> None :
345
+ def _run (self , stop_when_idling : bool = True ) -> None : # noqa: C901
335
346
"""
336
347
Run the task runner forever. This will poll for new tasks and execute them as they come in.
337
348
If no tasks are available, it will sleep for a short time and then try again.
338
349
"""
339
- task : Task | None = None
350
+ work : Task | Idling | None = None
340
351
341
352
# capture interrupt signals and delay them by a grace period in order to shut down gracefully
342
353
with _GracefulShutdown (_SHUTDOWN_GRACE_PERIOD , self ._service ) as shutdown_context :
343
354
while True :
344
- if task is None : # if we don't have a task right now, let's try to work-steal one
345
- if shutdown_context .is_shutting_down ():
355
+ if not isinstance ( work , Task ) : # if we don't have a task right now, let's try to work-steal one
356
+ if shutdown_context .is_shutting_down (): # unless we received an interrupt, then we shut down
346
357
return
347
358
try :
348
- task = self ._service .next_task (task_to_run = self .tasks_to_run , computed_task = None )
359
+ work = self ._service .next_task (task_to_run = self .tasks_to_run , computed_task = None )
349
360
except InternalServerError as e :
350
361
# We do not need to retry here, since the task runner will sleep for a while and then anyways request this again.
351
362
self .logger .error (f"Failed to get next task with error { e } " )
352
363
353
- if task is not None : # we have a task to execute
364
+ if isinstance (work , Task ): # we received a task to execute
365
+ task = work
354
366
if task .retry_count > 0 :
355
367
self .logger .debug (f"Retrying task { task .id } that failed { task .retry_count } times" )
356
- task = self ._execute (task , shutdown_context ) # submitting the task gives us the next one
357
- else : # if we didn't get a task, let's sleep for a bit and try work-stealing again
358
- self .logger .debug ("No task to run" )
368
+ work = self ._execute (task , shutdown_context ) # submitting the task gives us the next work item
369
+ elif isinstance ( work , Idling ) : # we received an idling response, so let's sleep for a bit
370
+ self .logger .debug ("No task to run, idling " )
359
371
if stop_when_idling : # if stop_when_idling is set, we can just return
360
372
return
373
+
361
374
# now sleep for a bit and then try again, unless we receive an interrupt
362
- shutdown_context .sleep (
363
- _POLL_INTERVAL .total_seconds () + random .uniform (0 , _JITTER_INTERVAL .total_seconds ()) # noqa: S311
364
- )
375
+ idling_duration = work .suggested_idling_duration
376
+ idling_duration = min (idling_duration , _MAX_IDLING_DURATION )
377
+ idling_duration = max (idling_duration , _MIN_IDLING_DURATION )
378
+ shutdown_context .sleep (idling_duration .total_seconds ())
365
379
if shutdown_context .is_shutting_down ():
366
380
return
381
+ else : # work is None
382
+ # we didn't receive an idling response, but also not a task. This only happens if we didn't request
383
+ # a task to run, indicating that we are shutting down.
384
+ if shutdown_context .is_shutting_down ():
385
+ return
386
+
387
+ fallback_interval = _FALLBACK_POLL_INTERVAL .total_seconds () + random .uniform ( # noqa: S311
388
+ 0 , _FALLBACK_JITTER_INTERVAL .total_seconds ()
389
+ )
390
+ self .logger .debug (
391
+ f"Didn't receive a task to run, nor an idling response, but runner is not shutting down. "
392
+ f"Falling back to a default idling period of { fallback_interval :.2f} s"
393
+ )
394
+
395
+ shutdown_context .sleep (fallback_interval )
367
396
368
- def _execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | None :
397
+ def _execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | Idling | None :
369
398
try :
370
399
return self ._try_execute (task , shutdown_context )
371
400
except Exception as e :
@@ -380,7 +409,7 @@ def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | No
380
409
task_failed_retry (task , e )
381
410
return None
382
411
383
- def _try_execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | None :
412
+ def _try_execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | Idling | None :
384
413
if task .job is None :
385
414
raise ValueError (f"Task { task .id } has no job associated with it." )
386
415
0 commit comments