Skip to content

Commit 18ca9cd

Browse files
authored
Merge pull request #61 from common-workflow-language/run_log
Rename workflow_log to run_log to conform to latest WES draft.
2 parents 56c4abf + ad7e04b commit 18ca9cd

File tree

6 files changed

+86
-38
lines changed

6 files changed

+86
-38
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
long_description = readmeFile.read()
1212

1313
setup(name='wes-service',
14-
version='2.7',
14+
version='2.8',
1515
description='GA4GH Workflow Execution Service reference implementation',
1616
long_description=long_description,
1717
author='GA4GH Containers and Workflows task team',

wes_client/wes_client_main.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def main(argv=sys.argv[1:]):
1515
parser = argparse.ArgumentParser(description="Workflow Execution Service")
1616
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"),
1717
help="Example: '--host=localhost:8080'. Defaults to WES_API_HOST.")
18-
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"), help="Defaults to WES_API_AUTH.")
18+
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"), help="Format is 'Header: value' or just 'value'. If header name is not provided, value goes in the 'Authorization'. Defaults to WES_API_AUTH.")
1919
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"),
2020
help="Options: [http, https]. Defaults to WES_API_PROTO (https).")
2121
parser.add_argument("--quiet", action="store_true", default=False)
@@ -49,7 +49,15 @@ def main(argv=sys.argv[1:]):
4949
print(u"%s %s" % (sys.argv[0], pkg[0].version))
5050
exit(0)
5151

52-
client = WESClient({'auth': args.auth, 'proto': args.proto, 'host': args.host})
52+
auth = {}
53+
if args.auth:
54+
if ": " in args.auth:
55+
sp = args.auth.split(": ")
56+
auth[sp[0]] = sp[1]
57+
else:
58+
auth["Authorization"] = args.auth
59+
60+
client = WESClient({'auth': auth, 'proto': args.proto, 'host': args.host})
5361

5462
if args.list:
5563
response = client.list_runs() # how to include: page_token=args.page, page_size=args.page_size ?
@@ -79,15 +87,15 @@ def main(argv=sys.argv[1:]):
7987
logging.error("Missing json/yaml file.")
8088
return 1
8189

82-
modify_jsonyaml_paths(args.job_order)
90+
job_order = modify_jsonyaml_paths(args.job_order)
8391

8492
if args.quiet:
8593
logging.basicConfig(level=logging.WARNING)
8694
else:
8795
logging.basicConfig(level=logging.INFO)
8896

8997
args.attachments = "" if not args.attachments else args.attachments.split(',')
90-
r = client.run(args.workflow_url, args.job_order, args.attachments)
98+
r = client.run(args.workflow_url, job_order, args.attachments)
9199

92100
if args.wait:
93101
logging.info("Workflow run id is %s", r["run_id"])
@@ -106,13 +114,13 @@ def main(argv=sys.argv[1:]):
106114

107115
try:
108116
# TODO: Only works with Arvados atm
109-
logging.info(str(s["workflow_log"]["stderr"]))
110-
logs = requests.get(s["workflow_log"]["stderr"], headers={"Authorization": args.auth}).text
111-
logging.info("Workflow log:\n" + logs)
117+
logging.info(str(s["run_log"]["stderr"]))
118+
logs = requests.get(s["run_log"]["stderr"], headers=auth).text
119+
logging.info("Run log:\n" + logs)
112120
except InvalidSchema:
113-
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))
121+
logging.info("Run log:\n" + str(s["run_log"]["stderr"]))
114122
except MissingSchema:
115-
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))
123+
logging.info("Run log:\n" + str(s["run_log"]["stderr"]))
116124

117125
# print the output json
118126
if "fields" in s["outputs"] and s["outputs"]["fields"] is None:

wes_service/arvados_wes.py

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ class MissingAuthorization(Exception):
1919
pass
2020

2121

