From 94f291a4fa835f8ad2ad730cb583bb4d7dff1430 Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 1 Oct 2024 15:18:31 +0200 Subject: [PATCH 1/5] Added timeout to run_shell_cmd --- easybuild/tools/run.py | 80 +++++++++++++++++++++++++++++++++++++++--- test/framework/run.py | 43 +++++++++++++++++++++++ 2 files changed, 118 insertions(+), 5 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 21eac92a70..1d49ee8b68 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -49,6 +49,7 @@ import sys import tempfile import time +import threading from collections import namedtuple from datetime import datetime @@ -259,12 +260,59 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns): return match_found +def _read_pipe(pipe, size, output): + """Helper function to read from a pipe and store output in a list. + :param pipe: pipe to read from + :param size: number of bytes to read + :param output: list to store output in + """ + data = pipe.read(size) + output.append(data) + +def read_pipe(pipe, size, timeout=None): + """Read from a pipe using a separate thread to avoid blocking and implement a timeout. + :param pipe: pipe to read from + :param size: number of bytes to read + :param timeout: timeout in seconds (default: None = no timeout) + + :return: data read from pipe + + :raises TimeoutError: when reading from pipe takes longer than specified timeout + """ + + output = [] + t = threading.Thread(target=_read_pipe, args=(pipe, size, output)) + t.start() + t.join(timeout) + if t.is_alive(): + raise TimeoutError() + return output[0] + +def terminate_process(proc, timeout=20): + """ + Terminate specified process (subprocess.Popen instance). + Attempt to terminate the process using proc.terminate(), and if that fails, use proc.kill(). + + :param proc: process to terminate + :param timeout: timeout in seconds to wait for process to terminate + """ + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") + proc.kill() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") + @run_shell_cmd_cache def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None, hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True, output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True, - qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): + timeout=None, qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): """ Run specified (interactive) shell command, and capture output + exit code. @@ -282,6 +330,7 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N :param asynchronous: indicate that command is being run asynchronously :param task_id: task ID for specified shell command (included in return value) :param with_hooks: trigger pre/post run_shell_cmd hooks (if defined) + :param timeout: timeout in seconds for command execution :param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers :param qa_wait_patterns: list of strings with patterns for non-questions :param qa_timeout: amount of seconds to wait until more output is produced when there is no matching question @@ -421,8 +470,13 @@ def to_cmd_str(cmd): time_no_match = 0 # collect output piece-wise, while checking for questions to answer (if qa_patterns is provided) + start = time.time() while exit_code is None: - + if timeout and time.time() - start > timeout: + error_msg = f"Timeout during `{cmd}` after {timeout} seconds!" + _log.warning(error_msg) + terminate_process(proc) + raise EasyBuildError(error_msg) # use small read size (128 bytes) when streaming output, to make it stream more fluently # -1 means reading until EOF read_size = 128 if exit_code is None else -1 @@ -432,7 +486,12 @@ def to_cmd_str(cmd): # since that will always wait until EOF more_stdout = True while more_stdout: - more_stdout = proc.stdout.read(read_size) or b'' + # more_stdout = proc.stdout.read(read_size) or b'' + try: + t = timeout - (time.time() - start) if timeout else None + more_stdout = read_pipe(proc.stdout, read_size, timeout=t) or b'' + except TimeoutError: + break _log.debug(f"Obtained more stdout: {more_stdout}") stdout += more_stdout @@ -440,7 +499,12 @@ def to_cmd_str(cmd): if split_stderr: more_stderr = True while more_stderr: - more_stderr = proc.stderr.read(read_size) or b'' + # more_stderr = proc.stderr.read(read_size) or b'' + try: + t = timeout - (time.time() - start) if timeout else None + more_stderr = read_pipe(proc.stderr, read_size, timeout=t) or b'' + except TimeoutError: + break stderr += more_stderr if qa_patterns: @@ -465,7 +529,13 @@ def to_cmd_str(cmd): if split_stderr: stderr += proc.stderr.read() else: - (stdout, stderr) = proc.communicate(input=stdin) + try: + (stdout, stderr) = proc.communicate(input=stdin, timeout=timeout) + except subprocess.TimeoutExpired as err: + error_msg = f"Timeout during `{cmd}` after {timeout} seconds" + _log.warning(error_msg) + terminate_process(proc) + raise EasyBuildError(error_msg) # return output as a regular string rather than a byte sequence (and non-UTF-8 characters get stripped out) # getpreferredencoding normally gives 'utf-8' but can be ASCII (ANSI_X3.4-1968) diff --git a/test/framework/run.py b/test/framework/run.py index f1ee6955ec..b3fbf9d9de 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1367,6 +1367,49 @@ def test_run_shell_cmd_stream(self): for line in expected: self.assertIn(line, stdout) + def test_run_shell_cmd_timeout(self): + """Test use of run_shell_cmd with a timeout.""" + cmd = 'sleep 1; echo hello' + # Failure on process timeout + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5 + ) + + # Success + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=3) + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + + def test_run_shell_cmd_timeout_stream(self): + """Test use of run_shell_cmd with a timeout.""" + data = '0'*128 + # Failure on process timeout + cmd = f'while true; do echo {data} && sleep 0.1; done' + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5, stream_output=True + ) + + # Failure on stdout read timeout + cmd = 'cat -' + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5, stream_output=True + ) + + # Success + cmd = 'sleep .5 && echo hello' + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=1.5, stream_output=True) + + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + def test_run_cmd_async(self): """Test asynchronously running of a shell command via run_cmd + complete_cmd.""" From b710254e49d6af111d18e791cf8bb7d799d1893a Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 1 Oct 2024 15:32:20 +0200 Subject: [PATCH 2/5] Removed unused --- easybuild/tools/run.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 1d49ee8b68..13992af14c 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -486,7 +486,6 @@ def to_cmd_str(cmd): # since that will always wait until EOF more_stdout = True while more_stdout: - # more_stdout = proc.stdout.read(read_size) or b'' try: t = timeout - (time.time() - start) if timeout else None more_stdout = read_pipe(proc.stdout, read_size, timeout=t) or b'' @@ -499,7 +498,6 @@ def to_cmd_str(cmd): if split_stderr: more_stderr = True while more_stderr: - # more_stderr = proc.stderr.read(read_size) or b'' try: t = timeout - (time.time() - start) if timeout else None more_stderr = read_pipe(proc.stderr, read_size, timeout=t) or b'' From f97e290e78c97173d48f811a8dfee21731c72c00 Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 1 Oct 2024 15:35:36 +0200 Subject: [PATCH 3/5] linting --- easybuild/tools/run.py | 5 ++++- test/framework/run.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 13992af14c..ba1848a868 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -260,6 +260,7 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns): return match_found + def _read_pipe(pipe, size, output): """Helper function to read from a pipe and store output in a list. :param pipe: pipe to read from @@ -269,6 +270,7 @@ def _read_pipe(pipe, size, output): data = pipe.read(size) output.append(data) + def read_pipe(pipe, size, timeout=None): """Read from a pipe using a separate thread to avoid blocking and implement a timeout. :param pipe: pipe to read from @@ -288,6 +290,7 @@ def read_pipe(pipe, size, timeout=None): raise TimeoutError() return output[0] + def terminate_process(proc, timeout=20): """ Terminate specified process (subprocess.Popen instance). @@ -529,7 +532,7 @@ def to_cmd_str(cmd): else: try: (stdout, stderr) = proc.communicate(input=stdin, timeout=timeout) - except subprocess.TimeoutExpired as err: + except subprocess.TimeoutExpired: error_msg = f"Timeout during `{cmd}` after {timeout} seconds" _log.warning(error_msg) terminate_process(proc) diff --git a/test/framework/run.py b/test/framework/run.py index b3fbf9d9de..023f7cd103 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1401,7 +1401,7 @@ def test_run_shell_cmd_timeout_stream(self): EasyBuildError, "Timeout during `.*` after .* seconds", run_shell_cmd, cmd, timeout=.5, stream_output=True ) - + # Success cmd = 'sleep .5 && echo hello' with self.mocked_stdout_stderr(): From afb576fcffdf4f1c77f307bb9a1ed5c353dcc602 Mon Sep 17 00:00:00 2001 From: crivella Date: Wed, 2 Oct 2024 13:15:27 +0200 Subject: [PATCH 4/5] Should not run `kill` if `terminate` worked --- easybuild/tools/run.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index ba1848a868..e185d30a3e 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -304,11 +304,12 @@ def terminate_process(proc, timeout=20): proc.wait(timeout=timeout) except subprocess.TimeoutExpired: _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") - proc.kill() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") + + proc.kill() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") @run_shell_cmd_cache From a6a64304ac137b24849513754dcc63e6c396b676 Mon Sep 17 00:00:00 2001 From: crivella Date: Wed, 2 Oct 2024 13:28:38 +0200 Subject: [PATCH 5/5] Avoid blocking test if running without timeout implementation --- test/framework/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/run.py b/test/framework/run.py index 023f7cd103..25bc822585 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1387,7 +1387,7 @@ def test_run_shell_cmd_timeout_stream(self): """Test use of run_shell_cmd with a timeout.""" data = '0'*128 # Failure on process timeout - cmd = f'while true; do echo {data} && sleep 0.1; done' + cmd = f'for i in {{1..20}}; do echo {data} && sleep 0.1; done' with self.mocked_stdout_stderr(): self.assertErrorRegex( EasyBuildError, "Timeout during `.*` after .* seconds", @@ -1395,7 +1395,7 @@ def test_run_shell_cmd_timeout_stream(self): ) # Failure on stdout read timeout - cmd = 'cat -' + cmd = 'timeout 1 cat -' with self.mocked_stdout_stderr(): self.assertErrorRegex( EasyBuildError, "Timeout during `.*` after .* seconds",