Skip to content

Commit 9be7ad8

Browse files
authored
Merge pull request #516 from common-workflow-language/toil-fixes
Refactor secondaryFile handling to avoid basing on primary "location".
2 parents 1c6b75d + fb98141 commit 9be7ad8

File tree

3 files changed

+78
-33
lines changed

3 files changed

+78
-33
lines changed

cwltool/builder.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22
import copy
33
import os
4+
import logging
45
from typing import Any, Callable, Dict, List, Text, Type, Union
56

67
import six
@@ -18,6 +19,8 @@
1819
from .stdfsaccess import StdFsAccess
1920
from .utils import aslist, get_feature, docker_windows_path_adjust, onWindows
2021

22+
_logger = logging.getLogger("cwltool")
23+
2124
AvroSchemaFromJSONData = avro.schema.make_avsc_object
2225

2326
CONTENT_LIMIT = 64 * 1024
@@ -146,18 +149,25 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
146149
datum["secondaryFiles"] = []
147150
for sf in aslist(schema["secondaryFiles"]):
148151
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
149-
secondary_eval = self.do_eval(sf, context=datum)
150-
if isinstance(secondary_eval, string_types):
151-
sfpath = {"location": secondary_eval,
152-
"class": "File"}
153-
else:
154-
sfpath = secondary_eval
155-
else:
156-
sfpath = {"location": substitute(datum["location"], sf), "class": "File"}
157-
if isinstance(sfpath, list):
158-
datum["secondaryFiles"].extend(sfpath)
152+
sfpath = self.do_eval(sf, context=datum)
159153
else:
160-
datum["secondaryFiles"].append(sfpath)
154+
sfpath = substitute(datum["basename"], sf)
155+
for sfname in aslist(sfpath):
156+
found = False
157+
for d in datum["secondaryFiles"]:
158+
if not d.get("basename"):
159+
d["basename"] = d["location"][d["location"].rindex("/")+1:]
160+
if d["basename"] == sfname:
161+
found = True
162+
if not found:
163+
if isinstance(sfname, dict):
164+
datum["secondaryFiles"].append(sfname)
165+
else:
166+
datum["secondaryFiles"].append({
167+
"location": datum["location"][0:datum["location"].rindex("/")+1]+sfname,
168+
"basename": sfname,
169+
"class": "File"})
170+
161171
normalizeFilesDirs(datum["secondaryFiles"])
162172

163173
def _capture_files(f):

cwltool/draft2tool.py

