forked from adamnovak/wdl-conformance-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
362 lines (288 loc) · 14.7 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import os
import json
import sys
import hashlib
import argparse
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from shutil import which
from uuid import uuid4
class WDLRunner:
"""
A class describing how to invoke a WDL runner to run a workflow.
"""
def format_command(self, wdl_file, json_file, results_file, args, quiet):
raise NotImplementedError
class CromwellStyleWDLRunner(WDLRunner):
def __init__(self, runner):
self.runner = runner
def format_command(self, wdl_file, json_file, results_file, args, quiet):
return f'{self.runner} {wdl_file} -i {json_file} -m {results_file} {" ".join(args)}'
class CromwellWDLRunner(CromwellStyleWDLRunner):
download_lock = threading.Lock()
def __init__(self):
super().__init__('cromwell')
def format_command(self, wdl_file, json_file, results_file, args, quiet):
if self.runner == 'cromwell' and not which('cromwell'):
with CromwellWDLRunner.download_lock:
if self.runner == 'cromwell':
# if there is no cromwell binary seen on the path, download
# our pinned version and use that instead
log_level = '-DLOG_LEVEL=OFF' if quiet else ''
cromwell = os.path.abspath('build/cromwell.jar')
if not os.path.exists(cromwell):
print('Cromwell not seen in the path, now downloading cromwell to run tests... ')
run_cmd(cmd='make cromwell', cwd=os.getcwd(), quiet=quiet)
self.runner = f'java {log_level} -jar {cromwell} run'
return super().format_command(wdl_file, json_file, results_file, args, quiet)
class MiniWDLStyleWDLRunner(WDLRunner):
def __init__(self, runner):
self.runner = runner
def format_command(self, wdl_file, json_file, results_file, args, quiet):
return f'{self.runner} {wdl_file} -i {json_file} -o {results_file} {" ".join(args)}'
RUNNERS = {
'cromwell': CromwellWDLRunner(),
'toil-wdl-runner-old': CromwellStyleWDLRunner('toil-wdl-runner-old'),
'toil-wdl-runner': CromwellStyleWDLRunner('toil-wdl-runner --outputDialect miniwdl'),
'miniwdl': MiniWDLStyleWDLRunner('miniwdl run')
}
def run_cmd(cmd, cwd, quiet):
p = subprocess.Popen(cmd, stdout=-1, stderr=-1, shell=True, cwd=cwd)
stdout, stderr = p.communicate()
result = {}
if p.returncode:
result['status'] = 'FAILED'
result['reason'] = f'Runner exited with code {p.returncode}'
else:
result['status'] = 'SUCCEEDED'
if not quiet or p.returncode:
result['stdout'] = stdout.decode("utf-8", errors="ignore")
result['stderr'] = stderr.decode("utf-8", errors="ignore")
return result
def wdl_type_to_python_type(wdl_type):
"""
Given a WDL type name without generics, like "Array", return a Python type like list.
"""
if wdl_type == 'File':
# This is a string but it represents a file.
return str
elif wdl_type == 'Int':
return int
elif wdl_type == 'Boolean':
return bool
elif wdl_type == 'String':
return str
elif wdl_type == 'Array':
return list
else:
return None
def wdl_outer_type(wdl_type):
"""
Get the outermost type of a WDL type. So "Array[String]" gives "Array".
"""
return wdl_type.split('[')[0]
def wdl_inner_type(wdl_type):
"""
Get the interior type of a WDL type. So "Array[String]" gives "String".
"""
if '[' in wdl_type:
return '['.join(wdl_type.split('[')[1:])[:-1]
else:
return wdl_type
def verify_outputs(expected_outputs, results_file, quiet):
try:
with open(results_file, 'r') as f:
test_results = json.load(f)
except OSError:
return {'status': 'FAILED', 'reason': f'Results file at {results_file} cannot be opened'}
except json.JSONDecodeError:
return {'status': 'FAILED', 'reason': f'Results file at {results_file} is not JSON'}
# print(json.dumps(test_results, indent=4))
for expected_output in expected_outputs:
if 'outputs' not in test_results:
return {'status': 'FAILED', 'reason': f"'outputs' section not found in workflow output JSON!"}
if not isinstance(test_results['outputs'], dict):
return {'status': 'FAILED', 'reason': f"'outputs' in workflow JSON is not an object!"}
if expected_output['identifier'] not in test_results['outputs']:
return {'status': 'FAILED', 'reason': f"'outputs' in workflow JSON does not contain expected key '{expected_output['identifier']}'!"}
expected_outer_type = wdl_outer_type(expected_output['type'])
expected_python_type = wdl_type_to_python_type(expected_outer_type)
if expected_python_type is None:
# We don't support this type yet.
raise NotImplementedError(f"Type '{expected_outer_type}' is not supported by the test harness!")
got_value = test_results['outputs'][expected_output['identifier']]
if not isinstance(got_value, expected_python_type):
# Make sure we got the right type
reason = f"For {expected_output['identifier']}, item {got_value} of type {type(got_value)} was returned, but {expected_outer_type} {expected_output['value']} was expected!"
return {'status': 'FAILED', 'reason': reason}
if expected_outer_type == 'File':
# check file path exists
if not os.path.exists(got_value):
return {'status': 'FAILED', 'reason': f"{got_value} not found!"}
# check md5sum
with open(got_value, 'rb') as f:
md5sum = hashlib.md5(f.read()).hexdigest()
if md5sum != expected_output['md5sum']:
reason = f"For {expected_output['identifier']}, md5sum does not match for {got_value}!\n" \
f"Expected: {expected_output['md5sum']}\n" \
f"Actual: {md5sum}"
return {'status': 'FAILED', 'reason': reason}
# check file size
if str(os.path.getsize(got_value)) != expected_output['size']:
reason = f"For {expected_output['identifier']}, {got_value} is {os.path.getsize(got_value)} bytes " \
f"(expected: {expected_output['size']} bytes)!"
return {'status': 'FAILED', 'reason': reason}
elif expected_outer_type == 'Array':
if len(got_value)!= len(expected_output['value']):
# Check array length
reason = f"For {expected_output['identifier']}, array {repr(got_value)} of length {len(expected_output['value'])} was returned, but {expected_output['type']} {repr(expected_output['value'])} of length {len(expected_output['value'])} was expected!"
return {'status': 'FAILED', 'reason': reason}
expected_inner_type = wdl_inner_type(expected_output['type'])
expected_inner_python_type = wdl_type_to_python_type(expected_inner_type)
if expected_inner_python_type is None or expected_inner_type == 'File':
# We don't support this type as an array item yet.
raise NotImplementedError(f"Array item type '{expected_inner_type}' is not supported by the test harness!")
for i, item in enumerate(got_value):
if not isinstance(item, expected_inner_python_type):
# Check the type of each item
reason = f"For {expected_output['identifier']}, at index {i}, item {item} of type {type(item)} was returned, but {expected_inner_type} {expected_output['value'][i]} was expected!"
return {'status': 'FAILED', 'reason': reason}
if item != expected_output['value'][i]:
# And its value
reason = f"For {expected_output['identifier']}, at index {i}, {expected_inner_type} {repr(item)} was returned, but {repr(expected_output['value'][i])} was expected!"
return {'status': 'FAILED', 'reason': reason}
else:
# check the single value returned
if got_value != expected_output['value']:
reason = f"For {expected_output['identifier']}, {expected_outer_type} {repr(got_value)} was returned, but {repr(expected_output['value'])} was expected!"
return {'status': 'FAILED', 'reason': reason}
return {'status': 'SUCCEEDED', 'reason': None}
def announce_test(test_number, total_tests, test):
description = test['description']
print(f'\n[{test_number + 1}/{total_tests}] TEST {test_number}: {description}')
def check_test(test, versions_to_test):
"""
Determine if a test should be skipped.
Returns a response, with SUCCEEDED status if the test should be run, and
SKIPPED otherwise.
"""
# TODO: Tests have a versions_supported array for some reason but
# actually can only really be one version.
version_match = False
for version in test['versions_supported']:
if version in versions_to_test:
version_match = True
if not version_match:
return {'status': 'SKIPPED', 'reason': f'Test only applies to versions: {",".join(test["versions_supported"])}'}
return {'status': 'SUCCEEDED', 'reason': None}
# Make sure output groups don't clobber each other.
LOG_LOCK = threading.Lock()
def print_response(test_number, total_tests, response):
"""
Log a test response that has a status and maybe a reason.
"""
print(f'[{test_number + 1}/{total_tests}] TEST {test_number}: {response["status"]}!')
if response["reason"]:
print(f' REASON: {response["reason"]}')
if 'stdout' in response:
print(f'\nstdout: {response["stdout"]}\n')
if 'stderr' in response:
print(f'\nstderr: {response["stderr"]}\n\n')
def run_test(test_number, total_tests, test, runner, quiet):
"""
Run a test and log success or failure.
Return the response dict.
"""
wdl_file = os.path.abspath(test['wdl'])
json_file = os.path.abspath(test['json'])
args = test.get('args', [])
outputs = test['outputs']
results_file = os.path.abspath(f'results-{uuid4()}.json')
cmd = runner.format_command(wdl_file, json_file, results_file, args, quiet)
with LOG_LOCK:
announce_test(test_number, total_tests, test)
print(f' RUNNING: {cmd}')
response = run_cmd(cmd=cmd, cwd=os.path.dirname(wdl_file), quiet=quiet)
if response['status'] == 'SUCCEEDED':
response.update(verify_outputs(outputs, results_file, quiet=quiet))
with LOG_LOCK:
print_response(test_number, total_tests, response)
return response
def handle_test(test_number, total_tests, test, runner, versions_to_test, quiet):
"""
Decide if the test should be skipped. If not, run it.
Returns a result that can have status SKIPPED, SUCCEEDED, or FAILED.
"""
response = check_test(test, versions_to_test)
if response['status'] != 'SUCCEEDED':
with LOG_LOCK:
announce_test(test_number, total_tests, test)
print_response(test_number, total_tests, response)
return response
return run_test(test_number, total_tests, test, runner, quiet)
def get_test_numbers(number_argument):
ranges = [i for i in number_argument.split(',') if i]
split_ranges = [i.split('-') if '-' in i else [i, i] for i in ranges]
tests = set()
for start, end in split_ranges:
for test_number in range(int(start), int(end) + 1):
tests.add(test_number)
return sorted(list(tests))
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(description='Run WDL conformance tests.')
parser.add_argument("--quiet", "-q", default=False, action='store_true',
help='Suppress printing run messages.')
parser.add_argument("--versions", "-v", default="1.0",
help='Select the WDL versions you wish to test against.')
parser.add_argument("--numbers", "-n", default=None,
help='Select the WDL test numbers you wish to run.')
parser.add_argument("--runner", "-r", default='cromwell',
help='Select the WDL runner to use.')
parser.add_argument("--threads", "-t", type=int, default=None,
help='Number of tests to run in parallel.')
args = parser.parse_args(argv)
# Get all the versions to test.
# Unlike with CWL, WDL requires a WDL file to declare a spacific version,
# and prohibits mixing file versions in a workflow, although some runners
# might allow it.
# But the tests all need to be for single WDL versions.
versions_to_test = set(args.versions.split(','))
with open('conformance.json', 'r') as f:
tests = json.load(f)
total_tests = len(tests)
if args.runner not in RUNNERS:
print(f'Unsupported runner: {args.runner}')
sys.exit(1)
runner = RUNNERS[args.runner]
print(f'Testing runner {args.runner} on WDL versions: {",".join(versions_to_test)}\n')
successes = 0
skips = 0
test_numbers = get_test_numbers(args.numbers) if args.numbers else range(total_tests)
selected_tests = len(test_numbers)
with ThreadPoolExecutor(max_workers=args.threads) as executor:
pending_futures = []
for test_number in test_numbers:
test = tests[str(test_number)]
# Handle each test as a concurrent job
result_future = executor.submit(handle_test,
test_number,
total_tests,
test,
runner,
versions_to_test,
args.quiet)
pending_futures.append(result_future)
for result_future in as_completed(pending_futures):
# Go get each result or reraise the relevant exception
result = result_future.result()
if result['status'] == 'SUCCEEDED':
successes += 1
elif result['status'] == 'SKIPPED':
skips += 1
print(f'{selected_tests - skips} tests run, {successes} succeeded, {selected_tests - skips - successes} failed, {skips} skiped')
if successes < selected_tests - skips:
# Fail the program overall if tests failed.
sys.exit(1)
if __name__ == '__main__':
main()