@@ -19,8 +19,9 @@ def run(*args, **kwargs):
19
19
"""Runs a pipeline and returns the execution.
20
20
21
21
:param str pipeline_path: the absolute path to the pipeline .nf file.
22
- :param str run_path: the location to run the pipeline in.
23
- :param str output_path: the location to store the output in.
22
+ :param str run_path: the location to run the pipeline in (if not current directory).
23
+ :param str output_path: the location to store the output in (if not run path).
24
+ :param str log_path: the location to store the log in (if not output path).
24
25
:param resume: whether to resume an existing execution.
25
26
:param function runner: a function to run the pipeline command.
26
27
:param str version: the nextflow version to use.
@@ -42,8 +43,9 @@ def run_and_poll(*args, **kwargs):
42
43
update.
43
44
44
45
:param str pipeline_path: the absolute path to the pipeline .nf file.
45
- :param str run_path: the location to run the pipeline in.
46
- :param str output_path: the location to store the output in.
46
+ :param str run_path: the location to run the pipeline in (if not current directory).
47
+ :param str output_path: the location to store the output in (if not run path).
48
+ :param str log_path: the location to store the log in (if not output path).
47
49
:param resume: whether to resume an existing execution.
48
50
:param function runner: a function to run the pipeline command.
49
51
:param str version: the nextflow version to use.
@@ -63,14 +65,17 @@ def run_and_poll(*args, **kwargs):
63
65
64
66
65
67
def _run (
66
- pipeline_path , resume = False , poll = False , run_path = None , output_path = None , runner = None ,
68
+ pipeline_path , resume = False , poll = False , run_path = None , output_path = None ,
69
+ log_path = None ,runner = None ,
67
70
version = None , configs = None , params = None , profiles = None , timezone = None ,
68
71
report = None , timeline = None , dag = None , trace = None , sleep = 1
69
72
):
70
73
if not run_path : run_path = os .path .abspath ("." )
74
+ if not output_path : output_path = run_path
75
+ if not log_path : log_path = output_path
71
76
nextflow_command = make_nextflow_command (
72
- run_path , output_path , pipeline_path , resume , version , configs , params ,
73
- profiles , timezone , report , timeline , dag , trace
77
+ run_path , output_path , log_path , pipeline_path , resume , version , configs ,
78
+ params , profiles , timezone , report , timeline , dag , trace
74
79
)
75
80
start = datetime .now ()
76
81
if runner :
@@ -81,11 +86,11 @@ def _run(
81
86
nextflow_command , universal_newlines = True , shell = True
82
87
)
83
88
execution , log_start = None , 0
84
- if resume : wait_for_log_creation (output_path or run_path , start )
89
+ if resume : wait_for_log_creation (log_path , start )
85
90
while True :
86
91
time .sleep (sleep )
87
92
execution , diff = get_execution (
88
- output_path or run_path , nextflow_command , execution , log_start
93
+ output_path , log_path , nextflow_command , execution , log_start
89
94
)
90
95
log_start += diff
91
96
if execution and poll : yield execution
@@ -95,11 +100,12 @@ def _run(
95
100
break
96
101
97
102
98
- def make_nextflow_command (run_path , output_path , pipeline_path , resume ,version , configs , params , profiles , timezone , report , timeline , dag , trace ):
103
+ def make_nextflow_command (run_path , output_path , log_path , pipeline_path , resume ,version , configs , params , profiles , timezone , report , timeline , dag , trace ):
99
104
"""Generates the `nextflow run` commmand.
100
105
101
106
:param str run_path: the location to run the pipeline in.
102
107
:param str output_path: the location to store the output in.
108
+ :param str log_path: the location to store the log in.
103
109
:param str pipeline_path: the absolute path to the pipeline .nf file.
104
110
:param bool resume: whether to resume an existing execution.
105
111
:param str version: the nextflow version to use.
@@ -113,10 +119,10 @@ def make_nextflow_command(run_path, output_path, pipeline_path, resume,version,
113
119
:param str trace: the filename to use for the trace report.
114
120
:rtype: ``str``"""
115
121
116
- env = make_nextflow_command_env_string (version , timezone , output_path )
122
+ env = make_nextflow_command_env_string (version , timezone , output_path , run_path )
117
123
if env : env += " "
118
124
nf = "nextflow -Duser.country=US"
119
- log = make_nextflow_command_log_string (output_path )
125
+ log = make_nextflow_command_log_string (log_path , run_path )
120
126
if log : log += " "
121
127
configs = make_nextflow_command_config_string (configs )
122
128
if configs : configs += " "
@@ -126,15 +132,15 @@ def make_nextflow_command(run_path, output_path, pipeline_path, resume,version,
126
132
profiles = make_nextflow_command_profiles_string (profiles )
127
133
reports = make_reports_string (output_path , report , timeline , dag , trace )
128
134
command = f"{ env } { nf } { log } { configs } run { pipeline_path } { resume } { params } { profiles } { reports } "
129
- if run_path : command = f"cd { run_path } ; { command } "
130
- prefix = (str (output_path ) + os .path .sep ) if output_path else ""
135
+ if run_path != os . path . abspath ( "." ) : command = f"cd { run_path } ; { command } "
136
+ prefix = (str (output_path ) + os .path .sep ) if output_path != run_path else ""
131
137
command = command .rstrip () + f" >{ prefix } "
132
138
command += f"stdout.txt 2>{ prefix } "
133
139
command += f"stderr.txt; echo $? >{ prefix } rc.txt"
134
140
return command
135
141
136
142
137
- def make_nextflow_command_env_string (version , timezone , output_path ):
143
+ def make_nextflow_command_env_string (version , timezone , output_path , run_path ):
138
144
"""Creates the environment variable setting portion of the nextflow run
139
145
command string.
140
146
@@ -146,18 +152,18 @@ def make_nextflow_command_env_string(version, timezone, output_path):
146
152
env = {"NXF_ANSI_LOG" : "false" }
147
153
if version : env ["NXF_VER" ] = version
148
154
if timezone : env ["TZ" ] = timezone
149
- if output_path : env ["NXF_WORK" ] = os .path .join (output_path , "work" )
155
+ if output_path != run_path : env ["NXF_WORK" ] = os .path .join (output_path , "work" )
150
156
return " " .join ([f"{ k } ={ v } " for k , v in env .items ()])
151
157
152
158
153
- def make_nextflow_command_log_string (output_path ):
159
+ def make_nextflow_command_log_string (log_path , run_path ):
154
160
"""Creates the log setting portion of the nextflow run command string.
155
161
156
- :param str output_path : the location to store the output in.
162
+ :param str log_path : the location to store the log file in.
157
163
:rtype: ``str``"""
158
164
159
- if not output_path : return ""
160
- return f"-log '{ os .path .join (output_path , '.nextflow.log' )} '"
165
+ if log_path == run_path : return ""
166
+ return f"-log '{ os .path .join (log_path , '.nextflow.log' )} '"
161
167
162
168
163
169
def make_nextflow_command_config_string (configs ):
@@ -245,17 +251,18 @@ def wait_for_log_creation(output_path, start):
245
251
time .sleep (0.1 )
246
252
247
253
248
- def get_execution (execution_path , nextflow_command , execution = None , log_start = 0 ):
254
+ def get_execution (execution_path , log_path , nextflow_command , execution = None , log_start = 0 ):
249
255
"""Creates an execution object from a location. If you are polling, you can
250
256
pass in the previous execution to update it with new information.
251
257
252
258
:param str execution_path: the location of the execution.
259
+ :param str log_path: the location of the log.
253
260
:param str nextflow_command: the command used to run the pipeline.
254
261
:param nextflow.models.Execution execution: the existing execution, if any.
255
262
:param int log_start: the number of lines already read from the log.
256
263
:rtype: ``nextflow.models.Execution``"""
257
264
258
- log = get_file_text (os .path .join (execution_path , ".nextflow.log" ))
265
+ log = get_file_text (os .path .join (log_path , ".nextflow.log" ))
259
266
if not log : return None , 0
260
267
log = log [log_start :]
261
268
execution = make_or_update_execution (log , execution_path , nextflow_command , execution )
0 commit comments