Skip to content

Commit 76f3ac1

Browse files
authored
Propagate custom fetcher_constructor to workflow steps (#243)
* Propagate custom fetcher_constructor to workflow steps, and custom urljoin in scandeps.
1 parent d2b57a8 commit 76f3ac1

File tree

5 files changed

+30
-26
lines changed

5 files changed

+30
-26
lines changed

cwltool/load_tool.py

+12-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import re
88
import urlparse
99

10-
from schema_salad.ref_resolver import Loader, Fetcher, DefaultFetcher
10+
from schema_salad.ref_resolver import Loader, Fetcher
1111
import schema_salad.validate as validate
1212
from schema_salad.validate import ValidationException
1313
import schema_salad.schema as schema
@@ -26,7 +26,7 @@
2626

2727
def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]]
2828
resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text]
29-
fetcher_constructor=DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
29+
fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
3030
):
3131
# type: (...) -> Tuple[Loader, Dict[Text, Any], Text]
3232
"""Retrieve a CWL document."""
@@ -111,23 +111,26 @@ def validate_document(document_loader, # type: Loader
111111
enable_dev=False, # type: bool
112112
strict=True, # type: bool
113113
preprocess_only=False, # type: bool
114-
fetcher_constructor=DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
114+
fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
115115
):
116116
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
117117
"""Validate a CWL document."""
118118

119+
if isinstance(workflowobj, list):
120+
workflowobj = {
121+
"$graph": workflowobj
122+
}
123+
124+
if not isinstance(workflowobj, dict):
125+
raise ValueError("workflowjobj must be a dict")
126+
119127
jobobj = None
120128
if "cwl:tool" in workflowobj:
121129
jobobj, _ = document_loader.resolve_all(workflowobj, uri)
122130
uri = urlparse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
123131
del cast(dict, jobobj)["https://w3id.org/cwl/cwl#tool"]
124132
workflowobj = fetch_document(uri, fetcher_constructor=fetcher_constructor)[1]
125133

126-
if isinstance(workflowobj, list):
127-
workflowobj = {
128-
"$graph": workflowobj
129-
}
130-
131134
fileuri = urlparse.urldefrag(uri)[0]
132135

133136
if "cwlVersion" in workflowobj:
@@ -235,7 +238,7 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
235238
enable_dev=False, # type: bool
236239
strict=True, # type: bool
237240
resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text]
238-
fetcher_constructor=DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
241+
fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
239242
):
240243
# type: (...) -> Process
241244

cwltool/main.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from typing import (Union, Any, AnyStr, cast, Callable, Dict, Sequence, Text,
1919
Tuple, Type, IO)
2020

21-
from schema_salad.ref_resolver import Loader, Fetcher, DefaultFetcher
21+
from schema_salad.ref_resolver import Loader, Fetcher
2222
import schema_salad.validate as validate
2323
import schema_salad.jsonld_context
2424
import schema_salad.makedoc
@@ -516,7 +516,7 @@ def printdeps(obj, document_loader, stdout, relative_deps, uri, basedir=None):
516516
"location": uri} # type: Dict[Text, Any]
517517

518518
def loadref(b, u):
519-
return document_loader.fetch(urlparse.urljoin(b, u))
519+
return document_loader.fetch(document_loader.fetcher.urljoin(b, u))
520520

521521
sf = scandeps(
522522
basedir if basedir else uri, obj, set(("$import", "run")),
@@ -565,7 +565,7 @@ def main(argsl=None, # type: List[str]
565565
versionfunc=versionstring, # type: Callable[[], Text]
566566
job_order_object=None, # type: Union[Tuple[Dict[Text, Any], Text], int]
567567
make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess]
568-
fetcher_constructor=DefaultFetcher, # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
568+
fetcher_constructor=None, # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
569569
resolver=tool_resolver
570570
):
571571
# type: (...) -> int

cwltool/process.py

+12-12
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import tempfile
88
import glob
99
import urlparse
10-
import pprint
1110
from collections import Iterable
1211
import errno
1312
import shutil
@@ -640,8 +639,8 @@ def mergedirs(listing):
640639
r.extend(ents.itervalues())
641640
return r
642641

