|
51 | 51 | )
|
52 | 52 | from .writablebagfile import create_job, write_bag_file # change this later
|
53 | 53 |
|
| 54 | +# from schema_salad.utils import convert_to_dict |
| 55 | + |
| 56 | + |
54 | 57 | if TYPE_CHECKING:
|
55 | 58 | from .ro import ResearchObject
|
56 | 59 |
|
| 60 | +_attributes_type = Dict[Union[str, Identifier], Any] |
| 61 | + |
57 | 62 |
|
58 | 63 | def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType:
|
59 | 64 | """Create copy of job object for provenance."""
|
@@ -235,13 +240,13 @@ def evaluate(
|
235 | 240 | """Evaluate the nature of job."""
|
236 | 241 | if not hasattr(process, "steps"):
|
237 | 242 | # record provenance of independent commandline tool executions
|
238 |
| - self.prospective_prov(job) |
| 243 | + self.prospective_prov(job, process) |
239 | 244 | customised_job = copy_job_order(job, job_order_object)
|
240 | 245 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
241 | 246 | create_job(research_obj, customised_job)
|
242 | 247 | elif hasattr(job, "workflow"):
|
243 | 248 | # record provenance of workflow executions
|
244 |
| - self.prospective_prov(job) |
| 249 | + self.prospective_prov(job, process) |
245 | 250 | customised_job = copy_job_order(job, job_order_object)
|
246 | 251 | self.used_artefacts(customised_job, self.workflow_run_uri)
|
247 | 252 | # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
|
@@ -306,8 +311,7 @@ def _add_nested_annotations(
|
306 | 311 | ) -> ProvEntity:
|
307 | 312 | """Propagate input data annotations to provenance."""
|
308 | 313 | # Change https:// into http:// first
|
309 |
| - schema2_uri = "https://schema.org/" |
310 |
| - if schema2_uri in annotation_key: |
| 314 | + if (schema2_uri := "https://schema.org/") in annotation_key: |
311 | 315 | annotation_key = SCHEMA[annotation_key.replace(schema2_uri, "")].uri
|
312 | 316 |
|
313 | 317 | if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
|
@@ -377,9 +381,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
|
377 | 381 | self.document.specializationOf(file_entity, entity)
|
378 | 382 |
|
379 | 383 | # Identify all schema annotations
|
380 |
| - schema_annotations = dict( |
381 |
| - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
382 |
| - ) |
| 384 | + schema_annotations = { |
| 385 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 386 | + } |
383 | 387 |
|
384 | 388 | # Transfer SCHEMA annotations to provenance
|
385 | 389 | for s in schema_annotations:
|
@@ -509,9 +513,9 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
|
509 | 513 | coll_b.add_attributes(coll_b_attribs)
|
510 | 514 |
|
511 | 515 | # Identify all schema annotations
|
512 |
| - schema_annotations = dict( |
513 |
| - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
514 |
| - ) |
| 516 | + schema_annotations = { |
| 517 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 518 | + } |
515 | 519 |
|
516 | 520 | # Transfer SCHEMA annotations to provenance
|
517 | 521 | for s in schema_annotations:
|
@@ -571,7 +575,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
|
571 | 575 | self.research_object.add_uri(entity.identifier.uri)
|
572 | 576 | return entity
|
573 | 577 |
|
574 |
| - if isinstance(value, (str, str)): |
| 578 | + if isinstance(value, str): |
575 | 579 | (entity, _) = self.declare_string(value)
|
576 | 580 | return entity
|
577 | 581 |
|
@@ -734,35 +738,38 @@ def generate_output_prov(
|
734 | 738 | entity, process_run_id, timestamp, None, {"prov:role": role}
|
735 | 739 | )
|
736 | 740 |
|
737 |
| - def prospective_prov(self, job: JobsType) -> None: |
| 741 | + def prospective_prov(self, job: JobsType, process: Process) -> None: |
738 | 742 | """Create prospective prov recording as wfdesc prov:Plan."""
|
| 743 | + prov_items: _attributes_type = { |
| 744 | + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], |
| 745 | + "prov:type": PROV["Plan"], |
| 746 | + "prov:label": "Prospective provenance", |
| 747 | + } |
| 748 | + if "doc" in process.tool: |
| 749 | + prov_items["schema:description"] = process.tool["doc"] |
| 750 | + if "label" in process.tool: |
| 751 | + prov_items["schema:name"] = process.tool["label"] |
| 752 | + # # TypeError: unhashable type: 'list' |
| 753 | + # if "intent" in process.tool: |
| 754 | + # prov_items["schema:featureList"] = convert_to_dict(process.tool["intent"]) |
| 755 | + self.document.entity("wf:main", prov_items) |
739 | 756 | if not isinstance(job, WorkflowJob):
|
740 |
| - # direct command line tool execution |
741 |
| - self.document.entity( |
742 |
| - "wf:main", |
743 |
| - { |
744 |
| - PROV_TYPE: WFDESC["Process"], |
745 |
| - "prov:type": PROV["Plan"], |
746 |
| - "prov:label": "Prospective provenance", |
747 |
| - }, |
748 |
| - ) |
749 | 757 | return
|
750 | 758 |
|
751 |
| - self.document.entity( |
752 |
| - "wf:main", |
753 |
| - { |
754 |
| - PROV_TYPE: WFDESC["Workflow"], |
755 |
| - "prov:type": PROV["Plan"], |
756 |
| - "prov:label": "Prospective provenance", |
757 |
| - }, |
758 |
| - ) |
759 |
| - |
760 | 759 | for step in job.steps:
|
761 | 760 | stepnametemp = "wf:main/" + str(step.name)[5:]
|
762 | 761 | stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
|
| 762 | + provstep_items: _attributes_type = { |
| 763 | + PROV_TYPE: WFDESC["Process"], |
| 764 | + "prov:type": PROV["Plan"], |
| 765 | + } |
| 766 | + if "doc" in step.tool: |
| 767 | + provstep_items["schema:description"] = step.tool["doc"] |
| 768 | + if "label" in step.tool: |
| 769 | + provstep_items["schema:name"] = step.tool["label"] |
763 | 770 | provstep = self.document.entity(
|
764 | 771 | stepname,
|
765 |
| - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, |
| 772 | + provstep_items, |
766 | 773 | )
|
767 | 774 | self.document.entity(
|
768 | 775 | "wf:main",
|
|
0 commit comments