Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repos:
- id: end-of-file-fixer
- repo: https://github.com/charliermarsh/ruff-pre-commit
# keep the version here in sync with the version in uv.lock
rev: "v0.12.2"
rev: "v0.12.7"
hooks:
- id: ruff-check
args: [--fix, --exit-non-zero-on-fix]
Expand Down
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.41.0] - 2025-08-01

### Added

- `tilebox-workflows`: Task runners now support receiving a suggested idling duration from the workflows API

### Fixed

- `tilebox-workflows`: Change task lease extension logging message to `DEBUG` level

## [0.40.0] - 2025-07-29

### Added
Expand Down Expand Up @@ -223,7 +233,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`


[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.40.0...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.41.0...HEAD
[0.41.0]: https://github.com/tilebox/tilebox-python/compare/v0.40.0...v0.41.0
[0.40.0]: https://github.com/tilebox/tilebox-python/compare/v0.39.0...v0.40.0
[0.39.0]: https://github.com/tilebox/tilebox-python/compare/v0.38.0...v0.39.0
[0.38.0]: https://github.com/tilebox/tilebox-python/compare/v0.37.1...v0.38.0
Expand Down
Git LFS file not shown
Git LFS file not shown
13 changes: 13 additions & 0 deletions tilebox-workflows/tests/tasks_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import string
from datetime import timedelta

from hypothesis.strategies import (
DrawFn,
Expand All @@ -26,6 +27,7 @@
Cluster,
ComputedTask,
CronTrigger,
Idling,
Job,
JobState,
StorageEventTrigger,
Expand Down Expand Up @@ -71,6 +73,17 @@ def tasks(draw: DrawFn) -> Task:
return Task(task_id, identifier, state, task_input, display, job, parent_id, depends_on, lease, retry_count)


@composite
def idling_responses(draw: DrawFn) -> Idling:
"""A hypothesis strategy for generating random idling_responses"""
return Idling(
timedelta(
seconds=draw(integers(min_value=0, max_value=60 * 60)),
milliseconds=draw(integers(min_value=0, max_value=1000)),
)
)


@composite
def task_identifiers(draw: DrawFn) -> TaskIdentifier:
"""A hypothesis strategy for generating random task_identifiers"""
Expand Down
7 changes: 7 additions & 0 deletions tilebox-workflows/tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
automations,
clusters,
computed_tasks,
idling_responses,
jobs,
storage_locations,
task_identifiers,
Expand All @@ -15,6 +16,7 @@
AutomationPrototype,
Cluster,
ComputedTask,
Idling,
Job,
StorageLocation,
Task,
Expand All @@ -34,6 +36,11 @@ def test_tasks_to_message_and_back(task: Task) -> None:
assert Task.from_message(task.to_message()) == task


@given(idling_responses())
def test_idling_responses_to_message_and_back(idling: Idling) -> None:
assert Idling.from_message(idling.to_message()) == idling


@given(jobs())
def test_jobs_to_message_and_back(job: Job) -> None:
assert Job.from_message(job.to_message()) == job
Expand Down
24 changes: 22 additions & 2 deletions tilebox-workflows/tilebox/workflows/data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import warnings
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from functools import lru_cache
from pathlib import Path
Expand All @@ -14,7 +14,13 @@

from tilebox.datasets.query.id_interval import IDInterval
from tilebox.datasets.query.pagination import Pagination
from tilebox.datasets.query.time_interval import TimeInterval, datetime_to_timestamp, timestamp_to_datetime
from tilebox.datasets.query.time_interval import (
TimeInterval,
datetime_to_timestamp,
duration_to_timedelta,
timedelta_to_duration,
timestamp_to_datetime,
)
from tilebox.datasets.uuid import uuid_message_to_optional_uuid, uuid_message_to_uuid, uuid_to_uuid_message

try:
Expand Down Expand Up @@ -149,6 +155,20 @@ def to_message(self) -> core_pb2.Task:
)


@dataclass(order=True)
class Idling:
suggested_idling_duration: timedelta

@classmethod
def from_message(cls, idling: task_pb2.IdlingResponse) -> "Idling":
"""Convert a Idling protobuf message to a Idling object."""
return cls(suggested_idling_duration=duration_to_timedelta(idling.suggested_idling_duration))

def to_message(self) -> task_pb2.IdlingResponse:
"""Convert a Idling object to a Idling protobuf message."""
return task_pb2.IdlingResponse(suggested_idling_duration=timedelta_to_duration(self.suggested_idling_duration))


class JobState(Enum):
UNSPECIFIED = 0
QUEUED = 1
Expand Down
69 changes: 49 additions & 20 deletions tilebox-workflows/tilebox/workflows/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from _tilebox.grpc.error import InternalServerError
from tilebox.datasets.sync.dataset import DatasetClient
from tilebox.workflows.cache import JobCache
from tilebox.workflows.data import ComputedTask, NextTaskToRun, Task, TaskLease
from tilebox.workflows.data import ComputedTask, Idling, NextTaskToRun, Task, TaskLease
from tilebox.workflows.interceptors import Interceptor, InterceptorType
from tilebox.workflows.observability.logging import get_logger
from tilebox.workflows.observability.tracing import WorkflowTracer
Expand All @@ -37,13 +37,24 @@
from tilebox.workflows.task import FutureTask, RunnerContext, TaskMeta
from tilebox.workflows.task import Task as TaskInstance

# In seconds
# The time we give a task to finish it's execution when a runner shutdown is requested before we forcefully stop it
_SHUTDOWN_GRACE_PERIOD = timedelta(seconds=2)
_POLL_INTERVAL = timedelta(seconds=5)
_JITTER_INTERVAL = timedelta(seconds=5)

# Retry configuration for retrying failed requests to the workflows API
_INITIAL_RETRY_BACKOFF = timedelta(seconds=5)
_MAX_RETRY_BACKOFF = timedelta(hours=1) # 1 hour

# A maximum idling duration, as a safeguard to avoid way too long sleep times in case the suggested idling duration is
# ever too long. 5 minutes should be plenty of time to wait.
_MAX_IDLING_DURATION = timedelta(minutes=5)
# A minimum idling duration, as a safeguard to avoid too short sleep times in case the suggested idling duration is
# ever too short.
_MIN_IDLING_DURATION = timedelta(milliseconds=1)

# Fallback polling interval and jitter in case the workflows API fails to respond with a suggested idling duration
_FALLBACK_POLL_INTERVAL = timedelta(seconds=5)
_FALLBACK_JITTER_INTERVAL = timedelta(seconds=5)

WrappedFnReturnT = TypeVar("WrappedFnReturnT")


Expand Down Expand Up @@ -96,14 +107,14 @@ def _extend_lease_while_task_is_running(

break

logger.info(f"Extending task lease for {task_id=}, {task_lease=}")
logger.debug(f"Extending task lease for {task_id=}, {task_lease=}")
try:
# The first time we call the function, we pass the argument we received
# After that, we call it with the result of the previous call
task_lease = service.extend_task_lease(task_id, 2 * task_lease.lease)
if task_lease.lease == 0:
# The server did not return a lease extension, it means that there is no need in trying to extend the lease
logger.info(f"task lease extension not granted for task {task_id}")
logger.debug(f"task lease extension not granted for task {task_id}")
# even though we failed to extend the lease, let's still wait till the task is done
# otherwise we might end up with a mismatch between the task currently being executed and the task
# that we extend leases for (and the runner can anyways only execute one task at a time)
Expand Down Expand Up @@ -331,41 +342,59 @@ def run_all(self) -> None:
"""
self._run(stop_when_idling=True)

def _run(self, stop_when_idling: bool = True) -> None:
def _run(self, stop_when_idling: bool = True) -> None: # noqa: C901
"""
Run the task runner forever. This will poll for new tasks and execute them as they come in.
If no tasks are available, it will sleep for a short time and then try again.
"""
task: Task | None = None
work: Task | Idling | None = None

# capture interrupt signals and delay them by a grace period in order to shut down gracefully
with _GracefulShutdown(_SHUTDOWN_GRACE_PERIOD, self._service) as shutdown_context:
while True:
if task is None: # if we don't have a task right now, let's try to work-steal one
if shutdown_context.is_shutting_down():
if not isinstance(work, Task): # if we don't have a task right now, let's try to work-steal one
if shutdown_context.is_shutting_down(): # unless we received an interrupt, then we shut down
return
try:
task = self._service.next_task(task_to_run=self.tasks_to_run, computed_task=None)
work = self._service.next_task(task_to_run=self.tasks_to_run, computed_task=None)
except InternalServerError as e:
# We do not need to retry here, since the task runner will sleep for a while and then anyways request this again.
self.logger.error(f"Failed to get next task with error {e}")

if task is not None: # we have a task to execute
if isinstance(work, Task): # we received a task to execute
task = work
if task.retry_count > 0:
self.logger.debug(f"Retrying task {task.id} that failed {task.retry_count} times")
task = self._execute(task, shutdown_context) # submitting the task gives us the next one
else: # if we didn't get a task, let's sleep for a bit and try work-stealing again
self.logger.debug("No task to run")
work = self._execute(task, shutdown_context) # submitting the task gives us the next work item
elif isinstance(work, Idling): # we received an idling response, so let's sleep for a bit
self.logger.debug("No task to run, idling")
if stop_when_idling: # if stop_when_idling is set, we can just return
return

# now sleep for a bit and then try again, unless we receive an interrupt
shutdown_context.sleep(
_POLL_INTERVAL.total_seconds() + random.uniform(0, _JITTER_INTERVAL.total_seconds()) # noqa: S311
)
idling_duration = work.suggested_idling_duration
idling_duration = min(idling_duration, _MAX_IDLING_DURATION)
idling_duration = max(idling_duration, _MIN_IDLING_DURATION)
shutdown_context.sleep(idling_duration.total_seconds())
if shutdown_context.is_shutting_down():
return
else: # work is None
# we didn't receive an idling response, but also not a task. This only happens if we didn't request
# a task to run, indicating that we are shutting down.
if shutdown_context.is_shutting_down():
return

fallback_interval = _FALLBACK_POLL_INTERVAL.total_seconds() + random.uniform( # noqa: S311
0, _FALLBACK_JITTER_INTERVAL.total_seconds()
)
self.logger.debug(
f"Didn't receive a task to run, nor an idling response, but runner is not shutting down. "
f"Falling back to a default idling period of {fallback_interval:.2f}s"
)

shutdown_context.sleep(fallback_interval)

def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | None:
def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | Idling | None:
try:
return self._try_execute(task, shutdown_context)
except Exception as e:
Expand All @@ -380,7 +409,7 @@ def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | No
task_failed_retry(task, e)
return None

def _try_execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | None:
def _try_execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | Idling | None:
if task.job is None:
raise ValueError(f"Task {task.id} has no job associated with it.")

Expand Down
14 changes: 8 additions & 6 deletions tilebox-workflows/tilebox/workflows/runner/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from _tilebox.grpc.error import with_pythonic_errors
from tilebox.workflows.data import (
ComputedTask,
Idling,
NextTaskToRun,
Task,
TaskLease,
Expand All @@ -32,18 +33,19 @@ def __init__(self, channel: Channel) -> None:
"""
self.service = with_pythonic_errors(TaskServiceStub(channel))

def next_task(self, task_to_run: NextTaskToRun | None, computed_task: ComputedTask | None) -> Task | None:
def next_task(self, task_to_run: NextTaskToRun | None, computed_task: ComputedTask | None) -> Task | Idling | None:
computed_task_message = None if computed_task is None else computed_task.to_message()
task_to_run_message = None if task_to_run is None else task_to_run.to_message()

response: NextTaskResponse = self.service.NextTask(
NextTaskRequest(computed_task=computed_task_message, next_task_to_run=task_to_run_message)
)
return (
Task.from_message(response.next_task)
if response.next_task is not None and response.next_task.id.uuid
else None
)

if response.next_task is not None and response.next_task.id.uuid:
return Task.from_message(response.next_task)
if response.idling is not None:
return Idling.from_message(response.idling)
return None

def task_failed(self, task: Task, error: Exception, cancel_job: bool = True) -> None:
# job ouptut is limited to 1KB, so truncate the error message if necessary
Expand Down
Loading
Loading