From 3accdcee11f4e4b7f1db2f366a39f7d5ccc6b123 Mon Sep 17 00:00:00 2001 From: erikvdbergh Date: Fri, 16 Feb 2018 16:42:56 +0000 Subject: [PATCH 01/20] In the process of fixing, still output errors --- scripts/filer.py | 21 ++++++++++++++++----- scripts/taskmaster.py | 35 ++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index bfeba853..b7db6308 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -10,10 +10,13 @@ import re import os import distutils.dir_util +import time +import logging debug = True def download_ftp_file(source, target, ftp): + logging.debug('downloading ftp file: '+source+' target: '+target) basedir = os.path.dirname(target) distutils.dir_util.mkpath(basedir) @@ -22,11 +25,12 @@ def download_ftp_file(source, target, ftp): def process_upload_dir(source, target, ftp): basename = os.path.basename(source) + logging.debug('processing upload dir, basename: '+basename) try: - print('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) + logging.debug('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) ftp.mkd('/'+target+'/'+basename) except ftplib.error_perm: - print('Directory exists, overwriting') + logging.debug('Directory exists, overwriting') for f in os.listdir(source): if os.path.isdir(source+'/'+f): @@ -36,6 +40,8 @@ def process_upload_dir(source, target, ftp): return 0 def process_ftp_dir(source, target, ftp): + logging.debug('processing ftp dir: '+source+' target: '+target) + pwd = ftp.pwd() ftp.cwd('/'+source) ls = [] @@ -53,11 +59,14 @@ def process_ftp_dir(source, target, ftp): else: download_ftp_file(name, target+'/'+name, ftp) + ftp.cwd(pwd) + def process_ftp_file(ftype, afile): p = re.compile('[a-z]+://([-a-z.]+)/(.*)') ftp_baseurl = p.match(afile['url']).group(1) ftp_path = p.match(afile['url']).group(2) + logging.debug('Connecting to FTP: '+ftp_baseurl) ftp = FTP(ftp_baseurl) if os.environ.get('TESK_FTP_USERNAME') is not None: try: @@ -125,7 +134,7 @@ def process_file(ftype, afile): p = re.compile('([a-z]+)://') protocol = p.match(url).group(1) - debug('protocol is: '+protocol) + logging.debug('protocol is: '+protocol) if protocol == 'ftp': return process_ftp_file(ftype, afile) @@ -140,6 +149,8 @@ def debug(msg): print(msg, file=sys.stderr) def main(argv): + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) + logging.debug('Starting filer...') parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') parser.add_argument('data', help='file description data, see docs for structure') @@ -148,13 +159,13 @@ def main(argv): data = json.loads(args.data) for afile in data[args.filetype]: - debug('processing file: '+afile['path']) + logging.debug('processing file: '+afile['path']) if process_file(args.filetype, afile): print('something went wrong', file=sys.stderr) return 1 # TODO a bit more detailed reporting else: - debug('Processed file: ' + afile['path']) + logging.debug('Processed file: ' + afile['path']) return 0 if __name__ == "__main__": diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index aa9769ef..90e9922e 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -10,6 +10,8 @@ import sys from kubernetes import client, config from datetime import datetime +import logging + created_jobs = [] debug = False @@ -64,6 +66,7 @@ def run_executors(specs, namespace, volume_mounts=None, pvc_name=None): if pvc_name is not None: spec['volumes'] = [{ 'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc_name }}] + logger.debug('Created job: '+jobname) job = v1.create_namespaced_job(body=executor, namespace=namespace) global created_jobs @@ -130,7 +133,8 @@ def get_filer_template(filer_version, name): "image": "eu.gcr.io/tes-wes/filer:"+filer_version, "args": [], "env": [], - "volumeMounts": [] + "volumeMounts": [], + "imagePullPolicy": "IfNotPresent" } ], "volumes": [], @@ -145,6 +149,11 @@ def get_filer_template(filer_version, name): env.append({ "name": "TESK_FTP_USERNAME", "value": os.environ['TESK_FTP_USERNAME'] }) env.append({ "name": "TESK_FTP_PASSWORD", "value": os.environ['TESK_FTP_PASSWORD'] }) + if args.debug: + filer['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = 'Always' + + logging.debug(json.dumps(filer)) + return filer def append_mount(volume_mounts, name, path): @@ -152,6 +161,7 @@ def append_mount(volume_mounts, name, path): duplicate = next((mount for mount in volume_mounts if mount['mountPath'] == path), None) # If not, add mount path if duplicate is None: + logger.debug('appending '+name+' at path '+path) volume_mounts.append({ 'name': name, 'mountPath': path }) def populate_pvc(data, namespace, pvc_name, filer_version): @@ -168,6 +178,7 @@ def populate_pvc(data, namespace, pvc_name, filer_version): # strip filename from path p = re.compile('(.*)/') basepath = p.match(aninput['path']).group(1) + logger.debug('basepath is: '+basepath) elif aninput['type'] == 'DIRECTORY': basepath = aninput['path'] @@ -218,7 +229,7 @@ def cleanup_pvc(data, namespace, volume_mounts, pvc_name, filer_version): job = bv1.create_namespaced_job(body=posttask, namespace=namespace) global created_jobs - created_jobs.append(pretask['metadata']['name']) + created_jobs.append(posttask['metadata']['name']) wait_for_job(posttask['metadata']['name'], namespace) @@ -245,7 +256,15 @@ def main(argv): polling_interval = args.polling_interval - debug = args.debug + loglevel = logging.WARNING + if args.debug: + loglevel = logging.DEBUG + + global logger + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=loglevel) + logging.getLogger('kubernetes.client').setLevel(logging.CRITICAL) + logger = logging.getLogger(__name__) + logger.debug('Starting taskmaster') # Get input JSON if args.file is None: @@ -258,10 +277,10 @@ def main(argv): #specs = generate_job_specs(tes) # Load kubernetes config file - if not debug: - config.load_incluster_config() - else: + if args.debug: config.load_kube_config() + else: + config.load_incluster_config() # create and populate pvc only if volumes/inputs/outputs are given if data['volumes'] or data['inputs'] or data['outputs']: @@ -284,14 +303,16 @@ def main(argv): cleanup_pvc(data, args.namespace, volume_mounts, pvc_name, args.filer_version) def clean_on_interrupt(): - print('Caught interrupt signal, deleting jobs and pvc', file=sys.stderr) + logger.debug('Caught interrupt signal, deleting jobs and pvc') bv1 = client.BatchV1Api() for job in created_jobs: + logger.debug('Deleting job: '+job) bv1.delete_namespaced_job(job, args.namespace, client.V1DeleteOptions()) if created_pvc: cv1 = client.CoreV1Api() + logger.debug('Deleting pvc: '+created_pvc) cv1.delete_namespaced_persistent_volume_claim(created_pvc, args.namespace, client.V1DeleteOptions()) if __name__ == "__main__": From 94d2343f3b6e9edde2f90e47af80542c4a87e463 Mon Sep 17 00:00:00 2001 From: erikvdbergh Date: Thu, 22 Feb 2018 15:32:29 +0000 Subject: [PATCH 02/20] still testing --- scripts/filer.py | 352 ++++++++++++++++++++++++----------------------- 1 file changed, 180 insertions(+), 172 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index b7db6308..ffacce2c 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -1,172 +1,180 @@ -#!/usr/bin/python - -from __future__ import print_function -from ftplib import FTP -import ftplib -import argparse -import requests -import sys -import json -import re -import os -import distutils.dir_util -import time -import logging - -debug = True - -def download_ftp_file(source, target, ftp): - logging.debug('downloading ftp file: '+source+' target: '+target) - basedir = os.path.dirname(target) - distutils.dir_util.mkpath(basedir) - - ftp.retrbinary("RETR "+source, open(target, 'w').write) - return 0 - -def process_upload_dir(source, target, ftp): - basename = os.path.basename(source) - logging.debug('processing upload dir, basename: '+basename) - try: - logging.debug('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) - ftp.mkd('/'+target+'/'+basename) - except ftplib.error_perm: - logging.debug('Directory exists, overwriting') - - for f in os.listdir(source): - if os.path.isdir(source+'/'+f): - process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) - elif os.path.isfile(source+'/'+f): - ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) - return 0 - -def process_ftp_dir(source, target, ftp): - logging.debug('processing ftp dir: '+source+' target: '+target) - pwd = ftp.pwd() - ftp.cwd('/'+source) - - ls = [] - ftp.retrlines('LIST', ls.append) - - # This is horrible and I'm sorry but it works flawlessly. Credit to Chris Haas for writing this - # see https://stackoverflow.com/questions/966578/parse-response-from-ftp-list-command-syntax-variations for attribution - p = re.compile('^(?P[\-ld])(?P([\-r][\-w][\-xs]){3})\s+(?P\d+)\s+(?P\w+)\s+(?P\w+)\s+(?P\d+)\s+(?P((\w{3})\s+(\d{2})\s+(\d{1,2}):(\d{2}))|((\w{3})\s+(\d{1,2})\s+(\d{4})))\s+(?P.+)$') - for l in ls: - dirbit = p.match(l).group('dir') - name = p.match(l).group('name') - - if dirbit == 'd': - process_ftp_dir(source+'/'+name, target+'/'+name, ftp) - else: - download_ftp_file(name, target+'/'+name, ftp) - - ftp.cwd(pwd) - -def process_ftp_file(ftype, afile): - p = re.compile('[a-z]+://([-a-z.]+)/(.*)') - ftp_baseurl = p.match(afile['url']).group(1) - ftp_path = p.match(afile['url']).group(2) - - logging.debug('Connecting to FTP: '+ftp_baseurl) - ftp = FTP(ftp_baseurl) - if os.environ.get('TESK_FTP_USERNAME') is not None: - try: - user = os.environ['TESK_FTP_USERNAME'] - pw = os.environ['TESK_FTP_PASSWORD'] - ftp.login(user, pw) - except ftplib.error_perm: - ftp.login() - else: - ftp.login() - - if ftype == 'inputs': - if afile['type'] == 'FILE': - return download_ftp_file(ftp_path, afile['path'], ftp) - elif afile['type'] == 'DIRECTORY': - return process_ftp_dir(ftp_path, afile['path'], ftp) - else: - print('Unknown file type') - return 1 - elif ftype == 'outputs': - if afile['type'] == 'FILE': - ftp.storbinary("STOR "+ftp_path, open(afile['path'], 'r')) - return 0 - elif afile['type'] == 'DIRECTORY': - return process_upload_dir(afile['path'], ftp_path, ftp) - else: - print('Unknown file type: '+afile['type']) - return 1 - else: - print('Unknown file action: ' + ftype) - return 1 - -def process_http_file(ftype, afile): - if ftype == 'inputs': - r = requests.get(afile['url']) - fp = open(afile['path'], 'wb') - fp.write(r.content) - fp.close - return 0 - elif ftype == 'outputs': - fp = open(afile['path'], 'r') - r = requests.put(afile['url'], data=fp.read()) - fp.close - return 0 - else: - print('Unknown action') - return 1 - -def filefromcontent(afile): - content = afile.get('content') - if content is None: - print('Incorrect file spec format, no content or url specified', file=sys.stderr) - return 1 - - fh = open(afile['path'], 'w') - fh.write(str(afile['content'])) - fh.close() - return 0 - -def process_file(ftype, afile): - url = afile.get('url') - - if url is None: - return filefromcontent(afile) - - p = re.compile('([a-z]+)://') - protocol = p.match(url).group(1) - logging.debug('protocol is: '+protocol) - - if protocol == 'ftp': - return process_ftp_file(ftype, afile) - elif protocol == 'http' or protocol == 'https': - return process_http_file(ftype, afile) - else: - print('Unknown file protocol') - return 1 - -def debug(msg): - if debug: - print(msg, file=sys.stderr) - -def main(argv): - logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) - logging.debug('Starting filer...') - parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') - parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') - parser.add_argument('data', help='file description data, see docs for structure') - args = parser.parse_args() - - data = json.loads(args.data) - - for afile in data[args.filetype]: - logging.debug('processing file: '+afile['path']) - if process_file(args.filetype, afile): - print('something went wrong', file=sys.stderr) - return 1 - # TODO a bit more detailed reporting - else: - logging.debug('Processed file: ' + afile['path']) - - return 0 -if __name__ == "__main__": - main(sys.argv) +#!/usr/bin/python + +from __future__ import print_function +from ftplib import FTP +import ftplib +import argparse +import requests +import sys +import json +import re +import os +import distutils.dir_util +import time +import logging +import traceback + +debug = True + +def download_ftp_file(source, target, ftp): + logging.debug('downloading ftp file: '+source+' target: '+target) + basedir = os.path.dirname(target) + distutils.dir_util.mkpath(basedir) + + ftp.retrbinary("RETR "+source, open(target, 'w').write) + return 0 + +def process_upload_dir(source, target, ftp): + basename = os.path.basename(source) + logging.debug('processing upload dir, basename: '+basename) + try: + logging.debug('trying to create dir: ' + '/'+target+'/'+basename) + ftp.mkd('/'+target+'/'+basename) + except ftplib.error_perm: + logging.debug('Directory exists, overwriting') + + for f in os.listdir(source): + if os.path.isdir(source+'/'+f): + process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) + elif os.path.isfile(source+'/'+f): + ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) + return 0 + +def process_ftp_dir(source, target, ftp): + logging.debug('processing ftp dir: '+source+' target: '+target) + pwd = ftp.pwd() + ftp.cwd('/'+source) + + ls = [] + ftp.retrlines('LIST', ls.append) + + # This is horrible and I'm sorry but it works flawlessly. Credit to Chris Haas for writing this + # see https://stackoverflow.com/questions/966578/parse-response-from-ftp-list-command-syntax-variations for attribution + p = re.compile('^(?P[\-ld])(?P([\-r][\-w][\-xs]){3})\s+(?P\d+)\s+(?P\w+)\s+(?P\w+)\s+(?P\d+)\s+(?P((\w{3})\s+(\d{2})\s+(\d{1,2}):(\d{2}))|((\w{3})\s+(\d{1,2})\s+(\d{4})))\s+(?P.+)$') + for l in ls: + dirbit = p.match(l).group('dir') + name = p.match(l).group('name') + + if dirbit == 'd': + process_ftp_dir(source+'/'+name, target+'/'+name, ftp) + else: + download_ftp_file(name, target+'/'+name, ftp) + + ftp.cwd(pwd) + +def process_ftp_file(ftype, afile): + p = re.compile('[a-z]+://([-a-z.]+)/(.*)') + ftp_baseurl = p.match(afile['url']).group(1) + ftp_path = p.match(afile['url']).group(2) + + logging.debug('Connecting to FTP: '+ftp_baseurl) + ftp = FTP(ftp_baseurl) + if os.environ.get('TESK_FTP_USERNAME') is not None: + try: + user = os.environ['TESK_FTP_USERNAME'] + pw = os.environ['TESK_FTP_PASSWORD'] + ftp.login(user, pw) + except ftplib.error_perm: + ftp.login() + else: + ftp.login() + + if ftype == 'inputs': + if afile['type'] == 'FILE': + return download_ftp_file(ftp_path, afile['path'], ftp) + elif afile['type'] == 'DIRECTORY': + return process_ftp_dir(ftp_path, afile['path'], ftp) + else: + print('Unknown file type') + return 1 + elif ftype == 'outputs': + if afile['type'] == 'FILE': + ftp.storbinary("STOR "+ftp_path, open(afile['path'], 'r')) + return 0 + elif afile['type'] == 'DIRECTORY': + return process_upload_dir(afile['path'], ftp_path, ftp) + else: + print('Unknown file type: '+afile['type']) + return 1 + else: + print('Unknown file action: ' + ftype) + return 1 + +def process_http_file(ftype, afile): + if ftype == 'inputs': + r = requests.get(afile['url']) + fp = open(afile['path'], 'wb') + fp.write(r.content) + fp.close + return 0 + elif ftype == 'outputs': + fp = open(afile['path'], 'r') + r = requests.put(afile['url'], data=fp.read()) + fp.close + return 0 + else: + print('Unknown action') + return 1 + +def filefromcontent(afile): + content = afile.get('content') + if content is None: + logging.error('Incorrect file spec format, no content or url specified') + return 1 + + fh = open(afile['path'], 'w') + fh.write(str(afile['content'])) + fh.close() + return 0 + +def process_file(ftype, afile): + url = afile.get('url') + + if url is None: + return filefromcontent(afile) + + p = re.compile('([a-z]+)://') + protocol = p.match(url).group(1) + logging.debug('protocol is: '+protocol) + + if protocol == 'ftp': + return process_ftp_file(ftype, afile) + elif protocol == 'http' or protocol == 'https': + return process_http_file(ftype, afile) + else: + print('Unknown file protocol') + return 1 + +def debug(msg): + if debug: + print(msg, file=sys.stderr) + +def main(argv): + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) + logging.debug('Starting filer...') + parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') + parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') + parser.add_argument('data', help='file description data, see docs for structure') + args = parser.parse_args() + + data = json.loads(args.data) + time.sleep(600) + + for afile in data[args.filetype]: + logging.debug('processing file: '+afile['path']) + if process_file(args.filetype, afile): + logging.error('something went wrong') + return 1 + # TODO a bit more detailed reporting + else: + logging.debug('Processed file: ' + afile['path']) + + return 0 + +if __name__ == "__main__": + try: + main(sys.argv) + except: + traceback.print_exc(file=sys.stderr) + time.sleep(600) + raise From d00c5f378554846a71a1d037d19c84ceef6f0734 Mon Sep 17 00:00:00 2001 From: erikvdbergh Date: Thu, 22 Feb 2018 15:37:16 +0000 Subject: [PATCH 03/20] now with unix line endings --- scripts/filer.py | 360 +++++++++++++++++++++++------------------------ 1 file changed, 180 insertions(+), 180 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index ffacce2c..05c10836 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -1,180 +1,180 @@ -#!/usr/bin/python - -from __future__ import print_function -from ftplib import FTP -import ftplib -import argparse -import requests -import sys -import json -import re -import os -import distutils.dir_util -import time -import logging -import traceback - -debug = True - -def download_ftp_file(source, target, ftp): - logging.debug('downloading ftp file: '+source+' target: '+target) - basedir = os.path.dirname(target) - distutils.dir_util.mkpath(basedir) - - ftp.retrbinary("RETR "+source, open(target, 'w').write) - return 0 - -def process_upload_dir(source, target, ftp): - basename = os.path.basename(source) - logging.debug('processing upload dir, basename: '+basename) - try: - logging.debug('trying to create dir: ' + '/'+target+'/'+basename) - ftp.mkd('/'+target+'/'+basename) - except ftplib.error_perm: - logging.debug('Directory exists, overwriting') - - for f in os.listdir(source): - if os.path.isdir(source+'/'+f): - process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) - elif os.path.isfile(source+'/'+f): - ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) - return 0 - -def process_ftp_dir(source, target, ftp): - logging.debug('processing ftp dir: '+source+' target: '+target) - pwd = ftp.pwd() - ftp.cwd('/'+source) - - ls = [] - ftp.retrlines('LIST', ls.append) - - # This is horrible and I'm sorry but it works flawlessly. Credit to Chris Haas for writing this - # see https://stackoverflow.com/questions/966578/parse-response-from-ftp-list-command-syntax-variations for attribution - p = re.compile('^(?P[\-ld])(?P([\-r][\-w][\-xs]){3})\s+(?P\d+)\s+(?P\w+)\s+(?P\w+)\s+(?P\d+)\s+(?P((\w{3})\s+(\d{2})\s+(\d{1,2}):(\d{2}))|((\w{3})\s+(\d{1,2})\s+(\d{4})))\s+(?P.+)$') - for l in ls: - dirbit = p.match(l).group('dir') - name = p.match(l).group('name') - - if dirbit == 'd': - process_ftp_dir(source+'/'+name, target+'/'+name, ftp) - else: - download_ftp_file(name, target+'/'+name, ftp) - - ftp.cwd(pwd) - -def process_ftp_file(ftype, afile): - p = re.compile('[a-z]+://([-a-z.]+)/(.*)') - ftp_baseurl = p.match(afile['url']).group(1) - ftp_path = p.match(afile['url']).group(2) - - logging.debug('Connecting to FTP: '+ftp_baseurl) - ftp = FTP(ftp_baseurl) - if os.environ.get('TESK_FTP_USERNAME') is not None: - try: - user = os.environ['TESK_FTP_USERNAME'] - pw = os.environ['TESK_FTP_PASSWORD'] - ftp.login(user, pw) - except ftplib.error_perm: - ftp.login() - else: - ftp.login() - - if ftype == 'inputs': - if afile['type'] == 'FILE': - return download_ftp_file(ftp_path, afile['path'], ftp) - elif afile['type'] == 'DIRECTORY': - return process_ftp_dir(ftp_path, afile['path'], ftp) - else: - print('Unknown file type') - return 1 - elif ftype == 'outputs': - if afile['type'] == 'FILE': - ftp.storbinary("STOR "+ftp_path, open(afile['path'], 'r')) - return 0 - elif afile['type'] == 'DIRECTORY': - return process_upload_dir(afile['path'], ftp_path, ftp) - else: - print('Unknown file type: '+afile['type']) - return 1 - else: - print('Unknown file action: ' + ftype) - return 1 - -def process_http_file(ftype, afile): - if ftype == 'inputs': - r = requests.get(afile['url']) - fp = open(afile['path'], 'wb') - fp.write(r.content) - fp.close - return 0 - elif ftype == 'outputs': - fp = open(afile['path'], 'r') - r = requests.put(afile['url'], data=fp.read()) - fp.close - return 0 - else: - print('Unknown action') - return 1 - -def filefromcontent(afile): - content = afile.get('content') - if content is None: - logging.error('Incorrect file spec format, no content or url specified') - return 1 - - fh = open(afile['path'], 'w') - fh.write(str(afile['content'])) - fh.close() - return 0 - -def process_file(ftype, afile): - url = afile.get('url') - - if url is None: - return filefromcontent(afile) - - p = re.compile('([a-z]+)://') - protocol = p.match(url).group(1) - logging.debug('protocol is: '+protocol) - - if protocol == 'ftp': - return process_ftp_file(ftype, afile) - elif protocol == 'http' or protocol == 'https': - return process_http_file(ftype, afile) - else: - print('Unknown file protocol') - return 1 - -def debug(msg): - if debug: - print(msg, file=sys.stderr) - -def main(argv): - logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) - logging.debug('Starting filer...') - parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') - parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') - parser.add_argument('data', help='file description data, see docs for structure') - args = parser.parse_args() - - data = json.loads(args.data) - time.sleep(600) - - for afile in data[args.filetype]: - logging.debug('processing file: '+afile['path']) - if process_file(args.filetype, afile): - logging.error('something went wrong') - return 1 - # TODO a bit more detailed reporting - else: - logging.debug('Processed file: ' + afile['path']) - - return 0 - -if __name__ == "__main__": - try: - main(sys.argv) - except: - traceback.print_exc(file=sys.stderr) - time.sleep(600) - raise +#!/usr/bin/python + +from __future__ import print_function +from ftplib import FTP +import ftplib +import argparse +import requests +import sys +import json +import re +import os +import distutils.dir_util +import time +import logging +import traceback + +debug = True + +def download_ftp_file(source, target, ftp): + logging.debug('downloading ftp file: '+source+' target: '+target) + basedir = os.path.dirname(target) + distutils.dir_util.mkpath(basedir) + + ftp.retrbinary("RETR "+source, open(target, 'w').write) + return 0 + +def process_upload_dir(source, target, ftp): + basename = os.path.basename(source) + logging.debug('processing upload dir, basename: '+basename) + try: + logging.debug('trying to create dir: ' + '/'+target+'/'+basename) + ftp.mkd('/'+target+'/'+basename) + except ftplib.error_perm: + logging.debug('Directory exists, overwriting') + + for f in os.listdir(source): + if os.path.isdir(source+'/'+f): + process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) + elif os.path.isfile(source+'/'+f): + ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) + return 0 + +def process_ftp_dir(source, target, ftp): + logging.debug('processing ftp dir: '+source+' target: '+target) + pwd = ftp.pwd() + ftp.cwd('/'+source) + + ls = [] + ftp.retrlines('LIST', ls.append) + + # This is horrible and I'm sorry but it works flawlessly. Credit to Chris Haas for writing this + # see https://stackoverflow.com/questions/966578/parse-response-from-ftp-list-command-syntax-variations for attribution + p = re.compile('^(?P[\-ld])(?P([\-r][\-w][\-xs]){3})\s+(?P\d+)\s+(?P\w+)\s+(?P\w+)\s+(?P\d+)\s+(?P((\w{3})\s+(\d{2})\s+(\d{1,2}):(\d{2}))|((\w{3})\s+(\d{1,2})\s+(\d{4})))\s+(?P.+)$') + for l in ls: + dirbit = p.match(l).group('dir') + name = p.match(l).group('name') + + if dirbit == 'd': + process_ftp_dir(source+'/'+name, target+'/'+name, ftp) + else: + download_ftp_file(name, target+'/'+name, ftp) + + ftp.cwd(pwd) + +def process_ftp_file(ftype, afile): + p = re.compile('[a-z]+://([-a-z.]+)/(.*)') + ftp_baseurl = p.match(afile['url']).group(1) + ftp_path = p.match(afile['url']).group(2) + + logging.debug('Connecting to FTP: '+ftp_baseurl) + ftp = FTP(ftp_baseurl) + if os.environ.get('TESK_FTP_USERNAME') is not None: + try: + user = os.environ['TESK_FTP_USERNAME'] + pw = os.environ['TESK_FTP_PASSWORD'] + ftp.login(user, pw) + except ftplib.error_perm: + ftp.login() + else: + ftp.login() + + if ftype == 'inputs': + if afile['type'] == 'FILE': + return download_ftp_file(ftp_path, afile['path'], ftp) + elif afile['type'] == 'DIRECTORY': + return process_ftp_dir(ftp_path, afile['path'], ftp) + else: + print('Unknown file type') + return 1 + elif ftype == 'outputs': + if afile['type'] == 'FILE': + ftp.storbinary("STOR "+ftp_path, open(afile['path'], 'r')) + return 0 + elif afile['type'] == 'DIRECTORY': + return process_upload_dir(afile['path'], ftp_path, ftp) + else: + print('Unknown file type: '+afile['type']) + return 1 + else: + print('Unknown file action: ' + ftype) + return 1 + +def process_http_file(ftype, afile): + if ftype == 'inputs': + r = requests.get(afile['url']) + fp = open(afile['path'], 'wb') + fp.write(r.content) + fp.close + return 0 + elif ftype == 'outputs': + fp = open(afile['path'], 'r') + r = requests.put(afile['url'], data=fp.read()) + fp.close + return 0 + else: + print('Unknown action') + return 1 + +def filefromcontent(afile): + content = afile.get('content') + if content is None: + logging.error('Incorrect file spec format, no content or url specified') + return 1 + + fh = open(afile['path'], 'w') + fh.write(str(afile['content'])) + fh.close() + return 0 + +def process_file(ftype, afile): + url = afile.get('url') + + if url is None: + return filefromcontent(afile) + + p = re.compile('([a-z]+)://') + protocol = p.match(url).group(1) + logging.debug('protocol is: '+protocol) + + if protocol == 'ftp': + return process_ftp_file(ftype, afile) + elif protocol == 'http' or protocol == 'https': + return process_http_file(ftype, afile) + else: + print('Unknown file protocol') + return 1 + +def debug(msg): + if debug: + print(msg, file=sys.stderr) + +def main(argv): + logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG) + logging.debug('Starting filer...') + parser = argparse.ArgumentParser(description='Filer script for down- and uploading files') + parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ') + parser.add_argument('data', help='file description data, see docs for structure') + args = parser.parse_args() + + data = json.loads(args.data) + time.sleep(60) + + for afile in data[args.filetype]: + logging.debug('processing file: '+afile['path']) + if process_file(args.filetype, afile): + logging.error('something went wrong') + return 1 + # TODO a bit more detailed reporting + else: + logging.debug('Processed file: ' + afile['path']) + + return 0 + +if __name__ == "__main__": + try: + main(sys.argv) + except: + traceback.print_exc(file=sys.stderr) + time.sleep(600) + raise From e49e1b39e455d9fe422da4962ee395fa3da42eec Mon Sep 17 00:00:00 2001 From: erikvdbergh Date: Thu, 22 Feb 2018 15:38:58 +0000 Subject: [PATCH 04/20] fixed to latest --- scripts/filer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/filer.py b/scripts/filer.py index 05c10836..252b157d 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -28,7 +28,7 @@ def process_upload_dir(source, target, ftp): basename = os.path.basename(source) logging.debug('processing upload dir, basename: '+basename) try: - logging.debug('trying to create dir: ' + '/'+target+'/'+basename) + logging.debug('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) ftp.mkd('/'+target+'/'+basename) except ftplib.error_perm: logging.debug('Directory exists, overwriting') From 213f0f2d1975ceea8404765053b628722cb9c419 Mon Sep 17 00:00:00 2001 From: erikvdbergh Date: Fri, 23 Feb 2018 16:50:28 +0000 Subject: [PATCH 05/20] OOP style overhaul of taskmaster Rewrote taskmaster with a more object oriented style. Job, Filer and PVC are now separate classes that get used in the taskmaster script. Also implemented label watching for cancellation and subPath style mounting to prevent recursive mounting. Also now uses logging iso plain old printing for debug messaging. --- dockerfiles/taskmaster/Dockerfile | 12 +- scripts/filer.py | 10 +- scripts/filer_class.py | 49 +++++ scripts/job.py | 44 +++++ scripts/pvc.py | 31 +++ scripts/taskmaster.py | 306 +++++++++--------------------- 6 files changed, 229 insertions(+), 223 deletions(-) create mode 100644 scripts/filer_class.py create mode 100644 scripts/job.py create mode 100644 scripts/pvc.py diff --git a/dockerfiles/taskmaster/Dockerfile b/dockerfiles/taskmaster/Dockerfile index 55c64020..2da4bb77 100644 --- a/dockerfiles/taskmaster/Dockerfile +++ b/dockerfiles/taskmaster/Dockerfile @@ -3,8 +3,14 @@ FROM gliderlabs/alpine RUN apk add --no-cache python py-pip curl openssl RUN pip install kubernetes -WORKDIR /root +RUN adduser -S taskmaster + +USER taskmaster + +WORKDIR /home/taskmaster COPY scripts/taskmaster.py . +COPY scripts/job.py . +COPY scripts/pvc.py . +COPY scripts/filer_class.py . -ENTRYPOINT ["/root/taskmaster.py"] -#CMD /root/taskmaster.py +ENTRYPOINT ["/home/taskmaster/taskmaster.py"] diff --git a/scripts/filer.py b/scripts/filer.py index 252b157d..ecb0c3d4 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -28,7 +28,7 @@ def process_upload_dir(source, target, ftp): basename = os.path.basename(source) logging.debug('processing upload dir, basename: '+basename) try: - logging.debug('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr) + logging.debug('trying to create dir: ' + '/'+target+'/'+basename) ftp.mkd('/'+target+'/'+basename) except ftplib.error_perm: logging.debug('Directory exists, overwriting') @@ -158,7 +158,6 @@ def main(argv): args = parser.parse_args() data = json.loads(args.data) - time.sleep(60) for afile in data[args.filetype]: logging.debug('processing file: '+afile['path']) @@ -172,9 +171,4 @@ def main(argv): return 0 if __name__ == "__main__": - try: - main(sys.argv) - except: - traceback.print_exc(file=sys.stderr) - time.sleep(600) - raise + main(sys.argv) diff --git a/scripts/filer_class.py b/scripts/filer_class.py new file mode 100644 index 00000000..4e0903e2 --- /dev/null +++ b/scripts/filer_class.py @@ -0,0 +1,49 @@ +import json + +class Filer: + def __init__(self, name, data, filer_version='v0.5', debug=False): + self.name = name + self.spec = { + "kind": "Job", + "apiVersion": "batch/v1", + "spec": { + "template": { + "metadata": { "name": "tesk-filer" }, + "spec": { + "containers": [ { + "name": "filer", + "image": "eu.gcr.io/tes-wes/filer:"+filer_version, + "args": [], + "env": [], + "volumeMounts": [], + "imagePullPolicy": "IfNotPresent" + } + ], + "volumes": [], + "restartPolicy": "Never" + } + } + } + } + + if debug: + self.spec['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = 'Always' + + container = self.spec['spec']['template']['spec']['containers'][0] + container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) + #container['env'].append({ "name": "JSON_INPUT", "value": 'test' }) + + def set_ftp(self, user, pw): + env = self.spec['spec']['template']['spec']['containers'][0]['env'] + env.append({ "name": "TESK_FTP_USERNAME", "value": user }) + env.append({ "name": "TESK_FTP_PASSWORD", "value": pw }) + + def set_volume_mounts(self, pvc): + tempspec = self.spec['spec']['template']['spec'] + tempspec['containers'][0]['volumeMounts'] = pvc.volume_mounts + tempspec['volumes'] = [ { "name": "task-volume", "persistentVolumeClaim": { "claimName": pvc.name} } ] + + def get_spec(self, mode): + self.spec['spec']['template']['spec']['containers'][0]['args'] = [mode, "$(JSON_INPUT)"] + self.spec['spec']['template']['metadata']['name'] = self.name + return self.spec diff --git a/scripts/job.py b/scripts/job.py new file mode 100644 index 00000000..46428b17 --- /dev/null +++ b/scripts/job.py @@ -0,0 +1,44 @@ +from kubernetes import client, config +import logging +import time + +class Job: + def __init__(self, body, name='task-job', namespace='default'): + self.name = name + self.namespace = namespace + self.status = 'Initialized' + self.bv1 = client.BatchV1Api() + self.body = body + self.body['metadata'] = {'name': self.name } + + def run_to_completion(self, poll_interval, check_cancelled): + logging.debug(self.body) + self.bv1.create_namespaced_job(self.namespace, self.body) + status = self.get_status() + while status == 'Running': + if check_cancelled(): + self.delete() + return 'cancelled' + + time.sleep(poll_interval) + + status = self.get_status() + + return status + + def get_status(self): + job = self.bv1.read_namespaced_job(self.name, self.namespace) + try: + if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status: + self.status = 'Complete' + elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status: + self.status = 'Failed' + else: + self.status = 'Error' + except TypeError: # The condition is not initialized, so it is not complete yet, wait for it + self.status = 'Running' + + return self.status + + def delete(self): + self.bv1.delete_namespaced_job(self.name, self.namespace, client.V1DeleteOptions()) diff --git a/scripts/pvc.py b/scripts/pvc.py new file mode 100644 index 00000000..bff5a588 --- /dev/null +++ b/scripts/pvc.py @@ -0,0 +1,31 @@ +from kubernetes import client, config + +class PVC(): + def __init__(self, name='task-pvc', size_gb=1, namespace='default'): + self.name = name + self.spec = { 'apiVersion': 'v1', + 'kind': 'PersistentVolumeClaim', + 'metadata': { 'name': name }, + 'spec': { + 'accessModes': [ 'ReadWriteOnce'], + 'resources': { 'requests' : { 'storage': str(size_gb)+'Gi' } }, + #'storageClassName': 'gold' + } + } + + self.subpath_idx = 0 + self.namespace='default' + self.cv1 = client.CoreV1Api() + self.cv1.create_namespaced_persistent_volume_claim(self.namespace, self.spec) + + def set_volume_mounts(self, mounts): + self.volume_mounts = mounts + + def get_subpath(self): + subpath = 'dir'+str(self.subpath_idx) + self.subpath_idx += 1 + return subpath + + def delete(self): + cv1 = client.CoreV1Api() + cv1.delete_namespaced_persistent_volume_claim(self.name, self.namespace, client.V1DeleteOptions()) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index 90e9922e..43253911 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -4,173 +4,55 @@ import argparse import json import os -import binascii import re import time import sys from kubernetes import client, config -from datetime import datetime import logging - +from job import Job +from pvc import PVC +from filer_class import Filer +from multiprocessing import Process, Pool created_jobs = [] debug = False -polling_interval = 5 +poll_interval = 5 task_volume_basename = 'task-volume' -# translates TES JSON into 1 Job spec per executor -def generate_job_specs(tes): - specs = [] - name = re.sub(r" ", "-", tes['name'].lower()) - - for executor in tes['executors']: - descriptor = { - 'apiVersion': 'batch/v1', - 'kind': 'Job', - 'metadata': {'name': name}, - 'spec': { - 'template': { - 'metadata': {'name': name}, - 'spec': { - 'restartPolicy': 'Never', - 'containers': [{ - 'name': name + '-ex' + len(specs), - 'image': executor['image_name'], - 'command': executor['cmd'], - 'resources': { - 'requests': { - 'cpu': tes['resources']['cpu_cores'], - 'memory': str(tes['resources']['ram_gb']) + 'Gi' - } - } - }] - } - } - } - } - specs.append(descriptor) - - return specs - -def run_executors(specs, namespace, volume_mounts=None, pvc_name=None): - - # init Kubernetes Job API - v1 = client.BatchV1Api() - - for executor in specs: - jobname = executor['metadata']['name'] - spec = executor['spec']['template']['spec'] - - if volume_mounts is not None: - spec['containers'][0]['volumeMounts'] = volume_mounts - if pvc_name is not None: - spec['volumes'] = [{ 'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc_name }}] - - logger.debug('Created job: '+jobname) - job = v1.create_namespaced_job(body=executor, namespace=namespace) - - global created_jobs - created_jobs.append(jobname) - - print("Created job with metadata='%s'" % str(job.metadata)) - if wait_for_job(jobname, namespace) == 'Failed': - return 'Failed' - - return 'Complete' - -def wait_for_job(jobname, namespace): - bv1 = client.BatchV1Api() - - finished = False - # Job polling loop - while not finished: - job = bv1.read_namespaced_job(jobname, namespace) - try: - #print("job.status.conditions[0].type: %s" % job.status.conditions[0].type) - if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status: - finished = True - elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status: - finished = True - return 'Failed' # If an executor failed we break out of running the executors - else: - #print("hit else, failing") - return 'Failed' # something we don't know happened, fail - except TypeError: # The condition is not initialized, so it is not complete yet, wait for it - time.sleep(polling_interval) - - return 'Complete' # No failures, so return successful finish - -def create_pvc(data, namespace): - task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - pvc_name = task_name + '-pvc' - size = data['resources']['disk_gb'] - - pvc = { 'apiVersion': 'v1', - 'kind': 'PersistentVolumeClaim', - 'metadata': { 'name': pvc_name }, - 'spec': { - 'accessModes': [ 'ReadWriteOnce'], - 'resources': { 'requests' : { 'storage': str(size)+'Gi' } }, - 'storageClassName': 'gold' - } - } - - cv1 = client.CoreV1Api() - cv1.create_namespaced_persistent_volume_claim(namespace, pvc) - return pvc_name - -def get_filer_template(filer_version, name): - filer = { - "kind": "Job", - "apiVersion": "batch/v1", - "metadata": { "name": name }, - "spec": { - "template": { - "metadata": { "name": "tesk-filer" }, - "spec": { - "containers": [ { - "name": "filer", - "image": "eu.gcr.io/tes-wes/filer:"+filer_version, - "args": [], - "env": [], - "volumeMounts": [], - "imagePullPolicy": "IfNotPresent" - } - ], - "volumes": [], - "restartPolicy": "Never" - } - } - } - } - - if os.environ.get('TESK_FTP_USERNAME') is not None: - env = filer['spec']['template']['spec']['containers'][0]['env'] - env.append({ "name": "TESK_FTP_USERNAME", "value": os.environ['TESK_FTP_USERNAME'] }) - env.append({ "name": "TESK_FTP_PASSWORD", "value": os.environ['TESK_FTP_PASSWORD'] }) +def run_executor(executor, namespace, pvc=None): + jobname = executor['metadata']['name'] + spec = executor['spec']['template']['spec'] - if args.debug: - filer['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = 'Always' + if pvc is not None: + spec['containers'][0]['volumeMounts'] = pvc.volume_mounts + spec['volumes'] = [{ 'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc.name }}] - logging.debug(json.dumps(filer)) + logger.debug('Created job: '+jobname) + job = Job(executor, jobname, namespace) - return filer + global created_jobs + created_jobs.append(job) + + status = job.run_to_completion(poll_interval, check_cancelled) + if status == 'cancelled': + exit_cancelled() -def append_mount(volume_mounts, name, path): +def append_mount(volume_mounts, name, path, pvc): # Checks all mount paths in volume_mounts if the path given is already in there duplicate = next((mount for mount in volume_mounts if mount['mountPath'] == path), None) # If not, add mount path if duplicate is None: - logger.debug('appending '+name+' at path '+path) - volume_mounts.append({ 'name': name, 'mountPath': path }) + subpath = pvc.get_subpath() + logger.debug('appending '+name+' at path '+path+' with subPath: '+subpath) + volume_mounts.append({ 'name': name, 'mountPath': path , 'subPath': subpath}) -def populate_pvc(data, namespace, pvc_name, filer_version): +def generate_mounts(data, pvc): volume_mounts = [] # gather volumes that need to be mounted, without duplicates volume_name = task_volume_basename for idx, volume in enumerate(data['volumes']): - append_mount(volume_mounts, volume_name, volume) + append_mount(volume_mounts, volume_name, volume, pvc) # gather other paths that need to be mounted from inputs FILE and DIRECTORY entries for idx, aninput in enumerate(data['inputs']): @@ -182,62 +64,65 @@ def populate_pvc(data, namespace, pvc_name, filer_version): elif aninput['type'] == 'DIRECTORY': basepath = aninput['path'] - append_mount(volume_mounts, volume_name, basepath) + append_mount(volume_mounts, volume_name, basepath, pvc) - # get name from taskmaster and create pretask template - name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - pretask = get_filer_template(filer_version, name+'-inputs-filer') + return volume_mounts - # insert JSON input into job template - container = pretask['spec']['template']['spec']['containers'][0] - container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) - container['args'] += ["inputs", "$(JSON_INPUT)"] +def init_pvc(data, filer): + task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] + pvc_name = task_name+'-pvc' + pvc_size = data['resources']['disk_gb'] + pvc = PVC(pvc_name, pvc_size, args.namespace) - # insert volume mounts and pvc into job template - container['volumeMounts'] = volume_mounts - pretask['spec']['template']['spec']['volumes'] = [ { "name": volume_name, "persistentVolumeClaim": { "claimName": pvc_name} } ] + # to global var for cleanup purposes + global created_pvc + created_pvc = pvc - #print(json.dumps(pretask, indent=2), file=sys.stderr) + mounts = generate_mounts(data, pvc) + logging.debug(mounts) + logging.debug(type(mounts)) + pvc.set_volume_mounts(mounts) - # Run pretask filer job - bv1 = client.BatchV1Api() - job = bv1.create_namespaced_job(body=pretask, namespace=namespace) + filer.set_volume_mounts(pvc) + filerjob = Job(filer.get_spec('inputs'), task_name+'-inputs-filer', args.namespace) - global created_jobs - created_jobs.append(pretask['metadata']['name']) + global created_jobs + created_jobs.append(filerjob) + #filerjob.run_to_completion(poll_interval) + status = filerjob.run_to_completion(poll_interval, check_cancelled) + if status == 'cancelled': + exit_cancelled() - wait_for_job(pretask['metadata']['name'], namespace) + return pvc - return volume_mounts +def run_task(data, filer_version): + task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] -def cleanup_pvc(data, namespace, volume_mounts, pvc_name, filer_version): - # get name from taskmaster and create posttask template - name = data['executors'][0]['metadata']['labels']['taskmaster-name'] - posttask = get_filer_template(filer_version, name+'-outputs-filer') + if data['volumes'] or data['inputs'] or data['outputs']: - # insert JSON input into posttask job template - container = posttask['spec']['template']['spec']['containers'][0] - container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) }) - container['args'] += ["outputs", "$(JSON_INPUT)"] - - # insert volume mounts and pvc into posttask job template - container['volumeMounts'] = volume_mounts - posttask['spec']['template']['spec']['volumes'] = [ { "name": task_volume_basename, "persistentVolumeClaim": { "claimName": pvc_name} } ] + filer = Filer(task_name+'-filer', data, filer_version, args.debug) + if os.environ.get('TESK_FTP_USERNAME') is not None: + filer.set_ftp(os.environ['TESK_FTP_USERNAME'], os.environ['TESK_FTP_PASSWORD']) - # run posttask filer job - bv1 = client.BatchV1Api() - job = bv1.create_namespaced_job(body=posttask, namespace=namespace) + pvc = init_pvc(data, filer) - global created_jobs - created_jobs.append(posttask['metadata']['name']) + for executor in data['executors']: + run_executor(executor, args.namespace, pvc) - wait_for_job(posttask['metadata']['name'], namespace) + # run executors + logging.debug("Finished running executors") - #print(json.dumps(posttask, indent=2), file=sys.stderr) + # upload files and delete pvc + if data['volumes'] or data['inputs'] or data['outputs']: + filerjob = Job(filer.get_spec('outputs'), task_name+'-outputs-filer', args.namespace) + + global created_jobs + created_jobs.append(filerjob) - # Delete pvc - cv1 = client.CoreV1Api() - cv1.delete_namespaced_persistent_volume_claim(pvc_name, namespace, client.V1DeleteOptions()) + #filerjob.run_to_completion(poll_interval) + status = filerjob.run_to_completion(poll_interval, check_cancelled) + if status == 'cancelled': + exit_cancelled() def main(argv): parser = argparse.ArgumentParser(description='TaskMaster main module') @@ -245,7 +130,7 @@ def main(argv): group.add_argument('json', help='string containing json TES request, required if -f is not given', nargs='?') group.add_argument('-f', '--file', help='TES request as a file or \'-\' for stdin, required if json is not given') - parser.add_argument('-p', '--polling-interval', help='Job polling interval', default=5) + parser.add_argument('-p', '--poll-interval', help='Job polling interval', default=5) parser.add_argument('-fv', '--filer-version', help='Filer image version', default='v0.1.9') parser.add_argument('-n', '--namespace', help='Kubernetes namespace to run in', default='default') parser.add_argument('-s', '--state-file', help='State file for state.py script', default='/tmp/.teskstate') @@ -254,7 +139,7 @@ def main(argv): global args args = parser.parse_args() - polling_interval = args.polling_interval + poll_interval = args.poll_interval loglevel = logging.WARNING if args.debug: @@ -274,46 +159,43 @@ def main(argv): else: data = json.load(open(args.file)) - #specs = generate_job_specs(tes) - # Load kubernetes config file if args.debug: config.load_kube_config() else: config.load_incluster_config() - # create and populate pvc only if volumes/inputs/outputs are given - if data['volumes'] or data['inputs'] or data['outputs']: - pvc_name = create_pvc(data, args.namespace) - - # to global var for cleanup purposes - global created_pvc - created_pvc = pvc_name + global created_pvc + created_pvc = None - volume_mounts = populate_pvc(data, args.namespace, pvc_name, args.filer_version) - state = run_executors(data['executors'], args.namespace, volume_mounts, pvc_name) - else: - state = run_executors(data['executors'], args.namespace) - - # run executors - print("Finished with state %s" % state) - - # upload files and delete pvc - if data['volumes'] or data['inputs'] or data['outputs']: - cleanup_pvc(data, args.namespace, volume_mounts, pvc_name, args.filer_version) + if check_cancelled(): + exit_cancelled() + run_task(data, args.filer_version) def clean_on_interrupt(): logger.debug('Caught interrupt signal, deleting jobs and pvc') - bv1 = client.BatchV1Api() for job in created_jobs: - logger.debug('Deleting job: '+job) - bv1.delete_namespaced_job(job, args.namespace, client.V1DeleteOptions()) + job.delete() + + if created_pvc: + created_pvc.delete() +def exit_cancelled(): if created_pvc: - cv1 = client.CoreV1Api() - logger.debug('Deleting pvc: '+created_pvc) - cv1.delete_namespaced_persistent_volume_claim(created_pvc, args.namespace, client.V1DeleteOptions()) + created_pvc.delete() + sys.exit(0) + +def check_cancelled(): + with open('/tmp/label') as fh: + label = fh.readline() + + logging.debug('Got label: '+label) + + if label == 'Cancelled': + return True + + return False if __name__ == "__main__": try: From 0b7b9d4f71bc8c02df134a3facab5756b7502fe3 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Mon, 26 Feb 2018 11:23:06 +0000 Subject: [PATCH 06/20] fixes --- scripts/job.py | 2 +- scripts/taskmaster.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/job.py b/scripts/job.py index 46428b17..0d671040 100644 --- a/scripts/job.py +++ b/scripts/job.py @@ -9,7 +9,7 @@ def __init__(self, body, name='task-job', namespace='default'): self.status = 'Initialized' self.bv1 = client.BatchV1Api() self.body = body - self.body['metadata'] = {'name': self.name } + #self.body['metadata']['name'] = self.name def run_to_completion(self, poll_interval, check_cancelled): logging.debug(self.body) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index 43253911..a89fd3e4 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -29,6 +29,7 @@ def run_executor(executor, namespace, pvc=None): logger.debug('Created job: '+jobname) job = Job(executor, jobname, namespace) + logger.debug('Job spec: '+str(job.body)) global created_jobs created_jobs.append(job) @@ -97,6 +98,7 @@ def init_pvc(data, filer): def run_task(data, filer_version): task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] + pvc = None if data['volumes'] or data['inputs'] or data['outputs']: From b7ef0db01f446c6e4f10217fcb83545459a18fd0 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Tue, 27 Feb 2018 14:23:18 +0000 Subject: [PATCH 07/20] fixes --- scripts/filer_class.py | 1 + scripts/pvc.py | 2 +- scripts/taskmaster.py | 13 ++++++------- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/scripts/filer_class.py b/scripts/filer_class.py index 4e0903e2..f5a96c92 100644 --- a/scripts/filer_class.py +++ b/scripts/filer_class.py @@ -6,6 +6,7 @@ def __init__(self, name, data, filer_version='v0.5', debug=False): self.spec = { "kind": "Job", "apiVersion": "batch/v1", + "metadata": { "name": name }, "spec": { "template": { "metadata": { "name": "tesk-filer" }, diff --git a/scripts/pvc.py b/scripts/pvc.py index bff5a588..4f58e6b4 100644 --- a/scripts/pvc.py +++ b/scripts/pvc.py @@ -14,7 +14,7 @@ def __init__(self, name='task-pvc', size_gb=1, namespace='default'): } self.subpath_idx = 0 - self.namespace='default' + self.namespace = namespace self.cv1 = client.CoreV1Api() self.cv1.create_namespaced_persistent_volume_claim(self.namespace, self.spec) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index a89fd3e4..88138aa0 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -189,13 +189,12 @@ def exit_cancelled(): sys.exit(0) def check_cancelled(): - with open('/tmp/label') as fh: - label = fh.readline() - - logging.debug('Got label: '+label) - - if label == 'Cancelled': - return True + with open('/podinfo/labels') as fh: + for line in fh.readlines(): + name, label = line.split('=') + logging.debug('Got label: '+label) + if label == '"Cancelled"': + return True return False From d3e355875b383da795ac5fe85602676471d39c9c Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Tue, 27 Feb 2018 14:26:01 +0000 Subject: [PATCH 08/20] add taskmaster examples --- examples/success/env_taskmaster.json | 100 +++++++++++++++++++++++ examples/success/hello_taskmaster.json | 46 +++++++++++ examples/success/stderr_taskmaster.json | 81 ++++++++++++++++++ examples/success/stdin_taskmaster.json | 45 ++++++++++ examples/success/stdout_taskmaster.json | 81 ++++++++++++++++++ examples/success/tags_taskmaster.json | 47 +++++++++++ examples/success/workdir_taskmaster.json | 44 ++++++++++ 7 files changed, 444 insertions(+) create mode 100644 examples/success/env_taskmaster.json create mode 100644 examples/success/hello_taskmaster.json create mode 100644 examples/success/stderr_taskmaster.json create mode 100644 examples/success/stdin_taskmaster.json create mode 100644 examples/success/stdout_taskmaster.json create mode 100644 examples/success/tags_taskmaster.json create mode 100644 examples/success/workdir_taskmaster.json diff --git a/examples/success/env_taskmaster.json b/examples/success/env_taskmaster.json new file mode 100644 index 00000000..877e5272 --- /dev/null +++ b/examples/success/env_taskmaster.json @@ -0,0 +1,100 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-9d70f812", + "executor-no": "0" + }, + "name": "task-9d70f812-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-9d70f812-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "echo", + "$SECRET_PROJECT_NAME", + "$PROJECT_STATUS" + ], + "env": [ + { + "name": "SECRET_PROJECT_NAME", + "value": "TESK" + }, + { + "name": "PROJECT_STATUS", + "value": "rocks!" + } + ], + "image": "alpine", + "name": "task-9d70f812-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + }, + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-9d70f812", + "executor-no": "1" + }, + "name": "task-9d70f812-ex-01" + }, + "spec": { + "template": { + "metadata": { + "name": "task-9d70f812-ex-01" + }, + "spec": { + "containers": [ + { + "command": [ + "sh", + "-c", + "echo $SECRET_PROJECT_NAME $PROJECT_STATUS" + ], + "env": [ + { + "name": "SECRET_PROJECT_NAME", + "value": "TESK" + }, + { + "name": "PROJECT_STATUS", + "value": "rocks!" + } + ], + "image": "alpine", + "name": "task-9d70f812-ex-01", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/hello_taskmaster.json b/examples/success/hello_taskmaster.json new file mode 100644 index 00000000..4ee5b00d --- /dev/null +++ b/examples/success/hello_taskmaster.json @@ -0,0 +1,46 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "Hello World" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-7d5c53f4", + "executor-no": "0" + }, + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "echo", + "TESK says: Hello World" + ], + "image": "alpine", + "name": "task-7d5c53f4-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/stderr_taskmaster.json b/examples/success/stderr_taskmaster.json new file mode 100644 index 00000000..d46433fc --- /dev/null +++ b/examples/success/stderr_taskmaster.json @@ -0,0 +1,81 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [ + "/outputs" + ], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-f1150292", + "executor-no": "0" + }, + "name": "task-f1150292-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-f1150292-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "/bin/sh", + "-c", + "sh -c 'grep bash /etc/s* || exit 0' 2> /outputs/stderr" + ], + "image": "ubuntu", + "name": "task-f1150292-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + }, + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-f1150292", + "executor-no": "1" + }, + "name": "task-f1150292-ex-01" + }, + "spec": { + "template": { + "metadata": { + "name": "task-f1150292-ex-01" + }, + "spec": { + "containers": [ + { + "command": [ + "cat", + "/outputs/stderr" + ], + "image": "alpine", + "name": "task-f1150292-ex-01", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/stdin_taskmaster.json b/examples/success/stdin_taskmaster.json new file mode 100644 index 00000000..d479faeb --- /dev/null +++ b/examples/success/stdin_taskmaster.json @@ -0,0 +1,45 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-54d04d9d", + "executor-no": "0" + }, + "name": "task-54d04d9d-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-54d04d9d-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "/bin/sh", + "-c", + "cat < /etc/fstab" + ], + "image": "alpine", + "name": "task-54d04d9d-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/stdout_taskmaster.json b/examples/success/stdout_taskmaster.json new file mode 100644 index 00000000..a7568a7d --- /dev/null +++ b/examples/success/stdout_taskmaster.json @@ -0,0 +1,81 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [ + "/outputs" + ], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-04512e8f", + "executor-no": "0" + }, + "name": "task-04512e8f-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-04512e8f-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "/bin/sh", + "-c", + "echo 'This will appear in stdout, but of the 2. executor.' > /outputs/stdout" + ], + "image": "ubuntu", + "name": "task-04512e8f-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + }, + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-04512e8f", + "executor-no": "1" + }, + "name": "task-04512e8f-ex-01" + }, + "spec": { + "template": { + "metadata": { + "name": "task-04512e8f-ex-01" + }, + "spec": { + "containers": [ + { + "command": [ + "cat", + "/outputs/stdout" + ], + "image": "alpine", + "name": "task-04512e8f-ex-01", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/tags_taskmaster.json b/examples/success/tags_taskmaster.json new file mode 100644 index 00000000..93df8e20 --- /dev/null +++ b/examples/success/tags_taskmaster.json @@ -0,0 +1,47 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "I do nothing" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-fc67859b", + "executor-no": "0" + }, + "name": "task-fc67859b-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-fc67859b-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "sh", + "-c", + "exit 0" + ], + "image": "alpine", + "name": "task-fc67859b-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file diff --git a/examples/success/workdir_taskmaster.json b/examples/success/workdir_taskmaster.json new file mode 100644 index 00000000..346352c7 --- /dev/null +++ b/examples/success/workdir_taskmaster.json @@ -0,0 +1,44 @@ +{ + "outputs": [], + "inputs": [], + "volumes": [], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": {}, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-49299ae5", + "executor-no": "0" + }, + "name": "task-49299ae5-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-49299ae5-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "ls" + ], + "image": "alpine", + "name": "task-49299ae5-ex-00", + "resources": {}, + "workingDir": "/etc" + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} \ No newline at end of file From 667baad03b21342e870c036c21def87a669204c8 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Tue, 27 Feb 2018 14:26:39 +0000 Subject: [PATCH 09/20] add io testing file --- examples/success/inputoutput_taskmaster.json | 107 +++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 examples/success/inputoutput_taskmaster.json diff --git a/examples/success/inputoutput_taskmaster.json b/examples/success/inputoutput_taskmaster.json new file mode 100644 index 00000000..2a33abcf --- /dev/null +++ b/examples/success/inputoutput_taskmaster.json @@ -0,0 +1,107 @@ +{ + "outputs": [ + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/out.txt", + "path": "/tmp/vol1/in.txt" + "type": "FILE" + }, + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse_out", + "path": "/tmp/vol2/mouse" + "type": "DIRECTORY" + } + ], + "inputs": [, + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/input.txt", + "path": "/tmp/vol1/in.txt" + "type": "FILE" + }, + { + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse", + "path": "/tmp/vol2" + "type": "DIRECTORY" + } + ], + "volumes": [ + "/tmp/vol1", + "/tmp/vol2", + ], + "executors": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "Hello Input" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-7d5c53f4", + "executor-no": "0" + }, + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "template": { + "metadata": { + "name": "task-7d5c53f4-ex-00" + }, + "spec": { + "containers": [ + { + "command": [ + "cat", + "/tmp/vol1/in.txt" + ], + "image": "alpine", + "name": "task-7d5c53f4-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + }, + { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "annotations": { + "tes-task-name": "Hello Input" + }, + "labels": { + "job-type": "executor", + "taskmaster-name": "task-7d5c53f4", + "executor-no": "1" + }, + "name": "task-7d5c53f4-ex-01" + }, + "spec": { + "template": { + "metadata": { + "name": "task-7d5c53f4-ex-01" + }, + "spec": { + "containers": [ + { + "command": [ + "find", + "/tmp/vol2" + ], + "image": "alpine", + "name": "task-7d5c53f4-ex-00", + "resources": {} + } + ], + "restartPolicy": "Never" + } + } + } + } + ], + "resources": { + "disk_gb": 0.1 + } +} From e23449d1288760434dda4c8ea207204acabe74cf Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Tue, 27 Feb 2018 14:58:57 +0000 Subject: [PATCH 10/20] works for real --- examples/success/inputoutput_taskmaster.json | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/success/inputoutput_taskmaster.json b/examples/success/inputoutput_taskmaster.json index 2a33abcf..bb8c1f74 100644 --- a/examples/success/inputoutput_taskmaster.json +++ b/examples/success/inputoutput_taskmaster.json @@ -1,31 +1,31 @@ { "outputs": [ { - "url": "ftp://ftp-private.ebi.ac.uk/upload/out.txt", - "path": "/tmp/vol1/in.txt" + "url": "ftp://ftp-private.ebi.ac.uk/upload/mouselist.txt", + "path": "/tmp/vol1/mouselist.txt", "type": "FILE" }, { "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse_out", - "path": "/tmp/vol2/mouse" + "path": "/tmp/vol2/mouse", "type": "DIRECTORY" } ], - "inputs": [, + "inputs": [ { "url": "ftp://ftp-private.ebi.ac.uk/upload/input.txt", - "path": "/tmp/vol1/in.txt" + "path": "/tmp/vol1/in.txt", "type": "FILE" }, { "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse", - "path": "/tmp/vol2" + "path": "/tmp/vol2", "type": "DIRECTORY" } ], "volumes": [ "/tmp/vol1", - "/tmp/vol2", + "/tmp/vol2" ], "executors": [ { @@ -87,11 +87,11 @@ "containers": [ { "command": [ - "find", - "/tmp/vol2" + "sh", "-c", + "find /tmp/vol2 > /tmp/vol1/mouselist.txt" ], "image": "alpine", - "name": "task-7d5c53f4-ex-00", + "name": "task-7d5c53f4-ex-01", "resources": {} } ], From 1202df6f0acc5a711b56b7be261affb4c7d11967 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Tue, 27 Feb 2018 15:13:18 +0000 Subject: [PATCH 11/20] small fixes to make things work correctly on ftp end --- examples/success/inputoutput_taskmaster.json | 2 +- scripts/job.py | 2 +- scripts/taskmaster.py | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/success/inputoutput_taskmaster.json b/examples/success/inputoutput_taskmaster.json index bb8c1f74..13d85412 100644 --- a/examples/success/inputoutput_taskmaster.json +++ b/examples/success/inputoutput_taskmaster.json @@ -19,7 +19,7 @@ }, { "url": "ftp://ftp-private.ebi.ac.uk/upload/mouse", - "path": "/tmp/vol2", + "path": "/tmp/vol2/mouse", "type": "DIRECTORY" } ], diff --git a/scripts/job.py b/scripts/job.py index 0d671040..3d1bd91d 100644 --- a/scripts/job.py +++ b/scripts/job.py @@ -9,7 +9,7 @@ def __init__(self, body, name='task-job', namespace='default'): self.status = 'Initialized' self.bv1 = client.BatchV1Api() self.body = body - #self.body['metadata']['name'] = self.name + self.body['metadata']['name'] = self.name def run_to_completion(self, poll_interval, check_cancelled): logging.debug(self.body) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index 88138aa0..be13a0e4 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -159,7 +159,8 @@ def main(argv): elif args.file == '-': data = json.load(sys.stdin) else: - data = json.load(open(args.file)) + with open(args.file) as fh: + data = json.load(fh) # Load kubernetes config file if args.debug: From 343b5c31ab90437b1e3ed17dd55c8b798983b90e Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 11:10:30 +0000 Subject: [PATCH 12/20] cleanup mounting code and also consider outputs --- scripts/taskmaster.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index be13a0e4..c3ebf97e 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -47,25 +47,33 @@ def append_mount(volume_mounts, name, path, pvc): logger.debug('appending '+name+' at path '+path+' with subPath: '+subpath) volume_mounts.append({ 'name': name, 'mountPath': path , 'subPath': subpath}) +def dirname(iodata): + if iodata['type'] == 'FILE': + # strip filename from path + r = '(.*)/' + dirname = re.match(r, iodata['path']).group(1) + logger.debug('dirname of '+iodata+'is: '+dirname) + elif aninput['type'] == 'DIRECTORY': + dirname = iodata['path'] + + return dirname + def generate_mounts(data, pvc): volume_mounts = [] # gather volumes that need to be mounted, without duplicates volume_name = task_volume_basename - for idx, volume in enumerate(data['volumes']): + for volume in data['volumes']: append_mount(volume_mounts, volume_name, volume, pvc) # gather other paths that need to be mounted from inputs FILE and DIRECTORY entries - for idx, aninput in enumerate(data['inputs']): - if aninput['type'] == 'FILE': - # strip filename from path - p = re.compile('(.*)/') - basepath = p.match(aninput['path']).group(1) - logger.debug('basepath is: '+basepath) - elif aninput['type'] == 'DIRECTORY': - basepath = aninput['path'] - - append_mount(volume_mounts, volume_name, basepath, pvc) + for aninput in data['inputs']: + dirname = dirname(aninput) + append_mount(volume_mounts, volume_name, dirname, pvc) + + for anoutput in data['outputs']: + dirname = dirname(anoutput) + append_mount(volume_mounts, volume_name, dirname, pvc) return volume_mounts From 1c03b641831d4f8067ddb12d3cafcc7fac55ee52 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 11:13:18 +0000 Subject: [PATCH 13/20] comments --- scripts/taskmaster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index c3ebf97e..c24b627a 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -38,6 +38,7 @@ def run_executor(executor, namespace, pvc=None): if status == 'cancelled': exit_cancelled() +# TODO move this code to PVC class def append_mount(volume_mounts, name, path, pvc): # Checks all mount paths in volume_mounts if the path given is already in there duplicate = next((mount for mount in volume_mounts if mount['mountPath'] == path), None) @@ -66,7 +67,7 @@ def generate_mounts(data, pvc): for volume in data['volumes']: append_mount(volume_mounts, volume_name, volume, pvc) - # gather other paths that need to be mounted from inputs FILE and DIRECTORY entries + # gather other paths that need to be mounted from inputs/outputs FILE and DIRECTORY entries for aninput in data['inputs']: dirname = dirname(aninput) append_mount(volume_mounts, volume_name, dirname, pvc) From 7c6bbc102594169f6093f5a48ec07ed6d0ab3c00 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 11:21:15 +0000 Subject: [PATCH 14/20] add cloudbuild for testing branch --- cloudbuild_testing.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 cloudbuild_testing.yaml diff --git a/cloudbuild_testing.yaml b/cloudbuild_testing.yaml new file mode 100644 index 00000000..eb298198 --- /dev/null +++ b/cloudbuild_testing.yaml @@ -0,0 +1,6 @@ +steps: +- name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', 'eu.gcr.io/tes-wes/taskmaster:testing', '-f', 'dockerfiles/taskmaster/Dockerfile', '.'] +- name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', 'eu.gcr.io/tes-wes/filer:testing', '-f', 'dockerfiles/filer/Dockerfile', '.'] +images: ['eu.gcr.io/tes-wes/taskmaster:testing', 'eu.gcr.io/tes-wes/filer:testing'] From 5a1ef3f6cf2be9e22784efa08f9e48af31719acf Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 11:25:34 +0000 Subject: [PATCH 15/20] add some debugging --- scripts/filer.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index ecb0c3d4..6ede4cc0 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -34,10 +34,12 @@ def process_upload_dir(source, target, ftp): logging.debug('Directory exists, overwriting') for f in os.listdir(source): - if os.path.isdir(source+'/'+f): - process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp) - elif os.path.isfile(source+'/'+f): - ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r')) + path = source+'/'+f + if os.path.isdir(path): + process_upload_dir(path, target+'/'+basename+'/', ftp) + elif os.path.isfile(path): + logging.debug('Trying to upload file: '+path+' to dest: '+target+'/'+basename+'/'+f) + ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(path, 'r')) return 0 def process_ftp_dir(source, target, ftp): From 11ddfbf7d26ee63dfa29503ec1aa5019530dfa3d Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 11:59:56 +0000 Subject: [PATCH 16/20] add parent dir check --- scripts/filer.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scripts/filer.py b/scripts/filer.py index 6ede4cc0..a17a7996 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -27,6 +27,16 @@ def download_ftp_file(source, target, ftp): def process_upload_dir(source, target, ftp): basename = os.path.basename(source) logging.debug('processing upload dir, basename: '+basename) + wd = ftp.pwd() + # does the parent dir exist? + try: + ftp.cwd('/'+target) + except: + logging.error('Cannot stat parent dir: /'+target) + return 1 + + ftp.cwd(wd) + try: logging.debug('trying to create dir: ' + '/'+target+'/'+basename) ftp.mkd('/'+target+'/'+basename) From 9cae5ea9a77152379722bb4cc0ae7d814b69f671 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 12:00:56 +0000 Subject: [PATCH 17/20] ignore python compiled --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 1377554e..b9489855 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.swp +*.pyc From 68fb147822eb72437468b194c0901eb490c32620 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 12:01:47 +0000 Subject: [PATCH 18/20] fix syntax --- scripts/taskmaster.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index c24b627a..ede16e99 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -53,8 +53,8 @@ def dirname(iodata): # strip filename from path r = '(.*)/' dirname = re.match(r, iodata['path']).group(1) - logger.debug('dirname of '+iodata+'is: '+dirname) - elif aninput['type'] == 'DIRECTORY': + logger.debug('dirname of '+iodata['path']+'is: '+dirname) + elif iodata['type'] == 'DIRECTORY': dirname = iodata['path'] return dirname @@ -69,12 +69,12 @@ def generate_mounts(data, pvc): # gather other paths that need to be mounted from inputs/outputs FILE and DIRECTORY entries for aninput in data['inputs']: - dirname = dirname(aninput) - append_mount(volume_mounts, volume_name, dirname, pvc) + dirnm = dirname(aninput) + append_mount(volume_mounts, volume_name, dirnm, pvc) for anoutput in data['outputs']: - dirname = dirname(anoutput) - append_mount(volume_mounts, volume_name, dirname, pvc) + dirnm = dirname(anoutput) + append_mount(volume_mounts, volume_name, dirnm, pvc) return volume_mounts From b37a3bba5a4936115597bdca060c756d9277b3a5 Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 16:25:55 +0000 Subject: [PATCH 19/20] error handling --- scripts/filer.py | 5 +++-- scripts/job.py | 2 +- scripts/taskmaster.py | 19 +++++++++++-------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index a17a7996..8a735257 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -26,7 +26,8 @@ def download_ftp_file(source, target, ftp): def process_upload_dir(source, target, ftp): basename = os.path.basename(source) - logging.debug('processing upload dir, basename: '+basename) + logging.debug('processing upload dir src: '+source+' target: '+target) + logging.debug('dir basename: '+basename) wd = ftp.pwd() # does the parent dir exist? try: @@ -183,4 +184,4 @@ def main(argv): return 0 if __name__ == "__main__": - main(sys.argv) + sys.exit(main(sys.argv)) diff --git a/scripts/job.py b/scripts/job.py index 3d1bd91d..59d9c028 100644 --- a/scripts/job.py +++ b/scripts/job.py @@ -18,7 +18,7 @@ def run_to_completion(self, poll_interval, check_cancelled): while status == 'Running': if check_cancelled(): self.delete() - return 'cancelled' + return 'Cancelled' time.sleep(poll_interval) diff --git a/scripts/taskmaster.py b/scripts/taskmaster.py index ede16e99..54a7d773 100755 --- a/scripts/taskmaster.py +++ b/scripts/taskmaster.py @@ -35,8 +35,8 @@ def run_executor(executor, namespace, pvc=None): created_jobs.append(job) status = job.run_to_completion(poll_interval, check_cancelled) - if status == 'cancelled': - exit_cancelled() + if status != 'Complete': + exit_cancelled('Got status '+status) # TODO move this code to PVC class def append_mount(volume_mounts, name, path, pvc): @@ -100,8 +100,8 @@ def init_pvc(data, filer): created_jobs.append(filerjob) #filerjob.run_to_completion(poll_interval) status = filerjob.run_to_completion(poll_interval, check_cancelled) - if status == 'cancelled': - exit_cancelled() + if status != 'Complete': + exit_cancelled('Got status '+status) return pvc @@ -132,8 +132,8 @@ def run_task(data, filer_version): #filerjob.run_to_completion(poll_interval) status = filerjob.run_to_completion(poll_interval, check_cancelled) - if status == 'cancelled': - exit_cancelled() + if status != 'Complete': + exit_cancelled('Got status '+status) def main(argv): parser = argparse.ArgumentParser(description='TaskMaster main module') @@ -180,8 +180,10 @@ def main(argv): global created_pvc created_pvc = None + # Check if we're cancelled during init if check_cancelled(): - exit_cancelled() + exit_cancelled('Cancelled during init') + run_task(data, args.filer_version) def clean_on_interrupt(): @@ -193,9 +195,10 @@ def clean_on_interrupt(): if created_pvc: created_pvc.delete() -def exit_cancelled(): +def exit_cancelled(reason='Unknown reason'): if created_pvc: created_pvc.delete() + logger.error('Cancelling taskmaster: '+reason) sys.exit(0) def check_cancelled(): From 580037cb224d66269d835e9b3a796d062e21289e Mon Sep 17 00:00:00 2001 From: Erik van den Bergh Date: Fri, 2 Mar 2018 16:55:27 +0000 Subject: [PATCH 20/20] http error handling --- scripts/filer.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/scripts/filer.py b/scripts/filer.py index 8a735257..83c3e33c 100755 --- a/scripts/filer.py +++ b/scripts/filer.py @@ -107,15 +107,20 @@ def process_ftp_file(ftype, afile): elif afile['type'] == 'DIRECTORY': return process_upload_dir(afile['path'], ftp_path, ftp) else: - print('Unknown file type: '+afile['type']) + logging.error('Unknown file type: '+afile['type']) return 1 else: - print('Unknown file action: ' + ftype) + logging.error('Unknown file action: ' + ftype) return 1 def process_http_file(ftype, afile): if ftype == 'inputs': r = requests.get(afile['url']) + + if r.status_code != 200: + logging.error('Got status code: '+str(r.status_code)) + return 1 + fp = open(afile['path'], 'wb') fp.write(r.content) fp.close @@ -123,6 +128,11 @@ def process_http_file(ftype, afile): elif ftype == 'outputs': fp = open(afile['path'], 'r') r = requests.put(afile['url'], data=fp.read()) + + if r.status_code != 200 or r.status_code != 201: + logging.error('Got status code: '+str(r.status_code)) + return 1 + fp.close return 0 else: