13
13
import urllib
14
14
from functools import cmp_to_key , partial
15
15
from typing import (
16
- IO ,
17
16
Any ,
18
17
Callable ,
19
18
Dict ,
24
23
MutableSequence ,
25
24
Optional ,
26
25
Set ,
26
+ TextIO ,
27
27
Union ,
28
28
cast ,
29
29
)
60
60
from .utils import (
61
61
CWLObjectType ,
62
62
CWLOutputType ,
63
+ DirectoryType ,
63
64
JobsGeneratorType ,
64
65
OutputCallbackType ,
65
66
adjustDirObjs ,
@@ -130,9 +131,20 @@ def run(
130
131
try :
131
132
normalizeFilesDirs (self .builder .job )
132
133
ev = self .builder .do_eval (self .script )
133
- normalizeFilesDirs (ev )
134
+ normalizeFilesDirs (
135
+ cast (
136
+ Optional [
137
+ Union [
138
+ MutableSequence [MutableMapping [str , Any ]],
139
+ MutableMapping [str , Any ],
140
+ DirectoryType ,
141
+ ]
142
+ ],
143
+ ev ,
144
+ )
145
+ )
134
146
if self .output_callback :
135
- self .output_callback (ev , "success" )
147
+ self .output_callback (cast ( Optional [ CWLObjectType ], ev ) , "success" )
136
148
except WorkflowException as err :
137
149
_logger .warning (
138
150
"Failed to evaluate expression:\n %s" ,
@@ -385,7 +397,7 @@ def make_job_runner(self, runtimeContext: RuntimeContext) -> Type[JobBase]:
385
397
386
398
def make_path_mapper (
387
399
self ,
388
- reffiles : List [Any ],
400
+ reffiles : List [CWLObjectType ],
389
401
stagedir : str ,
390
402
runtimeContext : RuntimeContext ,
391
403
separateDirs : bool ,
@@ -561,7 +573,7 @@ def calc_checksum(location: str) -> Optional[str]:
561
573
562
574
def update_status_output_callback (
563
575
output_callbacks : OutputCallbackType ,
564
- jobcachelock : IO [ Any ] ,
576
+ jobcachelock : TextIO ,
565
577
outputs : Optional [CWLObjectType ],
566
578
processStatus : str ,
567
579
) -> None :
@@ -620,9 +632,11 @@ def update_status_output_callback(
620
632
621
633
initialWorkdir , _ = self .get_requirement ("InitialWorkDirRequirement" )
622
634
if initialWorkdir is not None :
623
- ls = [] # type: List[Dict[str, Any] ]
635
+ ls = [] # type: List[CWLObjectType ]
624
636
if isinstance (initialWorkdir ["listing" ], str ):
625
- ls = builder .do_eval (initialWorkdir ["listing" ])
637
+ ls = cast (
638
+ List [CWLObjectType ], builder .do_eval (initialWorkdir ["listing" ])
639
+ )
626
640
else :
627
641
for t in cast (
628
642
MutableSequence [Union [str , CWLObjectType ]],
@@ -644,13 +658,13 @@ def update_status_output_callback(
644
658
if et ["entry" ] is not None :
645
659
ls .append (et )
646
660
else :
647
- initwd_item = builder .do_eval (cast ( str , t ) )
661
+ initwd_item = builder .do_eval (t )
648
662
if not initwd_item :
649
663
continue
650
664
if isinstance (initwd_item , MutableSequence ):
651
- ls .extend (initwd_item )
665
+ ls .extend (cast ( List [ CWLObjectType ], initwd_item ) )
652
666
else :
653
- ls .append (initwd_item )
667
+ ls .append (cast ( CWLObjectType , initwd_item ) )
654
668
for i , t2 in enumerate (ls ):
655
669
if "entry" in t2 :
656
670
if isinstance (t2 ["entry" ], str ):
@@ -663,10 +677,11 @@ def update_status_output_callback(
663
677
else :
664
678
if t2 .get ("entryname" ) or t2 .get ("writable" ):
665
679
t2 = copy .deepcopy (t2 )
680
+ t2entry = cast (CWLObjectType , t2 ["entry" ])
666
681
if t2 .get ("entryname" ):
667
- t2 [ "entry" ] ["basename" ] = t2 ["entryname" ]
668
- t2 [ "entry" ] ["writable" ] = t2 .get ("writable" )
669
- ls [i ] = t2 ["entry" ]
682
+ t2entry ["basename" ] = t2 ["entryname" ]
683
+ t2entry ["writable" ] = t2 .get ("writable" )
684
+ ls [i ] = cast ( CWLObjectType , t2 ["entry" ])
670
685
j .generatefiles ["listing" ] = ls
671
686
for entry in ls :
672
687
self .updatePathmap (builder .outdir , builder .pathmapper , entry )
@@ -689,13 +704,13 @@ def update_status_output_callback(
689
704
690
705
if self .tool .get ("stdin" ):
691
706
with SourceLine (self .tool , "stdin" , ValidationException , debug ):
692
- j .stdin = builder .do_eval (self .tool ["stdin" ])
707
+ j .stdin = cast ( str , builder .do_eval (self .tool ["stdin" ]) )
693
708
if j .stdin :
694
709
reffiles .append ({"class" : "File" , "path" : j .stdin })
695
710
696
711
if self .tool .get ("stderr" ):
697
712
with SourceLine (self .tool , "stderr" , ValidationException , debug ):
698
- j .stderr = builder .do_eval (self .tool ["stderr" ])
713
+ j .stderr = cast ( str , builder .do_eval (self .tool ["stderr" ]) )
699
714
if j .stderr :
700
715
if os .path .isabs (j .stderr ) or ".." in j .stderr :
701
716
raise ValidationException (
@@ -704,7 +719,7 @@ def update_status_output_callback(
704
719
705
720
if self .tool .get ("stdout" ):
706
721
with SourceLine (self .tool , "stdout" , ValidationException , debug ):
707
- j .stdout = builder .do_eval (self .tool ["stdout" ])
722
+ j .stdout = cast ( str , builder .do_eval (self .tool ["stdout" ]) )
708
723
if j .stdout :
709
724
if os .path .isabs (j .stdout ) or ".." in j .stdout or not j .stdout :
710
725
raise ValidationException (
@@ -738,21 +753,21 @@ def update_status_output_callback(
738
753
j .inplace_update = cast (bool , inplaceUpdateReq ["inplaceUpdate" ])
739
754
normalizeFilesDirs (j .generatefiles )
740
755
741
- readers = {} # type: Dict[str, Any ]
756
+ readers = {} # type: Dict[str, CWLObjectType ]
742
757
muts = set () # type: Set[str]
743
758
744
759
if builder .mutation_manager is not None :
745
760
746
- def register_mut (f ): # type: (Dict[str, Any] ) -> None
761
+ def register_mut (f : CWLObjectType ) -> None :
747
762
mm = cast (MutationManager , builder .mutation_manager )
748
- muts .add (f ["location" ])
763
+ muts .add (cast ( str , f ["location" ]) )
749
764
mm .register_mutation (j .name , f )
750
765
751
- def register_reader (f ): # type: (Dict[str, Any] ) -> None
766
+ def register_reader (f : CWLObjectType ) -> None :
752
767
mm = cast (MutationManager , builder .mutation_manager )
753
- if f ["location" ] not in muts :
768
+ if cast ( str , f ["location" ]) not in muts :
754
769
mm .register_reader (j .name , f )
755
- readers [f ["location" ]] = copy .deepcopy (f )
770
+ readers [cast ( str , f ["location" ]) ] = copy .deepcopy (f )
756
771
757
772
for li in j .generatefiles ["listing" ]:
758
773
if li .get ("writable" ) and j .inplace_update :
@@ -770,8 +785,9 @@ def register_reader(f): # type: (Dict[str, Any]) -> None
770
785
timelimit , _ = self .get_requirement ("ToolTimeLimit" )
771
786
if timelimit is not None :
772
787
with SourceLine (timelimit , "timelimit" , ValidationException , debug ):
773
- j .timelimit = builder .do_eval (
774
- cast (Union [int , str ], timelimit ["timelimit" ])
788
+ j .timelimit = cast (
789
+ Optional [int ],
790
+ builder .do_eval (cast (Union [int , str ], timelimit ["timelimit" ])),
775
791
)
776
792
if not isinstance (j .timelimit , int ) or j .timelimit < 0 :
777
793
raise WorkflowException (
@@ -781,8 +797,11 @@ def register_reader(f): # type: (Dict[str, Any]) -> None
781
797
networkaccess , _ = self .get_requirement ("NetworkAccess" )
782
798
if networkaccess is not None :
783
799
with SourceLine (networkaccess , "networkAccess" , ValidationException , debug ):
784
- j .networkaccess = builder .do_eval (
785
- cast (Union [bool , str ], networkaccess ["networkAccess" ])
800
+ j .networkaccess = cast (
801
+ bool ,
802
+ builder .do_eval (
803
+ cast (Union [bool , str ], networkaccess ["networkAccess" ])
804
+ ),
786
805
)
787
806
if not isinstance (j .networkaccess , bool ):
788
807
raise WorkflowException (
@@ -792,8 +811,10 @@ def register_reader(f): # type: (Dict[str, Any]) -> None
792
811
j .environment = {}
793
812
evr , _ = self .get_requirement ("EnvVarRequirement" )
794
813
if evr is not None :
795
- for t2 in cast (List [Dict [str , str ]], evr ["envDef" ]):
796
- j .environment [t2 ["envName" ]] = builder .do_eval (t2 ["envValue" ])
814
+ for t3 in cast (List [Dict [str , str ]], evr ["envDef" ]):
815
+ j .environment [t3 ["envName" ]] = cast (
816
+ str , builder .do_eval (t3 ["envValue" ])
817
+ )
797
818
798
819
shellcmd , _ = self .get_requirement ("ShellCommandRequirement" )
799
820
if shellcmd is not None :
@@ -822,13 +843,13 @@ def register_reader(f): # type: (Dict[str, Any]) -> None
822
843
823
844
def collect_output_ports (
824
845
self ,
825
- ports : Union [CommentedSeq , Set [Dict [ str , Any ] ]],
846
+ ports : Union [CommentedSeq , Set [CWLObjectType ]],
826
847
builder : Builder ,
827
848
outdir : str ,
828
849
rcode : int ,
829
850
compute_checksum : bool = True ,
830
851
jobname : str = "" ,
831
- readers : Optional [Dict [str , Any ]] = None ,
852
+ readers : Optional [MutableMapping [str , CWLObjectType ]] = None ,
832
853
) -> OutputPortsType :
833
854
ret = {} # type: OutputPortsType
834
855
debug = _logger .isEnabledFor (logging .DEBUG )
@@ -869,9 +890,7 @@ def collect_output_ports(
869
890
if ret :
870
891
revmap = partial (revmap_file , builder , outdir )
871
892
adjustDirObjs (ret , trim_listing )
872
- visit_class (
873
- ret , ("File" , "Directory" ), cast (Callable [[Any ], Any ], revmap )
874
- )
893
+ visit_class (ret , ("File" , "Directory" ), revmap )
875
894
visit_class (ret , ("File" , "Directory" ), remove_path )
876
895
normalizeFilesDirs (ret )
877
896
visit_class (
@@ -915,7 +934,10 @@ def collect_output(
915
934
empty_and_optional = False
916
935
debug = _logger .isEnabledFor (logging .DEBUG )
917
936
if "outputBinding" in schema :
918
- binding = cast (Dict [str , Any ], schema ["outputBinding" ])
937
+ binding = cast (
938
+ MutableMapping [str , Union [bool , str , List [str ]]],
939
+ schema ["outputBinding" ],
940
+ )
919
941
globpatterns = [] # type: List[str]
920
942
921
943
revmap = partial (revmap_file , builder , outdir )
@@ -1015,8 +1037,8 @@ def collect_output(
1015
1037
if "outputEval" in binding :
1016
1038
with SourceLine (binding , "outputEval" , WorkflowException , debug ):
1017
1039
result = builder .do_eval (
1018
- binding ["outputEval" ], context = r
1019
- ) # type: CWLOutputType
1040
+ cast ( CWLOutputType , binding ["outputEval" ]) , context = r
1041
+ )
1020
1042
else :
1021
1043
result = cast (CWLOutputType , r )
1022
1044
@@ -1088,7 +1110,7 @@ def collect_output(
1088
1110
if "format" in schema :
1089
1111
for primary in aslist (result ):
1090
1112
primary ["format" ] = builder .do_eval (
1091
- cast ( Union [ str , List [ str ]], schema ["format" ]) , context = primary
1113
+ schema ["format" ], context = primary
1092
1114
)
1093
1115
1094
1116
# Ensure files point to local references outside of the run environment
@@ -1108,8 +1130,8 @@ def collect_output(
1108
1130
and schema ["type" ]["type" ] == "record"
1109
1131
):
1110
1132
out = {}
1111
- for field in cast (List [Dict [ str , Any ] ], schema ["type" ]["fields" ]):
1112
- out [shortname (field ["name" ])] = self .collect_output (
1133
+ for field in cast (List [CWLObjectType ], schema ["type" ]["fields" ]):
1134
+ out [shortname (cast ( str , field ["name" ]) )] = self .collect_output (
1113
1135
field , builder , outdir , fs_access , compute_checksum = compute_checksum
1114
1136
)
1115
1137
return out
0 commit comments