Skip to content

Commit e085312

Browse files
authored
Better errors (#234)
* Improve error reporting for expression evaluation. * Connecting errors back to source file lines where possible. * Better reporting on missing input files. * Updating also should preserve line numbers as much as possible.
1 parent aeba354 commit e085312

15 files changed

+303
-203
lines changed

cwltool/builder.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from . import expression
44
import avro
55
import schema_salad.validate as validate
6+
from schema_salad.sourceline import SourceLine
67
from typing import Any, Callable, Text, Type, Union
78
from .errors import WorkflowException
89
from .stdfsaccess import StdFsAccess
@@ -36,6 +37,7 @@ def __init__(self): # type: () -> None
3637
self.stagedir = None # type: Text
3738
self.make_fs_access = None # type: Type[StdFsAccess]
3839
self.build_job_script = None # type: Callable[[List[str]], Text]
40+
self.debug = False # type: bool
3941

4042
def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
4143
# type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]]
@@ -155,7 +157,8 @@ def tostr(self, value): # type: (Any) -> Text
155157
def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text]
156158
value = binding.get("datum")
157159
if "valueFrom" in binding:
158-
value = self.do_eval(binding["valueFrom"], context=value)
160+
with SourceLine(binding, "valueFrom", WorkflowException):
161+
value = self.do_eval(binding["valueFrom"], context=value)
159162

160163
prefix = binding.get("prefix")
161164
sep = binding.get("separate", True)
@@ -203,4 +206,5 @@ def do_eval(self, ex, context=None, pull_image=True, recursive=False):
203206
self.outdir, self.tmpdir,
204207
self.resources,
205208
context=context, pull_image=pull_image,
206-
timeout=self.timeout)
209+
timeout=self.timeout,
210+
debug=self.debug)

cwltool/cwlrdf.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import json
22
import urlparse
33
from .process import Process
4-
from schema_salad.ref_resolver import Loader
4+
from schema_salad.ref_resolver import Loader, ContextType
55
from schema_salad.jsonld_context import makerdf
66
from rdflib import Graph, plugin, URIRef
77
from rdflib.serializer import Serializer
88
from typing import Any, Dict, IO, Text, Union
99

10-
def gather(tool, ctx): # type: (Process, Loader.ContextType) -> Graph
10+
def gather(tool, ctx): # type: (Process, ContextType) -> Graph
1111
g = Graph()
1212

1313
def visitor(t):
@@ -17,7 +17,7 @@ def visitor(t):
1717
return g
1818

1919
def printrdf(wf, ctx, sr, stdout):
20-
# type: (Process, Loader.ContextType, Text, IO[Any]) -> None
20+
# type: (Process, ContextType, Text, IO[Any]) -> None
2121
stdout.write(gather(wf, ctx).serialize(format=sr))
2222

2323
def lastpart(uri): # type: (Any) -> Text
@@ -158,7 +158,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None
158158

159159

160160
def printdot(wf, ctx, stdout, include_parameters=False):
161-
# type: (Process, Loader.ContextType, Any, bool) -> None
161+
# type: (Process, ContextType, Any, bool) -> None
162162
g = gather(wf, ctx)
163163

164164
stdout.write("digraph {")

cwltool/draft2tool.py

+64-58
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
from .job import CommandLineJob
2626
from .stdfsaccess import StdFsAccess
2727

28+
from schema_salad.sourceline import SourceLine, indent
29+
2830
ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
2931
ACCEPTLIST_EN_RELAXED_RE = re.compile(r"^[ a-zA-Z0-9._+-]+$") # with spaces
3032
ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
@@ -159,11 +161,7 @@ def makeJobRunner(self): # type: () -> CommandLineJob
159161
def makePathMapper(self, reffiles, stagedir, **kwargs):
160162
# type: (List[Any], Text, **Any) -> PathMapper
161163
dockerReq, _ = self.get_requirement("DockerRequirement")
162-
try:
163-
return PathMapper(reffiles, kwargs["basedir"], stagedir)
164-
except OSError as e:
165-
if e.errno == errno.ENOENT:
166-
raise WorkflowException(u"Missing input file %s" % e)
164+
return PathMapper(reffiles, kwargs["basedir"], stagedir)
167165