22-
def get_api():
23-
if not connexion.request.headers.get('Authorization'):
24-
raise MissingAuthorization()
25-
authtoken = connexion.request.headers['Authorization']
26-
if authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "):
22+
def get_api(authtoken=None):
23+
if authtoken is None:
24+
if not connexion.request.headers.get('Authorization'):
25+
raise MissingAuthorization()
26+
authtoken = connexion.request.headers['Authorization']
27+
if not authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "):
28+
raise ValueError("Authorization token must start with 'Bearer '")
2729
authtoken = authtoken[7:]
2830
return arvados.api_from_config(version="v1", apiconfig={
2931
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
@@ -55,6 +57,10 @@ def catch_exceptions_wrapper(self, *args, **kwargs):
5557
return {"msg": str(e), "status_code": 500}, 500
5658
except MissingAuthorization:
5759
return {"msg": "'Authorization' header is missing or empty, expecting Arvados API token", "status_code": 401}, 401
60+
except ValueError as e:
61+
return {"msg": str(e), "status_code": 400}, 400
62+
except Exception as e:
63+
return {"msg": str(e), "status_code": 500}, 500
5864

5965
return catch_exceptions_wrapper
6066

@@ -66,7 +72,7 @@ def GetServiceInfo(self):
6672
"workflow_type_versions": {
6773
"CWL": {"workflow_type_version": ["v1.0"]}
6874
},
69-
"supported_wes_versions": ["0.2.1"],
75+
"supported_wes_versions": ["0.3.0"],
7076
"supported_filesystem_protocols": ["http", "https", "keep"],
7177
"workflow_engine_versions": {
7278
"arvados-cwl-runner": stderr
@@ -108,6 +114,11 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None):
108114
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
109115
}
110116

117+
def log_for_run(self, run_id, message, authtoken=None):
118+
get_api(authtoken).logs().create(body={"log": {"object_uuid": run_id,
119+
"event_type": "stderr",
120+
"properties": {"text": message+"\n"}}}).execute()
121+
111122
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
112123
env, project_uuid,
113124
tempdir):
@@ -118,9 +129,18 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
118129
})
119130

120131
try:
121-
with tempfile.NamedTemporaryFile() as inputtemp:
132+
with tempfile.NamedTemporaryFile(dir=tempdir, suffix=".json") as inputtemp:
122133
json.dump(workflow_params, inputtemp)
123134
inputtemp.flush()
135+
136+
msg = ""
137+
for dirpath, dirs, files in os.walk(tempdir):
138+
for f in files:
139+
msg += " " + dirpath + "/" + f + "\n"
140+
141+
self.log_for_run(cr_uuid, "Contents of %s:\n%s" % (tempdir, msg),
142+
env['ARVADOS_API_TOKEN'])
143+
124144
# TODO: run submission process in a container to prevent
125145
# a-c-r submission processes from seeing each other.
126146

@@ -133,6 +153,8 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
133153
cmd.append(workflow_url)
134154
cmd.append(inputtemp.name)
135155

156+
self.log_for_run(cr_uuid, "Executing %s" % cmd, env['ARVADOS_API_TOKEN'])
157+
136158
proc = subprocess.Popen(cmd, env=env,
137159
cwd=tempdir,
138160
stdout=subprocess.PIPE,
@@ -141,9 +163,8 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
141163
if proc.returncode != 0:
142164
api.container_requests().update(uuid=cr_uuid, body={"priority": 0}).execute()
143165

144-
api.logs().create(body={"log": {"object_uuid": cr_uuid,
145-
"event_type": "stderr",
146-
"properties": {"text": stderrdata}}}).execute()
166+
self.log_for_run(cr_uuid, stderrdata, env['ARVADOS_API_TOKEN'])
167+
147168
if tempdir:
148169
shutil.rmtree(tempdir)
149170

@@ -153,8 +174,6 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
153174

154175
@catch_exceptions
155176
def RunWorkflow(self, **args):
156-
tempdir, body = self.collect_attachments()
157-
158177
if not connexion.request.headers.get('Authorization'):
159178
raise MissingAuthorization()
160179

@@ -178,17 +197,25 @@ def RunWorkflow(self, **args):
178197
"output_path": "n/a",
179198
"priority": 500}}).execute()
180199

