Skip to content

Commit 0e89055

Browse files
mhpopescumr-c
andauthored
Allow files to be named pipes (#1469)
* allow files to be named pipes Co-authored-by: Michael R. Crusoe <[email protected]>
1 parent cef7385 commit 0e89055

File tree

3 files changed

+125
-4
lines changed

3 files changed

+125
-4
lines changed

cwltool/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
107107
self.pull_image = True # type: bool
108108
self.rm_container = True # type: bool
109109
self.move_outputs = "move" # type: str
110+
self.streaming_allowed: bool = False
110111

111112
self.singularity = False # type: bool
112113
self.debug = False # type: bool

cwltool/job.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import itertools
44
import logging
55
import os
6+
import stat
67
import re
78
import shutil
89
import subprocess # nosec
@@ -175,13 +176,24 @@ def _setup(self, runtimeContext: RuntimeContext) -> None:
175176
if not os.path.exists(self.outdir):
176177
os.makedirs(self.outdir)
177178

179+
def is_streamable(file: str) -> bool:
180+
if not runtimeContext.streaming_allowed:
181+
return False
182+
for inp in self.joborder.values():
183+
if isinstance(inp, dict) and inp.get("location", None) == file:
184+
return inp.get("streamable", False)
185+
return False
186+
178187
for knownfile in self.pathmapper.files():
179188
p = self.pathmapper.mapper(knownfile)
180189
if p.type == "File" and not os.path.isfile(p[0]) and p.staged:
181-
raise WorkflowException(
182-
"Input file %s (at %s) not found or is not a regular "
183-
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
184-
)
190+
if not (
191+
is_streamable(knownfile) and stat.S_ISFIFO(os.stat(p[0]).st_mode)
192+
):
193+
raise WorkflowException(
194+
"Input file %s (at %s) not found or is not a regular "
195+
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
196+
)
185197

186198
if "listing" in self.generatefiles:
187199
runtimeContext = runtimeContext.copy()

tests/test_streaming.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Test that files marked as 'streamable' when 'streaming_allowed' can be named pipes."""
2+
import os
3+
4+
import pytest
5+
from pathlib import Path
6+
from typing import cast
7+
8+
from ruamel.yaml.comments import CommentedMap
9+
from schema_salad.sourceline import cmap
10+
11+
from cwltool.command_line_tool import CommandLineTool
12+
from cwltool.context import LoadingContext, RuntimeContext
13+
from cwltool.errors import WorkflowException
14+
from cwltool.job import JobBase
15+
from cwltool.update import INTERNAL_VERSION
16+
from cwltool.utils import CWLObjectType
17+
from .util import get_data
18+
19+
toolpath_object = cast(
20+
CommentedMap,
21+
cmap(
22+
{
23+
"cwlVersion": INTERNAL_VERSION,
24+
"class": "CommandLineTool",
25+
"inputs": [
26+
{
27+
"type": "File",
28+
"id": "inp",
29+
"streamable": True,
30+
}
31+
],
32+
"outputs": [],
33+
"requirements": [],
34+
}
35+
),
36+
)
37+
38+
loading_context = LoadingContext(
39+
{
40+
"metadata": {
41+
"cwlVersion": INTERNAL_VERSION,
42+
"http://commonwl.org/cwltool#original_cwlVersion": INTERNAL_VERSION,
43+
}
44+
}
45+
)
46+
47+
48+
def test_regular_file() -> None:
49+
"""Test that regular files do not raise any exception when they are checked in job._setup."""
50+
clt = CommandLineTool(
51+
toolpath_object,
52+
loading_context,
53+
)
54+
runtime_context = RuntimeContext()
55+
56+
joborder: CWLObjectType = {
57+
"inp": {
58+
"class": "File",
59+
"location": get_data("tests/wf/whale.txt"),
60+
}
61+
}
62+
63+
job = next(clt.job(joborder, None, runtime_context))
64+
assert isinstance(job, JobBase)
65+
66+
job._setup(runtime_context)
67+
68+
69+
streaming = [
70+
(True, True, False),
71+
(True, False, True),
72+
(False, True, True),
73+
(False, False, True),
74+
]
75+
76+
77+
@pytest.mark.parametrize("streamable,streaming_allowed,raise_exception", streaming)
78+
def test_input_can_be_named_pipe(
79+
tmp_path: Path, streamable: bool, streaming_allowed: bool, raise_exception: bool
80+
) -> None:
81+
"""Test that input can be a named pipe."""
82+
clt = CommandLineTool(
83+
toolpath_object,
84+
loading_context,
85+
)
86+
87+
runtime_context = RuntimeContext()
88+
runtime_context.streaming_allowed = streaming_allowed
89+
90+
path = tmp_path / "tmp"
91+
os.mkfifo(path)
92+
93+
joborder: CWLObjectType = {
94+
"inp": {
95+
"class": "File",
96+
"location": str(path),
97+
"streamable": streamable,
98+
}
99+
}
100+
101+
job = next(clt.job(joborder, None, runtime_context))
102+
assert isinstance(job, JobBase)
103+
104+
if raise_exception:
105+
with pytest.raises(WorkflowException):
106+
job._setup(runtime_context)
107+
else:
108+
job._setup(runtime_context)

0 commit comments

Comments
 (0)