Skip to content

Commit

Permalink
backend: limit stdout/stderr of ssh.run_expensive() commands
Browse files Browse the repository at this point in the history
  • Loading branch information
praiskup committed Feb 2, 2024
1 parent dba3811 commit 0499624
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 7 deletions.
2 changes: 1 addition & 1 deletion backend/copr-backend.spec
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
%global tests_version 5
%global tests_tar test-data-copr-backend

%global copr_common_version 0.20.1.dev1
%global copr_common_version 0.21.1.dev1

Name: copr-backend
Version: 1.173
Expand Down
30 changes: 26 additions & 4 deletions backend/copr_backend/sshcmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import netaddr

from copr_common.subprocess_live_output import PosixPipedProcess

DEFAULT_SUBPROCESS_TIMEOUT = 180

class SSHConnectionError(Exception):
Expand Down Expand Up @@ -173,10 +175,30 @@ def run_expensive(self, user_command, max_retries=0,

def _run_expensive(self, user_command, subprocess_timeout):
real_command = self._ssh_base() + [user_command]
with self._popen_timeouted(
real_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8") as proc:
stdout, stderr = proc.communicate(timeout=subprocess_timeout)

proc = PosixPipedProcess(real_command,
stdout_limit=1024*10,
stderr_limit=1024*10,
timeout=subprocess_timeout)
for chunk, filename in proc.readchunks():
if filename == 'stdout':
stdout += chunk
else:
stderr += chunk

try:
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
except UnicodeDecodeError as err:
raise SSHConnectionError("Non-UTF8 characters in SSH output.") from err

if proc.timeouted():
raise SSHConnectionError("SSH timeouted: " +
_user_readable_command(real_command))

if proc.has_cut():
raise SSHConnectionError("SSH output was too long: " +
_user_readable_command(real_command))

if proc.returncode == 255:
# Value 255 means either that 255 was returned by remote command or
Expand Down
162 changes: 162 additions & 0 deletions common/copr_common/subprocess_live_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Streamed reading of Popen() stdout and stderr:
proc = PosixPipedProcess(["command"])
for chunk, output = proc.readchunks()
assert output in ["stdout", "stderr"]
process(chunk)
"""

from subprocess import Popen, PIPE
from threading import Thread
from queue import Queue
import select
import time

EOF = 0
CUT = 1
LOOP = 2
TIMEOUT = 3
ERR = -1

def reader(pipe, queue, identifier, maxlen=None, poll=5.0):
"""
Read the PIPE in a separate thread, and enqueue the chunks of output into
the QUEUE. The chunks are stored as:
[(pipename, chunk), ..., # normal chunks (of bytes)
(pipename, EOF|CUT|ERR), # termination sequence
None, # iter(sentinel) (stop iteration)
]
"""
counter = 0
fd = pipe.fileno()
try:
while True:
readable, _, exc = select.select([fd], [], [fd], poll)
if exc:
queue.put((identifier, ERR))
break

if not readable:
queue.put((identifier, LOOP))
continue

chunk = pipe.read1(1024)
if not chunk:
queue.put((identifier, EOF))
break

if maxlen:
remains = maxlen - counter
counter += len(chunk)
if counter > maxlen:
chunk = chunk[:remains]
queue.put((identifier, chunk))
queue.put((identifier, CUT))
break

# send a "normal" continuous chunk to the reader
queue.put((identifier, chunk))

finally:
# Always notify the parent process that we quit!
queue.put(None)


class PosixPipedProcess:
"""
Start command using subprocess.Popen() and allow streamed reading of its
stdout and stderr outputs.
"""
def __init__(self, command, timeout=None, stdout_limit=None,
stderr_limit=None, poll=5.0, **kwargs):
self.command = command
self.kwargs = kwargs
self.stdout_limit = stdout_limit
self.stderr_limit = stderr_limit
self.returncode = None
self.started = None
self._stopreason = None
self.killed = None
self.timeout = timeout
self.poll = poll

def timeouted(self):
""" Return True if process timeouted """
if not self.timeout:
return False
return time.time() - self.started > self.timeout

def has_cut(self):
""" Return true if the stdout_limit or stderr_limit is reached """
return self.stopreason == CUT

@property
def stopreason(self):
"""
Get the reason for stop.
"""
return self._stopreason


@stopreason.setter
def stopreason(self, reason):
"""
Set the stop reason to EOF, CUT, TIMEOUT or ERR
"""
if self._stopreason != EOF:
return
self._stopreason = reason

def readchunks(self):
"""
(line, type)
(line, type)
(None: "error string") => Error
(None: None) => EOF
"""
self.returncode = None
self.started = time.time()
self.killed = False
self._stopreason = EOF
que = Queue()
with Popen(self.command, stdout=PIPE, stderr=PIPE, **self.kwargs) as process:
tout = Thread(target=reader, args=[process.stdout, que, "stdout",
self.stdout_limit, self.poll])
terr = Thread(target=reader, args=[process.stderr, que, "stderr",
self.stderr_limit, self.poll])
tout.start()
terr.start()
def _kill():
if self.killed:
return
self.killed = True
process.kill()

try:
# we need to find two sentinels for both stdout/stderr
for _ in [1, 2]:
for fd, chunk in iter(que.get, None):
process.stdout.flush()
process.stderr.flush()
if self.timeouted():
self.stopreason = TIMEOUT
_kill()

if not isinstance(chunk, int):
yield (chunk, fd)
continue

if chunk == LOOP:
continue

# one of the streams ended
if chunk in [CUT, ERR]:
self.stopreason = chunk
_kill()
break # next stream please...
finally:
process.wait()
terr.join()
tout.join()
self.returncode = process.returncode
2 changes: 1 addition & 1 deletion common/python-copr-common.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
%endif

Name: python-copr-common
Version: 0.21.1.dev
Version: 0.21.1.dev1
Release: 1%{?dist}
Summary: Python code used by Copr

Expand Down
2 changes: 1 addition & 1 deletion common/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

setup(
name='copr-common',
version="0.21.1.dev",
version="0.21.1.dev1",
description=__description__,
long_description=long_description,
author=__author__,
Expand Down
89 changes: 89 additions & 0 deletions common/tests/test_popen_live_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import pytest

from copr_common.subprocess_live_output import PosixPipedProcess

@pytest.mark.parametrize('output', ['stdout', 'stderr'])
def test_posix_live_output_one_stream(output):
redirect = "" if output == 'stdout' else '>&2'
opposite = "stderr" if output == 'stdout' else 'stdout'
proc = PosixPipedProcess(["sh", "-c", f"echo {redirect} {output} ; sleep 0.2; echo {redirect} -n {output}"])
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
assert out != opposite
outs[out] += chunk.decode("utf8")

assert proc.returncode == 0
assert proc.stopreason == 0
assert outs[opposite] == ""
assert outs[output] == f"{output}\n{output}"

def test_posix_live_output_both():
proc = PosixPipedProcess([
"sh", "-c",
"echo -n output ; "
"echo -n errout >&2 ; "
"echo output ; "
"echo errout >&2"
])
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
outs[out] += chunk.decode("utf8")

assert proc.returncode == 0
assert proc.stopreason == 0
assert not proc.has_cut()
assert outs['stdout'] == "outputoutput\n"
assert outs['stderr'] == "errouterrout\n"


@pytest.mark.parametrize("dataset", [
(["/bin/false"], 1, "", ""),
(["/bin/true"], 0, "", ""),
])
def test_posix_live_output_exit_status(dataset):
cmd, exp_rc, exp_out, exp_err = dataset
proc = PosixPipedProcess(cmd)
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
outs[out] += chunk.decode("utf8")
assert proc.returncode == exp_rc
assert proc.stopreason == 0
assert not proc.has_cut()
assert outs['stdout'] == exp_out
assert outs['stderr'] == exp_err


def test_posix_live_output_timeout():
cmd = ["bash", "-c", "echo ahoj; sleep 100"]
proc = PosixPipedProcess(cmd, timeout=2, poll=0.1)
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
outs[out] += chunk.decode("utf8")
assert proc.timeouted()
assert not proc.has_cut()
assert proc.returncode == -9
assert outs['stdout'] == "ahoj\n"
assert outs['stderr'] == ""


def test_posix_live_output_cut():
cmd = ["bash", "-c", "echo aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"]
proc = PosixPipedProcess(cmd, stdout_limit=10)
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
outs[out] += chunk.decode("utf8")
assert proc.returncode == 0
assert proc.has_cut()
assert outs['stdout'] == "aaaaaaaaaa"
assert outs['stderr'] == ""

def test_posix_live_output_cut_long():
cmd = ["bash", "-c", "while :; do echo -n aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa >&2; done"]
proc = PosixPipedProcess(cmd, stderr_limit=10)
outs = {"stdout": "", "stderr": ""}
for chunk, out in proc.readchunks():
outs[out] += chunk.decode("utf8")
assert outs['stdout'] == ""
assert outs['stderr'] == "aaaaaaaaaa"
assert proc.returncode == -9
assert proc.has_cut()

0 comments on commit 0499624

Please sign in to comment.