Skip to content

Commit

Permalink
Move scheduler constants into their own module so that packages with …
Browse files Browse the repository at this point in the history
…native dependencies are not transitively included in the package start-up script. (y-scope#80)
  • Loading branch information
kirkrodrigues authored Nov 13, 2022
1 parent 6d35126 commit 54497a0
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from clp_py_utils.clp_config import Database
from clp_py_utils.core import read_yaml_config_file
from job_orchestration.scheduler.scheduler_data import JobStatus, TaskStatus
from job_orchestration.scheduler.constants import JobStatus, TaskStatus
from sql_adapter import SQL_Adapter

# Setup logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from clp_py_utils.pretty_size import pretty_size
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import PathsToCompress, InputConfig, OutputConfig, ClpIoConfig
from job_orchestration.scheduler.scheduler_data import JobStatus
from job_orchestration.scheduler.constants import JobStatus
from .utils.common import JobCompletionStatus

# Setup logging
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from job_orchestration.scheduler.scheduler_data import QueueName, TASK_QUEUE_HIGHEST_PRIORITY
from job_orchestration.scheduler.constants import QueueName, TASK_QUEUE_HIGHEST_PRIORITY

# Worker settings
# Force workers to consume only one task at a time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from job_orchestration.executor.celery import app
from job_orchestration.executor.utils import append_message_to_task_results_queue
from job_orchestration.job_config import ClpIoConfig, PathsToCompress
from job_orchestration.scheduler.constants import TaskStatus, TaskUpdateType
from job_orchestration.scheduler.scheduler_data import \
TaskStatus, \
TaskUpdateType, \
TaskUpdate, \
TaskFailureUpdate, \
CompressionTaskSuccessUpdate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from job_orchestration.job_config import SearchConfig
from job_orchestration.executor.celery import app
from job_orchestration.executor.utils import append_message_to_task_results_queue
from job_orchestration.scheduler.scheduler_data import TaskUpdate, TaskUpdateType, TaskStatus, \
TaskFailureUpdate
from job_orchestration.scheduler.constants import TaskUpdateType, TaskStatus
from job_orchestration.scheduler.scheduler_data import TaskUpdate, TaskFailureUpdate

# Setup logging
logger = get_task_logger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
TASK_QUEUE_LOWEST_PRIORITY = 1
TASK_QUEUE_HIGHEST_PRIORITY = 3


class QueueName:
COMPRESSION = "compression"
SEARCH = "search"


class JobStatus:
SCHEDULING = 'SCHEDULING'
SCHEDULED = 'SCHEDULED'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'


class TaskUpdateType:
COMPRESSION = 'COMPRESSION'
SEARCH = 'SEARCH'


class TaskStatus:
SUBMITTED = 'SUBMITTED'
SCHEDULED = 'SCHEDULED'
IN_PROGRESS = 'IN_PROGRESS'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.compression_task import compress
from job_orchestration.executor.search_task import search
from job_orchestration.scheduler.constants import \
QueueName, \
JobStatus, \
TaskUpdateType, \
TaskStatus
from job_orchestration.scheduler.results_consumer import ReconnectingResultsConsumer
from job_orchestration.scheduler.scheduler_data import \
CompressionJob, \
SearchJob, \
JobStatus, \
CompressionTask, \
SearchTask, \
TaskStatus, \
TaskUpdateType, \
TaskUpdate, \
TaskFailureUpdate, \
CompressionTaskSuccessUpdate, \
QueueName
CompressionTaskSuccessUpdate

# Setup logging
# Create logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,11 @@
from celery.result import AsyncResult
from pydantic import BaseModel, validator

TASK_QUEUE_LOWEST_PRIORITY = 1
TASK_QUEUE_HIGHEST_PRIORITY = 3


class QueueName:
COMPRESSION = "compression"
SEARCH = "search"


class TaskUpdateType:
COMPRESSION = 'COMPRESSION'
SEARCH = 'SEARCH'


class TaskStatus:
SUBMITTED = 'SUBMITTED'
SCHEDULED = 'SCHEDULED'
IN_PROGRESS = 'IN_PROGRESS'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'
from .constants import \
TASK_QUEUE_LOWEST_PRIORITY, \
TASK_QUEUE_HIGHEST_PRIORITY, \
TaskStatus, \
TaskUpdateType


class TaskUpdate(BaseModel):
Expand Down Expand Up @@ -93,13 +78,6 @@ class Config:
arbitrary_types_allowed = True


class JobStatus:
SCHEDULING = 'SCHEDULING'
SCHEDULED = 'SCHEDULED'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'


class CompressionJob(BaseModel):
id: int
status: str
Expand Down
2 changes: 1 addition & 1 deletion components/package-template/src/sbin/native/search
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ from clp.package_utils import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, validate_an
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import SearchConfig
from job_orchestration.scheduler.scheduler_data import JobStatus
from job_orchestration.scheduler.constants import JobStatus


async def run_function_in_process(function, *args, initializer=None, init_args=None):
Expand Down
2 changes: 1 addition & 1 deletion components/package-template/src/sbin/start-clp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ from clp.package_utils import \
validate_queue_config, \
validate_worker_config
from clp_py_utils.clp_config import CLPConfig
from job_orchestration.scheduler.scheduler_data import QueueName
from job_orchestration.scheduler.constants import QueueName


def append_docker_port_settings_for_host_ips(hostname: str, host_port: int, container_port: int, cmd: [str]):
Expand Down

0 comments on commit 54497a0

Please sign in to comment.