+47-17
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
3636
ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
3737
DEFAULT_CONTAINER_MSG="""We are on Microsoft Windows and not all components of this CWL description have a
38-
container specified. This means that these steps will be executed in the default container,
38+
container specified. This means that these steps will be executed in the default container,
3939
which is %s.
4040
4141
Note, this could affect portability if this CWL description relies on non-POSIX features
@@ -116,17 +116,26 @@ def revmap_file(builder, outdir, f):
116116
if not split.scheme:
117117
outdir = file_uri(str(outdir))
118118

119+
# builder.outdir is the inner (container/compute node) output directory
120+
# outdir is the outer (host/storage system) output directory
121+
119122
if "location" in f:
120123
if f["location"].startswith("file://"):
121124
path = convert_pathsep_to_unix(uri_file_path(f["location"]))
122125
revmap_f = builder.pathmapper.reversemap(path)
126+
123127
if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
124128
f["basename"] = os.path.basename(path)
125-
f["location"] = revmap_f[0]
129+
f["location"] = revmap_f[1]
126130
elif path == builder.outdir:
127131
f["location"] = outdir
128132
elif path.startswith(builder.outdir):
129133
f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:])
134+
elif f["location"].startswith(outdir):
135+
revmap_f = builder.pathmapper.reversemap(builder.fs_access.join(builder.outdir, f["location"][len(outdir) + 1:]))
136+
if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
137+
f["basename"] = os.path.basename(path)
138+
f["location"] = revmap_f[1]
130139
return f
131140

132141
if "path" in f:
@@ -190,7 +199,7 @@ def __init__(self, toolpath_object, **kwargs):
190199
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
191200
self.find_default_container = kwargs.get("find_default_container", None)
192201

193-
def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBase
202+
def makeJobRunner(self, use_container=True, **kwargs): # type: (Optional[bool], **Any) -> JobBase
194203
dockerReq, _ = self.get_requirement("DockerRequirement")
195204
if not dockerReq and use_container:
196205
if self.find_default_container:
@@ -216,7 +225,8 @@ def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBas
216225

217226
def makePathMapper(self, reffiles, stagedir, **kwargs):
218227
# type: (List[Any], Text, **Any) -> PathMapper
219-
return PathMapper(reffiles, kwargs["basedir"], stagedir)
228+
return PathMapper(reffiles, kwargs["basedir"], stagedir,
229+
separateDirs=kwargs.get("separateDirs", True))
220230

221231
def updatePathmap(self, outdir, pathmap, fn):
222232
# type: (Text, PathMapper, Dict) -> None
@@ -325,9 +335,10 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
325335

326336
reffiles = copy.deepcopy(builder.files)
327337

328-
j = self.makeJobRunner(kwargs.get("use_container"))
338+
j = self.makeJobRunner(**kwargs)
329339
j.builder = builder
330340
j.joborder = builder.job
341+
j.make_pathmapper = self.makePathMapper
331342
j.stdin = None
332343
j.stderr = None
333344
j.stdout = None
@@ -350,6 +361,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
350361
if "stagedir" in make_path_mapper_kwargs:
351362
make_path_mapper_kwargs = make_path_mapper_kwargs.copy()
352363
del make_path_mapper_kwargs["stagedir"]
364+
353365
builder.pathmapper = self.makePathMapper(reffiles, builder.stagedir, **make_path_mapper_kwargs)
354366
builder.requirements = j.requirements
355367

@@ -566,7 +578,12 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
566578
elif gb.startswith("/"):
567579
raise WorkflowException("glob patterns must not start with '/'")
568580
try:
581+
prefix = fs_access.glob(outdir)
569582
r.extend([{"location": g,
583+
"path": fs_access.join(builder.outdir, g[len(prefix[0])+1:]),
584+
"basename": os.path.basename(g),
585+
"nameroot": os.path.splitext(os.path.basename(g))[0],
586+
"nameext": os.path.splitext(os.path.basename(g))[1],
570587
"class": "File" if fs_access.isfile(g) else "Directory"}
571588
for g in fs_access.glob(fs_access.join(outdir, gb))])
572589
except (OSError, IOError) as e:
@@ -576,12 +593,14 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
576593
raise
577594

578595
for files in r:
596+
rfile = files.copy()
597+
revmap(rfile)
579598
if files["class"] == "Directory":
580599
ll = builder.loadListing or (binding and binding.get("loadListing"))
581600
if ll and ll != "no_listing":
582601
get_listing(fs_access, files, (ll == "deep_listing"))
583602
else:
584-
with fs_access.open(files["location"], "rb") as f:
603+
with fs_access.open(rfile["location"], "rb") as f:
585604
contents = b""
586605
if binding.get("loadContents") or compute_checksum:
587606
contents = f.read(CONTENT_LIMIT)
@@ -625,28 +644,39 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
625644
else:
626645
r = r[0]
627646

628-
# Ensure files point to local references outside of the run environment
629-
adjustFileObjs(r, cast( # known bug in mypy
630-
# https://github.com/python/mypy/issues/797
631-
Callable[[Any], Any], revmap))
632-
633647
if "secondaryFiles" in schema:
634648
with SourceLine(schema, "secondaryFiles", WorkflowException):
635649
for primary in aslist(r):
636650
if isinstance(primary, dict):
637-
primary["secondaryFiles"] = []
651+
primary.setdefault("secondaryFiles", [])
652+
pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
638653
for sf in aslist(schema["secondaryFiles"]):
639654
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
640655
sfpath = builder.do_eval(sf, context=primary)
641-
if isinstance(sfpath, string_types):
642-
sfpath = revmap({"location": sfpath, "class": "File"})
656+
subst = False
643657
else:
644-
sfpath = {"location": substitute(primary["location"], sf), "class": "File"}
645-
658+
sfpath = sf
659+
subst = True
646660
for sfitem in aslist(sfpath):
647-
if fs_access.exists(sfitem["location"]):
661+
if isinstance(sfitem, string_types):
662+
if subst:
663+
sfitem = {"path": substitute(primary["path"], sfitem)}
664+
else:
665+
sfitem = {"path": pathprefix+sfitem}
666+
if "path" in sfitem and "location" not in sfitem:
667+
revmap(sfitem)
668+
if fs_access.isfile(sfitem["location"]):
669+
sfitem["class"] = "File"
670+
primary["secondaryFiles"].append(sfitem)
671+
elif fs_access.isdir(sfitem["location"]):
672+
sfitem["class"] = "Directory"
648673
primary["secondaryFiles"].append(sfitem)
649674

675+
# Ensure files point to local references outside of the run environment
676+
adjustFileObjs(r, cast( # known bug in mypy
677+
# https://github.com/python/mypy/issues/797
678+
Callable[[Any], Any], revmap))
679+
650680
if not r and optional:
651681
r = None
652682

cwltool/job.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def __init__(self): # type: () -> None
132132
self.name = None # type: Text
133133
self.command_line = None # type: List[Text]
134134
self.pathmapper = None # type: PathMapper
135+
self.make_pathmapper = None # type: Callable[..., PathMapper]
135136
self.generatemapper = None # type: PathMapper
136137
self.collect_outputs = None # type: Union[Callable[[Any], Any], functools.partial[Any]]
137138
self.output_callback = None # type: Callable[[Any, Any], Any]
@@ -142,7 +143,7 @@ def __init__(self): # type: () -> None
142143
self.stagedir = None # type: Text
143144
self.inplace_update = None # type: bool
144145

145-
def _setup(self): # type: () -> None
146+
def _setup(self, kwargs): # type: (Dict) -> None
146147
if not os.path.exists(self.outdir):
147148
os.makedirs(self.outdir)
148149

@@ -154,8 +155,12 @@ def _setup(self): # type: () -> None
154155
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0]))
155156

156157
if self.generatefiles["listing"]:
157-
self.generatemapper = PathMapper(cast(List[Any], self.generatefiles["listing"]),
158-
self.outdir, self.outdir, separateDirs=False)
158+
make_path_mapper_kwargs = kwargs
159+
if "basedir" in make_path_mapper_kwargs:
160+
make_path_mapper_kwargs = make_path_mapper_kwargs.copy()
161+
del make_path_mapper_kwargs["basedir"]
162+
self.generatemapper = self.make_pathmapper(cast(List[Any], self.generatefiles["listing"]),
163+
self.outdir, basedir=self.outdir, separateDirs=False, **make_path_mapper_kwargs)
159164
_logger.debug(u"[job %s] initial work dir %s", self.name,
160165
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4))
161166

@@ -275,7 +280,7 @@ def run(self, pull_image=True, rm_container=True,
275280
rm_tmpdir=True, move_outputs="move", **kwargs):
276281
# type: (bool, bool, bool, Text, **Any) -> None
277282

278-
self._setup()
283+
self._setup(kwargs)
279284

280285
env = self.environment
281286
if not os.path.exists(self.tmpdir):
@@ -369,7 +374,7 @@ def run(self, pull_image=True, rm_container=True,
369374
"Docker is not available for this tool, try --no-container"
370375
" to disable Docker: %s" % e)
371376

372-
self._setup()
377+
self._setup(kwargs)
373378

374379
runtime = [u"docker", u"run", u"-i"]
375380

0 commit comments

Comments
 (0)