181-
workflow_url = body.get("workflow_url")
200+
try:
201+
tempdir, body = self.collect_attachments(cr["uuid"])
202+
203+
workflow_url = body.get("workflow_url")
182204

183-
project_uuid = body.get("workflow_engine_parameters", {}).get("project_uuid")
205+
project_uuid = body.get("workflow_engine_parameters", {}).get("project_uuid")
184206

185-
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"],
186-
workflow_url,
187-
body["workflow_params"],
188-
env,
189-
project_uuid,
190-
tempdir)).start()
207+
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"],
208+
workflow_url,
209+
body["workflow_params"],
210+
env,
211+
project_uuid,
212+
tempdir)).start()
191213

214+
except Exception as e:
215+
self.log_for_run(cr["uuid"], str(e))
216+
cr = api.container_requests().update(uuid=cr["uuid"],
217+
body={"container_request":
218+
{"priority": 0}}).execute()
192219
return {"run_id": cr["uuid"]}
193220

194221
@catch_exceptions
@@ -203,7 +230,11 @@ def GetRunLog(self, run_id):
203230
containers_map = {c["uuid"]: c for c in tasks}
204231
containers_map[container["uuid"]] = container
205232
else:
206-
container = {"state": "Queued", "exit_code": None, "log": None}
233+
container = {
234+
"state": "Queued" if request["priority"] > 0 else "Cancelled",
235+
"exit_code": None,
236+
"log": None
237+
}
207238
tasks = []
208239
containers_map = {}
209240
task_reqs = []
@@ -256,7 +287,7 @@ def log_object(cr):
256287
"workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {})
257288
},
258289
"state": statemap[container["state"]],
259-
"workflow_log": log_object(request),
290+
"run_log": log_object(request),
260291
"task_logs": [log_object(t) for t in task_reqs],
261292
"outputs": outputobj
262293
}

wes_service/cwl_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def getlog(self):
142142
"run_id": self.run_id,
143143
"request": request,
144144
"state": state,
145-
"workflow_log": {
145+
"run_log": {
146146
"cmd": [""],
147147
"start_time": "",
148148
"end_time": "",

wes_service/toil_wes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def getlog(self):
158158
"run_id": self.run_id,
159159
"request": request,
160160
"state": state,
161-
"workflow_log": {
161+
"run_log": {
162162
"cmd": cmd,
163163
"start_time": starttime,
164164
"end_time": endtime,

wes_service/util.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import tempfile
22
import json
33
import os
4+
import logging
45

56
from six import itervalues, iterlists
67
import connexion
@@ -42,21 +43,29 @@ def getoptlist(self, p):
4243
optlist.append(v)
4344
return optlist
4445

45-
def collect_attachments(self):
46+
def log_for_run(self, run_id, message):
47+
logging.info("Workflow %s: %s", run_id, message)
48+
49+
def collect_attachments(self, run_id=None):
4650
tempdir = tempfile.mkdtemp()
4751
body = {}
4852
for k, ls in iterlists(connexion.request.files):
4953
for v in ls:
5054
if k == "workflow_attachment":
5155
filename = secure_filename(v.filename)
52-
v.save(os.path.join(tempdir, filename))
53-
body[k] = "file://%s" % tempdir # Reference to tem working dir.
56+
dest = os.path.join(tempdir, filename)
57+
self.log_for_run(run_id, "Staging attachment '%s' to '%s'" % (v.filename, dest))
58+
v.save(dest)
59+
body[k] = "file://%s" % tempdir # Reference to temp working dir.
5460
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
55-
body[k] = json.loads(v.read())
61+
content = v.read()
62+
body[k] = json.loads(content)
5663
else:
5764
body[k] = v.read()
5865

5966
if ":" not in body["workflow_url"]:
6067
body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"]))
6168

69+
self.log_for_run(run_id, "Using workflow_url '%s'" % body.get("workflow_url"))
70+
6271
return tempdir, body

0 commit comments

Comments
 (0)