643-
def scandeps(base, doc, reffields, urlfields, loadref):
644-
# type: (Text, Any, Set[Text], Set[Text], Callable[[Text, Text], Any]) -> List[Dict[Text, Text]]
642+
def scandeps(base, doc, reffields, urlfields, loadref, urljoin=urlparse.urljoin):
643+
# type: (Text, Any, Set[Text], Set[Text], Callable[[Text, Text], Any], Callable[[Text, Text], Text]) -> List[Dict[Text, Text]]
645644
r = [] # type: List[Dict[Text, Text]]
646645
deps = None # type: Dict[Text, Any]
647646
if isinstance(doc, dict):
@@ -660,7 +659,7 @@ def scandeps(base, doc, reffields, urlfields, loadref):
660659
if u and not u.startswith("_:"):
661660
deps = {
662661
"class": doc["class"],
663-
"location": urlparse.urljoin(base, u)
662+
"location": urljoin(base, u)
664663
}
665664
if doc["class"] == "Directory" and "listing" in doc:
666665
deps["listing"] = doc["listing"]
@@ -670,23 +669,23 @@ def scandeps(base, doc, reffields, urlfields, loadref):
670669
r.append(deps)
671670
else:
672671
if doc["class"] == "Directory" and "listing" in doc:
673-
r.extend(scandeps(base, doc["listing"], reffields, urlfields, loadref))
672+
r.extend(scandeps(base, doc["listing"], reffields, urlfields, loadref, urljoin=urljoin))
674673
elif doc["class"] == "File" and "secondaryFiles" in doc:
675-
r.extend(scandeps(base, doc["secondaryFiles"], reffields, urlfields, loadref))
674+
r.extend(scandeps(base, doc["secondaryFiles"], reffields, urlfields, loadref, urljoin=urljoin))
676675

677676
for k, v in doc.iteritems():
678677
if k in reffields:
679678
for u in aslist(v):
680679
if isinstance(u, dict):
681-
r.extend(scandeps(base, u, reffields, urlfields, loadref))
680+
r.extend(scandeps(base, u, reffields, urlfields, loadref, urljoin=urljoin))
682681
else:
683682
sub = loadref(base, u)
684-
subid = urlparse.urljoin(base, u)
683+
subid = urljoin(base, u)
685684
deps = {
686685
"class": "File",
687686
"location": subid
688687
}
689-
sf = scandeps(subid, sub, reffields, urlfields, loadref)
688+
sf = scandeps(subid, sub, reffields, urlfields, loadref, urljoin=urljoin)
690689
if sf:
691690
deps["secondaryFiles"] = sf
692691
deps = nestdir(base, deps)
@@ -695,19 +694,20 @@ def scandeps(base, doc, reffields, urlfields, loadref):
695694
for u in aslist(v):
696695
deps = {
697696
"class": "File",
698-
"location": urlparse.urljoin(base, u)
697+
"location": urljoin(base, u)
699698
}
700699
deps = nestdir(base, deps)
701700
r.append(deps)
702701
elif k not in ("listing", "secondaryFiles"):
703-
r.extend(scandeps(base, v, reffields, urlfields, loadref))
702+
r.extend(scandeps(base, v, reffields, urlfields, loadref, urljoin=urljoin))
704703
elif isinstance(doc, list):
705704
for d in doc:
706-
r.extend(scandeps(base, d, reffields, urlfields, loadref))
705+
r.extend(scandeps(base, d, reffields, urlfields, loadref, urljoin=urljoin))
707706

708707
if r:
709708
normalizeFilesDirs(r)
710709
r = mergedirs(r)
710+
711711
return r
712712

713713
def compute_checksums(fs_access, fileobj):

cwltool/workflow.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,8 @@ def __init__(self, toolpath_object, pos, **kwargs):
445445
self.embedded_tool = load_tool(
446446
toolpath_object["run"], kwargs.get("makeTool"), kwargs,
447447
enable_dev=kwargs.get("enable_dev"),
448-
strict=kwargs.get("strict"))
448+
strict=kwargs.get("strict"),
449+
fetcher_constructor=kwargs.get("fetcher_constructor"))
449450
except validate.ValidationException as v:
450451
raise WorkflowException(
451452
u"Tool definition %s failed validation:\n%s" %

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
'rdflib-jsonld == 0.3.0',
5050
'html5lib >=0.90, <= 0.9999999',
5151
'shellescape',
52-
'schema-salad >= 1.20.20161122192122, < 2',
52+
'schema-salad >= 1.21.20161206181442, < 2',
5353
'typing >= 3.5.2',
5454
'cwltest >= 1.0.20160907111242'],
5555
test_suite='tests',

0 commit comments

Comments
 (0)