diff --git a/.gitignore b/.gitignore index 1377554e..b9489855 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.swp +*.pyc diff --git a/cloudbuild_testing.yaml b/cloudbuild_testing.yaml new file mode 100644 index 00000000..eb298198 --- /dev/null +++ b/cloudbuild_testing.yaml @@ -0,0 +1,6 @@ +steps: +- name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', 'eu.gcr.io/tes-wes/taskmaster:testing', '-f', 'dockerfiles/taskmaster/Dockerfile', '.'] +- name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', 'eu.gcr.io/tes-wes/filer:testing', '-f', 'dockerfiles/filer/Dockerfile', '.'] +images: ['eu.gcr.io/tes-wes/taskmaster:testing', 'eu.gcr.io/tes-wes/filer:testing'] diff --git a/dockerfiles/taskmaster/Dockerfile b/dockerfiles/taskmaster/Dockerfile index 55c64020..2da4bb77 100644 --- a/dockerfiles/taskmaster/Dockerfile +++ b/dockerfiles/taskmaster/Dockerfile @@ -3,8 +3,14 @@ FROM gliderlabs/alpine RUN apk add --no-cache python py-pip curl openssl RUN pip install kubernetes -WORKDIR /root +RUN adduser -S taskmaster + +USER taskmaster + +WORKDIR /home/taskmaster COPY scripts/taskmaster.py . +COPY scripts/job.py . +COPY scripts/pvc.py . +COPY scripts/filer_class.py . -ENTRYPOINT ["/root/taskmaster.py"] -#CMD /root/taskmaster.py +ENTRYPOINT ["/home/taskmaster/taskmaster.py"] diff --git a/examples/success/inputoutput_taskmaster.json b/examples/success/inputoutput_taskmaster.json new file mode 100644 index 00000000..13d85412 --- /dev/null +++ b/examples/success/inputoutput_taskmaster.json @@ -0,0 +1,107 @@ +{ + "outputs": [ + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouselist.txt", + "path": "/tmp/vol1/mouselist.txt", + "type": "FILE" + }, + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse_out", + "path": "/tmp/vol2/mouse", + "type": "DIRECTORY" + } + ], + "inputs": [ + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/input.txt", + "path": "/tmp/vol1/in.txt", + "type": "FILE" + }, + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse", + "path": "/tmp/vol2/mouse", + "type": "DIRECTORY" + } + ], + "volumes": [ + "/tmp/vol1", + "/tmp/vol2" + ], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "Hello Input" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-7d5c53f4", + "executor-no": "0" + }, + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "cat", + "/tmp/vol1/in.txt" + ], + "image": "alpine", + "name": "task-7d5c53f4-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + }, + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "Hello Input" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-7d5c53f4", + "executor-no": "1" + }, + "name": "task-7d5c53f4-ex-01" + }, + "spec": { + "template": { + "metadata": { + "name": "task-7d5c53f4-ex-01" + }, + "spec": { + "containers": [ + { + "command": [ + "sh", "-c", + "find /tmp/vol2 > /tmp/vol1/mouselist.txt" + ], + "image": "alpine", + "name": "task-7d5c53f4-ex-01", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} diff --git a/scripts/filer.py b/scripts/filer.py index 43e129d6..82d5543a 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -9,11 +9,15 @@ import re import os import distutils.dir_util -import requests +import time +import logging +import traceback + debug = True def download_ftp_file(source, target, ftp): + logging.debug('downloading ftp file: '+source+' target: '+target) basedir = os.path.dirname(target) distutils.dir_util.mkpath(basedir) @@ -22,20 +26,36 @@ def download_ftp_file(source, target, ftp): def process_upload_dir(source, target, ftp): basename = os.path.basename(source) + logging.debug('processing upload dir src: '+source+' target: '+target) + logging.debug('dir basename: '+basename) + wd = ftp.pwd() + # does the parent dir exist? try: - print('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) + ftp.cwd('/'+target) + except: + logging.error('Cannot stat parent dir: /'+target) + return 1 + + ftp.cwd(wd) + + try: + logging.debug('trying to create dir: ' + '/'+target+'/'+basename) ftp.mkd('/'+target+'/'+basename) except ftplib.error_perm: - print('Directory exists, overwriting') + logging.debug('Directory exists, overwriting') for f in os.listdir(source): - if os.path.isdir(source+'/'+f): - process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) - elif os.path.isfile(source+'/'+f): - ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) + path = source+'/'+f + if os.path.isdir(path): + process_upload_dir(path, target+'/'+basename+'/', ftp) + elif os.path.isfile(path): + logging.debug('Trying to upload file: '+path+' to dest: '+target+'/'+basename+'/'+f) + ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(path, 'r')) return 0 def process_ftp_dir(source, target, ftp): + logging.debug('processing ftp dir: '+source+' target: '+target) + pwd = ftp.pwd() ftp.cwd('/'+source) ls = [] @@ -53,11 +73,14 @@ def process_ftp_dir(source, target, ftp): else: download_ftp_file(name, target+'/'+name, ftp) + ftp.cwd(pwd) + def process_ftp_file(ftype, afile): p = re.compile('[a-z]+://([-a-z.]+)/(.*)') ftp_baseurl = p.match(afile['url']).group(1) ftp_path = p.match(afile['url']).group(2) + logging.debug('Connecting to FTP: '+ftp_baseurl) ftp = FTP(ftp_baseurl) if os.environ.get('TESK_FTP_USERNAME') is not None: try: @@ -84,15 +107,20 @@ def process_ftp_file(ftype, afile): elif afile['type'] == 'DIRECTORY': return process_upload_dir(afile['path'], ftp_path, ftp) else: - print('Unknown file type: '+afile['type']) + logging.error('Unknown file type: '+afile['type']) return 1 else: - print('Unknown file action: ' + ftype) + logging.error('Unknown file action: ' + ftype) return 1 def process_http_file(ftype, afile): if ftype == 'inputs': r = requests.get(afile['url']) + + if r.status_code != 200: + logging.error('Got status code: '+str(r.status_code)) + return 1 + fp = open(afile['path'], 'wb') fp.write(r.content) fp.close @@ -100,6 +128,11 @@ def process_http_file(ftype, afile): elif ftype == 'outputs': fp = open(afile['path'], 'r') r = requests.put(afile['url'], data=fp.read()) + + if r.status_code != 200 or r.status_code != 201: + logging.error('Got status code: '+str(r.status_code)) + return 1 + fp.close return 0 else: @@ -109,7 +142,7 @@ def process_http_file(ftype, afile): def filefromcontent(afile): content = afile.get('content') if content is None: - print('Incorrect file spec format, no content or url specified', file=sys.stderr) + logging.error('Incorrect file spec format, no content or url specified') return 1 fh = open(afile['path'], 'w') @@ -125,7 +158,7 @@ def process_file(ftype, afile): p = re.compile('([a-z]+)://') protocol = p.match(url).group(1) - debug('protocol is: '+protocol) + logging.debug('protocol is: '+protocol) if protocol == 'ftp': return process_ftp_file(ftype, afile) @@ -140,6 +173,8 @@ def debug(msg): print(msg, file=sys.stderr) def main(argv): + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) + logging.debug('Starting filer...') parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') parser.add_argument('data', help='file description data, see docs for structure') @@ -148,14 +183,15 @@ def main(argv): data = json.loads(args.data) for afile in data[args.filetype]: - debug('processing file: '+afile['path']) + logging.debug('processing file: '+afile['path']) if process_file(args.filetype, afile): - print('something went wrong', file=sys.stderr) + logging.error('something went wrong') return 1 # TODO a bit more detailed reporting else: - debug('Processed file: ' + afile['path']) + logging.debug('Processed file: ' + afile['path']) return 0 + if __name__ == "__main__": - main(sys.argv) + sys.exit(main(sys.argv)) diff --git a/scripts/filer_class.py b/scripts/filer_class.py new file mode 100644 index 00000000..f5a96c92 --- /dev/null +++ b/scripts/filer_class.py @@ -0,0 +1,50 @@ +import json + +class Filer: + def __init__(self, name, data, filer_version='v0.5', debug=False): + self.name = name + self.spec = { + "kind": "Job", + "apiVersion": "batch/v1", + "metadata": { "name": name }, + "spec": { + "template": { + "metadata": { "name": "tesk-filer" }, + "spec": { + "containers": [ { + "name": "filer", + "image": "eu.gcr.io/tes-wes/filer:"+filer_version, + "args": [], + "env": [], + "volumeMounts": [], + "imagePullPolicy": "IfNotPresent" + } + ], + "volumes": [], + "restartPolicy": "Never" + } + } + } + } + + if debug: + self.spec['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = 'Always' + + container = self.spec['spec']['template']['spec']['containers'][0] + container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) + #container['env'].append({ "name": "JSON_INPUT", "value": 'test' }) + + def set_ftp(self, user, pw): + env = self.spec['spec']['template']['spec']['containers'][0]['env'] + env.append({ "name": "TESK_FTP_USERNAME", "value": user }) + env.append({ "name": "TESK_FTP_PASSWORD", "value": pw }) + + def set_volume_mounts(self, pvc): + tempspec = self.spec['spec']['template']['spec'] + tempspec['containers'][0]['volumeMounts'] = pvc.volume_mounts + tempspec['volumes'] = [ { "name": "task-volume", "persistentVolumeClaim": { "claimName": pvc.name} } ] + + def get_spec(self, mode): + self.spec['spec']['template']['spec']['containers'][0]['args'] = [mode, "$(JSON_INPUT)"] + self.spec['spec']['template']['metadata']['name'] = self.name + return self.spec diff --git a/scripts/job.py b/scripts/job.py new file mode 100644 index 00000000..59d9c028 --- /dev/null +++ b/scripts/job.py @@ -0,0 +1,44 @@ +from kubernetes import client, config +import logging +import time + +class Job: + def __init__(self, body, name='task-job', namespace='default'): + self.name = name + self.namespace = namespace + self.status = 'Initialized' + self.bv1 = client.BatchV1Api() + self.body = body + self.body['metadata']['name'] = self.name + + def run_to_completion(self, poll_interval, check_cancelled): + logging.debug(self.body) + self.bv1.create_namespaced_job(self.namespace, self.body) + status = self.get_status() + while status == 'Running': + if check_cancelled(): + self.delete() + return 'Cancelled' + + time.sleep(poll_interval) + + status = self.get_status() + + return status + + def get_status(self): + job = self.bv1.read_namespaced_job(self.name, self.namespace) + try: + if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status: + self.status = 'Complete' + elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status: + self.status = 'Failed' + else: + self.status = 'Error' + except TypeError: # The condition is not initialized, so it is not complete yet, wait for it + self.status = 'Running' + + return self.status + + def delete(self): + self.bv1.delete_namespaced_job(self.name, self.namespace, client.V1DeleteOptions()) diff --git a/scripts/pvc.py b/scripts/pvc.py new file mode 100644 index 00000000..4f58e6b4 --- /dev/null +++ b/scripts/pvc.py @@ -0,0 +1,31 @@ +from kubernetes import client, config + +class PVC(): + def __init__(self, name='task-pvc', size_gb=1, namespace='default'): + self.name = name + self.spec = { 'apiVersion': 'v1', + 'kind': 'PersistentVolumeClaim', + 'metadata': { 'name': name }, + 'spec': { + 'accessModes': [ 'ReadWriteOnce'], + 'resources': { 'requests' : { 'storage': str(size_gb)+'Gi' } }, + #'storageClassName': 'gold' + } + } + + self.subpath_idx = 0 + self.namespace = namespace + self.cv1 = client.CoreV1Api() + self.cv1.create_namespaced_persistent_volume_claim(self.namespace, self.spec) + + def set_volume_mounts(self, mounts): + self.volume_mounts = mounts + + def get_subpath(self): + subpath = 'dir'+str(self.subpath_idx) + self.subpath_idx += 1 + return subpath + + def delete(self): + cv1 = client.CoreV1Api() + cv1.delete_namespaced_persistent_volume_claim(self.name, self.namespace, client.V1DeleteOptions()) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index 217fc746..c0b935cf 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -8,223 +8,134 @@ import time import sys from kubernetes import client, config +import logging +from job import Job +from pvc import PVC +from filer_class import Filer +from multiprocessing import Process, Pool created_jobs = [] debug = False -polling_interval = 5 +poll_interval = 5 task_volume_basename = 'task-volume' -# translates TES JSON into 1 Job spec per executor -def generate_job_specs(tes): - specs = [] - name = re.sub(r" ", "-", tes['name'].lower()) - - for executor in tes['executors']: - descriptor = { - 'apiVersion': 'batch/v1', - 'kind': 'Job', - 'metadata': {'name': name}, - 'spec': { - 'template': { - 'metadata': {'name': name}, - 'spec': { - 'restartPolicy': 'Never', - 'containers': [{ - 'name': name + '-ex' + len(specs), - 'image': executor['image_name'], - 'command': executor['cmd'], - 'resources': { - 'requests': { - 'cpu': tes['resources']['cpu_cores'], - 'memory': str(tes['resources']['ram_gb']) + 'Gi' - } - } - }] - } - } - } - } - specs.append(descriptor) - - return specs - -def run_executors(specs, namespace, volume_mounts=None, pvc_name=None): - - # init Kubernetes Job API - v1 = client.BatchV1Api() - - for executor in specs: - jobname = executor['metadata']['name'] - spec = executor['spec']['template']['spec'] - - if volume_mounts is not None: - spec['containers'][0]['volumeMounts'] = volume_mounts - if pvc_name is not None: - spec['volumes'] = [{'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc_name }}] - - job = v1.create_namespaced_job(body=executor, namespace=namespace) +def run_executor(executor, namespace, pvc=None): + jobname = executor['metadata']['name'] + spec = executor['spec']['template']['spec'] + + if pvc is not None: + spec['containers'][0]['volumeMounts'] = pvc.volume_mounts + spec['volumes'] = [{ 'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc.name }}] + + logger.debug('Created job: '+jobname) + job = Job(executor, jobname, namespace) + logger.debug('Job spec: '+str(job.body)) + + global created_jobs + created_jobs.append(job) + + status = job.run_to_completion(poll_interval, check_cancelled) + if status != 'Complete': + exit_cancelled('Got status '+status) + +# TODO move this code to PVC class +def append_mount(volume_mounts, name, path, pvc): - global created_jobs - created_jobs.append(jobname) - - print("Created job with metadata='%s'" % str(job.metadata)) - if wait_for_job(jobname, namespace) == 'Failed': - return 'Failed' - - return 'Complete' - -def wait_for_job(jobname, namespace): - bv1 = client.BatchV1Api() - - finished = False - # Job polling loop - while not finished: - job = bv1.read_namespaced_job(jobname, namespace) - try: - #print("job.status.conditions[0].type: %s" % job.status.conditions[0].type) - if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status: - finished = True - elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status: - finished = True - return 'Failed' # If an executor failed we break out of running the executors - else: - #print("hit else, failing") - return 'Failed' # something we don't know happened, fail - except TypeError: # The condition is not initialized, so it is not complete yet, wait for it - time.sleep(polling_interval) - - return 'Complete' # No failures, so return successful finish - -def create_pvc(data, namespace): - task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - pvc_name = task_name + '-pvc' - size = data['resources']['disk_gb'] - - pvc = { 'apiVersion': 'v1', - 'kind': 'PersistentVolumeClaim', - 'metadata': { 'name': pvc_name }, - 'spec': { - 'accessModes': [ 'ReadWriteOnce'], - 'resources': { 'requests' : { 'storage': str(size)+'Gi' } }, - 'storageClassName': 'gold' - } - } - - cv1 = client.CoreV1Api() - cv1.create_namespaced_persistent_volume_claim(namespace, pvc) - return pvc_name - -def get_filer_template(filer_version, name): - filer = { - "kind": "Job", - "apiVersion": "batch/v1", - "metadata": { "name": name }, - "spec": { - "template": { - "metadata": { "name": "tesk-filer" }, - "spec": { - "containers": [ { - "name": "filer", - "image": "eu.gcr.io/tes-wes/filer:"+filer_version, - "args": [], - "env": [], - "volumeMounts": [] - } - ], - "volumes": [], - "restartPolicy": "Never" - } - } - } - } - - if os.environ.get('TESK_FTP_USERNAME') is not None: - env = filer['spec']['template']['spec']['containers'][0]['env'] - env.append({ "name": "TESK_FTP_USERNAME", "value": os.environ['TESK_FTP_USERNAME'] }) - env.append({ "name": "TESK_FTP_PASSWORD", "value": os.environ['TESK_FTP_PASSWORD'] }) - - return filer - -def append_mount(volume_mounts, name, path): # Checks all mount paths in volume_mounts if the path given is already in there duplicate = next((mount for mount in volume_mounts if mount['mountPath'] == path), None) # If not, add mount path if duplicate is None: - volume_mounts.append({ 'name': name, 'mountPath': path }) - -def populate_pvc(data, namespace, pvc_name, filer_version): + subpath = pvc.get_subpath() + logger.debug('appending '+name+' at path '+path+' with subPath: '+subpath) + volume_mounts.append({ 'name': name, 'mountPath': path , 'subPath': subpath}) + +def dirname(iodata): + if iodata['type'] == 'FILE': + # strip filename from path + r = '(.*)/' + dirname = re.match(r, iodata['path']).group(1) + logger.debug('dirname of '+iodata['path']+'is: '+dirname) + elif iodata['type'] == 'DIRECTORY': + dirname = iodata['path'] + + return dirname + +def generate_mounts(data, pvc): volume_mounts = [] # gather volumes that need to be mounted, without duplicates volume_name = task_volume_basename - for idx, volume in enumerate(data['volumes']): - append_mount(volume_mounts, volume_name, volume) + for volume in data['volumes']: + append_mount(volume_mounts, volume_name, volume, pvc) - # gather other paths that need to be mounted from inputs FILE and DIRECTORY entries - for idx, aninput in enumerate(data['inputs']): - if aninput['type'] == 'FILE': - # strip filename from path - p = re.compile('(.*)/') - basepath = p.match(aninput['path']).group(1) - elif aninput['type'] == 'DIRECTORY': - basepath = aninput['path'] + # gather other paths that need to be mounted from inputs/outputs FILE and DIRECTORY entries + for aninput in data['inputs']: + dirnm = dirname(aninput) + append_mount(volume_mounts, volume_name, dirnm, pvc) - append_mount(volume_mounts, volume_name, basepath) + for anoutput in data['outputs']: + dirnm = dirname(anoutput) + append_mount(volume_mounts, volume_name, dirnm, pvc) - # get name from taskmaster and create pretask template - name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - pretask = get_filer_template(filer_version, name+'-inputs-filer') + return volume_mounts - # insert JSON input into job template - container = pretask['spec']['template']['spec']['containers'][0] - container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) - container['args'] += ["inputs", "$(JSON_INPUT)"] +def init_pvc(data, filer): + task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] + pvc_name = task_name+'-pvc' + pvc_size = data['resources']['disk_gb'] + pvc = PVC(pvc_name, pvc_size, args.namespace) - # insert volume mounts and pvc into job template - container['volumeMounts'] = volume_mounts - pretask['spec']['template']['spec']['volumes'] = [ { "name": volume_name, "persistentVolumeClaim": { "claimName": pvc_name} } ] + # to global var for cleanup purposes + global created_pvc + created_pvc = pvc - #print(json.dumps(pretask, indent=2), file=sys.stderr) + mounts = generate_mounts(data, pvc) + logging.debug(mounts) + logging.debug(type(mounts)) + pvc.set_volume_mounts(mounts) - # Run pretask filer job - bv1 = client.BatchV1Api() - job = bv1.create_namespaced_job(body=pretask, namespace=namespace) + filer.set_volume_mounts(pvc) + filerjob = Job(filer.get_spec('inputs'), task_name+'-inputs-filer', args.namespace) - global created_jobs - created_jobs.append(pretask['metadata']['name']) + global created_jobs + created_jobs.append(filerjob) + #filerjob.run_to_completion(poll_interval) + status = filerjob.run_to_completion(poll_interval, check_cancelled) + if status != 'Complete': + exit_cancelled('Got status '+status) - wait_for_job(pretask['metadata']['name'], namespace) + return pvc - return volume_mounts +def run_task(data, filer_version): + task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] + pvc = None -def cleanup_pvc(data, namespace, volume_mounts, pvc_name, filer_version): - # get name from taskmaster and create posttask template - name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - posttask = get_filer_template(filer_version, name+'-outputs-filer') + if data['volumes'] or data['inputs'] or data['outputs']: - # insert JSON input into posttask job template - container = posttask['spec']['template']['spec']['containers'][0] - container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) - container['args'] += ["outputs", "$(JSON_INPUT)"] + filer = Filer(task_name+'-filer', data, filer_version, args.debug) + if os.environ.get('TESK_FTP_USERNAME') is not None: + filer.set_ftp(os.environ['TESK_FTP_USERNAME'], os.environ['TESK_FTP_PASSWORD']) - # insert volume mounts and pvc into posttask job template - container['volumeMounts'] = volume_mounts - posttask['spec']['template']['spec']['volumes'] = [ { "name": task_volume_basename, "persistentVolumeClaim": { "claimName": pvc_name} } ] - # run posttask filer job - bv1 = client.BatchV1Api() - job = bv1.create_namespaced_job(body=posttask, namespace=namespace) + pvc = init_pvc(data, filer) - global created_jobs - created_jobs.append(pretask['metadata']['name']) + for executor in data['executors']: + run_executor(executor, args.namespace, pvc) - wait_for_job(posttask['metadata']['name'], namespace) + # run executors + logging.debug("Finished running executors") - #print(json.dumps(posttask, indent=2), file=sys.stderr) + # upload files and delete pvc + if data['volumes'] or data['inputs'] or data['outputs']: + filerjob = Job(filer.get_spec('outputs'), task_name+'-outputs-filer', args.namespace) + + global created_jobs + created_jobs.append(filerjob) - # Delete pvc - cv1 = client.CoreV1Api() - cv1.delete_namespaced_persistent_volume_claim(pvc_name, namespace, client.V1DeleteOptions()) + #filerjob.run_to_completion(poll_interval) + status = filerjob.run_to_completion(poll_interval, check_cancelled) + if status != 'Complete': + exit_cancelled('Got status '+status) def main(argv): parser = argparse.ArgumentParser(description='TaskMaster main module') @@ -232,7 +143,7 @@ def main(argv): group.add_argument('json', help='string containing json TES request, required if -f is not given', nargs='?') group.add_argument('-f', '--file', help='TES request as a file or \'-\' for stdin, required if json is not given') - parser.add_argument('-p', '--polling-interval', help='Job polling interval', default=5) + parser.add_argument('-p', '--poll-interval', help='Job polling interval', default=5) parser.add_argument('-fv', '--filer-version', help='Filer image version', default='v0.1.9') parser.add_argument('-n', '--namespace', help='Kubernetes namespace to run in', default='default') parser.add_argument('-s', '--state-file', help='State file for state.py script', default='/tmp/.teskstate') @@ -241,9 +152,17 @@ def main(argv): global args args = parser.parse_args() - polling_interval = args.polling_interval + poll_interval = args.poll_interval - debug = args.debug + loglevel = logging.WARNING + if args.debug: + loglevel = logging.DEBUG + + global logger + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=loglevel) + logging.getLogger('kubernetes.client').setLevel(logging.CRITICAL) + logger = logging.getLogger(__name__) + logger.debug('Starting taskmaster') # Get input JSON if args.file is None: @@ -251,46 +170,48 @@ def main(argv): elif args.file == '-': data = json.load(sys.stdin) else: - data = json.load(open(args.file)) - - #specs = generate_job_specs(tes) + with open(args.file) as fh: + data = json.load(fh) # Load kubernetes config file - if not debug: - config.load_incluster_config() - else: + if args.debug: config.load_kube_config() - - # create and populate pvc only if volumes/inputs/outputs are given - if data['volumes'] or data['inputs'] or data['outputs']: - pvc_name = create_pvc(data, args.namespace) - - # to global var for cleanup purposes - global created_pvc - created_pvc = pvc_name - - volume_mounts = populate_pvc(data, args.namespace, pvc_name, args.filer_version) - state = run_executors(data['executors'], args.namespace, volume_mounts, pvc_name) else: - state = run_executors(data['executors'], args.namespace) + config.load_incluster_config() - # run executors - print("Finished with state %s" % state) + global created_pvc + created_pvc = None - # upload files and delete pvc - if data['volumes'] or data['inputs'] or data['outputs']: - cleanup_pvc(data, args.namespace, volume_mounts, pvc_name, args.filer_version) + # Check if we're cancelled during init + if check_cancelled(): + exit_cancelled('Cancelled during init') -def clean_on_interrupt(): - print('Caught interrupt signal, deleting jobs and pvc', file=sys.stderr) - bv1 = client.BatchV1Api() + run_task(data, args.filer_version) +def clean_on_interrupt(): + logger.debug('Caught interrupt signal, deleting jobs and pvc') + for job in created_jobs: - bv1.delete_namespaced_job(job, args.namespace, client.V1DeleteOptions()) + job.delete() + + if created_pvc: + created_pvc.delete() +def exit_cancelled(reason='Unknown reason'): if created_pvc: - cv1 = client.CoreV1Api() - cv1.delete_namespaced_persistent_volume_claim(created_pvc, args.namespace, client.V1DeleteOptions()) + created_pvc.delete() + logger.error('Cancelling taskmaster: '+reason) + sys.exit(0) + +def check_cancelled(): + with open('/podinfo/labels') as fh: + for line in fh.readlines(): + name, label = line.split('=') + logging.debug('Got label: '+label) + if label == '"Cancelled"': + return True + + return False if __name__ == "__main__": try: