forked from fedora-copr/copr
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
backend: limit stdout/stderr of ssh.run_expensive() commands
Fixes: fedora-copr#3118
- Loading branch information
Showing
6 changed files
with
294 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
""" | ||
Streamed reading of Popen() stdout and stderr: | ||
proc = PosixPipedProcess(["command"]) | ||
for chunk, output = proc.readchunks() | ||
assert output in ["stdout", "stderr"] | ||
process(chunk) | ||
""" | ||
|
||
from subprocess import Popen, PIPE | ||
from threading import Thread | ||
from queue import Queue | ||
import select | ||
import time | ||
|
||
EOF = 0 | ||
CUT = 1 | ||
LOOP = 2 | ||
TIMEOUT = 3 | ||
ERR = -1 | ||
|
||
def _pipe_consumer_thread(pipe, queue, identifier, maxlen=None, poll=5.0): | ||
""" | ||
Read the PIPE in a separate thread, and enqueue the chunks of output into | ||
the QUEUE. The chunks are stored as: | ||
[(pipename, chunk), ..., # normal chunks (of bytes) | ||
(pipename, EOF|CUT|ERR), # pre-termination sequence | ||
None, # iter(sentinel) (stop iteration) | ||
] | ||
""" | ||
counter = 0 | ||
fd = pipe.fileno() | ||
try: | ||
while True: | ||
readable, _, exc = select.select([fd], [], [fd], poll) | ||
if exc: | ||
queue.put((identifier, ERR)) | ||
break | ||
|
||
if not readable: | ||
queue.put((identifier, LOOP)) | ||
continue | ||
|
||
chunk = pipe.read1(1024) | ||
if not chunk: | ||
queue.put((identifier, EOF)) | ||
break | ||
|
||
if maxlen: | ||
remains = maxlen - counter | ||
counter += len(chunk) | ||
if counter > maxlen: | ||
chunk = chunk[:remains] | ||
queue.put((identifier, chunk)) | ||
queue.put((identifier, CUT)) | ||
break | ||
|
||
# send a "normal" continuous chunk to the reader | ||
queue.put((identifier, chunk)) | ||
|
||
finally: | ||
# Always notify the parent process that we quit! | ||
queue.put(None) | ||
|
||
|
||
class PosixPipedProcess: | ||
""" | ||
Start command using subprocess.Popen() and allow streamed reading of its | ||
stdout and stderr outputs. | ||
""" | ||
def __init__(self, command, timeout=None, stdout_limit=None, | ||
stderr_limit=None, poll=5.0, **kwargs): | ||
self.command = command | ||
self.kwargs = kwargs | ||
self.stdout_limit = stdout_limit | ||
self.stderr_limit = stderr_limit | ||
self.returncode = None | ||
self.started = None | ||
self._stopreason = None | ||
self.killed = None | ||
self.timeout = timeout | ||
self.poll = poll | ||
|
||
def timeouted(self): | ||
""" Return True if process timeouted """ | ||
if not self.timeout: | ||
return False | ||
return time.time() - self.started > self.timeout | ||
|
||
def has_cut(self): | ||
""" Return true if the stdout_limit or stderr_limit is reached """ | ||
return self.stopreason == CUT | ||
|
||
@property | ||
def stopreason(self): | ||
""" | ||
Get the reason for stop. | ||
""" | ||
return self._stopreason | ||
|
||
|
||
@stopreason.setter | ||
def stopreason(self, reason): | ||
""" | ||
Set the stop reason to EOF, CUT, TIMEOUT or ERR | ||
""" | ||
if self._stopreason != EOF: | ||
return | ||
self._stopreason = reason | ||
|
||
def readchunks(self): | ||
""" | ||
(line, type) | ||
(line, type) | ||
(None: "error string") => Error | ||
(None: None) => EOF | ||
""" | ||
self.returncode = None | ||
self.started = time.time() | ||
self.killed = False | ||
self._stopreason = EOF | ||
que = Queue() | ||
|
||
# EL7: no __enter__ support in EL7 | ||
# pylint: disable=consider-using-with | ||
process = Popen(self.command, stdout=PIPE, stderr=PIPE, **self.kwargs) | ||
tout = Thread(target=_pipe_consumer_thread, | ||
args=[process.stdout, que, "stdout", | ||
self.stdout_limit, self.poll]) | ||
terr = Thread(target=_pipe_consumer_thread, | ||
args=[process.stderr, que, "stderr", | ||
self.stderr_limit, self.poll]) | ||
tout.start() | ||
terr.start() | ||
def _kill(): | ||
if self.killed: | ||
return | ||
self.killed = True | ||
process.kill() | ||
|
||
try: | ||
# we need to find two sentinels for both stdout/stderr | ||
for _ in [1, 2]: | ||
for fd, chunk in iter(que.get, None): | ||
if self.timeouted(): | ||
self.stopreason = TIMEOUT | ||
_kill() | ||
|
||
if not isinstance(chunk, int): | ||
assert chunk | ||
yield (chunk, fd) | ||
continue | ||
|
||
if chunk == LOOP: | ||
continue | ||
|
||
# one of the streams ended | ||
if chunk in [CUT, ERR]: | ||
self.stopreason = chunk | ||
_kill() | ||
continue | ||
|
||
assert chunk == EOF | ||
finally: | ||
# Subprocesses and threads need to finish. | ||
assert process.poll() is not None | ||
terr.join() | ||
tout.join() | ||
self.returncode = process.returncode |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import sys | ||
import pytest | ||
|
||
if sys.version_info[0] >= 3: | ||
|
||
from copr_common.subprocess_live_output import PosixPipedProcess | ||
|
||
@pytest.mark.parametrize('output', ['stdout', 'stderr']) | ||
def test_posix_live_output_one_stream(output): | ||
redirect = "" if output == 'stdout' else '>&2' | ||
opposite = "stderr" if output == 'stdout' else 'stdout' | ||
proc = PosixPipedProcess([ | ||
"sh", "-c", | ||
"echo {redirect} {output} ; sleep 0.2; " | ||
"echo {redirect} -n {output}".format(output=output, redirect=redirect) | ||
]) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
assert out != opposite | ||
outs[out] += chunk.decode("utf8") | ||
|
||
assert proc.returncode == 0 | ||
assert proc.stopreason == 0 | ||
assert outs[opposite] == "" | ||
assert outs[output] == "{output}\n{output}".format(output=output) | ||
|
||
def test_posix_live_output_both(): | ||
proc = PosixPipedProcess([ | ||
"sh", "-c", | ||
"echo -n output ; " | ||
"echo -n errout >&2 ; " | ||
"echo output ; " | ||
"echo errout >&2" | ||
]) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
outs[out] += chunk.decode("utf8") | ||
|
||
assert proc.returncode == 0 | ||
assert proc.stopreason == 0 | ||
assert not proc.has_cut() | ||
assert outs['stdout'] == "outputoutput\n" | ||
assert outs['stderr'] == "errouterrout\n" | ||
|
||
|
||
@pytest.mark.parametrize("dataset", [ | ||
(["/bin/false"], 1, "", ""), | ||
(["/bin/true"], 0, "", ""), | ||
]) | ||
def test_posix_live_output_exit_status(dataset): | ||
cmd, exp_rc, exp_out, exp_err = dataset | ||
proc = PosixPipedProcess(cmd) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
outs[out] += chunk.decode("utf8") | ||
assert proc.returncode == exp_rc | ||
assert proc.stopreason == 0 | ||
assert not proc.has_cut() | ||
assert outs['stdout'] == exp_out | ||
assert outs['stderr'] == exp_err | ||
|
||
|
||
def test_posix_live_output_timeout(): | ||
cmd = ["bash", "-c", "echo ahoj; sleep 100"] | ||
proc = PosixPipedProcess(cmd, timeout=2, poll=0.1) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
outs[out] += chunk.decode("utf8") | ||
assert proc.timeouted() | ||
assert not proc.has_cut() | ||
assert proc.returncode == -9 | ||
assert outs['stdout'] == "ahoj\n" | ||
assert outs['stderr'] == "" | ||
|
||
|
||
def test_posix_live_output_cut(): | ||
cmd = ["bash", "-c", "echo aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"] | ||
proc = PosixPipedProcess(cmd, stdout_limit=10) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
outs[out] += chunk.decode("utf8") | ||
assert proc.returncode == 0 | ||
assert proc.has_cut() | ||
assert outs['stdout'] == "aaaaaaaaaa" | ||
assert outs['stderr'] == "" | ||
|
||
def test_posix_live_output_cut_long(): | ||
cmd = ["bash", "-c", "while :; do echo -n aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa >&2; done"] | ||
proc = PosixPipedProcess(cmd, stderr_limit=10) | ||
outs = {"stdout": "", "stderr": ""} | ||
for chunk, out in proc.readchunks(): | ||
outs[out] += chunk.decode("utf8") | ||
assert outs['stdout'] == "" | ||
assert outs['stderr'] == "aaaaaaaaaa" | ||
assert proc.returncode == -9 | ||
assert proc.has_cut() |