Skip to content

Commit 1ca75ea

Browse files
committed
Custom IO options
1 parent f5f36bb commit 1ca75ea

File tree

7 files changed

+311
-119
lines changed

7 files changed

+311
-119
lines changed

nextflow/command.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def run_and_poll(*args, **kwargs):
4848
:param str log_path: the location to store the log in (if not output path).
4949
:param resume: whether to resume an existing execution.
5050
:param function runner: a function to run the pipeline command.
51+
:param io: an optional custom io object to handle file operations.
5152
:param str version: the nextflow version to use.
5253
:param list configs: any config files to be applied.
5354
:param dict params: the parameters to pass.
@@ -66,16 +67,17 @@ def run_and_poll(*args, **kwargs):
6667

6768
def _run(
6869
pipeline_path, resume=False, poll=False, run_path=None, output_path=None,
69-
log_path=None,runner=None,
70+
log_path=None, runner=None, io=None,
7071
version=None, configs=None, params=None, profiles=None, timezone=None,
7172
report=None, timeline=None, dag=None, trace=None, sleep=1
7273
):
73-
if not run_path: run_path = os.path.abspath(".")
74+
if not run_path and not io: run_path = os.path.abspath(".")
75+
if not run_path and io: run_path = io.abspath(".")
7476
if not output_path: output_path = run_path
7577
if not log_path: log_path = output_path
7678
nextflow_command = make_nextflow_command(
7779
run_path, output_path, log_path, pipeline_path, resume, version, configs,
78-
params, profiles, timezone, report, timeline, dag, trace
80+
params, profiles, timezone, report, timeline, dag, trace, io
7981
)
8082
start = datetime.now()
8183
if runner:
@@ -86,11 +88,11 @@ def _run(
8688
nextflow_command, universal_newlines=True, shell=True
8789
)
8890
execution, log_start = None, 0
89-
if resume: wait_for_log_creation(log_path, start)
91+
if resume: wait_for_log_creation(log_path, start, io)
9092
while True:
9193
time.sleep(sleep)
9294
execution, diff = get_execution(
93-
output_path, log_path, nextflow_command, execution, log_start, timezone
95+
output_path, log_path, nextflow_command, execution, log_start, timezone, io
9496
)
9597
log_start += diff
9698
if execution and poll: yield execution
@@ -100,7 +102,7 @@ def _run(
100102
break
101103

102104

103-
def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume,version, configs, params, profiles, timezone, report, timeline, dag, trace):
105+
def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume,version, configs, params, profiles, timezone, report, timeline, dag, trace, io):
104106
"""Generates the `nextflow run` commmand.
105107
106108
:param str run_path: the location to run the pipeline in.
@@ -117,6 +119,7 @@ def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume
117119
:param str timeline: the filename to use for the timeline report.
118120
:param str dag: the filename to use for the DAG report.
119121
:param str trace: the filename to use for the trace report.
122+
:param io: an optional custom io object to handle file operations.
120123
:rtype: ``str``"""
121124

