Skip to content

Support using container entrypoints for shell tasks #822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
8 changes: 6 additions & 2 deletions docs/source/tutorial/2-advanced-execution.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,15 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Of course for this to work Docker needs to work and be configured for\n",
"[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/).\n",
"Note that for this to work Docker needs to work and be configured for\n",
"[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/). If\n",
"the command you want to execute is the entrypoint of the container, you can set the\n",
"`executable` field to None, e.g.\n",
"`MrGrid(executable=None, in_file=nifti_file, operation=\"regrid\", voxel=(0.5, 0.5, 0.5))(environment=docker.Environment(\"mrgrid-image-with-entrypoint\"))`.\n",
"See [Containers and Environments](../explanation/environments.rst) for more details on\n",
"how to utilise containers and add support for other software environments.\n",
"\n",
"\n",
"It is also possible to specify functions to run at hooks that are immediately before and after\n",
"the task is executed by passing a `pydra.engine.hooks.TaskHooks` object to the `hooks`\n",
"keyword arg. The callable should take the `pydra.engine.job.Job` object as its only\n",
Expand Down
30 changes: 23 additions & 7 deletions pydra/compose/shell/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
from .task import Task, Outputs


def executable_validator(_, __, value):
"""Validator for the executable attribute of a task"""
if value is None:
return

Check warning on line 39 in pydra/compose/shell/builder.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/builder.py#L39

Added line #L39 was not covered by tests
if not isinstance(value, (str, list)):
raise TypeError(

Check warning on line 41 in pydra/compose/shell/builder.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/builder.py#L41

Added line #L41 was not covered by tests
f"executable must be a string or a list of strings, not {value!r}"
)
if len(value) == 0:
raise ValueError("executable must be a non-empty string or a list of strings")

Check warning on line 45 in pydra/compose/shell/builder.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/builder.py#L45

Added line #L45 was not covered by tests


@dataclass_transform(
kw_only_default=True,
field_specifiers=(field.out, field.outarg),
Expand Down Expand Up @@ -130,13 +142,17 @@
f"Shell task class {wrapped} must have an `executable` "
"attribute that specifies the command to run"
) from None
if not isinstance(executable, str) and not (
isinstance(executable, ty.Sequence)
and all(isinstance(e, str) for e in executable)
if (
executable is not None
and not isinstance(executable, str)
and not (
isinstance(executable, ty.Sequence)
and all(isinstance(e, str) for e in executable)
)
):
raise ValueError(
"executable must be a string or a sequence of strings"
f", not {executable!r}"
"executable must be a string or a sequence of strings or None if "
f"the command run is the entrypoint of a container, not {executable!r}"
)
class_name = klass.__name__
check_explicit_fields_are_none(klass, inputs, outputs)
Expand Down Expand Up @@ -199,11 +215,11 @@
)
parsed_inputs["executable"] = field.arg(
name="executable",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
argstr="",
position=0,
default=executable,
validator=attrs.validators.min_len(1),
validator=executable_validator,
help=Task.EXECUTABLE_HELP,
)

Expand Down
20 changes: 13 additions & 7 deletions pydra/compose/shell/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@

def _run(self, job: "Job[ShellTask]", rerun: bool = True) -> None:
"""Run the shell command."""

if self.executable is None and not job.environment.has_entrypoint:
raise ValueError(

Check warning on line 259 in pydra/compose/shell/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/task.py#L259

Added line #L259 was not covered by tests
"executable is not set, and the environment is not a container "
f"({job.environment}) with an entrypoint"
)
job.return_values = job.environment.execute(job)

@property
Expand Down Expand Up @@ -288,12 +294,12 @@
if is_fileset_or_union(fld.type) and type(fld_value) is bool:
del values[fld.name]
# Drop special fields that are added separately
del values["executable"]
del values["append_args"]
# Add executable
pos_args = [
self._command_shelltask_executable(fld, self.executable),
] # list for (position, command arg)
pos_args = []

