4
4
import copy
5
5
import datetime
6
6
import hashlib
7
- import io
8
7
import logging
9
8
import os
10
9
import os .path
15
14
import uuid
16
15
from collections import OrderedDict
17
16
from getpass import getuser
18
- from io import open
17
+ from io import BytesIO , FileIO , TextIOWrapper , open
19
18
from socket import getfqdn
20
- from typing import (IO , Any , Callable , Dict , List , MutableMapping , Optional ,
21
- Set , Tuple , Union , cast )
19
+ from typing import (IO , Any , Callable , Dict , List , Generator , MutableMapping ,
20
+ Optional , Set , Tuple , Union , cast )
22
21
23
22
import prov .model as provM
24
23
import six
@@ -73,7 +72,7 @@ class PermissionError(OSError): # pylint: disable=redefined-builtin
73
72
# 2. Bump minor number if adding resources or PROV statements
74
73
# 3. Bump patch number for non-breaking non-adding changes,
75
74
# e.g. fixing broken relative paths
76
- CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.4 .0"
75
+ CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.5 .0"
77
76
78
77
# Research Object folders
79
78
METADATA = "metadata"
@@ -149,7 +148,7 @@ def _whoami():
149
148
return (username , fullname )
150
149
151
150
152
- class WritableBagFile (io . FileIO ):
151
+ class WritableBagFile (FileIO ):
153
152
"""Writes files in research object."""
154
153
155
154
def __init__ (self , research_object , rel_path ):
@@ -209,7 +208,7 @@ def readable(self):
209
208
210
209
def truncate (self , size = None ):
211
210
# type: (Optional[int]) -> int
212
- # FIXME: This breaks contract io. IOBase,
211
+ # FIXME: This breaks contract IOBase,
213
212
# as it means we would have to recalculate the hash
214
213
if size is not None :
215
214
raise IOError ("WritableBagFile can't truncate" )
@@ -458,7 +457,7 @@ def evaluate(self,
458
457
self .prospective_prov (job )
459
458
customised_job = copy_job_order (job , job_order_object )
460
459
self .used_artefacts (customised_job , self .workflow_run_uri )
461
- research_obj .create_job (job , customised_job )
460
+ research_obj .create_job (customised_job , job )
462
461
# self.used_artefacts(inputs, self.workflow_run_uri)
463
462
name = ""
464
463
if hasattr (job , "name" ):
@@ -680,7 +679,7 @@ def declare_directory(self, value): # type: (MutableMapping) -> ProvEntity
680
679
def declare_string (self , value ):
681
680
# type: (Union[Text, str]) -> Tuple[ProvEntity,Text]
682
681
"""Save as string in UTF-8."""
683
- byte_s = io . BytesIO (str (value ).encode (ENCODING ))
682
+ byte_s = BytesIO (str (value ).encode (ENCODING ))
684
683
data_file = self .research_object .add_data_file (byte_s , content_type = TEXT_PLAIN )
685
684
checksum = posixpath .basename (data_file )
686
685
# FIXME: Don't naively assume add_data_file uses hash in filename!
@@ -716,7 +715,7 @@ def declare_artefact(self, value):
716
715
717
716
if isinstance (value , bytes ):
718
717
# If we got here then we must be in Python 3
719
- byte_s = io . BytesIO (value )
718
+ byte_s = BytesIO (value )
720
719
data_file = self .research_object .add_data_file (byte_s )
721
720
# FIXME: Don't naively assume add_data_file uses hash in filename!
722
721
data_id = "data:%s" % posixpath .split (data_file )[1 ]
@@ -1051,13 +1050,13 @@ def write_bag_file(self, path, encoding=ENCODING):
1051
1050
# type: (Text, Optional[str]) -> IO
1052
1051
"""Write the bag file into our research object."""
1053
1052
# For some reason below throws BlockingIOError
1054
- #fp = io. BufferedWriter(WritableBagFile(self, path))
1053
+ #fp = BufferedWriter(WritableBagFile(self, path))
1055
1054
bag_file = cast (IO , WritableBagFile (self , path ))
1056
1055
if encoding :
1057
1056
# encoding: match Tag-File-Character-Encoding: UTF-8
1058
1057
# newline: ensure LF also on Windows
1059
1058
return cast (IO ,
1060
- io . TextIOWrapper (bag_file , encoding = encoding , newline = "\n " ))
1059
+ TextIOWrapper (bag_file , encoding = encoding , newline = "\n " ))
1061
1060
return bag_file
1062
1061
1063
1062
def add_tagfile (self , path , when = None ):
@@ -1490,16 +1489,22 @@ def _add_to_bagit(self, rel_path, **checksums):
1490
1489
self .add_to_manifest (rel_path , checksums )
1491
1490
1492
1491
def create_job (self ,
1493
- wf_job ,
1494
- builder_job # type: Dict
1492
+ builder_job , # type: Dict[Text, Any]
1493
+ wf_job = None , # type: Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]
1494
+ is_output = False
1495
1495
): # type: (...) -> Dict
1496
1496
#TODO customise the file
1497
1497
"""Generate the new job object with RO specific relative paths."""
1498
1498
copied = copy .deepcopy (builder_job )
1499
- relativised_input_objecttemp = {} # type: Dict[Any , Any]
1499
+ relativised_input_objecttemp = {} # type: Dict[Text , Any]
1500
1500
self ._relativise_files (copied )
1501
- rel_path = posixpath .join (_posix_path (WORKFLOW ), "primary-job.json" )
1502
- j = json_dumps (copied , indent = 4 , ensure_ascii = False )
1501
+ def jdefault (o ):
1502
+ return dict (o )
1503
+ if is_output :
1504
+ rel_path = posixpath .join (_posix_path (WORKFLOW ), "primary-output.json" )
1505
+ else :
1506
+ rel_path = posixpath .join (_posix_path (WORKFLOW ), "primary-job.json" )
1507
+ j = json_dumps (copied , indent = 4 , ensure_ascii = False , default = jdefault )
1503
1508
with self .write_bag_file (rel_path ) as file_path :
1504
1509
file_path .write (j + u"\n " )
1505
1510
_logger .debug (u"[provenance] Generated customised job file: %s" ,
@@ -1520,7 +1525,7 @@ def create_job(self,
1520
1525
return self .relativised_input_object
1521
1526
1522
1527
def _relativise_files (self , structure ):
1523
- # type: (Any, Dict) -> None
1528
+ # type: (Any, Dict[Any, Any] ) -> None
1524
1529
"""Save any file objects into the RO and update the local paths."""
1525
1530
# Base case - we found a File we need to update
1526
1531
_logger .debug (u"[provenance] Relativising: %s" , structure )
0 commit comments