122125
env = make_nextflow_command_env_string(version, timezone, output_path, run_path)
@@ -132,7 +135,8 @@ def make_nextflow_command(run_path, output_path, log_path, pipeline_path, resume
132135
profiles = make_nextflow_command_profiles_string(profiles)
133136
reports = make_reports_string(output_path, report, timeline, dag, trace)
134137
command = f"{env}{nf} {log}{configs}run {pipeline_path} {resume}{params} {profiles} {reports}"
135-
if run_path != os.path.abspath("."): command = f"cd {run_path}; {command}"
138+
abspath = io.abspath if io else os.path.abspath
139+
if run_path != abspath("."): command = f"cd {run_path}; {command}"
136140
prefix = (str(output_path) + os.path.sep) if output_path != run_path else ""
137141
command = command.rstrip() + f" >{prefix}"
138142
command += f"stdout.txt 2>{prefix}"
@@ -239,19 +243,20 @@ def make_reports_string(output_path, report, timeline, dag, trace):
239243
return " ".join(params)
240244

241245

242-
def wait_for_log_creation(output_path, start):
246+
def wait_for_log_creation(output_path, start, io):
243247
"""Waits for a log file for this execution to be created.
244248
245249
:param str output_path: the location to store the output in.
246-
:param datetime start: the start time."""
250+
:param datetime start: the start time.
251+
:param io: an optional custom io object to handle file operations."""
247252

248253
while True:
249-
created = get_file_creation_time(os.path.join(output_path, ".nextflow.log"))
254+
created = get_file_creation_time(os.path.join(output_path, ".nextflow.log"), io)
250255
if created and created > start: break
251256
time.sleep(0.1)
252257

253258

254-
def get_execution(execution_path, log_path, nextflow_command, execution=None, log_start=0, timezone=None):
259+
def get_execution(execution_path, log_path, nextflow_command, execution=None, log_start=0, timezone=None, io=None):
255260
"""Creates an execution object from a location. If you are polling, you can
256261
pass in the previous execution to update it with new information.
257262
@@ -260,33 +265,36 @@ def get_execution(execution_path, log_path, nextflow_command, execution=None, lo
260265
:param str nextflow_command: the command used to run the pipeline.
261266
:param nextflow.models.Execution execution: the existing execution, if any.
262267
:param int log_start: the number of lines already read from the log.
268+
:param str timezone: the timezone to use for the log.
269+
:param io: an optional custom io object to handle file operations.
263270
:rtype: ``nextflow.models.Execution``"""
264271

265-
log = get_file_text(os.path.join(log_path, ".nextflow.log"))
272+
log = get_file_text(os.path.join(log_path, ".nextflow.log"), io)
266273
if not log: return None, 0
267274
log = log[log_start:]
268-
execution = make_or_update_execution(log, execution_path, nextflow_command, execution)
269-
process_executions, changed = get_initial_process_executions(log, execution)
275+
execution = make_or_update_execution(log, execution_path, nextflow_command, execution, io)
276+
process_executions, changed = get_initial_process_executions(log, execution, io)
270277
no_path = [k for k, v in process_executions.items() if not v.path]
271-
process_ids_to_paths = get_process_ids_to_paths(no_path, execution_path)
278+
process_ids_to_paths = get_process_ids_to_paths(no_path, execution_path, io)
272279
for process_id, path in process_ids_to_paths.items():
273280
process_executions[process_id].path = path
274281
for process_execution in process_executions.values():
275282
if not process_execution.finished or not process_execution.started or \
276283
process_execution.identifier in changed:
277-
update_process_execution_from_path(process_execution, execution_path, timezone)
284+
update_process_execution_from_path(process_execution, execution_path, timezone, io)
278285
execution.process_executions = list(process_executions.values())
279286
return execution, len(log)
280287

281288

282-
def make_or_update_execution(log, execution_path, nextflow_command, execution):
289+
def make_or_update_execution(log, execution_path, nextflow_command, execution, io):
283290
"""Creates an Execution object from a log file, or updates an existing one
284291
from a previous poll.
285292
286293
:param str log: a section of the log file.
287294
:param str execution_path: the location of the execution.
288295
:param str nextflow_command: the command used to run the pipeline.
289296
:param nextflow.models.Execution execution: the existing execution.
297+
:param io: an optional custom io object to handle file operations.
290298
:rtype: ``nextflow.models.Execution``"""
291299

292300
if not execution:
@@ -303,13 +311,13 @@ def make_or_update_execution(log, execution_path, nextflow_command, execution):
303311
if not execution.finished: execution.finished = get_finished_from_log(log)
304312
if not execution.session_uuid: execution.session_uuid = get_session_uuid_from_log(log)
305313
execution.log += log
306-
execution.stdout = get_file_text(os.path.join(execution_path, "stdout.txt"))
307-
execution.stderr = get_file_text(os.path.join(execution_path, "stderr.txt"))
308-
execution.return_code = get_file_text(os.path.join(execution_path, "rc.txt")).rstrip()
314+
execution.stdout = get_file_text(os.path.join(execution_path, "stdout.txt"), io)
315+
execution.stderr = get_file_text(os.path.join(execution_path, "stderr.txt"), io)
316+
execution.return_code = get_file_text(os.path.join(execution_path, "rc.txt"), io).rstrip()
309317
return execution
310318

311319

312-
def get_initial_process_executions(log, execution):
320+
def get_initial_process_executions(log, execution, io):
313321
"""Parses a section of a log file and looks for new process executions not
314322
currently in the list, or uncompleted ones which can now be completed. Some
315323
attributes are not yet filled in.
@@ -318,6 +326,7 @@ def get_initial_process_executions(log, execution):
318326
319327
:param str log: a section of the log file.
320328
:param nextflow.models.Execution execution: the containing execution.
329+
:param io: an optional custom io object to handle file operations.
321330
:rtype: ``tuple``"""
322331

323332
lines = log.splitlines()
@@ -326,7 +335,7 @@ def get_initial_process_executions(log, execution):
326335
for line in lines:
327336
if "Submitted process" in line or "Cached process" in line:
328337
is_cached = "Cached process" in line
329-
proc_ex = create_process_execution_from_line(line, is_cached)
338+
proc_ex = create_process_execution_from_line(line, is_cached, io)
330339
if not proc_ex: continue
331340
proc_ex.execution = execution
332341
process_executions[proc_ex.identifier] = proc_ex
@@ -338,12 +347,13 @@ def get_initial_process_executions(log, execution):
338347
return process_executions, just_updated
339348

340349

341-
def create_process_execution_from_line(line, cached=False):
350+
def create_process_execution_from_line(line, cached=False, io=None):
342351
"""Creates a process execution from a line of the log file in which its
343352
submission (or previous caching) is reported.
344353
345354
:param str line: a line from the log file.
346355
:param bool cached: whether the process is cached.
356+
:param io: an optional custom io object to handle file operations.
347357
:rtype: ``nextflow.models.ProcessExecution``"""
348358

349359
if cached:
@@ -357,7 +367,7 @@ def create_process_execution_from_line(line, cached=False):
357367
path="", stdout="", stderr="", bash="", started=None, finished=None,
358368
return_code="0" if cached else "",
359369
status="COMPLETED" if cached else "-",
360-
cached=cached
370+
cached=cached, io=io
361371
)
362372

363373

@@ -380,20 +390,22 @@ def update_process_execution_from_line(process_executions, line):
380390
return identifier
381391

382392

383-
def update_process_execution_from_path(process_execution, execution_path, timezone=None):
393+
def update_process_execution_from_path(process_execution, execution_path, timezone=None, io=None):
384394
"""Some attributes of a process execution need to be obtained from files on
385395
disk. This function updates the process execution with these values.
386396
387397
:param nextflow.models.ProcessExecution process_execution: the process execution.
388-
:param str execution_path: the location of the containing execution."""
398+
:param str execution_path: the location of the containing execution.
399+
:param str timezone: the timezone to use for the log.
400+
:param io: an optional custom io object to handle file operations."""
389401

390402
if not process_execution.path: return
391403
full_path = os.path.join(execution_path, "work", process_execution.path)
392-
process_execution.stdout = get_file_text(os.path.join(full_path, ".command.out"))
393-
process_execution.stderr = get_file_text(os.path.join(full_path, ".command.err"))
404+
process_execution.stdout = get_file_text(os.path.join(full_path, ".command.out"), io)
405+
process_execution.stderr = get_file_text(os.path.join(full_path, ".command.err"), io)
394406
if not process_execution.started and not process_execution.cached:
395-
process_execution.started = get_file_creation_time(os.path.join(full_path, ".command.begin"), timezone)
407+
process_execution.started = get_file_creation_time(os.path.join(full_path, ".command.begin"), timezone, io)
396408
if not process_execution.bash:
397-
process_execution.bash = get_file_text(os.path.join(full_path, ".command.sh"))
409+
process_execution.bash = get_file_text(os.path.join(full_path, ".command.sh"), io)
398410
if process_execution.execution.finished and not process_execution.return_code:
399411
process_execution.return_code = process_execution.execution.return_code

nextflow/io.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,33 @@
33
import glob
44
from datetime import datetime
55

6-
def get_file_text(path):
6+
def get_file_text(path, io=None):
77
"""Gets the contents of a text file, if it exists.
88
99
:param str path: the location of the file.
10+
:param io: an optional custom io object to handle reading.
1011
:rtype: ``str``"""
1112

13+
if io: return io.read(path)
1214
try:
1315
with open(path, "r") as f: return f.read()
1416
except FileNotFoundError:
1517
return ""
1618

1719

18-
def get_file_creation_time(path, timezone=None):
20+
def get_file_creation_time(path, timezone=None, io=None):
1921
"""Gets the creation time of a file.
2022
2123
:param str path: the location of the file.
24+
:param str timezone: an optional timezone to convert the creation time to.
25+
:param io: an optional custom io object to handle file times.
2226
:rtype: ``datetime.datetime``"""
2327

2428
try:
25-
dt = datetime.fromtimestamp(os.path.getctime(path))
29+
if io:
30+
dt = io.ctime(path)
31+
else:
32+
dt = datetime.fromtimestamp(os.path.getctime(path))
2633
if timezone:
2734
tz = pytz.timezone(timezone)
2835
dt = dt.astimezone(tz)
@@ -32,17 +39,21 @@ def get_file_creation_time(path, timezone=None):
3239
return None
3340

3441

35-
def get_process_ids_to_paths(process_ids, execution_path):
42+
def get_process_ids_to_paths(process_ids, execution_path, io=None):
3643
"""Takes a list of nine character process IDs and maps them to the full
3744
directories they represent.
3845
3946
:param list process_ids: a list of nine character process IDs.
4047
:param str execution_path: the path to the execution directory.
48+
:param io: an optional custom io object to handle globbing.
4149
:rtype: ``dict``"""
4250

4351
process_ids_to_paths = {}
4452
path = os.path.join(execution_path, "work")
45-
subdirectories = glob.glob(os.path.join(path, "*", "*"))
53+
if io:
54+
subdirectories = io.glob(os.path.join(path, "*", "*"))
55+
else:
56+
subdirectories = glob.glob(os.path.join(path, "*", "*"))
4657
for subdirectory in subdirectories:
4758
sub = os.path.sep.join(subdirectory.split(os.path.sep)[-2:])
4859
for process_id in process_ids:
@@ -56,19 +67,19 @@ def get_process_ids_to_paths(process_ids, execution_path):
5667

5768
class CustomIO:
5869

59-
def read(path, mode="r"):
70+
def abspath(path):
6071
pass
6172

6273

63-
def ctime(path):
74+
def listdir(path):
6475
pass
6576

6677

67-
def listdir(path):
78+
def read(path, mode="r"):
6879
pass
6980

7081

71-
def abspath(path):
82+
def ctime(path):
7283
pass
7384

7485

nextflow/models.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ProcessExecution:
6464
finished: datetime
6565
status: str
6666
cached: bool
67+
io: any
6768

6869
def __repr__(self):
6970
return f"<ProcessExecution: {self.identifier}>"
@@ -120,7 +121,8 @@ def all_output_data(self, include_path=True):
120121
outputs = []
121122
if not self.path: return []
122123
inputs = self.input_data(include_path=False)
123-
for f in os.listdir(self.full_path):
124+
listdir = self.io.listdir if self.io else os.listdir
125+
for f in listdir(self.full_path):
124126
full_path = Path(f"{self.full_path}/{f}")
125127
if not f.startswith(".command") and f != ".exitcode":
126128
if f not in inputs:

0 commit comments

Comments
 (0)