Check warning on line 299 in pydra/compose/shell/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/task.py#L299

Added line #L299 was not covered by tests
if self.executable is not None:
pos_args.append(self._executable_pos_arg(fld, self.executable))
del values["executable"]

Check warning on line 302 in pydra/compose/shell/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/compose/shell/task.py#L301-L302

Added lines #L301 - L302 were not covered by tests
positions_provided = [0]
fields = {f.name: f for f in get_fields(self)}
for field_name in values:
Expand All @@ -312,9 +318,9 @@
command_args += self.append_args
return command_args

def _command_shelltask_executable(
self, fld: field.arg, value: ty.Any
) -> tuple[int, ty.Any]:
def _executable_pos_arg(
self, fld: field.arg, value: str | list[str] | None
) -> tuple[int, str | list[str] | None]:
"""Returning position and value for executable Task input"""
pos = 0 # executable should be the first el. of the command
assert value
Expand Down
36 changes: 18 additions & 18 deletions pydra/compose/shell/tests/test_shell_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import cloudpickle as cp
from pydra.compose import shell
from pydra.utils.general import get_fields, task_help, wrap_text
from pydra.compose.shell.builder import _InputPassThrough
from pydra.compose.shell.builder import _InputPassThrough, executable_validator
from fileformats.generic import File, Directory, FsObject
from fileformats import text, image
from pydra.utils.typing import MultiInputObj
Expand All @@ -26,9 +26,9 @@ def test_interface_template():
assert sorted_fields(Cp) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -81,9 +81,9 @@ def test_interface_template_w_types_and_path_template_ext():
assert sorted_fields(TrimPng) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="trim-png",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -122,9 +122,9 @@ def test_interface_template_w_modify():
assert sorted_fields(TrimPng) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="trim-png",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -181,9 +181,9 @@ def test_interface_template_more_complex():
assert sorted_fields(Cp) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -281,9 +281,9 @@ def test_interface_template_with_overrides_and_optionals():
assert sorted_fields(Cp) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -353,9 +353,9 @@ def test_interface_template_with_defaults():
assert sorted_fields(Cp) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -421,9 +421,9 @@ def test_interface_template_with_type_overrides():
assert sorted_fields(Cp) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
position=0,
help=shell.Task.EXECUTABLE_HELP,
),
Expand Down Expand Up @@ -738,9 +738,9 @@ class Outputs(shell.Outputs):
assert sorted_fields(A) == [
shell.arg(
name="executable",
validator=attrs.validators.min_len(1),
validator=executable_validator,
default="cp",
type=str | ty.Sequence[str],
type=str | ty.Sequence[str] | None,
argstr="",
position=0,
help=shell.Task.EXECUTABLE_HELP,
Expand Down Expand Up @@ -1004,7 +1004,7 @@ def test_shell_help1():
"----------------------------",
"",
"Inputs:",
"- executable: str | Sequence[str]; default = 'shelly'",
"- executable: str | Sequence[str] | None; default = 'shelly'",
" the first part of the command, can be a string, e.g. 'ls', or a list, e.g.",
" ['ls', '-l', 'dirname']",
"- in_file: generic/file",
Expand Down
74 changes: 36 additions & 38 deletions pydra/environments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
TODO: add setup and teardown methods
"""

has_entrypoint = False

def setup(self):
pass

Expand Down Expand Up @@ -107,6 +109,8 @@
Extra arguments to be passed to the container
"""

has_entrypoint = True

image: str
tag: str = "latest"
root: str = "/mnt/pydra"
Expand Down Expand Up @@ -200,52 +204,46 @@
return bindings, values


def execute(cmd, strip=False):
"""
Run the event loop with coroutine.

Uses :func:`read_and_display_async` unless a loop is
already running, in which case :func:`read_and_display`
is used.
def read_and_display(
*cmd: str, strip: bool = False, hide_display: bool = False
) -> dict[str, int | str]:
"""Capture a process' standard output.

Parameters
----------
cmd : :obj:`list` or :obj:`tuple`
The command line to be executed.
strip : :obj:`bool`
TODO

"""
rc, stdout, stderr = read_and_display(*cmd, strip=strip)
cmd : str
The command to execute, as a list of strings.
strip : bool, optional
If True, the output will be stripped of leading and trailing whitespace.
hide_display : bool, optional
If True, the output will not be displayed.

Returns
-------
dict[str, Any]
A dictionary containing the return code, standard output, and standard error.

Raises
------
RuntimeError
If the return code is not 0, a RuntimeError is raised with a formatted
error message.
"""
loop = get_open_loop()
if loop.is_running():
rc, stdout, stderr = read_and_display(*cmd, strip=strip)
else:
rc, stdout, stderr = loop.run_until_complete(
read_and_display_async(*cmd, strip=strip)
)
"""
return rc, stdout, stderr


def read_and_display(*cmd, strip=False, hide_display=False):
"""Capture a process' standard output."""
try:
process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
except Exception:
# TODO editing some tracing?
raise

stdout = process.stdout.decode("utf-8")

Check warning on line 238 in pydra/environments/base.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/base.py#L238

Added line #L238 was not covered by tests
if strip:
return (
process.returncode,
process.stdout.decode("utf-8").strip(),
process.stderr.decode("utf-8"),
)
else:
return (
process.returncode,
process.stdout.decode("utf-8"),
process.stderr.decode("utf-8"),
)
stdout = stdout.strip()
stderr = process.stderr.decode("utf-8")

Check warning on line 241 in pydra/environments/base.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/base.py#L240-L241

Added lines #L240 - L241 were not covered by tests
if process.returncode:
msg = f"Error executing command {' '.join(cmd)!r} (code: {process.returncode}):"

Check warning on line 243 in pydra/environments/base.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/base.py#L243

Added line #L243 was not covered by tests
if stdout:
msg += "\n\nstderr:\n" + stderr

Check warning on line 245 in pydra/environments/base.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/base.py#L245

Added line #L245 was not covered by tests
if stdout:
msg += "\n\nstdout:\n" + stdout
raise RuntimeError(msg)
return {"return_code": process.returncode, "stdout": stdout, "stderr": stderr}

Check warning on line 249 in pydra/environments/base.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/base.py#L247-L249

Added lines #L247 - L249 were not covered by tests
12 changes: 2 additions & 10 deletions pydra/environments/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,11 @@
).split()
)
docker_args.extend(["-w", f"{self.root}{job.cache_dir}"])
keys = ["return_code", "stdout", "stderr"]

