-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathjob.py
89 lines (81 loc) · 4 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging
import time
from datetime import datetime, timezone
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from tesk_core.Util import pprint
from tesk_core.callback_sender import CallbackSender
logging.basicConfig(format='%(message)s', level=logging.INFO)
class Job:
def __init__(self, body, name='task-job', namespace='default', callback_url=None):
self.name = name
self.namespace = namespace
self.status = 'Initialized'
self.bv1 = client.BatchV1Api()
self.cv1 = client.CoreV1Api()
self.timeout = 240
self.body = body
self.body['metadata']['name'] = self.name
self.callback = None
if callback_url:
task_name = '-'.join(name.split('-')[:2])
self.callback = CallbackSender(task_name, callback_url)
def run_to_completion(self, poll_interval, check_cancelled, pod_timeout):
logging.debug("Creating job '{}'...".format(self.name))
logging.debug(pprint(self.body))
self.timeout = pod_timeout
try:
self.bv1.create_namespaced_job(self.namespace, self.body)
except ApiException as ex:
if ex.status == 409:
logging.debug(f"Reading existing job: {self.name} ")
self.bv1.read_namespaced_job(self.name, self.namespace)
else:
logging.debug(ex.body)
raise ApiException(ex.status, ex.reason)
is_all_pods_running = False
status, is_all_pods_running = self.get_status(is_all_pods_running)
# notify the callback receiver that the job is running
if self.callback and status == 'Running':
self.callback.send('RUNNING')
while status == 'Running':
if check_cancelled():
self.delete()
# notify the callback receiver that the task is cancelled
if self.callback:
self.callback.send('CANCELED')
return 'Cancelled'
time.sleep(poll_interval)
status, is_all_pods_running = self.get_status(is_all_pods_running)
return status
def get_status(self, is_all_pods_runnning):
job = self.bv1.read_namespaced_job(self.name, self.namespace)
try:
if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status:
self.status = 'Complete'
elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status:
self.status = 'Failed'
else:
self.status = 'Error'
except TypeError: # The condition is not initialized, so it is not complete yet, wait for it
self.status = 'Running'
job_duration = 0
if job.status.active and job.status.start_time:
job_duration = (datetime.now(timezone.utc) - job.status.start_time).total_seconds()
if job_duration > self.timeout and not is_all_pods_runnning:
pods = (self.cv1.list_namespaced_pod(self.namespace
, label_selector='job-name={}'.format(self.name))).items
is_all_pods_runnning = True
for pod in pods:
if pod.status.phase == "Pending" and pod.status.start_time:
is_all_pods_runnning = False
delta = (datetime.now(timezone.utc) - pod.status.start_time).total_seconds()
if delta > self.timeout and \
pod.status.container_statuses[0].state.waiting.reason == "ImagePullBackOff":
logging.info(pod.status.container_statuses[0].state.waiting)
return 'Error', is_all_pods_runnning
return self.status, is_all_pods_runnning
def delete(self):
logging.info("Removing failed jobs")
self.bv1.delete_namespaced_job(
self.name, self.namespace, body=client.V1DeleteOptions(propagation_policy="Background"))