From 49ae0ddc56c4a93f5d4c3cd05891dc1b9c78679b Mon Sep 17 00:00:00 2001 From: Borja Lorente Date: Fri, 26 Apr 2019 15:19:20 +0100 Subject: [PATCH] Don't try to read from a closed nailgun socket This synchronises background threads that read from the nailgun socket with the DaemonPantsRunner which closes it --- src/python/pants/bin/daemon_pants_runner.py | 66 +++++++++++-------- src/python/pants/java/nailgun_client.py | 9 ++- src/python/pants/java/nailgun_io.py | 19 ++++-- src/python/pants/java/nailgun_protocol.py | 70 ++++++++++++++------- 4 files changed, 105 insertions(+), 59 deletions(-) diff --git a/src/python/pants/bin/daemon_pants_runner.py b/src/python/pants/bin/daemon_pants_runner.py index 02d55473de55..8cef8663a0e4 100644 --- a/src/python/pants/bin/daemon_pants_runner.py +++ b/src/python/pants/bin/daemon_pants_runner.py @@ -21,7 +21,7 @@ from pants.init.logging import encapsulated_global_logger, setup_logging_from_options from pants.init.util import clean_global_runtime_state from pants.java.nailgun_io import NailgunStreamStdinReader, NailgunStreamWriter -from pants.java.nailgun_protocol import ChunkType, NailgunProtocol +from pants.java.nailgun_protocol import ChunkType, MaybeShutdownSocket, NailgunProtocol from pants.pantsd.process_manager import ProcessManager from pants.util.contextutil import hermetic_environment_as, stdio_as from pants.util.socket import teardown_socket @@ -49,7 +49,7 @@ def write_to_file(msg): class NoopExiter(Exiter): def exit(self, result, *args, **kwargs): if result != 0: - write_to_file("LPR, Exiting with code {}!".format(result)) + # TODO this exception was not designed for this, but it happens to do what we want. raise _GracefulTerminationException(result) @@ -59,12 +59,12 @@ class DaemonExiter(Exiter): TODO: This no longer really follows the Exiter API, per-se (or at least, it doesn't call super). """ - def __init__(self, socket): + def __init__(self, maybe_shutdown_socket): # N.B. Assuming a fork()'d child, cause os._exit to be called here to avoid the routine # sys.exit behavior. # TODO: The behavior we're avoiding with the use of os._exit should be described and tested. super(DaemonExiter, self).__init__(exiter=os._exit) - self._socket = socket + self._maybe_shutdown_socket = maybe_shutdown_socket self._finalizer = None def set_finalizer(self, finalizer): @@ -78,24 +78,28 @@ def exit(self, result=0, msg=None, *args, **kwargs): self._finalizer() except Exception as e: try: - NailgunProtocol.send_stderr( - self._socket, - '\nUnexpected exception in finalizer: {!r}\n'.format(e) - ) + with self._maybe_shutdown_socket.lock: + NailgunProtocol.send_stderr( + self._maybe_shutdown_socket.socket, + '\nUnexpected exception in finalizer: {!r}\n'.format(e) + ) except Exception: pass write_to_file("DPR, Exiting with code {} and msg {}".format(result, msg)) - # Write a final message to stderr if present. - if msg: - NailgunProtocol.send_stderr(self._socket, msg) + with self._maybe_shutdown_socket.lock: + # Write a final message to stderr if present. + if msg: + NailgunProtocol.send_stderr(self._maybe_shutdown_socket.socket, msg) - # Send an Exit chunk with the result. - NailgunProtocol.send_exit_with_code(self._socket, result) + # Send an Exit chunk with the result. + # This will cause the client to disconnect from the socket. + NailgunProtocol.send_exit_with_code(self._maybe_shutdown_socket.socket, result) - # Shutdown the connected socket. - teardown_socket(self._socket) + # Shutdown the connected socket. + teardown_socket(self._maybe_shutdown_socket.socket) + self._maybe_shutdown_socket.is_shutdown = True class _GracefulTerminationException(Exception): @@ -131,11 +135,13 @@ class DaemonPantsRunner(ProcessManager): @classmethod def create(cls, sock, args, env, services, scheduler_service): + maybe_shutdown_socket = MaybeShutdownSocket(sock) + try: # N.B. This will temporarily redirect stdio in the daemon's pre-fork context # to the nailgun session. We'll later do this a second time post-fork, because # threads. - with cls.nailgunned_stdio(sock, env, handle_stdin=False): + with cls.nailgunned_stdio(maybe_shutdown_socket, env, handle_stdin=False): options, _, options_bootstrapper = LocalPantsRunner.parse_options(args, env) subprocess_dir = options.for_global_scope().pants_subprocessdir graph_helper, target_roots, exit_code = scheduler_service.prefork(options, options_bootstrapper) @@ -151,7 +157,7 @@ def create(cls, sock, args, env, services, scheduler_service): subprocess_dir = os.path.join(get_buildroot(), '.pids') return cls( - sock, + maybe_shutdown_socket, args, env, graph_helper, @@ -162,7 +168,7 @@ def create(cls, sock, args, env, services, scheduler_service): deferred_exc ) - def __init__(self, socket, args, env, graph_helper, target_roots, services, + def __init__(self, maybe_shutdown_socket, args, env, graph_helper, target_roots, services, metadata_base_dir, options_bootstrapper, deferred_exc): """ :param socket socket: A connected socket capable of speaking the nailgun protocol. @@ -182,7 +188,7 @@ def __init__(self, socket, args, env, graph_helper, target_roots, services, name=self._make_identity(), metadata_base_dir=metadata_base_dir ) - self._socket = socket + self._maybe_shutdown_socket = maybe_shutdown_socket self._args = args self._env = env self._graph_helper = graph_helper @@ -191,7 +197,7 @@ def __init__(self, socket, args, env, graph_helper, target_roots, services, self._options_bootstrapper = options_bootstrapper self._deferred_exception = deferred_exc - self._exiter = DaemonExiter(socket) + self._exiter = DaemonExiter(maybe_shutdown_socket) def _make_identity(self): """Generate a ProcessManager identity for a given pants run. @@ -224,7 +230,7 @@ def finalizer(): @classmethod @contextmanager - def _pipe_stdio(cls, sock, stdin_isatty, stdout_isatty, stderr_isatty, handle_stdin): + def _pipe_stdio(cls, maybe_shutdown_socket, stdin_isatty, stdout_isatty, stderr_isatty, handle_stdin): """Handles stdio redirection in the case of pipes and/or mixed pipes and ttys.""" stdio_writers = ( (ChunkType.STDOUT, stdout_isatty), @@ -238,26 +244,30 @@ def maybe_handle_stdin(want): # TODO: Launching this thread pre-fork to handle @rule input currently results # in an unhandled SIGILL in `src/python/pants/engine/scheduler.py, line 313 in pre_fork`. # More work to be done here in https://github.com/pantsbuild/pants/issues/6005 - with NailgunStreamStdinReader.open(sock, stdin_isatty) as fd: + with NailgunStreamStdinReader.open(maybe_shutdown_socket, stdin_isatty) as fd: yield fd else: with open('/dev/null', 'rb') as fh: yield fh.fileno() + # The NailgunStreamWriter probably shouldn't grab the socket without a lock... with maybe_handle_stdin(handle_stdin) as stdin_fd,\ - NailgunStreamWriter.open_multi(sock, types, ttys) as ((stdout_fd, stderr_fd), writer),\ - stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd): + NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_fd, stderr_fd), writer),\ + stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd): # N.B. This will be passed to and called by the `DaemonExiter` prior to sending an # exit chunk, to avoid any socket shutdown vs write races. stdout, stderr = sys.stdout, sys.stderr def finalizer(): + write_to_file("_pipe_stdio, enter finalizer") try: stdout.flush() stderr.flush() finally: time.sleep(.001) # HACK: Sleep 1ms in the main thread to free the GIL. writer.stop() + write_to_file("_pipe_stdio, before writer.join()") writer.join() + write_to_file("_pipe_stdio, after writer.join()") stdout.close() stderr.close() yield finalizer @@ -282,6 +292,7 @@ def nailgunned_stdio(cls, sock, env, handle_stdin=True): handle_stdin ) as finalizer: yield finalizer + write_to_file("DPR, nailgunned_stdio after yielding the finalizer") # TODO: there's no testing for this method, and this caused a user-visible failure -- see #7008! def _raise_deferred_exc(self): @@ -311,11 +322,12 @@ def run(self): # Broadcast our process group ID (in PID form - i.e. negated) to the remote client so # they can send signals (e.g. SIGINT) to all processes in the runners process group. - NailgunProtocol.send_pid(self._socket, os.getpid()) - NailgunProtocol.send_pgrp(self._socket, os.getpgrp() * -1) + with self._maybe_shutdown_socket.lock: + NailgunProtocol.send_pid(self._maybe_shutdown_socket.socket, os.getpid()) + NailgunProtocol.send_pgrp(self._maybe_shutdown_socket.socket, os.getpgrp() * -1) # Invoke a Pants run with stdio redirected and a proxied environment. - with self.nailgunned_stdio(self._socket, self._env) as finalizer, \ + with self.nailgunned_stdio(self._maybe_shutdown_socket, self._env) as finalizer, \ hermetic_environment_as(**self._env), \ encapsulated_global_logger(): try: diff --git a/src/python/pants/java/nailgun_client.py b/src/python/pants/java/nailgun_client.py index 38273ffc14b5..b3ac38a6da9c 100644 --- a/src/python/pants/java/nailgun_client.py +++ b/src/python/pants/java/nailgun_client.py @@ -17,7 +17,7 @@ from future.utils import PY3 from pants.java.nailgun_io import NailgunStreamWriter -from pants.java.nailgun_protocol import ChunkType, NailgunProtocol +from pants.java.nailgun_protocol import ChunkType, MaybeShutdownSocket, NailgunProtocol from pants.util.dirutil import safe_file_dump from pants.util.osutil import safe_kill from pants.util.socket import RecvBufferedSocket @@ -115,8 +115,11 @@ def _process_session(self): :raises: :class:`Exception` if the session completes before the timeout, the `reason` argument to .set_exit_timeout() will be raised.""" try: - for chunk_type, payload in self.iter_chunks(self._sock, return_bytes=True, - timeout_object=self): + for chunk_type, payload in self.iter_chunks( + MaybeShutdownSocket(self._sock), + return_bytes=True, + timeout_object=self, + ): # TODO(#6579): assert that we have at this point received all the chunk types in # ChunkType.REQUEST_TYPES, then require PID and PGRP (exactly once?), and then allow any of # ChunkType.EXECUTION_TYPES. diff --git a/src/python/pants/java/nailgun_io.py b/src/python/pants/java/nailgun_io.py index 7b05e6961c56..14a89b90e394 100644 --- a/src/python/pants/java/nailgun_io.py +++ b/src/python/pants/java/nailgun_io.py @@ -84,19 +84,19 @@ class NailgunStreamStdinReader(_StoppableDaemonThread): Runs until the socket is closed. """ - def __init__(self, sock, write_handle): + def __init__(self, maybe_shutdown_socket, write_handle): """ :param socket sock: the socket to read nailgun protocol chunks from. :param file write_handle: A file-like (usually the write end of a pipe/pty) onto which to write data decoded from the chunks. """ super(NailgunStreamStdinReader, self).__init__(name=self.__class__.__name__) - self._socket = sock + self._maybe_shutdown_socket = maybe_shutdown_socket self._write_handle = write_handle @classmethod @contextmanager - def open(cls, sock, isatty=False): + def open(cls, maybe_shutdown_socket, isatty=False): # We use a plain pipe here (as opposed to a self-closing pipe), because # NailgunStreamStdinReader will close the file descriptor it's writing to when it's done. # Therefore, when _self_closing_pipe tries to clean up, it will try to close an already closed fd. @@ -104,10 +104,11 @@ def open(cls, sock, isatty=False): # _self_closing_pipe doens't close the write_fd until the pants run is done, and that generates # issues around piping stdin to interactive processes such as REPLs. with _pipe(isatty) as (read_fd, write_fd): - reader = NailgunStreamStdinReader(sock, os.fdopen(os.dup(write_fd), 'wb')) + reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(write_fd, 'wb')) with reader.running(): # Instruct the thin client to begin reading and sending stdin. - NailgunProtocol.send_start_reading_input(sock) + with maybe_shutdown_socket.lock: + NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket) try: yield read_fd finally: @@ -115,9 +116,12 @@ def open(cls, sock, isatty=False): write_to_file("BL: after reader.running()") write_to_file("BL: after _pipe()") + # TODO: The error is that when the client finishes (and presumably closes the socket), + # the server might still want to read stdin, since this is in another thread. + def run(self): try: - for chunk_type, payload in NailgunProtocol.iter_chunks(self._socket, return_bytes=True): + for chunk_type, payload in NailgunProtocol.iter_chunks(self._maybe_shutdown_socket, return_bytes=True): if self.is_stopped: return @@ -157,6 +161,7 @@ def __init__(self, in_fds, sock, chunk_types, chunk_eof_type, buf_size=None, sel """ super(NailgunStreamWriter, self).__init__(name=self.__class__.__name__) # Validates that we've received file descriptor numbers. + write_to_file("NSW(in_fds = {}, socket = {})".format(in_fds, sock)) self._in_fds = [int(f) for f in in_fds] self._socket = sock self._chunk_eof_type = chunk_eof_type @@ -214,10 +219,12 @@ def run(self): if readable: for fileno in readable: data = os.read(fileno, self._buf_size) + # write_to_file("NSW, read data from stdin (fd={}) {}".format(fileno, data)) if not data: # We've reached EOF. try: + write_to_file("NSW, We have reached EOF, chunk type is {}".format(self._chunk_eof_type)) if self._chunk_eof_type is not None: NailgunProtocol.write_chunk(self._socket, self._chunk_eof_type) finally: diff --git a/src/python/pants/java/nailgun_protocol.py b/src/python/pants/java/nailgun_protocol.py index ba4ab0309605..8228de3bb860 100644 --- a/src/python/pants/java/nailgun_protocol.py +++ b/src/python/pants/java/nailgun_protocol.py @@ -8,6 +8,7 @@ import os import socket import struct +import threading import time from abc import abstractmethod from builtins import bytes, object, str, zip @@ -251,7 +252,7 @@ def maybe_timeout_options(self): """ @classmethod - def iter_chunks(cls, sock, return_bytes=False, timeout_object=None): + def iter_chunks(cls, maybe_shutdown_socket, return_bytes=False, timeout_object=None): """Generates chunks from a connected socket until an Exit chunk is sent or a timeout occurs. :param sock: the socket to read from. @@ -261,32 +262,39 @@ def iter_chunks(cls, sock, return_bytes=False, timeout_object=None): :raises: :class:`cls.ProcessStreamTimeout` """ assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutProvider)) - orig_timeout_time = None - timeout_interval = None + + if timeout_object is None: + deadline = None + else: + options = timeout_object.maybe_timeout_options() + if options is None: + deadline = None + else: + deadline = options.start_time + options.interval + while 1: - if orig_timeout_time is not None: - remaining_time = time.time() - (orig_timeout_time + timeout_interval) - if remaining_time > 0: - original_timestamp = datetime.datetime.fromtimestamp(orig_timeout_time).isoformat() + if deadline is not None: + overtime_seconds = deadline - time.time() + if overtime_seconds > 0: + original_timestamp = datetime.datetime.fromtimestamp(deadline).isoformat() raise cls.ProcessStreamTimeout( - "iterating over bytes from nailgun timed out with timeout interval {} starting at {}, " - "overtime seconds: {}" - .format(timeout_interval, original_timestamp, remaining_time)) - elif timeout_object is not None: - opts = timeout_object.maybe_timeout_options() - if opts: - orig_timeout_time = opts.start_time - timeout_interval = opts.interval - continue - remaining_time = None - else: - remaining_time = None + "iterating over bytes from nailgun timed out at {}, overtime seconds: {}" + .format(original_timestamp, overtime_seconds)) - with cls._set_socket_timeout(sock, timeout=remaining_time): - chunk_type, payload = cls.read_chunk(sock, return_bytes) - yield chunk_type, payload - if chunk_type == ChunkType.EXIT: + with maybe_shutdown_socket.lock: + if maybe_shutdown_socket.is_shutdown: break + # We poll with low timeouts because we poll under a lock. This allows the DaemonPantsRunner + # to shut down the socket, and us to notice, pretty quickly. + with cls._set_socket_timeout(maybe_shutdown_socket.socket, timeout=0.01): + try: + chunk_type, payload = cls.read_chunk(maybe_shutdown_socket.socket, return_bytes) + except socket.timeout: + # Timeouts are handled by the surrounding loop + continue + yield chunk_type, payload + if chunk_type == ChunkType.EXIT: + break @classmethod def send_start_reading_input(cls, sock): @@ -390,3 +398,19 @@ def ttynames_from_env(cls, env): :returns: A tuple of boolean values indicating ttyname paths or None for (stdin, stdout, stderr). """ return tuple(env.get(cls.TTY_PATH_ENV.format(fd_id)) for fd_id in STDIO_DESCRIPTORS) + + +class MaybeShutdownSocket(object): + """A wrapper around a socket which knows whether it has been shut down. + + Because we may shut down a nailgun socket from one thread, and read from it on another, we use + this wrapper so that a shutting-down thread can signal to a reading thread that it should stop + reading. + + lock guards access to is_shutdown, shutting down the socket, and any calls which need to guarantee + they don't race a shutdown call. + """ + def __init__(self, sock): + self.socket = sock + self.lock = threading.Lock() + self.is_shutdown = False