From 9390448830ff820c669ddb6b085748fc7c4636cf Mon Sep 17 00:00:00 2001 From: Brian Gunnarson <49216024+bgunnar5@users.noreply.github.com> Date: Thu, 22 Feb 2024 10:15:25 -0800 Subject: [PATCH] feature/retry_priority (#468) * remove a merge conflict statement that was missed * add a 'pip freeze' call in github workflow to view reqs versions * add new retry priority as highest task priority * update CHANGELOG * add in MID priority * change default priority to use priority map MID value --- .github/workflows/push-pr_workflow.yml | 1 + CHANGELOG.md | 4 +++ merlin/common/tasks.py | 4 +-- merlin/config/utils.py | 48 +++++++++++++++----------- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml index eecbf3eeb..4b5de2373 100644 --- a/.github/workflows/push-pr_workflow.yml +++ b/.github/workflows/push-pr_workflow.yml @@ -95,6 +95,7 @@ jobs: python3 -m pip install --upgrade pip if [ -f requirements.txt ]; then pip install -r requirements.txt; fi pip3 install -r requirements/dev.txt + pip freeze - name: Install singularity run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index c7862710a..9e198bcbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to Merlin will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] +### Added +- New Priority.RETRY value for the Celery task priorities. This will be the new highest priority. + ## [1.12.0] ### Added - A new command `merlin queue-info` that will print the status of your celery queues diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 15be1182f..97ade4177 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -139,7 +139,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq f"Step '{step_name}' in '{step_dir}' is being restarted ({self.request.retries + 1}/{self.max_retries})..." ) step.mstep.mark_restart() - self.retry(countdown=step.retry_delay) + self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY)) except MaxRetriesExceededError: LOG.warning( f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RESTART command, @@ -155,7 +155,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq f"Step '{step_name}' in '{step_dir}' is being retried ({self.request.retries + 1}/{self.max_retries})..." ) step.mstep.mark_restart() - self.retry(countdown=step.retry_delay) + self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY)) except MaxRetriesExceededError: LOG.warning( f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RETRY command, diff --git a/merlin/config/utils.py b/merlin/config/utils.py index f0380b63c..bb0dcd58b 100644 --- a/merlin/config/utils.py +++ b/merlin/config/utils.py @@ -30,7 +30,7 @@ """This module contains priority handling""" import enum -from typing import List +from typing import Dict from merlin.config.configfile import CONFIG @@ -41,6 +41,7 @@ class Priority(enum.Enum): HIGH = 1 MID = 2 LOW = 3 + RETRY = 4 def is_rabbit_broker(broker: str) -> bool: @@ -53,26 +54,31 @@ def is_redis_broker(broker: str) -> bool: return broker in ["redis", "rediss", "redis+socket"] +def determine_priority_map(broker_name: str) -> Dict[Priority, int]: + """ + Returns the priority mapping for the given broker name. + + :param broker_name: The name of the broker that we need the priority map for + :returns: The priority map associated with `broker_name` + """ + if is_rabbit_broker(broker_name): + return {Priority.LOW: 1, Priority.MID: 5, Priority.HIGH: 9, Priority.RETRY: 10} + if is_redis_broker(broker_name): + return {Priority.LOW: 10, Priority.MID: 5, Priority.HIGH: 2, Priority.RETRY: 1} + + raise ValueError(f"Unsupported broker name: {broker_name}") + + def get_priority(priority: Priority) -> int: """ - Get the priority based on the broker. For a rabbit broker - a low priority is 1 and high is 10. For redis it's the opposite. - :returns: An int representing the priority level + Gets the priority level as an integer based on the broker. + For a rabbit broker a low priority is 1 and high is 10. For redis it's the opposite. + + :param priority: The priority value that we want + :returns: The priority value as an integer """ - broker: str = CONFIG.broker.name.lower() - priorities: List[Priority] = [Priority.HIGH, Priority.MID, Priority.LOW] - if not isinstance(priority, Priority): - raise TypeError(f"Unrecognized priority '{priority}'! Priority enum options: {[x.name for x in priorities]}") - if priority == Priority.MID: - return 5 - if is_rabbit_broker(broker): - if priority == Priority.LOW: - return 1 - if priority == Priority.HIGH: - return 10 - if is_redis_broker(broker): - if priority == Priority.LOW: - return 10 - if priority == Priority.HIGH: - return 1 - raise ValueError(f"Function get_priority has reached unknown state! Maybe unsupported broker {broker}?") + if priority not in Priority: + raise ValueError(f"Invalid priority: {priority}") + + priority_map = determine_priority_map(CONFIG.broker.name.lower()) + return priority_map.get(priority, priority_map[Priority.MID]) # Default to MID priority for unknown priorities