From 8fb2d263f140b8195dd31fa0d8e39253325aadd6 Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Fri, 2 Feb 2024 12:05:22 +0100 Subject: [PATCH] backend: limit stdout/stderr of ssh.run_expensive() commands Fixes: #3118 --- backend/copr-backend.spec | 2 +- backend/copr_backend/sshcmd.py | 30 +++- common/copr_common/subprocess_live_output.py | 157 +++++++++++++++++++ common/python-copr-common.spec | 2 +- common/setup.py | 2 +- 5 files changed, 186 insertions(+), 7 deletions(-) create mode 100644 common/copr_common/subprocess_live_output.py diff --git a/backend/copr-backend.spec b/backend/copr-backend.spec index 856ed706b..ff5401f4a 100644 --- a/backend/copr-backend.spec +++ b/backend/copr-backend.spec @@ -6,7 +6,7 @@ %global tests_version 5 %global tests_tar test-data-copr-backend -%global copr_common_version 0.20.1.dev1 +%global copr_common_version 0.21.1.dev1 Name: copr-backend Version: 1.173 diff --git a/backend/copr_backend/sshcmd.py b/backend/copr_backend/sshcmd.py index ddc13445a..e5671f307 100644 --- a/backend/copr_backend/sshcmd.py +++ b/backend/copr_backend/sshcmd.py @@ -7,6 +7,8 @@ import netaddr +from copr_common.subprocess_live_output import PosixPipedProcess + DEFAULT_SUBPROCESS_TIMEOUT = 180 class SSHConnectionError(Exception): @@ -173,10 +175,30 @@ def run_expensive(self, user_command, max_retries=0, def _run_expensive(self, user_command, subprocess_timeout): real_command = self._ssh_base() + [user_command] - with self._popen_timeouted( - real_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - encoding="utf-8") as proc: - stdout, stderr = proc.communicate(timeout=subprocess_timeout) + + proc = PosixPipedProcess(real_command, + stdout_limit=1024*10, + stderr_limit=1024*10, + timeout=subprocess_timeout) + for chunk, filename in proc.readchunks(): + if filename == 'stdout': + stdout += chunk + else: + stderr += chunk + + try: + stdout = stdout.decode("utf-8") + stderr = stderr.decode("utf-8") + except UnicodeDecodeError as err: + raise SSHConnectionError("Non-UTF8 characters in SSH output.") from err + + if proc.timeouted(): + raise SSHConnectionError("SSH timeouted: " + + _user_readable_command(real_command)) + + if proc.has_cut(): + raise SSHConnectionError("SSH output was too long: " + + _user_readable_command(real_command)) if proc.returncode == 255: # Value 255 means either that 255 was returned by remote command or diff --git a/common/copr_common/subprocess_live_output.py b/common/copr_common/subprocess_live_output.py new file mode 100644 index 000000000..386616672 --- /dev/null +++ b/common/copr_common/subprocess_live_output.py @@ -0,0 +1,157 @@ +""" +Streamed reading of Popen() stdout and stderr: + + proc = PosixPipedProcess(["command"]) + for chunk, output = proc.readchunks() + assert output in ["stdout", "stderr"] + process(chunk) +""" + +from subprocess import Popen, PIPE +from threading import Thread +from queue import Queue +import select +import time + +EOF = 0 +CUT = 1 +LOOP = 2 +TIMEOUT = 3 +ERR = -1 + +def reader(pipe, queue, identifier, maxlen=None): + """ + Read the PIPE in a separate thread, and enqueue the chunks of output into + the QUEUE. The chunks are stored as: + [(pipename, chunk), ..., # normal chunks (of bytes) + (pipename, EOF|CUT|ERR), # termination sequence + None, # iter(sentinel) (stop iteration) + ] + """ + counter = 0 + fd = pipe.fileno() + try: + while True: + readable, _, exc = select.select([fd], [], [fd], 5) + if exc: + queue.put((identifier, ERR)) + break + + if not readable: + queue.put((identifier, LOOP)) + continue + + chunk = pipe.read1(1024) + if not chunk: + queue.put((identifier, EOF)) + break + + if maxlen: + remains = maxlen - counter + counter += len(chunk) + if counter > maxlen: + chunk = chunk[:remains] + queue.put((identifier, chunk)) + queue.put((identifier, CUT)) + break + + # send a "normal" continuous chunk to the reader + queue.put((identifier, chunk)) + + finally: + # Always notify the parent process that we quit! + queue.put(None) + + +class PosixPipedProcess: + """ + Start command using subprocess.Popen() and allow streamed reading of its + stdout and stderr outputs. + """ + def __init__(self, command, timeout=None, stdout_limit=None, stderr_limit=None, **kwargs): + self.command = command + self.kwargs = kwargs + self.stdout_limit = stdout_limit + self.stderr_limit = stderr_limit + self.returncode = None + self.started = None + self._stopreason = None + self.timeout = timeout + + def timeouted(self): + """ Return True if process timeouted """ + if not self.timeout: + return False + return time.time() - self.started > self.timeout + + def has_cut(self): + """ Return true if the stdout_limit or stderr_limit is reached """ + return self.stopreason == CUT + + @property + def stopreason(self): + """ + Get the reason for stop. + """ + return self._stopreason + + + @stopreason.setter + def stopreason(self, reason): + """ + Set the stop reason to EOF, CUT, TIMEOUT or ERR + """ + if self._stopreason != EOF: + return + self._stopreason = reason + + def readchunks(self): + """ + (line, type) + (line, type) + (None: "error string") => Error + (None: None) => EOF + """ + self.returncode = None + self.started = time.time() + self._stopreason = EOF + + que = Queue() + with Popen(self.command, stdout=PIPE, stderr=PIPE, **self.kwargs) as process: + tout = Thread(target=reader, args=[process.stdout, que, "stdout", + self.stdout_limit]) + terr = Thread(target=reader, args=[process.stderr, que, "stderr", + self.stderr_limit]) + tout.start() + terr.start() + killed = False + def _kill(): + if killed: + return + process.kill() + + try: + # we need to find two sentinels for both stdout/stderr + for _ in [1, 2]: + for fd, chunk in iter(que.get, None): + if self.timeouted(): + self.stopreason = TIMEOUT + _kill() + + if not isinstance(chunk, int): + yield (chunk, fd) + continue + + if chunk == LOOP: + continue + + # one of the streams ended + if chunk in [CUT, ERR]: + self.stopreason = chunk + _kill() + break # next stream please... + finally: + terr.join() + tout.join() + process.wait() + self.returncode = process.returncode diff --git a/common/python-copr-common.spec b/common/python-copr-common.spec index 32adbbb43..c035260d3 100644 --- a/common/python-copr-common.spec +++ b/common/python-copr-common.spec @@ -16,7 +16,7 @@ %endif Name: python-copr-common -Version: 0.21.1.dev +Version: 0.21.1.dev1 Release: 1%{?dist} Summary: Python code used by Copr diff --git a/common/setup.py b/common/setup.py index 496053757..169f283d7 100644 --- a/common/setup.py +++ b/common/setup.py @@ -20,7 +20,7 @@ setup( name='copr-common', - version="0.21.1.dev", + version="0.21.1.dev1", description=__description__, long_description=long_description, author=__author__,