Skip to content

Commit

Permalink
make mypy happy
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeAeich committed Aug 4, 2024
1 parent 89e2dfb commit f9b630b
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 123 deletions.
7 changes: 3 additions & 4 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,20 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore

# POST /tasks
@log_traffic
def CreateTask(**kwargs) -> dict: # type: ignore
def CreateTask(**kwargs) -> dict[str, str]: # type: ignore
"""Create task.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
try:
request_body = kwargs.get("body")
if request_body is None:
logger("Nothing received in request body.")
logger.error("Nothing received in request body.")
raise BadRequest("No request body received.")
tes_task = TesTask(**request_body)
namespace = "tesk"
CreateTesTask(tes_task, namespace).response()
return CreateTesTask(tes_task, namespace).response()
except Exception as e:
raise InternalServerError from e

Expand Down
2 changes: 1 addition & 1 deletion tesk/api/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TesResources(BaseModel):
example={"VmSize": "Standard_D64_v3"},
)
backend_parameters_strict: Optional[bool] = Field(
False,
default=False,
description="If set to true, backends should fail the task if any "
"backend_parameters\nkey/values are unsupported, otherwise, backends should "
"attempt to run the task",
Expand Down
28 changes: 17 additions & 11 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.models import TesResources, TesTask
from tesk.api.kubernetes.client_wrapper import KubernetesClientWrapper
from tesk.api.kubernetes.constants import Constants
from tesk.api.kubernetes.convert.converter import TesKubernetesConverter
Expand Down Expand Up @@ -30,18 +30,19 @@ def __init__(self, task: TesTask, namespace=TeskConstants.tesk_namespace):
self.tes_kubernetes_converter = TesKubernetesConverter(self.namespace)
self.constants = Constants()

def create_task(self):
def create_task(self) -> dict[str, str]:
"""Create TES task."""
attempts_no = 0
while attempts_no < self.constants.job_create_attempts_no:
try:
attempts_no += 1
resources = self.task.resources

if resources and resources.ram_gb:
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()
if resources.ram_gb < minimum_ram_gb:
self.task.resources.ram_gb = minimum_ram_gb
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()
if not self.task.resources:
self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb))
if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb:
self.task.resources.ram_gb = minimum_ram_gb

task_master_job = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
Expand All @@ -54,16 +55,20 @@ def create_task(self):
self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map(
self.task,
task_master_job,
# user
# self.user
)
)

# Create ConfigMap and Job
_ = self.kubernetes_client_wrapper.create_config_map(
task_master_config_map
)
created_job = self.kubernetes_client_wrapper.create_job(task_master_job)
print(task_master_config_map)
print(task_master_job)
return created_job.metadata.name

assert created_job.metadata is not None
assert created_job.metadata.name is not None

return {"id": created_job.metadata.name}

except KubernetesError as e:
if (
Expand All @@ -75,7 +80,8 @@ def create_task(self):
except Exception as exc:
logging.error("ERROR: In createTask", exc_info=True)
raise exc
return {} # Dummy return to silence mypy

def response(self) -> dict:
def response(self) -> dict[str, str]:
"""Create response."""
return self.create_task()
65 changes: 44 additions & 21 deletions tesk/api/kubernetes/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
from typing import Optional

from kubernetes import client, config
from kubernetes.client import V1ConfigMap, V1Job, V1LimitRangeList
from kubernetes.client import (
V1ConfigMap,
V1Job,
V1LabelSelector,
V1LimitRangeList,
V1PodList,
)
from kubernetes.utils.quantity import parse_quantity # type: ignore

from tesk.api.kubernetes.constants import Constants
from tesk.constants import TeskConstants
from tesk.exceptions import KubernetesError, NotFound

Expand All @@ -26,6 +33,7 @@ def __init__(self, namespace=TeskConstants.tesk_namespace):
self.batch_api = client.BatchV1Api()
self.core_api = client.CoreV1Api()
self.namespace = namespace
self.constant = Constants()

def create_job(self, job: V1Job) -> V1Job:
"""Create a job in the Kubernetes cluster.
Expand Down Expand Up @@ -73,9 +81,11 @@ def read_taskmaster_job(self, task_id: str) -> V1Job:
name=task_id, namespace=self.namespace
)
if (
# TODO: Put all these string in constants.py
"job-type" in job.metadata.labels
and job.metadata.labels["job-type"] == "taskmaster"
job.metadata
and job.metadata.labels
and self.constant.label_jobtype_key in job.metadata.labels
and job.metadata.labels[self.constant.label_jobtype_key]
== self.constant.label_jobtype_value_taskm
):
return job
except KubernetesError as e:
Expand Down Expand Up @@ -103,13 +113,14 @@ def list_limits(self, label_selector=None, limit=None) -> V1LimitRangeList:
"""List limit ranges in the Kubernetes cluster.
Args:
label_selector: Label selector to filter limit ranges.
limit: Maximum number of limit ranges to return.
label_selector: Label selector to filter limit ranges.
limit: Maximum number of limit ranges to return.
"""
try:
return self.core_api.list_namespaced_limit_range(
limits: V1LimitRangeList = self.core_api.list_namespaced_limit_range(
namespace=self.namespace, label_selector=label_selector, limit=limit
)
return limits
except KubernetesError as e:
logger.error(f"Exception when listing limits: {e}")
raise
Expand All @@ -118,17 +129,18 @@ def minimum_ram_gb(self) -> float:
"""Get the minimum amount of RAM in the cluster.
Returns:
Minimum amount of RAM in the cluster in GB.
Minimum amount of RAM in the cluster in GB.
"""
try:
min_ram = 0
limits = self.list_limits().items
for limit in limits:
for item in limit.spec.limits:
if "memory" in item.min:
mem_quantity = item.min["memory"]
mem_bytes = self.quantity_to_bytes(mem_quantity)
min_ram = max(min_ram, mem_bytes)
if limit.spec:
for item in limit.spec.limits:
if item.min and "memory" in item.min:
mem_quantity = item.min["memory"]
mem_bytes = self.quantity_to_bytes(mem_quantity)
min_ram = max(min_ram, mem_bytes)
return min_ram / (1024**3)
except (ValueError, TypeError) as e:
logger.error(f"Error in minimum_ram_gb: {e}")
Expand All @@ -139,21 +151,32 @@ def minimum_ram_gb(self) -> float:

def quantity_to_bytes(self, quantity: str) -> int:
"""Convert quantity(resource) to bytes."""
return parse_quantity(quantity)
parsed_quantity: int = parse_quantity(quantity)
return parsed_quantity

def list_single_job_pods(self, job: V1Job):
def list_single_job_pods(self, job: V1Job) -> V1PodList:
"""List pods associated with a single job.
Args:
job: Job object to list pods for.
"""
label_selector = ",".join(
f"{k}={v}" for k, v in job.spec.selector.match_labels.items()
)
try:
return self.core_api.list_namespaced_pod(
namespace=self.namespace, label_selector=label_selector
)
if (
job.spec
and job.spec.selector
and isinstance(job.spec.selector, V1LabelSelector)
and job.spec.selector.match_labels
):
label_selector = ",".join(
f"{k}={v}" for k, v in job.spec.selector.match_labels.items()
)
namespaced_pods: V1PodList = self.core_api.list_namespaced_pod(
namespace=self.namespace, label_selector=label_selector
)
return namespaced_pods
else:
logger.error("Job spec, selector, or match_labels is None or invalid")
return V1PodList(items=[])
except KubernetesError as e:
logger.error(f"Exception when listing pods: {e}")
raise
Expand Down
51 changes: 20 additions & 31 deletions tesk/api/kubernetes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Constants(BaseModel):
default="executors",
description="Key in JSON taskmaster input, which holds list of executors",
)
volume_name: str = Field("PVC", description="Volume name")
volume_name: str = Field(default="PVC", description="Volume name")
job_create_attempts_no: int = Field(
default=5,
description="Number of attempts of job creation in case of name collision",
Expand All @@ -38,10 +38,8 @@ class Constants(BaseModel):
)
job_name_exec_no_length: int = Field(
default=2,
description=(
"No of digits reserved for executor number in executor's job name. Ends up "
"padded with '0' for numbers < 10",
),
description="No of digits reserved for executor number in executor's job name."
" Ends up padded with '0' for numbers < 10",
)
job_name_filer_suf: str = Field(
default="-outputs-filer", description="Output filer name suffix"
Expand All @@ -55,23 +53,18 @@ class Constants(BaseModel):
)
ann_json_input_key: str = Field(
default="json-input",
description=(
"Key of the annotation, that stores whole input TES task serialized to "
"JSON",
),
description="Key of the annotation, that stores whole input TES task serialized"
" to JSON",
)
label_testask_id_key: str = Field(
default="taskmaster-name",
description=(
"Key of the label, that stores taskmaster's name (==TES task generated ID)"
" in executor jobs",
),
description="Key of the label, that stores taskmaster's name (==TES task "
"generated ID) in executor jobs",
)
label_jobtype_key: str = Field(
default="job-type",
description=(
"Key of the label, that stores type of a job (taskmaster or executor)",
),
description="Key of the label, that stores type of a job (taskmaster or "
"executor)",
)
label_jobtype_value_taskm: str = Field(
default="taskmaster",
Expand Down Expand Up @@ -104,30 +97,26 @@ class Constants(BaseModel):
)
absolute_path_message: str = Field(
default="must be an absolute path",
description=(
"Message for absolute path validation (to avoid message.properties)",
),
description="Message for absolute path validation (to avoid "
"message.properties)",
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
resource_disk_default: float = Field(0.1, description="Default resource disk value")
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description=(
"TES task states, indicating task is not running and cannot be cancelled",
),
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)
ftp_secret_username_env: str = Field(
default="TESK_FTP_USERNAME",
description=(
"Name of taskmaster's ENV variable with username of FTP account used for "
"storage",
),
description="Name of taskmaster's ENV variable with username of FTP account "
"used for storage",
)
ftp_secret_password_env: str = Field(
default="TESK_FTP_PASSWORD",
description=(
"Name of taskmaster's ENV variable with password of FTP account used for "
"storage",
),
description="Name of taskmaster's ENV variable with password of FTP account "
"used for storage",
)
cancel_patch: str = Field(
default='{"metadata":{"labels":{"task-status":"Cancelled"}}}',
Expand Down
Loading

0 comments on commit f9b630b

Please sign in to comment.