168166
def job(self, joborder, output_callback, **kwargs):
169167
# type: (Dict[Text, Text], Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
@@ -286,18 +284,21 @@ def rm_pending_output_callback(output_callback, jobcachepending,
286284
adjustDirObjs(builder.bindings, _check_adjust)
287285

288286
if self.tool.get("stdin"):
289-
j.stdin = builder.do_eval(self.tool["stdin"])
290-
reffiles.append({"class": "File", "path": j.stdin})
287+
with SourceLine(self.tool, "stdin", validate.ValidationException):
288+
j.stdin = builder.do_eval(self.tool["stdin"])
289+
reffiles.append({"class": "File", "path": j.stdin})
291290

292291
if self.tool.get("stderr"):
293-
j.stderr = builder.do_eval(self.tool["stderr"])
294-
if os.path.isabs(j.stderr) or ".." in j.stderr:
295-
raise validate.ValidationException("stderr must be a relative path")
292+
with SourceLine(self.tool, "stderr", validate.ValidationException):
293+
j.stderr = builder.do_eval(self.tool["stderr"])
294+
if os.path.isabs(j.stderr) or ".." in j.stderr:
295+
raise validate.ValidationException("stderr must be a relative path, got '%s'" % j.stderr)
296296

297297
if self.tool.get("stdout"):
298-
j.stdout = builder.do_eval(self.tool["stdout"])
299-
if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout:
300-
raise validate.ValidationException("stdout must be a relative path")
298+
with SourceLine(self.tool, "stdout", validate.ValidationException):
299+
j.stdout = builder.do_eval(self.tool["stdout"])
300+
if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout:
301+
raise validate.ValidationException("stdout must be a relative path, got '%s'" % j.stdout)
301302

302303
if _logger.isEnabledFor(logging.DEBUG):
303304
_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
@@ -389,17 +390,18 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
389390
if _logger.isEnabledFor(logging.DEBUG):
390391
_logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
391392
else:
392-
for port in ports:
393-
fragment = shortname(port["id"])
394-
try:
395-
ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum)
396-
except Exception as e:
397-
_logger.debug(
398-
u"Error collecting output for parameter '%s'"
399-
% shortname(port["id"]), exc_info=True)
400-
raise WorkflowException(
401-
u"Error collecting output for parameter '%s': %s"
402-
% (shortname(port["id"]), e))
393+
for i, port in enumerate(ports):
394+
with SourceLine(ports, i, WorkflowException):
395+
fragment = shortname(port["id"])
396+
try:
397+
ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum)
398+
except Exception as e:
399+
_logger.debug(
400+
u"Error collecting output for parameter '%s'"
401+
% shortname(port["id"]), exc_info=True)
402+
raise WorkflowException(
403+
u"Error collecting output for parameter '%s':\n%s"
404+
% (shortname(port["id"]), indent(unicode(e))))
403405

