From 9f20f4aabc10c9692694224106bf193a20443cf1 Mon Sep 17 00:00:00 2001 From: Jeffrey Wu Date: Thu, 14 Nov 2019 12:15:19 -0800 Subject: [PATCH] added kubernetes watch (#2848) * added kubernetes watch * updated timeout_seconds to maxWait * Resolved confusion and merge conflict * [WIP] deleting job after completion * Return Result for failed jobs * fixed jobID and return result and deleted job * changed name job to pod and added _waitforjobdeath * edited pod.status.phase to Failed to fix failing tests (All tests passing) * checked for containerStatuses being None --- src/toil/batchSystems/kubernetes.py | 54 ++++++++++++++++------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index e01b43429a..4c2da28f47 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -123,7 +123,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.awsSecretName = os.environ.get("TOIL_AWS_SECRET_NAME", None) # Set this to True to enable the experimental wait-for-job-update code - self.enableWatching = False + self.enableWatching = True # Required APIs needed from kubernetes self.batchApi = kubernetes.client.BatchV1Api() @@ -389,10 +389,6 @@ def _getIDForOurJob(self, jobObject): return int(jobObject.metadata.name[len(self.jobPrefix):]) - def _genPods(self): - for j in _ourJobObjects: - yield self._getPodForJob(j) - def getUpdatedBatchJob(self, maxWait): entry = datetime.datetime.now() @@ -408,25 +404,35 @@ def getUpdatedBatchJob(self, maxWait): if self.enableWatching: # Try watching for something to happen and use that. - w = kubernetes.watch.Watch() - # TODO: block and wait for the jobs to update, until maxWait is hit - # For now just say we couldn't get anything - for pod in w.stream(self._genPods, timeout_seconds=maxwait): - # if pod status is terminated then check exit code - if pod.status.container_statuses[0].state is 'terminated': - if pod.status.container_statues[0].state.exit_code == 0: - return (pod.metadata.name,\ - pod.status.container_statues[0].state.exit_code, \ - (pod.status.container_statuses[0].state.finished_at - \ - pod.status.container_statues[0].state.started_at).total_seconds()) - # if job failed - else: - logger.warning(pod.status.container_status[0].state.reason, - pod.status.container_statuses[0].state.exit_code) - return None - else: - continue - return None + w = kubernetes.watch.Watch() + + if self.enableWatching: + for j in self._ourJobObjects(): + logger.debug(j.spec.template.metadata.labels[u'job-name'], type(j.spec.template.metadata.labels[u'job-name'])) + for event in w.stream(self.coreApi.list_namespaced_pod, self.namespace, timeout_seconds=maxWait): + pod = event['object'] + if pod.metadata.name.startswith(self.jobPrefix): + logger.info("Event: %s %s %s" % (event['type'],event['object'].kind, event['object'].metadata.name)) + if pod.status.phase == 'Failed' or pod.status.phase == 'Succeeded': + containerStatuses = pod.status.container_statuses + logger.info("FINISHED") + if containerStatuses is None or len(containerStatuses) == 0: + logger.debug("No job container statuses for job %s" % (pod.metadata.owner_references[0].name)) + return (int(pod.metadata.owner_references[0].name[len(self.jobPrefix):]), -1, 0) + logger.info("REASON: %s Eixt Code: %s" % (pod.status.container_statuses[0].state.terminated.reason, + pod.status.container_statuses[0].state.terminated.exit_code)) + jobID = int(pod.metadata.owner_references[0].name[len(self.jobPrefix):]) + terminated = pod.status.container_statuses[0].state.terminated + runtime = (terminated.finished_at - terminated.started_at).total_seconds() + result = (jobID, terminated.exit_code, runtime) + self.batchApi.delete_namespaced_job(pod.metadata.owner_references[0].name, + self.namespace, + propagation_policy='Foreground') + + self._waitForJobDeath(pod.metadata.owner_references[0].name) + return result + else: + continue else: # Try polling instead