Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate input annotations to primary.cwlprov files #1678

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ include tests/loop/*
include tests/tmp1/tmp2/tmp3/.gitkeep
include tests/tmp4/alpha/*
include tests/wf/*
include tests/wf/adv_prov/*
include tests/wf/adv_prov/data/*
include tests/wf/adv_prov/tools/*
include tests/wf/operation/*
include tests/override/*
include tests/reloc/*.cwl
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ diff_pydocstyle_report: pydocstyle_report.txt

## codespell-check : check for common misspellings
codespell-check:
@codespell $(shell git ls-files | grep -v cwltool/schemas | grep -v cwltool/jshint/ | grep -v mypy-stubs) \
@codespell $(shell git ls-files | grep -v cwltool/schemas | grep -v cwltool/jshint/ | grep -v mypy-stubs | grep -v setup.cfg) \
|| (echo Probable typo foun. Run \"make codespell-fix\" to accept suggested fixes, or add the word to the ignore list in setup.cfg ; exit 1)

## codespell-fix : fix common misspellings
Expand Down
2 changes: 1 addition & 1 deletion build-cwltool-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ ${engine} run -t -v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v "$PWD":/tmp/cwltool \
quay.io/commonwl/cwltool_module /bin/sh -c \
"apk add gcc bash git && pip install -r/tmp/cwltool/test-requirements.txt ; pytest -k 'not (test_bioconda or test_double_overwrite or test_env_filtering or test_biocontainers or test_disable_file_overwrite_without_ext or test_disable_file_creation_in_outdir_with_ext or test_write_write_conflict or test_directory_literal_with_real_inputs_inside or test_revsort_workflow or test_stdin_with_id_preset or test_no_compute_chcksum or test_packed_workflow_execution[tests/wf/count-lines1-wf.cwl-tests/wf/wc-job.json-False] or test_sequential_workflow or test_single_process_subwf_subwf_inline_step)' --ignore-glob '*test_udocker.py' -n 0 -v -rs --pyargs cwltool"
"apk add gcc bash git && pip install -r/tmp/cwltool/test-requirements.txt ; pytest -k 'not (test_bioconda or test_double_overwrite or test_env_filtering or test_biocontainers or test_disable_file_overwrite_without_ext or test_disable_file_creation_in_outdir_with_ext or test_write_write_conflict or test_directory_literal_with_real_inputs_inside or test_revsort_workflow or test_revsort_label_annotations or test_stdin_with_id_preset or test_no_compute_chcksum or test_packed_workflow_execution[tests/wf/count-lines1-wf.cwl-tests/wf/wc-job.json-False] or test_sequential_workflow or test_single_process_subwf_subwf_inline_step)' --ignore-glob '*test_udocker.py' -n 0 -v -rs --pyargs cwltool"
131 changes: 106 additions & 25 deletions cwltool/cwlprov/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@
)
from .writablebagfile import create_job, write_bag_file # change this later

# from schema_salad.utils import convert_to_dict


if TYPE_CHECKING:
from .ro import ResearchObject

ProvType = Dict[Union[str, Identifier], Any]


def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType:
"""Create copy of job object for provenance."""
Expand Down Expand Up @@ -177,14 +182,14 @@ def host_provenance(document: ProvDocument) -> None:
# by a user account, as cwltool is a command line tool
account = self.document.agent(ACCOUNT_UUID)
if self.orcid or self.full_name:
person: Dict[Union[str, Identifier], Any] = {
person: ProvType = {
PROV_TYPE: PROV["Person"],
"prov:type": SCHEMA["Person"],
}
if self.full_name:
person["prov:label"] = self.full_name
person["foaf:name"] = self.full_name
person["schema:name"] = self.full_name
person[SCHEMA["name"]] = self.full_name
else:
# TODO: Look up name from ORCID API?
pass
Expand Down Expand Up @@ -235,15 +240,25 @@ def evaluate(
"""Evaluate the nature of job."""
if not hasattr(process, "steps"):
# record provenance of independent commandline tool executions
self.prospective_prov(job)
self.prospective_prov(job, process)
customised_job = copy_job_order(job, job_order_object)
self.used_artefacts(customised_job, self.workflow_run_uri)
create_job(research_obj, customised_job)
elif hasattr(job, "workflow"):
# record provenance of workflow executions
self.prospective_prov(job)
self.prospective_prov(job, process)
customised_job = copy_job_order(job, job_order_object)
self.used_artefacts(customised_job, self.workflow_run_uri)
# if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
# metadata = job_order_object[CWLPROV['prov'].uri] # change uri to CWLPROV['prov'].uri
# for item in metadata:
# # make a new entity with id
# # give it type additionalType value
# # add nested annotations
# # how much of this can we reuse from _add_nested_annotations?
# # how do we identify the correct file to write to? self.workflow_run_uri?
# #
# pass

def record_process_start(
self, process: Process, job: JobsType, process_run_id: Optional[str] = None
Expand Down Expand Up @@ -291,6 +306,30 @@ def record_process_end(
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

def _add_nested_annotations(
self, annotation_key: str, annotation_value: Any, e: ProvEntity
) -> ProvEntity:
"""Propagate input data annotations to provenance."""
# Change https:// into http:// first
if (schema2_uri := "https://schema.org/") in annotation_key:
annotation_key = SCHEMA[annotation_key.replace(schema2_uri, "")].uri

if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
e.add_attributes({annotation_key: str(annotation_value)})
elif isinstance(annotation_value, MutableSequence):
for item_value in annotation_value:
e = self._add_nested_annotations(annotation_key, item_value, e)
else:
nested_id = uuid.uuid4().urn
nested_entity = self.document.entity(nested_id)
e.add_attributes({annotation_key: nested_entity.identifier})
for nested_key in annotation_value:
nested_value = annotation_value[nested_key]
nested_entity = self._add_nested_annotations(
nested_key, nested_value, nested_entity
)
return e

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
Expand Down Expand Up @@ -341,6 +380,29 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
file_entity.add_attributes({CWLPROV["nameext"]: cast(str, value["nameext"])})
self.document.specializationOf(file_entity, entity)

# Identify all schema annotations
schema_annotations = {
v: value[v] for v in value.keys() if v.startswith("https://schema.org")

Check failure

Code scanning / CodeQL

Incomplete URL substring sanitization

The string [https://schema.org](1) may be at an arbitrary position in the sanitized URL.
}

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
atype = schema_annotations[s]
if isinstance(atype, str):
additional_type = atype.split(sep="/")[-1] # find better method?
file_entity.add_attributes({PROV_TYPE: SCHEMA[additional_type]})
else:
for a_entry in cast(List[str], atype):
additional_type = a_entry.split(sep="/")[-1] # find better method?
file_entity.add_attributes({PROV_TYPE: SCHEMA[additional_type]})
else:
file_entity = self._add_nested_annotations(s, schema_annotations[s], file_entity)

# Transfer format annotations to provenance:
if "format" in value:
file_entity.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Check for secondaries
for sec in cast(MutableSequence[CWLObjectType], value.get("secondaryFiles", [])):
# TODO: Record these in a specializationOf entity with UUID?
Expand Down Expand Up @@ -450,6 +512,21 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
coll.add_attributes(coll_attribs)
coll_b.add_attributes(coll_b_attribs)

# Identify all schema annotations
schema_annotations = {
v: value[v] for v in value.keys() if v.startswith("https://schema.org")

Check failure

Code scanning / CodeQL

Incomplete URL substring sanitization

The string [https://schema.org](1) may be at an arbitrary position in the sanitized URL.
}

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = cast(str, schema_annotations[s]).split(sep="/")[
-1
] # find better method?
coll.add_attributes({PROV_TYPE: SCHEMA[additional_type]})
elif "hasPart" not in s:
coll = self._add_nested_annotations(s, schema_annotations[s], coll)

# Also Save ORE Folder as annotation metadata
ore_doc = ProvDocument()
ore_doc.add_namespace(ORE)
Expand Down Expand Up @@ -498,7 +575,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
self.research_object.add_uri(entity.identifier.uri)
return entity

if isinstance(value, (str, str)):
if isinstance(value, str):
(entity, _) = self.declare_string(value)
return entity

Expand Down Expand Up @@ -661,35 +738,39 @@ def generate_output_prov(
entity, process_run_id, timestamp, None, {"prov:role": role}
)

def prospective_prov(self, job: JobsType) -> None:
def prospective_prov(self, job: JobsType, process: Process) -> None:
"""Create prospective prov recording as wfdesc prov:Plan."""
prov_items: ProvType = {
PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"],
"prov:type": PROV["Plan"],
"prov:label": "Prospective provenance",
}
if "doc" in process.tool:
prov_items[SCHEMA["description"]] = process.tool["doc"]
if "label" in process.tool:
prov_items[SCHEMA["name"]] = process.tool["label"]
# # TypeError: unhashable type: 'list'
# if "intent" in process.tool:
# prov_items[SCHEMA["featureList"]] = convert_to_dict(process.tool["intent"])
self.document.entity("wf:main", prov_items)
if not isinstance(job, WorkflowJob):
# direct command line tool execution
self.document.entity(
"wf:main",
{
PROV_TYPE: WFDESC["Process"],
"prov:type": PROV["Plan"],
"prov:label": "Prospective provenance",
},
)
return

self.document.entity(
"wf:main",
{
PROV_TYPE: WFDESC["Workflow"],
"prov:type": PROV["Plan"],
"prov:label": "Prospective provenance",
},
)

for step in job.steps:
stepnametemp = "wf:main/" + str(step.name)[5:]
stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
provstep_items: ProvType = {
PROV_TYPE: WFDESC["Process"],
"prov:type": PROV["Plan"],
}
# WorkflowStep level annotations
if "doc" in step.tool:
provstep_items[SCHEMA["description"]] = step.tool["doc"]
if "label" in step.tool:
provstep_items[SCHEMA["name"]] = step.tool["label"]
provstep = self.document.entity(
stepname,
{PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]},
provstep_items,
)
self.document.entity(
"wf:main",
Expand Down
2 changes: 1 addition & 1 deletion cwltool/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def add_writable_file_volume(
if self.inplace_update:
try:
os.link(os.path.realpath(volume.resolved), host_outdir_tgt)
except os.error:
except OSError:
shutil.copy(volume.resolved, host_outdir_tgt)
else:
shutil.copy(volume.resolved, host_outdir_tgt)
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ use_parentheses = True
line_length = 88

[codespell]
ignore-words-list=ORE,ore,RO,ro,recuse
builtin = clear
ignore-words-list = ORE,ore,RO,ro,recuse
47 changes: 46 additions & 1 deletion tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@


def cwltool(tmp_path: Path, *args: Any) -> Path:
out_folder = tmp_path / "out"
out_folder.mkdir()
prov_folder = tmp_path / "provenance"
prov_folder.mkdir()
new_args = ["--provenance", str(prov_folder)]
new_args = ["--provenance", str(prov_folder), "--outdir", str(out_folder)]
new_args.extend(args)
# Run within a temporary directory to not pollute git checkout
tmp_dir = tmp_path / "cwltool-run"
Expand Down Expand Up @@ -83,6 +85,49 @@ def test_revsort_workflow(tmp_path: Path) -> None:
check_provenance(folder)


@needs_docker
def test_revsort_label_annotations(tmp_path: Path) -> None:
"""Affirm that EDAM file formats in the input object make it into CWLProv."""
base_path = cwltool(
tmp_path,
get_data("tests/wf/revsort.cwl"),
get_data("tests/wf/revsort-job.json"),
)
prov_file = base_path / "metadata" / "provenance" / "primary.cwlprov.nt"
arcp_root = find_arcp(base_path)
g = Graph()
with open(prov_file, "rb") as f:
g.parse(file=f, format="nt", publicID=arcp_root)
mime_having_objects = list(g.subjects(SCHEMA.encodingFormat))
assert len(mime_having_objects) == 2
for obj in mime_having_objects:
assert (
cast(Literal, list(g.objects(obj, SCHEMA.encodingFormat))[0]).value
== "https://www.iana.org/assignments/media-types/text/plain"
)


def test_advanced_prov_annotations(tmp_path: Path) -> None:
"""Pass through of advanced input annotations."""
base_path = cwltool(
tmp_path,
get_data("tests/wf/adv_prov/niaa_wf.cwl"),
get_data("tests/wf/adv_prov/niaa_wf_job.yml"),
)
prov_file = base_path / "metadata" / "provenance" / "primary.cwlprov.nt"
arcp_root = find_arcp(base_path)
g = Graph()
with open(prov_file, "rb") as f:
g.parse(file=f, format="nt", publicID=arcp_root)
mime_having_objects = list(g.subjects(SCHEMA.encodingFormat))
assert len(mime_having_objects) == 8
# for obj in mime_having_objects:
# assert (
# cast(Literal, list(g.objects(obj, SCHEMA.encodingFormat))[0]).value
# == "https://www.iana.org/assignments/media-types/text/plain"
# )


@needs_docker
def test_revsort_workflow_shortcut(tmp_path: Path) -> None:
"""Confirm that using 'cwl:tool' shortcut still snapshots the CWL files."""
Expand Down
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Loading