Skip to content

Commit

Permalink
added kubernetes watch (#2848)
Browse files Browse the repository at this point in the history
* added kubernetes watch

* updated timeout_seconds to maxWait

* Resolved confusion and merge conflict

* [WIP] deleting job after completion

* Return Result for failed jobs

* fixed jobID and return result and deleted job

* changed name job to pod and added _waitforjobdeath

* edited pod.status.phase to Failed to fix failing tests (All tests passing)

* checked for containerStatuses being None
  • Loading branch information
jeffrey856 authored Nov 14, 2019
1 parent 9da9808 commit 9f20f4a
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.awsSecretName = os.environ.get("TOIL_AWS_SECRET_NAME", None)

# Set this to True to enable the experimental wait-for-job-update code
self.enableWatching = False
self.enableWatching = True

# Required APIs needed from kubernetes
self.batchApi = kubernetes.client.BatchV1Api()
Expand Down Expand Up @@ -389,10 +389,6 @@ def _getIDForOurJob(self, jobObject):
return int(jobObject.metadata.name[len(self.jobPrefix):])


def _genPods(self):
for j in _ourJobObjects:
yield self._getPodForJob(j)

def getUpdatedBatchJob(self, maxWait):

entry = datetime.datetime.now()
Expand All @@ -408,25 +404,35 @@ def getUpdatedBatchJob(self, maxWait):
if self.enableWatching:
# Try watching for something to happen and use that.

w = kubernetes.watch.Watch()
# TODO: block and wait for the jobs to update, until maxWait is hit
# For now just say we couldn't get anything
for pod in w.stream(self._genPods, timeout_seconds=maxwait):
# if pod status is terminated then check exit code
if pod.status.container_statuses[0].state is 'terminated':
if pod.status.container_statues[0].state.exit_code == 0:
return (pod.metadata.name,\
pod.status.container_statues[0].state.exit_code, \
(pod.status.container_statuses[0].state.finished_at - \
pod.status.container_statues[0].state.started_at).total_seconds())
# if job failed
else:
logger.warning(pod.status.container_status[0].state.reason,
pod.status.container_statuses[0].state.exit_code)
return None
else:
continue
return None
w = kubernetes.watch.Watch()

if self.enableWatching:
for j in self._ourJobObjects():
logger.debug(j.spec.template.metadata.labels[u'job-name'], type(j.spec.template.metadata.labels[u'job-name']))
for event in w.stream(self.coreApi.list_namespaced_pod, self.namespace, timeout_seconds=maxWait):
pod = event['object']
if pod.metadata.name.startswith(self.jobPrefix):
logger.info("Event: %s %s %s" % (event['type'],event['object'].kind, event['object'].metadata.name))
if pod.status.phase == 'Failed' or pod.status.phase == 'Succeeded':
containerStatuses = pod.status.container_statuses
logger.info("FINISHED")
if containerStatuses is None or len(containerStatuses) == 0:
logger.debug("No job container statuses for job %s" % (pod.metadata.owner_references[0].name))
return (int(pod.metadata.owner_references[0].name[len(self.jobPrefix):]), -1, 0)
logger.info("REASON: %s Eixt Code: %s" % (pod.status.container_statuses[0].state.terminated.reason,
pod.status.container_statuses[0].state.terminated.exit_code))
jobID = int(pod.metadata.owner_references[0].name[len(self.jobPrefix):])
terminated = pod.status.container_statuses[0].state.terminated
runtime = (terminated.finished_at - terminated.started_at).total_seconds()
result = (jobID, terminated.exit_code, runtime)
self.batchApi.delete_namespaced_job(pod.metadata.owner_references[0].name,
self.namespace,
propagation_policy='Foreground')

self._waitForJobDeath(pod.metadata.owner_references[0].name)
return result
else:
continue

else:
# Try polling instead
Expand Down

0 comments on commit 9f20f4a

Please sign in to comment.