Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add k8s client wrapper and constants #201

Merged
merged 21 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 57 additions & 18 deletions tesk/api/kubernetes/constants.py
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import BaseModel, Field


class Constants(BaseModel):
class JobConstants(BaseModel):
"""Constants related to job and tasks."""

taskmaster_input: str = Field(
Expand Down Expand Up @@ -45,6 +45,27 @@ class Constants(BaseModel):
job_name_filer_suf: str = Field(
default="-outputs-filer", description="Output filer name suffix"
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)
executor_backoff_limit: str = Field(
default="EXECUTOR_BACKOFF_LIMIT",
description="Set a number of retries of a job execution.",
)
filer_backoff_limit: str = Field(
default="FILER_BACKOFF_LIMIT",
description="Set a number of retries of a filer job execution.",
)


class AnnotationConstants(BaseModel):
"""Constants related to Kubernetes annotations."""

ann_testask_name_key: str = Field(
default="tes-task-name",
description=(
Expand All @@ -57,6 +78,11 @@ class Constants(BaseModel):
description="Key of the annotation, that stores whole input TES task serialized"
" to JSON",
)


class LabelConstants(BaseModel):
"""Constants related to Kubernetes labels."""

label_testask_id_key: str = Field(
default="taskmaster-name",
description="Key of the label, that stores taskmaster's name (==TES task "
Expand Down Expand Up @@ -93,6 +119,11 @@ class Constants(BaseModel):
default="creator-group-name",
description="Key of the label, that holds user's group name",
)


class PathValidationConstants(BaseModel):
"""Constants related to path validation."""

absolute_path_regexp: str = Field(
default="^\\/.*", description="Pattern to validate paths"
)
Expand All @@ -101,14 +132,11 @@ class Constants(BaseModel):
description="Message for absolute path validation (to avoid "
"message.properties)",
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)


class FTPConstants(BaseModel):
"""Constants related to FTP configuration."""

ftp_secret_username_env: str = Field(
default="TESK_FTP_USERNAME",
description="Name of taskmaster's ENV variable with username of FTP account "
Expand All @@ -119,18 +147,15 @@ class Constants(BaseModel):
description="Name of taskmaster's ENV variable with password of FTP account "
"used for storage",
)


class PatchConstants(BaseModel):
"""Constants related to patch operations."""

cancel_patch: str = Field(
default='{"metadata":{"labels":{"task-status":"Cancelled"}}}',
description="Patch object passed to job API, when cancelling task",
)
executor_backoff_limit: str = Field(
default="EXECUTOR_BACKOFF_LIMIT",
description="Set a number of retries of a job execution.",
)
filer_backoff_limit: str = Field(
default="FILER_BACKOFF_LIMIT",
description="Set a number of retries of a filer job execution.",
)


class K8sConstants(BaseModel):
Expand All @@ -145,7 +170,9 @@ class K8sConstants(BaseModel):
job_restart_policy: str = Field(
default="Never", description="Kubernetes Job restart policy"
)
resource_cpu_key: str = Field("cpu", description="Executor CPU resource label")
resource_cpu_key: str = Field(
default="cpu", description="Executor CPU resource label"
)
resource_mem_key: str = Field(
default="memory", description="Executor memory resource label"
)
Expand All @@ -164,3 +191,15 @@ class PodPhase(Enum):
def get_code(self) -> str:
"""Return the pod state."""
return self.value


class Constants(BaseModel):
"""All the constants related to k8s and job creation."""

job_constants: JobConstants = JobConstants()
annotation_constants: AnnotationConstants = AnnotationConstants()
label_constants: LabelConstants = LabelConstants()
path_validation_constants: PathValidationConstants = PathValidationConstants()
ftp_constants: FTPConstants = FTPConstants()
patch_constants: PatchConstants = PatchConstants()
k8s_constants: K8sConstants = K8sConstants()
62 changes: 23 additions & 39 deletions tesk/api/kubernetes/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ class KubernetesClientWrapper:
"""Kubernetes client wrapper class."""

def __init__(self):
"""Initialize the Kubernetes client wrapper.

Args:
namespace: Namespace to use for Kubernetes.
"""
"""Initialize the Kubernetes client wrapper."""
config.load_kube_config()
self.batch_api = client.BatchV1Api()
self.core_api = client.CoreV1Api()
Expand All @@ -40,7 +36,7 @@ def create_job(self, job: V1Job) -> V1Job:
"""Create a job in the Kubernetes cluster.

Returns:
Job object created in the Kubernetes cluster.
Job object created in the Kubernetes cluster.
"""
try:
v1_job: V1Job = self.batch_api.create_namespaced_job(
Expand Down Expand Up @@ -72,10 +68,10 @@ def read_taskmaster_job(self, task_id: str) -> V1Job:
task_id: Task identifier.

Returns:
Job object read from the Kubernetes cluster
Job object read from the Kubernetes cluster

Raises:
Exception: If the task is not found.
Exception: If the task is not found.
"""
try:
job: V1Job = self.batch_api.read_namespaced_job(
Expand All @@ -84,9 +80,10 @@ def read_taskmaster_job(self, task_id: str) -> V1Job:
if (
job.metadata
and job.metadata.labels
and self.constant.label_jobtype_key in job.metadata.labels
and job.metadata.labels[self.constant.label_jobtype_key]
== self.constant.label_jobtype_value_taskm
and self.constant.label_constants.label_jobtype_key
in job.metadata.labels
and job.metadata.labels[self.constant.label_constants.label_jobtype_key]
== self.constant.label_constants.label_jobtype_value_taskm
):
return job
except KubernetesError as e:
Expand All @@ -101,10 +98,10 @@ def list_jobs(
"""List jobs in the Kubernetes cluster.

Args:
page_token: pageToken supplied by user (from previous result; points to
next page of results)
label_selector: Label selector to filter jobs.
limit: Maximum number of jobs to return.
page_token: pageToken supplied by user (from previous result; points to
next page of results)
label_selector: Label selector to filter jobs.
limit: Maximum number of jobs to return.
"""
try:
return self.batch_api.list_namespaced_job(
Expand Down Expand Up @@ -166,53 +163,40 @@ def list_all_taskmaster_jobs_for_user(
self,
page_token: str,
items_per_page: int,
# user: str
) -> V1JobList:
"""Gets all Taskmaster job objects, a User is allowed to see.

Args:
page_token: pageToken supplied by user (from previous result; points to
next page of results)
items_per_page: Value submitted by user, limiting number of results.
# user: User identifier.

Returns:
Job list of Taskmaster jobs that user is allowed to see.
"""
# TODO: Implement this method when auth is implemented in FOCA.
label_selector = (
f"{self.constant.label_jobtype_key}"
f"{self.constant.label_constants.label_jobtype_key}"
"="
f"{self.constant.label_jobtype_value_taskm}"
f"{self.constant.label_constants.label_jobtype_value_taskm}"
)
# if user.get_label_selector():
# label_selector += f",{user.get_label_selector()}"

result: V1JobList = self.list_jobs(page_token, label_selector, items_per_page)

# if user.is_member_in_non_managed_groups():
# filtered_job_list = [
# job for job in result.items
# if user.is_group_manager(
# job.metadata.labels.get(self.constant.label_groupname_key)
# ) or user.get_username()
# == job.metadata.labels.get(self.constant.label_userid_key)
# ]
# result.items = filtered_job_list

return result

def list_single_task_executor_jobs(self, task_id: str) -> V1JobList:
"""List single task executor job."""
label_selector = (self.constant.label_testask_id_key + "=" + task_id,)
label_selector = (
self.constant.label_constants.label_testask_id_key + "=" + task_id,
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]:
"""Get single task output filer job."""
try:
job: V1Job = self.batch_api.read_namespaced_job(
name=task_id + self.constant.job_name_filer_suf,
name=task_id + self.constant.job_constants.job_name_filer_suf,
namespace=self.namespace,
)
return job
Expand All @@ -225,26 +209,26 @@ def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]:
def list_all_taskmaster_jobs(self) -> V1JobList:
"""List all taskmaster jobs in the Kubernetes cluster."""
label_selector = (
self.constant.label_jobtype_key
self.constant.label_constants.label_jobtype_key
+ "="
+ self.constant.label_jobtype_value_taskm
+ self.constant.label_constants.label_jobtype_value_taskm
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def list_all_task_executor_jobs(self) -> V1JobList:
"""List all executor jobs in the Kubernetes cluster."""
label_selector = (
self.constant.label_jobtype_key
self.constant.label_constants.label_jobtype_key
+ "="
+ self.constant.label_jobtype_value_exec
+ self.constant.label_constants.label_jobtype_value_exec
)
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

def list_all_filer_jobs(self) -> V1JobList:
"""List all output filer jobs in the Kubernetes cluster."""
label_selector = "!" + self.constant.label_jobtype_key
label_selector = "!" + self.constant.label_constants.label_jobtype_key
job_list: V1JobList = self.list_jobs(label_selector=label_selector)
return job_list

Expand Down
Loading