404406
if ret:
405407
adjustFileObjs(ret,
@@ -427,24 +429,25 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
427429
revmap = partial(revmap_file, builder, outdir)
428430

429431
if "glob" in binding:
430-
for gb in aslist(binding["glob"]):
431-
gb = builder.do_eval(gb)
432-
if gb:
433-
globpatterns.extend(aslist(gb))
434-
435-
for gb in globpatterns:
436-
if gb.startswith(outdir):
437-
gb = gb[len(outdir)+1:]
438-
elif gb == ".":
439-
gb = outdir
440-
elif gb.startswith("/"):
441-
raise WorkflowException("glob patterns must not start with '/'")
442-
try:
443-
r.extend([{"location": g,
444-
"class": "File" if fs_access.isfile(g) else "Directory"}
445-
for g in fs_access.glob(fs_access.join(outdir, gb))])
446-
except (OSError, IOError) as e:
447-
_logger.warn(Text(e))
432+
with SourceLine(binding, "glob", WorkflowException):
433+
for gb in aslist(binding["glob"]):
434+
gb = builder.do_eval(gb)
435+
if gb:
436+
globpatterns.extend(aslist(gb))
437+
438+
for gb in globpatterns:
439+
if gb.startswith(outdir):
440+
gb = gb[len(outdir)+1:]
441+
elif gb == ".":
442+
gb = outdir
443+
elif gb.startswith("/"):
444+
raise WorkflowException("glob patterns must not start with '/'")
445+
try:
446+
r.extend([{"location": g,
447+
"class": "File" if fs_access.isfile(g) else "Directory"}
448+
for g in fs_access.glob(fs_access.join(outdir, gb))])
449+
except (OSError, IOError) as e:
450+
_logger.warn(Text(e))
448451

449452
for files in r:
450453
if files["class"] == "Directory" and "listing" not in files:
@@ -479,11 +482,13 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
479482
single = True
480483

481484
if "outputEval" in binding:
482-
r = builder.do_eval(binding["outputEval"], context=r)
485+
with SourceLine(binding, "outputEval", WorkflowException):
486+
r = builder.do_eval(binding["outputEval"], context=r)
483487

484488
if single:
485489
if not r and not optional:
486-
raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns))
490+
with SourceLine(binding, "glob", WorkflowException):
491+
raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns))
487492
elif not r and optional:
488493
pass
489494
elif isinstance(r, list):
@@ -498,20 +503,21 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
498503
Callable[[Any], Any], revmap))
499504

500505
if "secondaryFiles" in schema:
501-
for primary in aslist(r):
502-
if isinstance(primary, dict):
503-
primary["secondaryFiles"] = []
504-
for sf in aslist(schema["secondaryFiles"]):
505-
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
506-
sfpath = builder.do_eval(sf, context=primary)
507-
if isinstance(sfpath, basestring):
508-
sfpath = revmap({"location": sfpath, "class": "File"})
509-
else:
510-
sfpath = {"location": substitute(primary["location"], sf), "class": "File"}
511-
512-
for sfitem in aslist(sfpath):
513-
if fs_access.exists(sfitem["location"]):
514-
primary["secondaryFiles"].append(sfitem)
506+
with SourceLine(schema, "secondaryFiles", WorkflowException):
507+
for primary in aslist(r):
508+
if isinstance(primary, dict):
509+
primary["secondaryFiles"] = []
510+
for sf in aslist(schema["secondaryFiles"]):
511+
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
512+
sfpath = builder.do_eval(sf, context=primary)
513+
if isinstance(sfpath, basestring):
514+
sfpath = revmap({"location": sfpath, "class": "File"})
515+
else:
516+
sfpath = {"location": substitute(primary["location"], sf), "class": "File"}
517+
518+
for sfitem in aslist(sfpath):
519+
if fs_access.exists(sfitem["location"]):
520+
primary["secondaryFiles"].append(sfitem)
515521

516522
if not r and optional:
517523
r = None

cwltool/expression.py

+46-20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
import re
6+
import copy
67

78
from typing import Any, AnyStr, Union, Text, Dict, List
89
import schema_salad.validate as validate
@@ -106,33 +107,54 @@ def scanner(scan): # type: (Text) -> List[int]
106107
else:
107108
return None
108109

109-
def next_seg(remain, obj): # type: (Text, Any)->Text
110+
def next_seg(remain, obj): # type: (Text, Any) -> Any
110111
if remain:
111112
m = segment_re.match(remain)
113+
key = None # type: Union[str, int]
112114
if m.group(0)[0] == '.':
113-
return next_seg(remain[m.end(0):], obj[m.group(0)[1:]])
115+
key = m.group(0)[1:]
114116
elif m.group(0)[1] in ("'", '"'):
115117
key = m.group(0)[2:-2].replace("\\'", "'").replace('\\"', '"')
116-
return next_seg(remain[m.end(0):], obj[key])
118+
119+
if key:
120+
if isinstance(obj, list) and key == "length" and not remain[m.end(0):]:
121+
return len(obj)
122+
if not isinstance(obj, dict):
123+
raise WorkflowException(" is a %s, cannot index on string '%s'" % (type(obj).__name__, key))
124+
if key not in obj:
125+
raise WorkflowException(" does not contain key '%s'" % key)
117126
else:
118-
key = m.group(0)[1:-1]
119-
return next_seg(remain[m.end(0):], obj[int(key)])
127+
try:
128+
key = int(m.group(0)[1:-1])
129+
except ValueError as v:
130+
raise WorkflowException(unicode(v))
131+
if not isinstance(obj, list):
132+
raise WorkflowException(" is a %s, cannot index on int '%s'" % (type(obj).__name__, key))
133+
if key >= len(obj):
134+
raise WorkflowException(" list index %i out of range" % key)
135+
try:
136+
return next_seg(remain[m.end(0):], obj[key])
137+
except WorkflowException as w:
138+
raise WorkflowException("%s%s" % (m.group(0), w))
120139
else:
121140
return obj
122141

