Skip to content

Proposed changest to add a timeout to run_shell_cmd #4665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 83 additions & 8 deletions easybuild/tools/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import subprocess
import tempfile
import time
import threading
from collections import namedtuple
from datetime import datetime

Expand Down Expand Up @@ -356,11 +357,80 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns):
return match_found


def _read_pipe(pipe, output):
"""Helper function to read from a pipe and store output in a list.
:param pipe: pipe to read from
:param output: list to store output in
"""
out = b''
for line in iter(pipe.readline, b''):
_log.debug(f"Captured: {line.decode(errors='ignore').rstrip()}")
out += line
output.append(out)


def read_process_pipe(proc, pipe_name, start=None, timeout=None):
"""Read from a pipe form a process using a separate thread to avoid blocking and implement a timeout.
:param proc: process to read from
:param pipe_name: name of the pipe to read from (stdout or stderr)
:param start: time when the process was started (used to calculate timeout)
:param timeout: timeout in seconds (default: None = no timeout)

:return: data read from pipe

:raises EasyBuildError: when reading from pipe takes longer than specified timeout
"""
pipe = getattr(proc, pipe_name, None)
if pipe is None:
raise EasyBuildError(f"Pipe '{pipe_name}' not found in process '{proc}'. This is probably a bug.")

error_msg = "Unexpected timeout error during read_process_pipe"
current_timeout = None
if start is not None and timeout is not None:
current_timeout = timeout - (time.time() - start)
error_msg = f"Timeout during `{proc.args}` after {timeout} seconds"
if current_timeout <= 0:
_log.warning(error_msg)
try:
terminate_process(proc)
except subprocess.TimeoutExpired as exc:
_log.warning(f"Failed to terminate process '{proc.args}': {exc}")
raise EasyBuildError(error_msg)

output = []
t = threading.Thread(target=_read_pipe, args=(pipe, output))
t.start()
t.join(current_timeout)
if t.is_alive():
raise EasyBuildError(error_msg)
return output[0]


def terminate_process(proc, timeout=20):
"""
Terminate specified process (subprocess.Popen instance).
Attempt to terminate the process using proc.terminate(), and if that fails, use proc.kill().

:param proc: process to terminate
:param timeout: timeout in seconds to wait for process to terminate

:raises subprocess.TimeoutExpired: if process does not terminate within specified timeout
"""
proc.terminate()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
_log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL")

proc.kill()
proc.wait(timeout=timeout)


@run_shell_cmd_cache
def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None,
hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True,
output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True,
qa_patterns=None, qa_wait_patterns=None, qa_timeout=100):
timeout=None, qa_patterns=None, qa_wait_patterns=None, qa_timeout=100):
"""
Run specified (interactive) shell command, and capture output + exit code.

Expand All @@ -378,6 +448,7 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N
:param asynchronous: indicate that command is being run asynchronously
:param task_id: task ID for specified shell command (included in return value)
:param with_hooks: trigger pre/post run_shell_cmd hooks (if defined)
:param timeout: timeout in seconds for command execution
:param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers
:param qa_wait_patterns: list of strings with patterns for non-questions
:param qa_timeout: amount of seconds to wait until more output is produced when there is no matching question
Expand Down Expand Up @@ -524,16 +595,14 @@ def to_cmd_str(cmd):
time_no_match = 0
prev_stdout = ''

# collect output piece-wise, while checking for questions to answer (if qa_patterns is provided)
start = time.time()
while exit_code is None:
# collect output line by line, while checking for questions to answer (if qa_patterns is provided)
for line in iter(proc.stdout.readline, b''):
_log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}")
stdout += line
stdout += read_process_pipe(proc, 'stdout', start=start, timeout=timeout)

# note: we assume that there won't be any questions in stderr output
if split_stderr:
for line in iter(proc.stderr.readline, b''):
stderr += line
stderr += read_process_pipe(proc, 'stderr', start=start, timeout=timeout)

if qa_patterns:
# only check for question patterns if additional output is available
Expand Down Expand Up @@ -565,7 +634,13 @@ def to_cmd_str(cmd):
if split_stderr:
stderr += proc.stderr.read() or b''
else:
(stdout, stderr) = proc.communicate(input=stdin)
try:
(stdout, stderr) = proc.communicate(input=stdin, timeout=timeout)
except subprocess.TimeoutExpired:
error_msg = f"Timeout during `{cmd}` after {timeout} seconds"
_log.warning(error_msg)
terminate_process(proc)
raise EasyBuildError(error_msg)

# return output as a regular string rather than a byte sequence (and non-UTF-8 characters get stripped out)
# getpreferredencoding normally gives 'utf-8' but can be ASCII (ANSI_X3.4-1968)
Expand Down
35 changes: 35 additions & 0 deletions test/framework/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,41 @@ def test_run_shell_cmd_eof_stdin(self):
self.assertEqual(res.exit_code, 0, "Non-streaming output: Command timed out")
self.assertEqual(res.output, inp)

def test_run_shell_cmd_timeout(self):
"""Test use of run_shell_cmd with a timeout."""
cmd = 'sleep 1; echo hello'
# Failure on process timeout
with self.mocked_stdout_stderr():
self.assertErrorRegex(
EasyBuildError, "Timeout during `.*` after .* seconds",
run_shell_cmd, cmd, timeout=.5
)

# Success
with self.mocked_stdout_stderr():
res = run_shell_cmd(cmd, timeout=3)
self.assertEqual(res.exit_code, 0)
self.assertEqual(res.output, "hello\n")

def test_run_shell_cmd_timeout_stream(self):
"""Test use of run_shell_cmd with a timeout."""
data = '0'*128
# Failure on process timeout
cmd = f'for i in {{1..20}}; do echo {data} && sleep 0.1; done'
with self.mocked_stdout_stderr():
self.assertErrorRegex(
EasyBuildError, "Timeout during `.*` after .* seconds",
run_shell_cmd, cmd, timeout=.5, stream_output=True
)

# Success
cmd = 'sleep .5 && echo hello'
with self.mocked_stdout_stderr():
res = run_shell_cmd(cmd, timeout=1.5, stream_output=True)

self.assertEqual(res.exit_code, 0)
self.assertEqual(res.output, "hello\n")

def test_run_cmd_async(self):
"""Test asynchronously running of a shell command via run_cmd + complete_cmd."""

Expand Down