Skip to content

Commit

Permalink
chore: categorise consts
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeAeich committed Aug 9, 2024
1 parent 40e2136 commit 3e86549
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 57 deletions.
75 changes: 57 additions & 18 deletions tesk/api/kubernetes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import BaseModel, Field


class Constants(BaseModel):
class JobConstants(BaseModel):
"""Constants related to job and tasks."""

taskmaster_input: str = Field(
Expand Down Expand Up @@ -45,6 +45,27 @@ class Constants(BaseModel):
job_name_filer_suf: str = Field(
default="-outputs-filer", description="Output filer name suffix"
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)
executor_backoff_limit: str = Field(
default="EXECUTOR_BACKOFF_LIMIT",
description="Set a number of retries of a job execution.",
)
filer_backoff_limit: str = Field(
default="FILER_BACKOFF_LIMIT",
description="Set a number of retries of a filer job execution.",
)


class AnnotationConstants(BaseModel):
"""Constants related to Kubernetes annotations."""

ann_testask_name_key: str = Field(
default="tes-task-name",
description=(
Expand All @@ -57,6 +78,11 @@ class Constants(BaseModel):
description="Key of the annotation, that stores whole input TES task serialized"
" to JSON",
)


class LabelConstants(BaseModel):
"""Constants related to Kubernetes labels."""

label_testask_id_key: str = Field(
default="taskmaster-name",
description="Key of the label, that stores taskmaster's name (==TES task "
Expand Down Expand Up @@ -93,6 +119,11 @@ class Constants(BaseModel):
default="creator-group-name",
description="Key of the label, that holds user's group name",
)


class PathValidationConstants(BaseModel):
"""Constants related to path validation."""

absolute_path_regexp: str = Field(
default="^\\/.*", description="Pattern to validate paths"
)
Expand All @@ -101,14 +132,11 @@ class Constants(BaseModel):
description="Message for absolute path validation (to avoid "
"message.properties)",
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)


class FTPConstants(BaseModel):
"""Constants related to FTP configuration."""

ftp_secret_username_env: str = Field(
default="TESK_FTP_USERNAME",
description="Name of taskmaster's ENV variable with username of FTP account "
Expand All @@ -119,18 +147,15 @@ class Constants(BaseModel):
description="Name of taskmaster's ENV variable with password of FTP account "
"used for storage",
)


class PatchConstants(BaseModel):
"""Constants related to patch operations."""

cancel_patch: str = Field(
default='{"metadata":{"labels":{"task-status":"Cancelled"}}}',
description="Patch object passed to job API, when cancelling task",
)
executor_backoff_limit: str = Field(
default="EXECUTOR_BACKOFF_LIMIT",
description="Set a number of retries of a job execution.",
)
filer_backoff_limit: str = Field(
default="FILER_BACKOFF_LIMIT",
description="Set a number of retries of a filer job execution.",
)


class K8sConstants(BaseModel):
Expand All @@ -145,7 +170,9 @@ class K8sConstants(BaseModel):
job_restart_policy: str = Field(
default="Never", description="Kubernetes Job restart policy"
)
resource_cpu_key: str = Field("cpu", description="Executor CPU resource label")
resource_cpu_key: str = Field(
default="cpu", description="Executor CPU resource label"
)
resource_mem_key: str = Field(
default="memory", description="Executor memory resource label"
)
Expand All @@ -164,3 +191,15 @@ class PodPhase(Enum):
def get_code(self) -> str:
"""Return the pod state."""
return self.value


class Constants(BaseModel):
"""All the constants related to k8s and job creation."""

job_constants: JobConstants = JobConstants()
annotation_constants: AnnotationConstants = AnnotationConstants()
label_constants: LabelConstants = LabelConstants()
path_validation_constants: PathValidationConstants = PathValidationConstants()
ftp_constants: FTPConstants = FTPConstants()
patch_constants: PatchConstants = PatchConstants()
k8s_constants: K8sConstants = K8sConstants()
62 changes: 23 additions & 39 deletions tesk/api/kubernetes/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ class KubernetesClientWrapper:
"""Kubernetes client wrapper class."""

def __init__(self):
"""Initialize the Kubernetes client wrapper.
Args:
namespace: Namespace to use for Kubernetes.
"""
"""Initialize the Kubernetes client wrapper."""
config.load_kube_config()
self.batch_api = client.BatchV1Api()
self.core_api = client.CoreV1Api()
Expand All @@ -40,7 +36,7 @@ def create_job(self, job: V1Job) -> V1Job:
"""Create a job in the Kubernetes cluster.
Returns:
Job object created in the Kubernetes cluster.
Job object created in the Kubernetes cluster.
"""
try:
v1_job: V1Job = self.batch_api.create_namespaced_job(
Expand Down Expand Up @@ -72,10 +68,10 @@ def read_taskmaster_job(self, task_id: str) -> V1Job:
task_id: Task identifier.
Returns:
Job object read from the Kubernetes cluster
Job object read from the Kubernetes cluster
Raises:
Exception: If the task is not found.
Exception: If the task is not found.
"""
try:
job: V1Job = self.batch_api.read_namespaced_job(
Expand All @@ -84,9 +80,10 @@ def read_taskmaster_job(self, task_id: str) -> V1Job:
if (
job.metadata
and job.metadata.labels
and self.constant.label_jobtype_key in job.metadata.labels
and job.metadata.labels[self.constant.label_jobtype_key]
== self.constant.label_jobtype_value_taskm
and self.constant.label_constants.label_jobtype_key
in job.metadata.labels
and job.metadata.labels[self.constant.label_constants.label_jobtype_key]
== self.constant.label_constants.label_jobtype_value_taskm
):
return job
except KubernetesError as e:
Expand All @@ -101,10 +98,10 @@ def list_jobs(
"""List jobs in the Kubernetes cluster.
Args:
page_token: pageToken supplied by user (from previous result; points to
next page of results)
label_selector: Label selector to filter jobs.
limit: Maximum number of jobs to return.
page_token: pageToken supplied by user (from previous result; points to
next page of results)
label_selector: Label selector to filter jobs.
limit: Maximum number of jobs to return.
"""
try:
return self.batch_api.list_namespaced_job(
Expand Down Expand Up @@ -166,53 +163,40 @@ def list_all_taskmaster_jobs_for_user(
self,
page_token: str,
items_per_page: int,
# user: str
) -> V1JobList:
"""Gets all Taskmaster job objects, a User is allowed to see.
Args:
page_token: pageToken supplied by user (from previous result; points to
next page of results)
items_per_page: Value submitted by user, limiting number of results.
# user: User identifier.
Returns:
Job list of Taskmaster jobs that user is allowed to see.
"""
# TODO: Implement this method when auth is implemented in FOCA.
label_selector = (
f"{self.constant.label_jobtype_key}"
f"{self.constant.label_constants.label_jobtype_key}"
"="
f"{self.constant.label_jobtype_value_taskm}"
f"{self.constant.label_constants.label_jobtype_value_taskm}"
)
# if user.get_label_selector():
# label_selector += f",{user.get_label_selector()}"

result: V1JobList = self.list_jobs(page_token, label_selector, items_per_page)

# if user.is_member_in_non_managed_groups():
# filtered_job_list = [
# job for job in result.items
# if user.is_group_manager(
# job.metadata.labels.get(self.constant.label_groupname_key)
# ) or user.get_username()
# == job.metadata.labels.get(self.constant.label_userid_key)
# ]
# result.items = filtered_job_list

return result

def list_single_task_executor_jobs(self, task_id: str) -> V1JobList:
"""List single task executor job."""
label_selector = (self.constant.label_testask_id_key + "=" + task_id,)
label_selector = (
self.constant.label_constants.label_testask_id_key + "=" + task_id,
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]:
"""Get single task output filer job."""
try:
job: V1Job = self.batch_api.read_namespaced_job(
name=task_id + self.constant.job_name_filer_suf,
name=task_id + self.constant.job_constants.job_name_filer_suf,
namespace=self.namespace,
)
return job
Expand All @@ -225,26 +209,26 @@ def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]:
def list_all_taskmaster_jobs(self) -> V1JobList:
"""List all taskmaster jobs in the Kubernetes cluster."""
label_selector = (
self.constant.label_jobtype_key
self.constant.label_constants.label_jobtype_key
+ "="
+ self.constant.label_jobtype_value_taskm
+ self.constant.label_constants.label_jobtype_value_taskm
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def list_all_task_executor_jobs(self) -> V1JobList:
"""List all executor jobs in the Kubernetes cluster."""
label_selector = (
self.constant.label_jobtype_key
self.constant.label_constants.label_jobtype_key
+ "="
+ self.constant.label_jobtype_value_exec
+ self.constant.label_constants.label_jobtype_value_exec
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def list_all_filer_jobs(self) -> V1JobList:
"""List all output filer jobs in the Kubernetes cluster."""
label_selector = "!" + self.constant.label_jobtype_key
label_selector = "!" + self.constant.label_constants.label_jobtype_key
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

Expand Down

0 comments on commit 3e86549

Please sign in to comment.