123-
def evaluator(ex, jslib, obj, fullJS=False, timeout=None):
124-
# type: (Text, Text, Dict[Text, Any], bool, int) -> JSON
142+
def evaluator(ex, jslib, obj, fullJS=False, timeout=None, debug=False):
143+
# type: (Text, Text, Dict[Text, Any], bool, int, bool) -> JSON
125144
m = param_re.match(ex)
126145
if m:
127-
return next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)])
146+
try:
147+
return next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)])
148+
except Exception as w:
149+
raise WorkflowException("%s%s" % (m.group(1), w))
128150
elif fullJS:
129-
return sandboxjs.execjs(ex, jslib, timeout=timeout)
151+
return sandboxjs.execjs(ex, jslib, timeout=timeout, debug=debug)
130152
else:
131153
raise sandboxjs.JavascriptException("Syntax error in parameter reference '%s' or used Javascript code without specifying InlineJavascriptRequirement.", ex)
132154

133155
def interpolate(scan, rootvars,
134-
timeout=None, fullJS=None, jslib=""):
135-
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text]) -> JSON
156+
timeout=None, fullJS=None, jslib="", debug=False):
157+
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool) -> JSON
136158
scan = scan.strip()
137159
parts = []
138160
w = scanner(scan)
@@ -141,7 +163,7 @@ def interpolate(scan, rootvars,
141163

142164
if scan[w[0]] == '$':
143165
e = evaluator(scan[w[0]+1:w[1]], jslib, rootvars, fullJS=fullJS,
144-
timeout=timeout)
166+
timeout=timeout, debug=debug)
145167
if w[0] == 0 and w[1] == len(scan):
146168
return e
147169
leaf = json.dumps(e, sort_keys=True)
@@ -158,10 +180,10 @@ def interpolate(scan, rootvars,
158180
return ''.join(parts)
159181

160182
def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
161-
context=None, pull_image=True, timeout=None):
162-
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int) -> Any
183+
context=None, pull_image=True, timeout=None, debug=False):
184+
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool) -> Any
163185

164-
runtime = resources.copy()
186+
runtime = copy.copy(resources)
165187
runtime["tmpdir"] = tmpdir
166188
runtime["outdir"] = outdir
167189

@@ -179,10 +201,14 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
179201
jslib = jshead(r.get("expressionLib", []), rootvars)
180202
break
181203

182-
return interpolate(ex,
183-
rootvars,
184-
timeout=timeout,
185-
fullJS=fullJS,
186-
jslib=jslib)
204+
try:
205+
return interpolate(ex,
206+
rootvars,
207+
timeout=timeout,
208+
fullJS=fullJS,
209+
jslib=jslib,
210+
debug=debug)
211+
except Exception as e:
212+
raise WorkflowException("Expression evaluation error:\n%s" % e)
187213
else:
188214
return ex

cwltool/job.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def linkoutdir(src, tgt):
305305
_logger.exception("Exception while running job")
306306
processStatus = "permanentFail"
307307
except WorkflowException as e:
308-
_logger.error(u"Error while running job: %s" % e)
308+
_logger.error(u"[job %s] Job error:\n%s" % (self.name, e))
309309
processStatus = "permanentFail"
310310
except Exception as e:
311311
_logger.exception("Exception while running job")

0 commit comments

Comments
 (0)