job.cache_dir.mkdir(exist_ok=True)
values = base.execute(
docker_args + [docker_img] + job.task._command_args(values=values),
return base.read_and_display(

Check warning on line 33 in pydra/environments/docker.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/docker.py#L33

Added line #L33 was not covered by tests
*(docker_args + [docker_img] + job.task._command_args(values=values)),
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])
else:
raise RuntimeError(output["stdout"])
return output


# Alias so it can be referred to as docker.Environment
Expand Down
12 changes: 1 addition & 11 deletions pydra/environments/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@
"""

def execute(self, job: "Job[shell.Task]") -> dict[str, ty.Any]:
keys = ["return_code", "stdout", "stderr"]
cmd_args = job.task._command_args(values=job.inputs)
values = base.execute(cmd_args)
output = dict(zip(keys, values))
if output["return_code"]:
msg = f"Error running '{job.name}' job with {cmd_args}:"
if output["stderr"]:
msg += "\n\nstderr:\n" + output["stderr"]
if output["stdout"]:
msg += "\n\nstdout:\n" + output["stdout"]
raise RuntimeError(msg)
return output
return base.read_and_display(*cmd_args)

Check warning on line 19 in pydra/environments/native.py

View check run for this annotation

Codecov / codecov/patch

pydra/environments/native.py#L19

Added line #L19 was not covered by tests


# Alias so it can be referred to as native.Environment
Expand Down
Loading
Loading