@@ -793,6 +793,7 @@ def _get_node_info(workflow, node):
793
793
def _get_predecessors (workflow , node ):
794
794
# recursive method to get predecessors of a given node
795
795
pred = []
796
+
796
797
for pnode in workflow .graph .predecessors (node ):
797
798
pred = _get_predecessors (workflow , pnode )
798
799
cxns = {x [0 ]: x [2 ]
@@ -864,15 +865,17 @@ def _get_predecessors(workflow, node):
864
865
if wk_params ['sample' ]:
865
866
df = ST (self .study_id ).to_dataframe (samples = list (self ))
866
867
for k , v in wk_params ['sample' ].items ():
867
- if k not in df .columns or v not in df [k ].unique ():
868
+ if k not in df .columns or (v != '*' and v not in
869
+ df [k ].unique ()):
868
870
reqs_satisfied = False
869
871
else :
870
872
total_conditions_satisfied += 1
871
873
872
874
if wk_params ['prep' ]:
873
875
df = self .to_dataframe ()
874
876
for k , v in wk_params ['prep' ].items ():
875
- if k not in df .columns or v not in df [k ].unique ():
877
+ if k not in df .columns or (v != '*' and v not in
878
+ df [k ].unique ()):
876
879
reqs_satisfied = False
877
880
else :
878
881
total_conditions_satisfied += 1
@@ -890,117 +893,112 @@ def _get_predecessors(workflow, node):
890
893
891
894
# let's just keep one, let's give it preference to the one with the
892
895
# most total_conditions_satisfied
893
- workflows = sorted (workflows , key = lambda x : x [0 ], reverse = True )[: 1 ]
896
+ _ , wk = sorted (workflows , key = lambda x : x [0 ], reverse = True )[0 ]
894
897
missing_artifacts = dict ()
895
- for _ , wk in workflows :
896
- missing_artifacts [wk ] = dict ()
897
- for node , degree in wk .graph .out_degree ():
898
- if degree != 0 :
899
- continue
900
- mscheme = _get_node_info (wk , node )
901
- if mscheme not in merging_schemes :
902
- missing_artifacts [wk ][mscheme ] = node
903
- if not missing_artifacts [wk ]:
904
- del missing_artifacts [wk ]
898
+ for node , degree in wk .graph .out_degree ():
899
+ if degree != 0 :
900
+ continue
901
+ mscheme = _get_node_info (wk , node )
902
+ if mscheme not in merging_schemes :
903
+ missing_artifacts [mscheme ] = node
905
904
if not missing_artifacts :
906
905
# raises option b.
907
906
raise ValueError ('This preparation is complete' )
908
907
909
908
# 3.
910
- for wk , wk_data in missing_artifacts .items ():
911
- previous_jobs = dict ()
912
- for ma , node in wk_data .items ():
913
- predecessors = _get_predecessors (wk , node )
914
- predecessors .reverse ()
915
- cmds_to_create = []
916
- init_artifacts = None
917
- for i , (pnode , cnode , cxns ) in enumerate (predecessors ):
918
- cdp = cnode .default_parameter
919
- cdp_cmd = cdp .command
920
- params = cdp .values .copy ()
921
-
922
- icxns = {y : x for x , y in cxns .items ()}
923
- reqp = {x : icxns [y [1 ][0 ]]
924
- for x , y in cdp_cmd .required_parameters .items ()}
925
- cmds_to_create .append ([cdp_cmd , params , reqp ])
926
-
927
- info = _get_node_info (wk , pnode )
928
- if info in merging_schemes :
929
- if set (merging_schemes [info ]) >= set (cxns ):
930
- init_artifacts = merging_schemes [info ]
931
- break
932
- if init_artifacts is None :
933
- pdp = pnode .default_parameter
934
- pdp_cmd = pdp .command
935
- params = pdp .values .copy ()
936
- # verifying that the workflow.artifact_type is included
937
- # in the command input types or raise an error
938
- wkartifact_type = wk .artifact_type
939
- reqp = dict ()
940
- for x , y in pdp_cmd .required_parameters .items ():
941
- if wkartifact_type not in y [1 ]:
942
- raise ValueError (f'{ wkartifact_type } is not part '
943
- 'of this preparation and cannot '
944
- 'be applied' )
945
- reqp [x ] = wkartifact_type
946
-
947
- cmds_to_create .append ([pdp_cmd , params , reqp ])
948
-
949
- if starting_job is not None :
950
- init_artifacts = {
951
- wkartifact_type : f'{ starting_job .id } :' }
952
- else :
953
- init_artifacts = {wkartifact_type : self .artifact .id }
954
-
955
- cmds_to_create .reverse ()
956
- current_job = None
957
- loop_starting_job = starting_job
958
- for i , (cmd , params , rp ) in enumerate (cmds_to_create ):
959
- if loop_starting_job is not None :
960
- previous_job = loop_starting_job
961
- loop_starting_job = None
962
- else :
963
- previous_job = current_job
964
- if previous_job is None :
965
- req_params = dict ()
966
- for iname , dname in rp .items ():
967
- if dname not in init_artifacts :
968
- msg = (f'Missing Artifact type: "{ dname } " in '
969
- 'this preparation; this might be due '
970
- 'to missing steps or not having the '
971
- 'correct raw data.' )
972
- # raises option c.
973
- raise ValueError (msg )
974
- req_params [iname ] = init_artifacts [dname ]
975
- else :
976
- req_params = dict ()
977
- connections = dict ()
978
- for iname , dname in rp .items ():
979
- req_params [iname ] = f'{ previous_job .id } { dname } '
980
- connections [dname ] = iname
981
- params .update (req_params )
982
- job_params = qdb .software .Parameters .load (
983
- cmd , values_dict = params )
984
-
985
- if params in previous_jobs .values ():
986
- for x , y in previous_jobs .items ():
987
- if params == y :
988
- current_job = x
909
+ previous_jobs = dict ()
910
+ for ma , node in missing_artifacts .items ():
911
+ predecessors = _get_predecessors (wk , node )
912
+ predecessors .reverse ()
913
+ cmds_to_create = []
914
+ init_artifacts = None
915
+ for i , (pnode , cnode , cxns ) in enumerate (predecessors ):
916
+ cdp = cnode .default_parameter
917
+ cdp_cmd = cdp .command
918
+ params = cdp .values .copy ()
919
+
920
+ icxns = {y : x for x , y in cxns .items ()}
921
+ reqp = {x : icxns [y [1 ][0 ]]
922
+ for x , y in cdp_cmd .required_parameters .items ()}
923
+ cmds_to_create .append ([cdp_cmd , params , reqp ])
924
+
925
+ info = _get_node_info (wk , pnode )
926
+ if info in merging_schemes :
927
+ if set (merging_schemes [info ]) >= set (cxns ):
928
+ init_artifacts = merging_schemes [info ]
929
+ break
930
+ if init_artifacts is None :
931
+ pdp = pnode .default_parameter
932
+ pdp_cmd = pdp .command
933
+ params = pdp .values .copy ()
934
+ # verifying that the workflow.artifact_type is included
935
+ # in the command input types or raise an error
936
+ wkartifact_type = wk .artifact_type
937
+ reqp = dict ()
938
+ for x , y in pdp_cmd .required_parameters .items ():
939
+ if wkartifact_type not in y [1 ]:
940
+ raise ValueError (f'{ wkartifact_type } is not part '
941
+ 'of this preparation and cannot '
942
+ 'be applied' )
943
+ reqp [x ] = wkartifact_type
944
+
945
+ cmds_to_create .append ([pdp_cmd , params , reqp ])
946
+
947
+ if starting_job is not None :
948
+ init_artifacts = {
949
+ wkartifact_type : f'{ starting_job .id } :' }
950
+ else :
951
+ init_artifacts = {wkartifact_type : self .artifact .id }
952
+
953
+ cmds_to_create .reverse ()
954
+ current_job = None
955
+ loop_starting_job = starting_job
956
+ for i , (cmd , params , rp ) in enumerate (cmds_to_create ):
957
+ if loop_starting_job is not None :
958
+ previous_job = loop_starting_job
959
+ loop_starting_job = None
960
+ else :
961
+ previous_job = current_job
962
+ if previous_job is None :
963
+ req_params = dict ()
964
+ for iname , dname in rp .items ():
965
+ if dname not in init_artifacts :
966
+ msg = (f'Missing Artifact type: "{ dname } " in '
967
+ 'this preparation; this might be due '
968
+ 'to missing steps or not having the '
969
+ 'correct raw data.' )
970
+ # raises option c.
971
+ raise ValueError (msg )
972
+ req_params [iname ] = init_artifacts [dname ]
973
+ else :
974
+ req_params = dict ()
975
+ connections = dict ()
976
+ for iname , dname in rp .items ():
977
+ req_params [iname ] = f'{ previous_job .id } { dname } '
978
+ connections [dname ] = iname
979
+ params .update (req_params )
980
+ job_params = qdb .software .Parameters .load (
981
+ cmd , values_dict = params )
982
+
983
+ if params in previous_jobs .values ():
984
+ for x , y in previous_jobs .items ():
985
+ if params == y :
986
+ current_job = x
987
+ else :
988
+ if workflow is None :
989
+ PW = qdb .processing_job .ProcessingWorkflow
990
+ workflow = PW .from_scratch (user , job_params )
991
+ current_job = [
992
+ j for j in workflow .graph .nodes ()][0 ]
989
993
else :
990
- if workflow is None :
991
- PW = qdb .processing_job .ProcessingWorkflow
992
- workflow = PW .from_scratch (user , job_params )
993
- current_job = [
994
- j for j in workflow .graph .nodes ()][0 ]
994
+ if previous_job is None :
995
+ current_job = workflow .add (
996
+ job_params , req_params = req_params )
995
997
else :
996
- if previous_job is None :
997
- current_job = workflow .add (
998
- job_params , req_params = req_params )
999
- else :
1000
- current_job = workflow .add (
1001
- job_params , req_params = req_params ,
1002
- connections = {previous_job : connections })
1003
- previous_jobs [current_job ] = params
998
+ current_job = workflow .add (
999
+ job_params , req_params = req_params ,
1000
+ connections = {previous_job : connections })
1001
+ previous_jobs [current_job ] = params
1004
1002
1005
1003
return workflow
1006
1004
0 commit comments