From 04996244df5320e1b11cc3e6577293d915249f8d Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Fri, 2 Feb 2024 12:05:22 +0100 Subject: [PATCH] backend: limit stdout/stderr of ssh.run_expensive() commands Fixes: #3118 --- backend/copr-backend.spec | 2 +- backend/copr_backend/sshcmd.py | 30 +++- common/copr_common/subprocess_live_output.py | 162 +++++++++++++++++++ common/python-copr-common.spec | 2 +- common/setup.py | 2 +- common/tests/test_popen_live_output.py | 89 ++++++++++ 6 files changed, 280 insertions(+), 7 deletions(-) create mode 100644 common/copr_common/subprocess_live_output.py create mode 100644 common/tests/test_popen_live_output.py diff --git a/backend/copr-backend.spec b/backend/copr-backend.spec index 856ed706b..ff5401f4a 100644 --- a/backend/copr-backend.spec +++ b/backend/copr-backend.spec @@ -6,7 +6,7 @@ %global tests_version 5 %global tests_tar test-data-copr-backend -%global copr_common_version 0.20.1.dev1 +%global copr_common_version 0.21.1.dev1 Name: copr-backend Version: 1.173 diff --git a/backend/copr_backend/sshcmd.py b/backend/copr_backend/sshcmd.py index ddc13445a..e5671f307 100644 --- a/backend/copr_backend/sshcmd.py +++ b/backend/copr_backend/sshcmd.py @@ -7,6 +7,8 @@ import netaddr +from copr_common.subprocess_live_output import PosixPipedProcess + DEFAULT_SUBPROCESS_TIMEOUT = 180 class SSHConnectionError(Exception): @@ -173,10 +175,30 @@ def run_expensive(self, user_command, max_retries=0, def _run_expensive(self, user_command, subprocess_timeout): real_command = self._ssh_base() + [user_command] - with self._popen_timeouted( - real_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - encoding="utf-8") as proc: - stdout, stderr = proc.communicate(timeout=subprocess_timeout) + + proc = PosixPipedProcess(real_command, + stdout_limit=1024*10, + stderr_limit=1024*10, + timeout=subprocess_timeout) + for chunk, filename in proc.readchunks(): + if filename == 'stdout': + stdout += chunk + else: + stderr += chunk + + try: + stdout = stdout.decode("utf-8") + stderr = stderr.decode("utf-8") + except UnicodeDecodeError as err: + raise SSHConnectionError("Non-UTF8 characters in SSH output.") from err + + if proc.timeouted(): + raise SSHConnectionError("SSH timeouted: " + + _user_readable_command(real_command)) + + if proc.has_cut(): + raise SSHConnectionError("SSH output was too long: " + + _user_readable_command(real_command)) if proc.returncode == 255: # Value 255 means either that 255 was returned by remote command or diff --git a/common/copr_common/subprocess_live_output.py b/common/copr_common/subprocess_live_output.py new file mode 100644 index 000000000..3ed7568d0 --- /dev/null +++ b/common/copr_common/subprocess_live_output.py @@ -0,0 +1,162 @@ +""" +Streamed reading of Popen() stdout and stderr: + + proc = PosixPipedProcess(["command"]) + for chunk, output = proc.readchunks() + assert output in ["stdout", "stderr"] + process(chunk) +""" + +from subprocess import Popen, PIPE +from threading import Thread +from queue import Queue +import select +import time + +EOF = 0 +CUT = 1 +LOOP = 2 +TIMEOUT = 3 +ERR = -1 + +def reader(pipe, queue, identifier, maxlen=None, poll=5.0): + """ + Read the PIPE in a separate thread, and enqueue the chunks of output into + the QUEUE. The chunks are stored as: + [(pipename, chunk), ..., # normal chunks (of bytes) + (pipename, EOF|CUT|ERR), # termination sequence + None, # iter(sentinel) (stop iteration) + ] + """ + counter = 0 + fd = pipe.fileno() + try: + while True: + readable, _, exc = select.select([fd], [], [fd], poll) + if exc: + queue.put((identifier, ERR)) + break + + if not readable: + queue.put((identifier, LOOP)) + continue + + chunk = pipe.read1(1024) + if not chunk: + queue.put((identifier, EOF)) + break + + if maxlen: + remains = maxlen - counter + counter += len(chunk) + if counter > maxlen: + chunk = chunk[:remains] + queue.put((identifier, chunk)) + queue.put((identifier, CUT)) + break + + # send a "normal" continuous chunk to the reader + queue.put((identifier, chunk)) + + finally: + # Always notify the parent process that we quit! + queue.put(None) + + +class PosixPipedProcess: + """ + Start command using subprocess.Popen() and allow streamed reading of its + stdout and stderr outputs. + """ + def __init__(self, command, timeout=None, stdout_limit=None, + stderr_limit=None, poll=5.0, **kwargs): + self.command = command + self.kwargs = kwargs + self.stdout_limit = stdout_limit + self.stderr_limit = stderr_limit + self.returncode = None + self.started = None + self._stopreason = None + self.killed = None + self.timeout = timeout + self.poll = poll + + def timeouted(self): + """ Return True if process timeouted """ + if not self.timeout: + return False + return time.time() - self.started > self.timeout + + def has_cut(self): + """ Return true if the stdout_limit or stderr_limit is reached """ + return self.stopreason == CUT + + @property + def stopreason(self): + """ + Get the reason for stop. + """ + return self._stopreason + + + @stopreason.setter + def stopreason(self, reason): + """ + Set the stop reason to EOF, CUT, TIMEOUT or ERR + """ + if self._stopreason != EOF: + return + self._stopreason = reason + + def readchunks(self): + """ + (line, type) + (line, type) + (None: "error string") => Error + (None: None) => EOF + """ + self.returncode = None + self.started = time.time() + self.killed = False + self._stopreason = EOF + que = Queue() + with Popen(self.command, stdout=PIPE, stderr=PIPE, **self.kwargs) as process: + tout = Thread(target=reader, args=[process.stdout, que, "stdout", + self.stdout_limit, self.poll]) + terr = Thread(target=reader, args=[process.stderr, que, "stderr", + self.stderr_limit, self.poll]) + tout.start() + terr.start() + def _kill(): + if self.killed: + return + self.killed = True + process.kill() + + try: + # we need to find two sentinels for both stdout/stderr + for _ in [1, 2]: + for fd, chunk in iter(que.get, None): + process.stdout.flush() + process.stderr.flush() + if self.timeouted(): + self.stopreason = TIMEOUT + _kill() + + if not isinstance(chunk, int): + yield (chunk, fd) + continue + + if chunk == LOOP: + continue + + # one of the streams ended + if chunk in [CUT, ERR]: + self.stopreason = chunk + _kill() + break # next stream please... + finally: + process.wait() + terr.join() + tout.join() + self.returncode = process.returncode diff --git a/common/python-copr-common.spec b/common/python-copr-common.spec index 32adbbb43..c035260d3 100644 --- a/common/python-copr-common.spec +++ b/common/python-copr-common.spec @@ -16,7 +16,7 @@ %endif Name: python-copr-common -Version: 0.21.1.dev +Version: 0.21.1.dev1 Release: 1%{?dist} Summary: Python code used by Copr diff --git a/common/setup.py b/common/setup.py index 496053757..169f283d7 100644 --- a/common/setup.py +++ b/common/setup.py @@ -20,7 +20,7 @@ setup( name='copr-common', - version="0.21.1.dev", + version="0.21.1.dev1", description=__description__, long_description=long_description, author=__author__, diff --git a/common/tests/test_popen_live_output.py b/common/tests/test_popen_live_output.py new file mode 100644 index 000000000..a76723911 --- /dev/null +++ b/common/tests/test_popen_live_output.py @@ -0,0 +1,89 @@ +import pytest + +from copr_common.subprocess_live_output import PosixPipedProcess + +@pytest.mark.parametrize('output', ['stdout', 'stderr']) +def test_posix_live_output_one_stream(output): + redirect = "" if output == 'stdout' else '>&2' + opposite = "stderr" if output == 'stdout' else 'stdout' + proc = PosixPipedProcess(["sh", "-c", f"echo {redirect} {output} ; sleep 0.2; echo {redirect} -n {output}"]) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + assert out != opposite + outs[out] += chunk.decode("utf8") + + assert proc.returncode == 0 + assert proc.stopreason == 0 + assert outs[opposite] == "" + assert outs[output] == f"{output}\n{output}" + +def test_posix_live_output_both(): + proc = PosixPipedProcess([ + "sh", "-c", + "echo -n output ; " + "echo -n errout >&2 ; " + "echo output ; " + "echo errout >&2" + ]) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + outs[out] += chunk.decode("utf8") + + assert proc.returncode == 0 + assert proc.stopreason == 0 + assert not proc.has_cut() + assert outs['stdout'] == "outputoutput\n" + assert outs['stderr'] == "errouterrout\n" + + +@pytest.mark.parametrize("dataset", [ + (["/bin/false"], 1, "", ""), + (["/bin/true"], 0, "", ""), +]) +def test_posix_live_output_exit_status(dataset): + cmd, exp_rc, exp_out, exp_err = dataset + proc = PosixPipedProcess(cmd) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + outs[out] += chunk.decode("utf8") + assert proc.returncode == exp_rc + assert proc.stopreason == 0 + assert not proc.has_cut() + assert outs['stdout'] == exp_out + assert outs['stderr'] == exp_err + + +def test_posix_live_output_timeout(): + cmd = ["bash", "-c", "echo ahoj; sleep 100"] + proc = PosixPipedProcess(cmd, timeout=2, poll=0.1) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + outs[out] += chunk.decode("utf8") + assert proc.timeouted() + assert not proc.has_cut() + assert proc.returncode == -9 + assert outs['stdout'] == "ahoj\n" + assert outs['stderr'] == "" + + +def test_posix_live_output_cut(): + cmd = ["bash", "-c", "echo aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"] + proc = PosixPipedProcess(cmd, stdout_limit=10) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + outs[out] += chunk.decode("utf8") + assert proc.returncode == 0 + assert proc.has_cut() + assert outs['stdout'] == "aaaaaaaaaa" + assert outs['stderr'] == "" + +def test_posix_live_output_cut_long(): + cmd = ["bash", "-c", "while :; do echo -n aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa >&2; done"] + proc = PosixPipedProcess(cmd, stderr_limit=10) + outs = {"stdout": "", "stderr": ""} + for chunk, out in proc.readchunks(): + outs[out] += chunk.decode("utf8") + assert outs['stdout'] == "" + assert outs['stderr'] == "aaaaaaaaaa" + assert proc.returncode == -9 + assert proc.has_cut()