Skip to content

Commit

Permalink
backend: limit stdout/stderr of ssh.run_expensive() commands
Browse files Browse the repository at this point in the history
  • Loading branch information
praiskup committed Feb 2, 2024
1 parent dba3811 commit 8fb2d26
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 7 deletions.
2 changes: 1 addition & 1 deletion backend/copr-backend.spec
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
%global tests_version 5
%global tests_tar test-data-copr-backend

%global copr_common_version 0.20.1.dev1
%global copr_common_version 0.21.1.dev1

Name: copr-backend
Version: 1.173
Expand Down
30 changes: 26 additions & 4 deletions backend/copr_backend/sshcmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import netaddr

from copr_common.subprocess_live_output import PosixPipedProcess

DEFAULT_SUBPROCESS_TIMEOUT = 180

class SSHConnectionError(Exception):
Expand Down Expand Up @@ -173,10 +175,30 @@ def run_expensive(self, user_command, max_retries=0,

def _run_expensive(self, user_command, subprocess_timeout):
real_command = self._ssh_base() + [user_command]
with self._popen_timeouted(
real_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8") as proc:
stdout, stderr = proc.communicate(timeout=subprocess_timeout)

proc = PosixPipedProcess(real_command,
stdout_limit=1024*10,
stderr_limit=1024*10,
timeout=subprocess_timeout)
for chunk, filename in proc.readchunks():
if filename == 'stdout':
stdout += chunk
else:
stderr += chunk

try:
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
except UnicodeDecodeError as err:
raise SSHConnectionError("Non-UTF8 characters in SSH output.") from err

if proc.timeouted():
raise SSHConnectionError("SSH timeouted: " +
_user_readable_command(real_command))

if proc.has_cut():
raise SSHConnectionError("SSH output was too long: " +
_user_readable_command(real_command))

if proc.returncode == 255:
# Value 255 means either that 255 was returned by remote command or
Expand Down
157 changes: 157 additions & 0 deletions common/copr_common/subprocess_live_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Streamed reading of Popen() stdout and stderr:
proc = PosixPipedProcess(["command"])
for chunk, output = proc.readchunks()
assert output in ["stdout", "stderr"]
process(chunk)
"""

from subprocess import Popen, PIPE
from threading import Thread
from queue import Queue
import select
import time

EOF = 0
CUT = 1
LOOP = 2
TIMEOUT = 3
ERR = -1

def reader(pipe, queue, identifier, maxlen=None):
"""
Read the PIPE in a separate thread, and enqueue the chunks of output into
the QUEUE. The chunks are stored as:
[(pipename, chunk), ..., # normal chunks (of bytes)
(pipename, EOF|CUT|ERR), # termination sequence
None, # iter(sentinel) (stop iteration)
]
"""
counter = 0
fd = pipe.fileno()
try:
while True:
readable, _, exc = select.select([fd], [], [fd], 5)
if exc:
queue.put((identifier, ERR))
break

if not readable:
queue.put((identifier, LOOP))
continue

chunk = pipe.read1(1024)
if not chunk:
queue.put((identifier, EOF))
break

if maxlen:
remains = maxlen - counter
counter += len(chunk)
if counter > maxlen:
chunk = chunk[:remains]
queue.put((identifier, chunk))
queue.put((identifier, CUT))
break

# send a "normal" continuous chunk to the reader
queue.put((identifier, chunk))

finally:
# Always notify the parent process that we quit!
queue.put(None)


class PosixPipedProcess:
"""
Start command using subprocess.Popen() and allow streamed reading of its
stdout and stderr outputs.
"""
def __init__(self, command, timeout=None, stdout_limit=None, stderr_limit=None, **kwargs):
self.command = command
self.kwargs = kwargs
self.stdout_limit = stdout_limit
self.stderr_limit = stderr_limit
self.returncode = None
self.started = None
self._stopreason = None
self.timeout = timeout

def timeouted(self):
""" Return True if process timeouted """
if not self.timeout:
return False
return time.time() - self.started > self.timeout

def has_cut(self):
""" Return true if the stdout_limit or stderr_limit is reached """
return self.stopreason == CUT

@property
def stopreason(self):
"""
Get the reason for stop.
"""
return self._stopreason


@stopreason.setter
def stopreason(self, reason):
"""
Set the stop reason to EOF, CUT, TIMEOUT or ERR
"""
if self._stopreason != EOF:
return
self._stopreason = reason

def readchunks(self):
"""
(line, type)
(line, type)
(None: "error string") => Error
(None: None) => EOF
"""
self.returncode = None
self.started = time.time()
self._stopreason = EOF

que = Queue()
with Popen(self.command, stdout=PIPE, stderr=PIPE, **self.kwargs) as process:
tout = Thread(target=reader, args=[process.stdout, que, "stdout",
self.stdout_limit])
terr = Thread(target=reader, args=[process.stderr, que, "stderr",
self.stderr_limit])
tout.start()
terr.start()
killed = False
def _kill():
if killed:
return
process.kill()

try:
# we need to find two sentinels for both stdout/stderr
for _ in [1, 2]:
for fd, chunk in iter(que.get, None):
if self.timeouted():
self.stopreason = TIMEOUT
_kill()

if not isinstance(chunk, int):
yield (chunk, fd)
continue

if chunk == LOOP:
continue

# one of the streams ended
if chunk in [CUT, ERR]:
self.stopreason = chunk
_kill()
break # next stream please...
finally:
terr.join()
tout.join()
process.wait()
self.returncode = process.returncode
2 changes: 1 addition & 1 deletion common/python-copr-common.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
%endif

Name: python-copr-common
Version: 0.21.1.dev
Version: 0.21.1.dev1
Release: 1%{?dist}
Summary: Python code used by Copr

Expand Down
2 changes: 1 addition & 1 deletion common/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

setup(
name='copr-common',
version="0.21.1.dev",
version="0.21.1.dev1",
description=__description__,
long_description=long_description,
author=__author__,
Expand Down

0 comments on commit 8fb2d26

Please sign in to comment.