diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 1fdbba2..4be42c3 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -28,21 +28,20 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore # POST /tasks @log_traffic -def CreateTask(**kwargs) -> dict: # type: ignore +def CreateTask(**kwargs) -> dict[str, str]: # type: ignore """Create task. Args: - *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ try: request_body = kwargs.get("body") if request_body is None: - logger("Nothing received in request body.") + logger.error("Nothing received in request body.") raise BadRequest("No request body received.") tes_task = TesTask(**request_body) namespace = "tesk" - CreateTesTask(tes_task, namespace).response() + return CreateTesTask(tes_task, namespace).response() except Exception as e: raise InternalServerError from e diff --git a/tesk/api/ga4gh/tes/models.py b/tesk/api/ga4gh/tes/models.py index c442e1e..54bb6ba 100644 --- a/tesk/api/ga4gh/tes/models.py +++ b/tesk/api/ga4gh/tes/models.py @@ -275,7 +275,7 @@ class TesResources(BaseModel): example={"VmSize": "Standard_D64_v3"}, ) backend_parameters_strict: Optional[bool] = Field( - False, + default=False, description="If set to true, backends should fail the task if any " "backend_parameters\nkey/values are unsupported, otherwise, backends should " "attempt to run the task", diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index 3fb29b3..abb2109 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -2,7 +2,7 @@ import logging -from tesk.api.ga4gh.tes.models import TesTask +from tesk.api.ga4gh.tes.models import TesResources, TesTask from tesk.api.kubernetes.client_wrapper import KubernetesClientWrapper from tesk.api.kubernetes.constants import Constants from tesk.api.kubernetes.convert.converter import TesKubernetesConverter @@ -30,7 +30,7 @@ def __init__(self, task: TesTask, namespace=TeskConstants.tesk_namespace): self.tes_kubernetes_converter = TesKubernetesConverter(self.namespace) self.constants = Constants() - def create_task(self): + def create_task(self) -> dict[str, str]: """Create TES task.""" attempts_no = 0 while attempts_no < self.constants.job_create_attempts_no: @@ -38,10 +38,11 @@ def create_task(self): attempts_no += 1 resources = self.task.resources - if resources and resources.ram_gb: - minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() - if resources.ram_gb < minimum_ram_gb: - self.task.resources.ram_gb = minimum_ram_gb + minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() + if not self.task.resources: + self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) + if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: + self.task.resources.ram_gb = minimum_ram_gb task_master_job = ( self.tes_kubernetes_converter.from_tes_task_to_k8s_job( @@ -54,16 +55,20 @@ def create_task(self): self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map( self.task, task_master_job, - # user + # self.user ) ) + + # Create ConfigMap and Job _ = self.kubernetes_client_wrapper.create_config_map( task_master_config_map ) created_job = self.kubernetes_client_wrapper.create_job(task_master_job) - print(task_master_config_map) - print(task_master_job) - return created_job.metadata.name + + assert created_job.metadata is not None + assert created_job.metadata.name is not None + + return {"id": created_job.metadata.name} except KubernetesError as e: if ( @@ -75,7 +80,8 @@ def create_task(self): except Exception as exc: logging.error("ERROR: In createTask", exc_info=True) raise exc + return {} # Dummy return to silence mypy - def response(self) -> dict: + def response(self) -> dict[str, str]: """Create response.""" return self.create_task() diff --git a/tesk/api/kubernetes/client_wrapper.py b/tesk/api/kubernetes/client_wrapper.py index 01d25e5..de138cc 100644 --- a/tesk/api/kubernetes/client_wrapper.py +++ b/tesk/api/kubernetes/client_wrapper.py @@ -4,9 +4,16 @@ from typing import Optional from kubernetes import client, config -from kubernetes.client import V1ConfigMap, V1Job, V1LimitRangeList +from kubernetes.client import ( + V1ConfigMap, + V1Job, + V1LabelSelector, + V1LimitRangeList, + V1PodList, +) from kubernetes.utils.quantity import parse_quantity # type: ignore +from tesk.api.kubernetes.constants import Constants from tesk.constants import TeskConstants from tesk.exceptions import KubernetesError, NotFound @@ -26,6 +33,7 @@ def __init__(self, namespace=TeskConstants.tesk_namespace): self.batch_api = client.BatchV1Api() self.core_api = client.CoreV1Api() self.namespace = namespace + self.constant = Constants() def create_job(self, job: V1Job) -> V1Job: """Create a job in the Kubernetes cluster. @@ -73,9 +81,11 @@ def read_taskmaster_job(self, task_id: str) -> V1Job: name=task_id, namespace=self.namespace ) if ( - # TODO: Put all these string in constants.py - "job-type" in job.metadata.labels - and job.metadata.labels["job-type"] == "taskmaster" + job.metadata + and job.metadata.labels + and self.constant.label_jobtype_key in job.metadata.labels + and job.metadata.labels[self.constant.label_jobtype_key] + == self.constant.label_jobtype_value_taskm ): return job except KubernetesError as e: @@ -103,13 +113,14 @@ def list_limits(self, label_selector=None, limit=None) -> V1LimitRangeList: """List limit ranges in the Kubernetes cluster. Args: - label_selector: Label selector to filter limit ranges. - limit: Maximum number of limit ranges to return. + label_selector: Label selector to filter limit ranges. + limit: Maximum number of limit ranges to return. """ try: - return self.core_api.list_namespaced_limit_range( + limits: V1LimitRangeList = self.core_api.list_namespaced_limit_range( namespace=self.namespace, label_selector=label_selector, limit=limit ) + return limits except KubernetesError as e: logger.error(f"Exception when listing limits: {e}") raise @@ -118,17 +129,18 @@ def minimum_ram_gb(self) -> float: """Get the minimum amount of RAM in the cluster. Returns: - Minimum amount of RAM in the cluster in GB. + Minimum amount of RAM in the cluster in GB. """ try: min_ram = 0 limits = self.list_limits().items for limit in limits: - for item in limit.spec.limits: - if "memory" in item.min: - mem_quantity = item.min["memory"] - mem_bytes = self.quantity_to_bytes(mem_quantity) - min_ram = max(min_ram, mem_bytes) + if limit.spec: + for item in limit.spec.limits: + if item.min and "memory" in item.min: + mem_quantity = item.min["memory"] + mem_bytes = self.quantity_to_bytes(mem_quantity) + min_ram = max(min_ram, mem_bytes) return min_ram / (1024**3) except (ValueError, TypeError) as e: logger.error(f"Error in minimum_ram_gb: {e}") @@ -139,21 +151,32 @@ def minimum_ram_gb(self) -> float: def quantity_to_bytes(self, quantity: str) -> int: """Convert quantity(resource) to bytes.""" - return parse_quantity(quantity) + parsed_quantity: int = parse_quantity(quantity) + return parsed_quantity - def list_single_job_pods(self, job: V1Job): + def list_single_job_pods(self, job: V1Job) -> V1PodList: """List pods associated with a single job. Args: job: Job object to list pods for. """ - label_selector = ",".join( - f"{k}={v}" for k, v in job.spec.selector.match_labels.items() - ) try: - return self.core_api.list_namespaced_pod( - namespace=self.namespace, label_selector=label_selector - ) + if ( + job.spec + and job.spec.selector + and isinstance(job.spec.selector, V1LabelSelector) + and job.spec.selector.match_labels + ): + label_selector = ",".join( + f"{k}={v}" for k, v in job.spec.selector.match_labels.items() + ) + namespaced_pods: V1PodList = self.core_api.list_namespaced_pod( + namespace=self.namespace, label_selector=label_selector + ) + return namespaced_pods + else: + logger.error("Job spec, selector, or match_labels is None or invalid") + return V1PodList(items=[]) except KubernetesError as e: logger.error(f"Exception when listing pods: {e}") raise diff --git a/tesk/api/kubernetes/constants.py b/tesk/api/kubernetes/constants.py index 6163e57..82f8d86 100644 --- a/tesk/api/kubernetes/constants.py +++ b/tesk/api/kubernetes/constants.py @@ -16,7 +16,7 @@ class Constants(BaseModel): default="executors", description="Key in JSON taskmaster input, which holds list of executors", ) - volume_name: str = Field("PVC", description="Volume name") + volume_name: str = Field(default="PVC", description="Volume name") job_create_attempts_no: int = Field( default=5, description="Number of attempts of job creation in case of name collision", @@ -38,10 +38,8 @@ class Constants(BaseModel): ) job_name_exec_no_length: int = Field( default=2, - description=( - "No of digits reserved for executor number in executor's job name. Ends up " - "padded with '0' for numbers < 10", - ), + description="No of digits reserved for executor number in executor's job name." + " Ends up padded with '0' for numbers < 10", ) job_name_filer_suf: str = Field( default="-outputs-filer", description="Output filer name suffix" @@ -55,23 +53,18 @@ class Constants(BaseModel): ) ann_json_input_key: str = Field( default="json-input", - description=( - "Key of the annotation, that stores whole input TES task serialized to " - "JSON", - ), + description="Key of the annotation, that stores whole input TES task serialized" + " to JSON", ) label_testask_id_key: str = Field( default="taskmaster-name", - description=( - "Key of the label, that stores taskmaster's name (==TES task generated ID)" - " in executor jobs", - ), + description="Key of the label, that stores taskmaster's name (==TES task " + "generated ID) in executor jobs", ) label_jobtype_key: str = Field( default="job-type", - description=( - "Key of the label, that stores type of a job (taskmaster or executor)", - ), + description="Key of the label, that stores type of a job (taskmaster or " + "executor)", ) label_jobtype_value_taskm: str = Field( default="taskmaster", @@ -104,30 +97,26 @@ class Constants(BaseModel): ) absolute_path_message: str = Field( default="must be an absolute path", - description=( - "Message for absolute path validation (to avoid message.properties)", - ), + description="Message for absolute path validation (to avoid " + "message.properties)", + ) + resource_disk_default: float = Field( + default=0.1, description="Default resource disk value" ) - resource_disk_default: float = Field(0.1, description="Default resource disk value") completed_states: Set[str] = Field( default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"}, - description=( - "TES task states, indicating task is not running and cannot be cancelled", - ), + description="TES task states, indicating task is not running and cannot be " + "cancelled", ) ftp_secret_username_env: str = Field( default="TESK_FTP_USERNAME", - description=( - "Name of taskmaster's ENV variable with username of FTP account used for " - "storage", - ), + description="Name of taskmaster's ENV variable with username of FTP account " + "used for storage", ) ftp_secret_password_env: str = Field( default="TESK_FTP_PASSWORD", - description=( - "Name of taskmaster's ENV variable with password of FTP account used for " - "storage", - ), + description="Name of taskmaster's ENV variable with password of FTP account " + "used for storage", ) cancel_patch: str = Field( default='{"metadata":{"labels":{"task-status":"Cancelled"}}}', diff --git a/tesk/api/kubernetes/convert/converter.py b/tesk/api/kubernetes/convert/converter.py index a87273d..7908563 100644 --- a/tesk/api/kubernetes/convert/converter.py +++ b/tesk/api/kubernetes/convert/converter.py @@ -6,14 +6,17 @@ import logging from decimal import Decimal from io import BytesIO -from typing import Any +from typing import Any, Optional from kubernetes.client import ( V1ConfigMap, V1ConfigMapVolumeSource, V1Container, V1EnvVar, + V1JobSpec, V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, V1ResourceRequirements, V1Volume, ) @@ -48,56 +51,62 @@ def __init__(self, namespace=TeskConstants.tesk_namespace): self.k8s_constants = K8sConstants() self.namespace = namespace - # TODO: Add user to the method when auth implemented in FOCA def from_tes_task_to_k8s_job(self, task: TesTask): """Convert TES task to Kubernetes job.""" - taskmsater_job: V1Job = KubernetesTemplateSupplier( + taskmaster_job: V1Job = KubernetesTemplateSupplier( self.namespace ).task_master_template() - if taskmsater_job.metadata is None: - taskmsater_job.metadata = V1ObjectMeta() + if taskmaster_job.metadata is None: + taskmaster_job.metadata = V1ObjectMeta() - if taskmsater_job.metadata.annotations is None: - taskmsater_job.metadata.annotations = {} + if taskmaster_job.metadata.annotations is None: + taskmaster_job.metadata.annotations = {} - if taskmsater_job.metadata.labels is None: - taskmsater_job.metadata.labels = {} + if taskmaster_job.metadata.labels is None: + taskmaster_job.metadata.labels = {} - # taskmsater_job.metadata.name = task.name - - taskmsater_job.metadata.annotations[self.constants.ann_testask_name_key] = ( - task.name - ) - # taskmsater_job.metadata.labels[self.constants.label_userid_key] = user[ + # taskmaster_job.metadata.name = task.name + if task.name: + taskmaster_job.metadata.annotations[self.constants.ann_testask_name_key] = ( + task.name + ) + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = user[ # "username" # ] # if task.tags and "GROUP_NAME" in task.tags: - # taskmsater_job.metadata.labels[self.constants.label_userid_key] = task[ + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = task[ # "tags" # ]["GROUP_NAME"] # elif user["is_member"]: - # taskmsater_job.metadata.labels[self.constants.label_groupname_key] = user[ + # taskmaster_job.metadata.labels[self.constants.label_groupname_key] = user[ # "any_group" # ] try: - taskmsater_job.metadata.annotations[self.constants.ann_json_input_key] = ( + taskmaster_job.metadata.annotations[self.constants.ann_json_input_key] = ( task.json() ) except Exception as ex: logger.info( - f"Serializing task {taskmsater_job.metadata.name} to JSON failed", ex + f"Serializing task {taskmaster_job.metadata.name} to JSON failed", ex ) volume = V1Volume( name="jsoninput", - config_map=V1ConfigMapVolumeSource(name=taskmsater_job.metadata.name), + config_map=V1ConfigMapVolumeSource(name=taskmaster_job.metadata.name), ) - taskmsater_job.spec.template.spec.volumes.append(volume) - return taskmsater_job + if taskmaster_job.spec is None: + taskmaster_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if taskmaster_job.spec.template.spec is None: + taskmaster_job.spec.template.spec = V1PodSpec(containers=[]) + if taskmaster_job.spec.template.spec.volumes is None: + taskmaster_job.spec.template.spec.volumes = [] + + taskmaster_job.spec.template.spec.volumes.append(volume) + return taskmaster_job def from_tes_task_to_k8s_config_map( self, @@ -106,10 +115,14 @@ def from_tes_task_to_k8s_config_map( # user, ) -> V1ConfigMap: """Create a Kubernetes ConfigMap from a TES task.""" + if job.metadata is None: + job.metadata = V1ObjectMeta() + task_master_config_map = V1ConfigMap( metadata=V1ObjectMeta(name=job.metadata.name) ) + assert task_master_config_map.metadata is not None task_master_config_map.metadata.labels = ( task_master_config_map.metadata.labels or {} ) @@ -117,14 +130,15 @@ def from_tes_task_to_k8s_config_map( task_master_config_map.metadata.annotations or {} ) - task_master_config_map.metadata.annotations[ - self.constants.ann_testask_name_key - ] = task.name + if task.name: + task_master_config_map.metadata.annotations[ + self.constants.ann_testask_name_key + ] = task.name # task_master_config_map.metadata.labels[self.constants.label_userid_key] # = user["username"] - if "tags" in task and "GROUP_NAME" in task.tags: + if task.tags and "GROUP_NAME" in task.tags: task_master_config_map.metadata.labels[ self.constants.label_groupname_key ] = task.tags["GROUP_NAME"] @@ -132,6 +146,9 @@ def from_tes_task_to_k8s_config_map( # task_master_config_map.metadata.labels[self.constants.label_groupname_key] # = user["any_group"] + assert task_master_config_map.metadata.name is not None + assert task.resources is not None + executors_as_jobs = [ self.from_tes_executor_to_k8s_job( task_master_config_map.metadata.name, @@ -145,10 +162,14 @@ def from_tes_task_to_k8s_config_map( ] task_master_input: dict[str, Any] = { - "inputs": pydantic_model_list_json(task.inputs) or [], - "outputs": pydantic_model_list_json(task.outputs) or [], + "inputs": pydantic_model_list_json(task.inputs) if task.inputs else [], + "outputs": pydantic_model_list_json(task.outputs) if task.outputs else [], "volumes": task.volumes or [], - "resources": {"disk_gb": float(task.resources.disk_gb) or 10.0}, + "resources": { + "disk_gb": float(task.resources.disk_gb) + if task.resources.disk_gb + else 10.0 + }, } task_master_input[self.constants.taskmaster_input_exec_key] = [ exec_job.to_dict() for exec_job in executors_as_jobs @@ -186,7 +207,7 @@ def decimal_to_float(obj): def from_tes_executor_to_k8s_job( # noqa: PLR0913 self, generated_task_id: str, - tes_task_name: str, + tes_task_name: Optional[str], executor: TesExecutor, executor_index: int, resources: TesResources, @@ -197,11 +218,15 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913 job: V1Job = self.template_supplier.executor_template() # Set executors name based on taskmaster's job name + # TODO: Fix me ASAP Job(job).change_job_name( # Task(job, generated_task_id).get_executor_name(executor_index) "newname" ) + if job.metadata is None: + job.metadata = V1ObjectMeta() + # Put arbitrary labels and annotations job.metadata.labels = job.metadata.labels or {} job.metadata.labels[self.constants.label_testask_id_key] = generated_task_id @@ -209,7 +234,15 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913 # job.metadata.labels[self.constants.label_userid_key] = user.username job.metadata.annotations = job.metadata.annotations or {} - job.metadata.annotations[self.constants.ann_testask_name_key] = tes_task_name + if tes_task_name: + job.metadata.annotations[self.constants.ann_testask_name_key] = ( + tes_task_name + ) + + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) container: V1Container = job.spec.template.spec.containers[0] @@ -235,9 +268,10 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913 container.env = [] container.working_dir = executor.workdir - container.resources = V1ResourceRequirements(requests={}) + assert container.resources.requests is not None + if resources.cpu_cores: container.resources.requests["cpu"] = parse_quantity( str(resources.cpu_cores) diff --git a/tesk/api/kubernetes/convert/data/job.py b/tesk/api/kubernetes/convert/data/job.py index 3fb1333..8a30907 100644 --- a/tesk/api/kubernetes/convert/data/job.py +++ b/tesk/api/kubernetes/convert/data/job.py @@ -6,7 +6,7 @@ from typing import List, Optional -from kubernetes.client import V1Job, V1Pod +from kubernetes.client import V1Job, V1ObjectMeta, V1Pod class Job: @@ -51,11 +51,22 @@ def change_job_name(self, new_name: str): Also the names in its metadata and container specs. """ - self.job.metadata.name = new_name - self.job.spec.template.metadata.name = new_name - if self.job.spec.template.spec.containers: - self.job.spec.template.spec.containers[0].name = new_name + if self.job.metadata is None: + self.job.metadata = V1ObjectMeta(name=new_name) + else: + self.job.metadata.name = new_name + + if ( + self.job is not None + and self.job.spec is not None + and self.job.spec.template is not None + and self.job.spec.template.metadata is not None + ): + self.job.spec.template.metadata.name = new_name + + if self.job.spec.template.spec and self.job.spec.template.spec.containers: + self.job.spec.template.spec.containers[0].name = new_name def get_job_name(self) -> Optional[str]: """Returns the job name.""" - return self.job.metadata.name + return self.job.metadata.name if self.job.metadata else None diff --git a/tesk/api/kubernetes/convert/data/task.py b/tesk/api/kubernetes/convert/data/task.py index ab49cf0..49fc5de 100644 --- a/tesk/api/kubernetes/convert/data/task.py +++ b/tesk/api/kubernetes/convert/data/task.py @@ -34,10 +34,17 @@ def __init__( self.executors_by_name: Dict[str, Job] = {} self.output_filer: Optional[Job] = None self.constants = Constants() + self.MAX_INT = 2**31 - 1 - def add_executor(self, executor: Job): + def add_executor(self, executor: Job) -> None: """Add executor to the task.""" - self.executors_by_name.setdefault(executor.get_job().metadata.name, executor) + metadata = executor.get_job().metadata + assert metadata is not None + + name = metadata.name + assert name is not None + + self.executors_by_name.setdefault(name, executor) def set_output_filer(self, filer: Job): """Set output filer for the task.""" @@ -65,11 +72,19 @@ def get_output_filer(self) -> Optional[Job]: def extract_executor_number(self, executor: Job) -> int: """Extract executor number from the executor's name.""" taskmaster_name = self.taskmaster.get_job_name() + assert taskmaster_name is not None + prefix = taskmaster_name + self.constants.job_name_exec_prefix - match = re.match(f"{re.escape(prefix)}(\d+)", executor.get_job_name()) + exec_name = executor.get_job_name() + + if not exec_name: + return self.MAX_INT + + match = re.match(f"{re.escape(prefix)}(\d+)", exec_name) if match: return int(match.group(1)) - return float("inf") + + return self.MAX_INT def get_executor_name(self, executor_index: int) -> str: """Get executor name based on the taskmaster's job name and executor index.""" diff --git a/tesk/api/kubernetes/convert/executor_command_wrapper.py b/tesk/api/kubernetes/convert/executor_command_wrapper.py index 5fde24c..08e515f 100644 --- a/tesk/api/kubernetes/convert/executor_command_wrapper.py +++ b/tesk/api/kubernetes/convert/executor_command_wrapper.py @@ -41,13 +41,14 @@ def get_commands_with_stream_redirects(self) -> List[str]: command_parts = [] for command_part in self.executor.command: + command = "" if self.SPECIAL_CHARS.search(command_part): # Replace single quotes with '"'"' if "'" in command_part: replace_command_part = command_part.replace("'", "'\"'\"'") # Quote the command part - quote_command_part = f"'{replace_command_part}'" - command_parts.append(quote_command_part) + command = f"'{replace_command_part}'" + command_parts.append(command) if self.executor.stdin: command_parts.append("<") diff --git a/tesk/api/kubernetes/convert/taskmaster_env_properties.py b/tesk/api/kubernetes/convert/taskmaster_env_properties.py index d0f7ef7..79d7df5 100644 --- a/tesk/api/kubernetes/convert/taskmaster_env_properties.py +++ b/tesk/api/kubernetes/convert/taskmaster_env_properties.py @@ -69,11 +69,6 @@ class TaskmasterEnvProperties(BaseModel): serviceAccountName: str = Field( default="default", description="Service Account name for taskmaster" ) - debug: bool = Field( - default=False, - description="If verbose (debug) mode of taskmaster is on (passes additional " - "flag to taskmaster and sets image pull policy to Always)", - ) executorSecret: Optional[ExecutorSecret] = Field( default=None, description="Executor secret configuration" ) diff --git a/tesk/api/kubernetes/convert/template.py b/tesk/api/kubernetes/convert/template.py index 8940c75..6457686 100644 --- a/tesk/api/kubernetes/convert/template.py +++ b/tesk/api/kubernetes/convert/template.py @@ -2,6 +2,7 @@ import logging import uuid +from typing import Iterable from kubernetes.client import ( V1Container, @@ -41,12 +42,18 @@ def __init__(self, namespace=TeskConstants.tesk_namespace, security_context=None def get_task_master_name(self) -> str: """Generate a unique name for the taskmaster job.""" - return self.constants.job_name_taskm_prefix + str(uuid.uuid4()) + name: str = self.constants.job_name_taskm_prefix + str(uuid.uuid4()) + return name def task_master_template(self) -> V1Job: """Create a template for the taskmaster job.""" job: V1Job = self.taskmaster_template + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + job.spec.template.spec.service_account_name = ( self.taskmaster_env_properties.serviceAccountName ) @@ -56,6 +63,9 @@ def task_master_template(self) -> V1Job: f"{self.taskmaster_env_properties.imageName}:" f"{self.taskmaster_env_properties.imageVersion}" ) + + assert isinstance(container.args, Iterable) + container.args.extend( [ "-n", @@ -71,12 +81,19 @@ def task_master_template(self) -> V1Job: container.args.append("-d") container.image_pull_policy = "Always" + if job.metadata is None: + job.metadata = V1ObjectMeta(labels={}) + + assert job.metadata.labels is not None + job.metadata.labels[self.constants.label_jobtype_key] = ( self.constants.label_jobtype_value_taskm ) task_master_name = self.get_task_master_name() job.metadata.name = task_master_name + assert isinstance(container.env, Iterable) + container.env.extend( [ V1EnvVar(name=key.upper().replace(".", "_"), value=value) @@ -97,6 +114,8 @@ def task_master_template(self) -> V1Job: if self.taskmaster_env_properties.ftp.enabled: for env in container.env: if env.name in ftp_secrets: + assert env.value_from is not None + assert env.value_from.secret_key_ref is not None env.value_from.secret_key_ref.name = ( self.taskmaster_env_properties.ftp.secretName ) @@ -111,8 +130,10 @@ def executor_template(self) -> V1Job: container.volume_mounts = [ V1VolumeMount( read_only=True, - name=self.taskmaster_env_properties.executorSecret.name, - mount_path=self.taskmaster_env_properties.executorSecret.mountPath, + name=str(self.taskmaster_env_properties.executorSecret.name), + mount_path=str( + self.taskmaster_env_properties.executorSecret.mountPath + ), ) ] @@ -140,9 +161,14 @@ def executor_template(self) -> V1Job: ) if self.taskmaster_env_properties.executorSecret is not None: + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + job.spec.template.spec.volumes = [ V1Volume( - name=self.taskmaster_env_properties.executorSecret.name, + name=str(self.taskmaster_env_properties.executorSecret.name), secret=V1SecretVolumeSource( secret_name=self.taskmaster_env_properties.executorSecret.name ), diff --git a/tesk/utils.py b/tesk/utils.py index a4a887f..b8ea88e 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -2,7 +2,7 @@ import os from pathlib import Path -from typing import Any, List, Optional +from typing import Any, List, Optional, Sequence from foca import Foca from kubernetes.client.models import ( @@ -193,7 +193,7 @@ def convert_job_metadata(job_metadata: JobMetadata) -> V1ObjectMeta: ) -def pydantic_model_list_json(model_list: List[BaseModel]) -> List[dict[str, Any]]: +def pydantic_model_list_json(model_list: Sequence[BaseModel]) -> List[dict[str, Any]]: """Convert a list of pydantic models to a list of JSON objects.""" json_list = [] for item in model_list: