Skip to content

Commit fe08fe9

Browse files
committed
POC: add submit execution function
1 parent 3572837 commit fe08fe9

File tree

2 files changed

+97
-48
lines changed

2 files changed

+97
-48
lines changed

nextflow/command.py

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import subprocess
55
from datetime import datetime
66
from nextflow.io import get_file_text, get_process_ids_to_paths, get_file_creation_time
7-
from nextflow.models import Execution, ProcessExecution
7+
from nextflow.models import Execution, ProcessExecution, ExecutionSubmission
88
from nextflow.log import (
99
get_started_from_log,
1010
get_finished_from_log,
@@ -17,7 +17,7 @@
1717

1818
def run(*args, **kwargs):
1919
"""Runs a pipeline and returns the execution.
20-
20+
2121
:param str pipeline_path: the absolute path to the pipeline .nf file.
2222
:param str run_path: the location to run the pipeline in (if not current directory).
2323
:param str output_path: the location to store the output in (if not run path).
@@ -41,7 +41,7 @@ def run(*args, **kwargs):
4141
def run_and_poll(*args, **kwargs):
4242
"""Runs a pipeline and polls it for updates. Yields the execution after each
4343
update.
44-
44+
4545
:param str pipeline_path: the absolute path to the pipeline .nf file.
4646
:param str run_path: the location to run the pipeline in (if not current directory).
4747
:param str output_path: the location to store the output in (if not run path).
@@ -70,6 +70,59 @@ def _run(
7070
log_path=None, runner=None, io=None,
7171
version=None, configs=None, params=None, profiles=None, timezone=None,
7272
report=None, timeline=None, dag=None, trace=None, sleep=1
73+
):
74+
process, submission = submit_execution(
75+
configs,
76+
dag,
77+
io,
78+
log_path,
79+
output_path,
80+
params,
81+
pipeline_path,
82+
profiles,
83+
report,
84+
resume,
85+
run_path,
86+
runner,
87+
timeline,
88+
timezone,
89+
trace,
90+
version
91+
)
92+
start = datetime.now()
93+
94+
execution, log_start = None, 0
95+
if resume: wait_for_log_creation(submission.log_path, start, io)
96+
while True:
97+
time.sleep(sleep)
98+
execution, diff = get_execution(
99+
submission.output_path, submission.log_path, submission.nextflow_command, execution, log_start, timezone, io
100+
)
101+
log_start += diff
102+
if execution and poll: yield execution
103+
process_finished = not process or process.poll() is not None
104+
if execution and execution.return_code and process_finished:
105+
if not poll: yield execution
106+
break
107+
108+
109+
def submit_execution(
110+
configs,
111+
dag,
112+
io,
113+
log_path,
114+
output_path,
115+
params,
116+
pipeline_path,
117+
profiles,
118+
report,
119+
resume,
120+
run_path,
121+
runner,
122+
timeline,
123+
timezone,
124+
trace,
125+
version
73126
):
74127
if not run_path and not io: run_path = os.path.abspath(".")
75128
if not run_path and io: run_path = io.abspath(".")
@@ -79,32 +132,22 @@ def _run(
79132
run_path, output_path, log_path, pipeline_path, resume, version, configs,
80133
params, profiles, timezone, report, timeline, dag, trace, io
81134
)
82-
start = datetime.now()
83135
if runner:
84136
process = None
85137
runner(nextflow_command)
86138
else:
87139
process = subprocess.Popen(
88-
nextflow_command, universal_newlines=True, shell=True
89-
)
90-
execution, log_start = None, 0
91-
if resume: wait_for_log_creation(log_path, start, io)
92-
while True:
93-
time.sleep(sleep)
94-
execution, diff = get_execution(
95-
output_path, log_path, nextflow_command, execution, log_start, timezone, io
140+
nextflow_command, universal_newlines=True, shell=True
96141
)
97-
log_start += diff
98-
if execution and poll: yield execution
99-
process_finished = not process or process.poll() is not None
100-
if execution and execution.return_code and process_finished:
101-
if not poll: yield execution
102-
break
142+
submission = ExecutionSubmission(
143+
pipeline_path, run_path, output_path, log_path, nextflow_command, timezone
144+
)
145+
return process, submission
103146

104147

105148
def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume,version, configs, params, profiles, timezone, report, timeline, dag, trace, io):
106149
"""Generates the `nextflow run` commmand.
107-
150+
108151
:param str run_path: the location to run the pipeline in.
109152
:param str output_path: the location to store the output in.
110153
:param str log_path: the location to store the log in.
@@ -147,7 +190,7 @@ def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume
147190
def make_nextflow_command_env_string(version, timezone, output_path, run_path):
148191
"""Creates the environment variable setting portion of the nextflow run
149192
command string.
150-
193+
151194
:param str version: the nextflow version to use.
152195
:param str timezone: the timezone to use.
153196
:param str output_path: the location to store the output in.
@@ -162,7 +205,7 @@ def make_nextflow_command_env_string(version, timezone, output_path, run_path):
162205

163206
def make_nextflow_command_log_string(log_path, run_path):
164207
"""Creates the log setting portion of the nextflow run command string.
165-
208+
166209
:param str log_path: the location to store the log file in.
167210
:rtype: ``str``"""
168211

@@ -173,7 +216,7 @@ def make_nextflow_command_log_string(log_path, run_path):
173216
def make_nextflow_command_config_string(configs):
174217
"""Creates the config setting portion of the nextflow run command string.
175218
Absolute paths are recommended.
176-
219+
177220
:param str version: the nextflow version to use.
178221
:rtype: ``str``"""
179222

@@ -183,7 +226,7 @@ def make_nextflow_command_config_string(configs):
183226

184227
def make_nextflow_command_resume_string(resume):
185228
"""Creates the resume setting portion of the nextflow run command string.
186-
229+
187230
:param resume: whether to resume an existing execution.
188231
:rtype: ``str``"""
189232

@@ -203,7 +246,7 @@ def make_nextflow_command_params_string(params):
203246
for key, value in params.items():
204247
if not value:
205248
param_list.append(f"--{key}=")
206-
elif value[0] in "'\"":
249+
elif value[0] in "'\"":
207250
param_list.append(f"--{key}={value}")
208251
else:
209252
param_list.append(f"--{key}='{value}'")
@@ -212,7 +255,7 @@ def make_nextflow_command_params_string(params):
212255

213256
def make_nextflow_command_profiles_string(profiles):
214257
"""Creates the profile setting portion of the nextflow run command string.
215-
258+
216259
:param list profiles: any profiles to be applied.
217260
:rtype: ``str``"""
218261

@@ -222,7 +265,7 @@ def make_nextflow_command_profiles_string(profiles):
222265

223266
def make_reports_string(output_path, report, timeline, dag, trace):
224267
"""Creates the report setting portion of the nextflow run command string.
225-
268+
226269
:param str output_path: the location to store the output in.
227270
:param str report: the filename to use for the execution report.
228271
:param str timeline: the filename to use for the timeline report.
@@ -245,11 +288,11 @@ def make_reports_string(output_path, report, timeline, dag, trace):
245288

246289
def wait_for_log_creation(output_path, start, io):
247290
"""Waits for a log file for this execution to be created.
248-
291+
249292
:param str output_path: the location to store the output in.
250293
:param datetime start: the start time.
251294
:param io: an optional custom io object to handle file operations."""
252-
295+
253296
while True:
254297
created = get_file_creation_time(os.path.join(output_path, ".nextflow.log"), io=io)
255298
if created and created > start: break
@@ -259,7 +302,7 @@ def wait_for_log_creation(output_path, start, io):
259302
def get_execution(execution_path, log_path, nextflow_command, execution=None, log_start=0, timezone=None, io=None):
260303
"""Creates an execution object from a location. If you are polling, you can
261304
pass in the previous execution to update it with new information.
262-
305+
263306
:param str execution_path: the location of the execution.
264307
:param str log_path: the location of the log.
265308
:param str nextflow_command: the command used to run the pipeline.
@@ -322,8 +365,8 @@ def get_initial_process_executions(log, execution, io):
322365
currently in the list, or uncompleted ones which can now be completed. Some
323366
attributes are not yet filled in.
324367
325-
The identifiers of the proccess executions seen are returned.
326-
368+
The identifiers of the process executions seen are returned.
369+
327370
:param str log: a section of the log file.
328371
:param nextflow.models.Execution execution: the containing execution.
329372
:param io: an optional custom io object to handle file operations.
@@ -350,7 +393,7 @@ def get_initial_process_executions(log, execution, io):
350393
def create_process_execution_from_line(line, cached=False, io=None):
351394
"""Creates a process execution from a line of the log file in which its
352395
submission (or previous caching) is reported.
353-
396+
354397
:param str line: a line from the log file.
355398
:param bool cached: whether the process is cached.
356399
:param io: an optional custom io object to handle file operations.
@@ -375,7 +418,7 @@ def update_process_execution_from_line(process_executions, line):
375418
"""Updates a process execution with information from a line of the log file
376419
in which its completion is reported. The identifier of the process execution
377420
is returned.
378-
421+
379422
:param dict process_executions: a dictionary of process executions.
380423
:param str line: a line from the log file.
381424
:rtype: ``str``"""
@@ -393,7 +436,7 @@ def update_process_execution_from_line(process_executions, line):
393436
def update_process_execution_from_path(process_execution, execution_path, timezone=None, io=None):
394437
"""Some attributes of a process execution need to be obtained from files on
395438
disk. This function updates the process execution with these values.
396-
439+
397440
:param nextflow.models.ProcessExecution process_execution: the process execution.
398441
:param str execution_path: the location of the containing execution.
399442
:param str timezone: the timezone to use for the log.

nextflow/models.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,21 @@
33
from pathlib import Path
44
from dataclasses import dataclass
55
from datetime import datetime
6+
from typing import Any
7+
68
from nextflow.io import get_file_text
79

10+
11+
@dataclass(frozen=True)
12+
class ExecutionSubmission:
13+
pipeline_path: str
14+
run_path: str
15+
output_path: str
16+
log_path: str
17+
nextflow_command: str
18+
timezone: str
19+
20+
821
@dataclass
922
class Execution:
1023
"""A class to represent the execution of a Nextflow pipeline."""
@@ -23,7 +36,6 @@ class Execution:
2336

2437
def __repr__(self):
2538
return f"<Execution: {self.identifier}>"
26-
2739

2840
@property
2941
def duration(self):
@@ -33,20 +45,18 @@ def duration(self):
3345

3446
if self.finished is None: return None
3547
return self.finished - self.started
36-
3748

3849
@property
3950
def status(self):
4051
"""A string representing the status of the execution.
41-
52+
4253
:rtype: ``str``"""
4354

4455
if self.return_code == "0": return "OK"
4556
if self.return_code == "": return "-"
4657
return "ERROR"
4758

4859

49-
5060
@dataclass
5161
class ProcessExecution:
5262
"""A class to represent the execution of a single Nextflow process."""
@@ -64,39 +74,36 @@ class ProcessExecution:
6474
finished: datetime
6575
status: str
6676
cached: bool
67-
io: any
77+
io: Any
6878

6979
def __repr__(self):
7080
return f"<ProcessExecution: {self.identifier}>"
71-
7281

7382
@property
7483
def duration(self):
7584
"""The duration of the process execution, in seconds.
76-
85+
7786
:rtype: ``datetime.timedelta``"""
7887

7988
if self.finished is None: return None
8089
if self.started is None: return None
8190
return self.finished - self.started
8291

83-
8492
@property
8593
def full_path(self):
8694
"""The full absolute path to the process execution.
87-
95+
8896
:rtype: ``pathlib.Path``"""
8997

9098
if not self.path: return None
9199
return Path(self.execution.path, "work", self.path)
92-
93100

94101
def input_data(self, include_path=True):
95102
"""A list of files passed to the process execution as inputs.
96-
103+
97104
:param bool include_path: if ``False``, only filenames returned.
98105
:type: ``list``"""
99-
106+
100107
inputs = []
101108
if not self.path: return []
102109
run = get_file_text(self.full_path / ".command.run")
@@ -109,7 +116,6 @@ def input_data(self, include_path=True):
109116
return inputs
110117
else:
111118
return [os.path.basename(f) for f in inputs]
112-
113119

114120
def all_output_data(self, include_path=True):
115121
"""A list of all output data produced by the process execution,
@@ -127,4 +133,4 @@ def all_output_data(self, include_path=True):
127133
if not f.startswith(".command") and f != ".exitcode":
128134
if f not in inputs:
129135
outputs.append(str(full_path) if include_path else f)
130-
return outputs
136+
return outputs

0 commit comments

Comments
 (0)