Skip to content

Commit

Permalink
Don't try to read from a closed nailgun socket
Browse files Browse the repository at this point in the history
This synchronises background threads that read from the nailgun socket
with the DaemonPantsRunner which closes it
  • Loading branch information
Borja Lorente authored and ity committed Apr 30, 2019
1 parent a7f15b7 commit 49ae0dd
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 59 deletions.
66 changes: 39 additions & 27 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pants.init.logging import encapsulated_global_logger, setup_logging_from_options
from pants.init.util import clean_global_runtime_state
from pants.java.nailgun_io import NailgunStreamStdinReader, NailgunStreamWriter
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol
from pants.java.nailgun_protocol import ChunkType, MaybeShutdownSocket, NailgunProtocol
from pants.pantsd.process_manager import ProcessManager
from pants.util.contextutil import hermetic_environment_as, stdio_as
from pants.util.socket import teardown_socket
Expand Down Expand Up @@ -49,7 +49,7 @@ def write_to_file(msg):
class NoopExiter(Exiter):
def exit(self, result, *args, **kwargs):
if result != 0:
write_to_file("LPR, Exiting with code {}!".format(result))
# TODO this exception was not designed for this, but it happens to do what we want.
raise _GracefulTerminationException(result)


Expand All @@ -59,12 +59,12 @@ class DaemonExiter(Exiter):
TODO: This no longer really follows the Exiter API, per-se (or at least, it doesn't call super).
"""

def __init__(self, socket):
def __init__(self, maybe_shutdown_socket):
# N.B. Assuming a fork()'d child, cause os._exit to be called here to avoid the routine
# sys.exit behavior.
# TODO: The behavior we're avoiding with the use of os._exit should be described and tested.
super(DaemonExiter, self).__init__(exiter=os._exit)
self._socket = socket
self._maybe_shutdown_socket = maybe_shutdown_socket
self._finalizer = None

def set_finalizer(self, finalizer):
Expand All @@ -78,24 +78,28 @@ def exit(self, result=0, msg=None, *args, **kwargs):
self._finalizer()
except Exception as e:
try:
NailgunProtocol.send_stderr(
self._socket,
'\nUnexpected exception in finalizer: {!r}\n'.format(e)
)
with self._maybe_shutdown_socket.lock:
NailgunProtocol.send_stderr(
self._maybe_shutdown_socket.socket,
'\nUnexpected exception in finalizer: {!r}\n'.format(e)
)
except Exception:
pass

write_to_file("DPR, Exiting with code {} and msg {}".format(result, msg))

# Write a final message to stderr if present.
if msg:
NailgunProtocol.send_stderr(self._socket, msg)
with self._maybe_shutdown_socket.lock:
# Write a final message to stderr if present.
if msg:
NailgunProtocol.send_stderr(self._maybe_shutdown_socket.socket, msg)

# Send an Exit chunk with the result.
NailgunProtocol.send_exit_with_code(self._socket, result)
# Send an Exit chunk with the result.
# This will cause the client to disconnect from the socket.
NailgunProtocol.send_exit_with_code(self._maybe_shutdown_socket.socket, result)

# Shutdown the connected socket.
teardown_socket(self._socket)
# Shutdown the connected socket.
teardown_socket(self._maybe_shutdown_socket.socket)
self._maybe_shutdown_socket.is_shutdown = True


class _GracefulTerminationException(Exception):
Expand Down Expand Up @@ -131,11 +135,13 @@ class DaemonPantsRunner(ProcessManager):

@classmethod
def create(cls, sock, args, env, services, scheduler_service):
maybe_shutdown_socket = MaybeShutdownSocket(sock)

try:
# N.B. This will temporarily redirect stdio in the daemon's pre-fork context
# to the nailgun session. We'll later do this a second time post-fork, because
# threads.
with cls.nailgunned_stdio(sock, env, handle_stdin=False):
with cls.nailgunned_stdio(maybe_shutdown_socket, env, handle_stdin=False):
options, _, options_bootstrapper = LocalPantsRunner.parse_options(args, env)
subprocess_dir = options.for_global_scope().pants_subprocessdir
graph_helper, target_roots, exit_code = scheduler_service.prefork(options, options_bootstrapper)
Expand All @@ -151,7 +157,7 @@ def create(cls, sock, args, env, services, scheduler_service):
subprocess_dir = os.path.join(get_buildroot(), '.pids')

return cls(
sock,
maybe_shutdown_socket,
args,
env,
graph_helper,
Expand All @@ -162,7 +168,7 @@ def create(cls, sock, args, env, services, scheduler_service):
deferred_exc
)

def __init__(self, socket, args, env, graph_helper, target_roots, services,
def __init__(self, maybe_shutdown_socket, args, env, graph_helper, target_roots, services,
metadata_base_dir, options_bootstrapper, deferred_exc):
"""
:param socket socket: A connected socket capable of speaking the nailgun protocol.
Expand All @@ -182,7 +188,7 @@ def __init__(self, socket, args, env, graph_helper, target_roots, services,
name=self._make_identity(),
metadata_base_dir=metadata_base_dir
)
self._socket = socket
self._maybe_shutdown_socket = maybe_shutdown_socket
self._args = args
self._env = env
self._graph_helper = graph_helper
Expand All @@ -191,7 +197,7 @@ def __init__(self, socket, args, env, graph_helper, target_roots, services,
self._options_bootstrapper = options_bootstrapper
self._deferred_exception = deferred_exc

self._exiter = DaemonExiter(socket)
self._exiter = DaemonExiter(maybe_shutdown_socket)

def _make_identity(self):
"""Generate a ProcessManager identity for a given pants run.
Expand Down Expand Up @@ -224,7 +230,7 @@ def finalizer():

@classmethod
@contextmanager
def _pipe_stdio(cls, sock, stdin_isatty, stdout_isatty, stderr_isatty, handle_stdin):
def _pipe_stdio(cls, maybe_shutdown_socket, stdin_isatty, stdout_isatty, stderr_isatty, handle_stdin):
"""Handles stdio redirection in the case of pipes and/or mixed pipes and ttys."""
stdio_writers = (
(ChunkType.STDOUT, stdout_isatty),
Expand All @@ -238,26 +244,30 @@ def maybe_handle_stdin(want):
# TODO: Launching this thread pre-fork to handle @rule input currently results
# in an unhandled SIGILL in `src/python/pants/engine/scheduler.py, line 313 in pre_fork`.
# More work to be done here in https://github.com/pantsbuild/pants/issues/6005
with NailgunStreamStdinReader.open(sock, stdin_isatty) as fd:
with NailgunStreamStdinReader.open(maybe_shutdown_socket, stdin_isatty) as fd:
yield fd
else:
with open('/dev/null', 'rb') as fh:
yield fh.fileno()

# The NailgunStreamWriter probably shouldn't grab the socket without a lock...
with maybe_handle_stdin(handle_stdin) as stdin_fd,\
NailgunStreamWriter.open_multi(sock, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
# exit chunk, to avoid any socket shutdown vs write races.
stdout, stderr = sys.stdout, sys.stderr
def finalizer():
write_to_file("_pipe_stdio, enter finalizer")
try:
stdout.flush()
stderr.flush()
finally:
time.sleep(.001) # HACK: Sleep 1ms in the main thread to free the GIL.
writer.stop()
write_to_file("_pipe_stdio, before writer.join()")
writer.join()
write_to_file("_pipe_stdio, after writer.join()")
stdout.close()
stderr.close()
yield finalizer
Expand All @@ -282,6 +292,7 @@ def nailgunned_stdio(cls, sock, env, handle_stdin=True):
handle_stdin
) as finalizer:
yield finalizer
write_to_file("DPR, nailgunned_stdio after yielding the finalizer")

# TODO: there's no testing for this method, and this caused a user-visible failure -- see #7008!
def _raise_deferred_exc(self):
Expand Down Expand Up @@ -311,11 +322,12 @@ def run(self):

# Broadcast our process group ID (in PID form - i.e. negated) to the remote client so
# they can send signals (e.g. SIGINT) to all processes in the runners process group.
NailgunProtocol.send_pid(self._socket, os.getpid())
NailgunProtocol.send_pgrp(self._socket, os.getpgrp() * -1)
with self._maybe_shutdown_socket.lock:
NailgunProtocol.send_pid(self._maybe_shutdown_socket.socket, os.getpid())
NailgunProtocol.send_pgrp(self._maybe_shutdown_socket.socket, os.getpgrp() * -1)

# Invoke a Pants run with stdio redirected and a proxied environment.
with self.nailgunned_stdio(self._socket, self._env) as finalizer, \
with self.nailgunned_stdio(self._maybe_shutdown_socket, self._env) as finalizer, \
hermetic_environment_as(**self._env), \
encapsulated_global_logger():
try:
Expand Down
9 changes: 6 additions & 3 deletions src/python/pants/java/nailgun_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from future.utils import PY3

from pants.java.nailgun_io import NailgunStreamWriter
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol
from pants.java.nailgun_protocol import ChunkType, MaybeShutdownSocket, NailgunProtocol
from pants.util.dirutil import safe_file_dump
from pants.util.osutil import safe_kill
from pants.util.socket import RecvBufferedSocket
Expand Down Expand Up @@ -115,8 +115,11 @@ def _process_session(self):
:raises: :class:`Exception` if the session completes before the timeout, the `reason` argument
to .set_exit_timeout() will be raised."""
try:
for chunk_type, payload in self.iter_chunks(self._sock, return_bytes=True,
timeout_object=self):
for chunk_type, payload in self.iter_chunks(
MaybeShutdownSocket(self._sock),
return_bytes=True,
timeout_object=self,
):
# TODO(#6579): assert that we have at this point received all the chunk types in
# ChunkType.REQUEST_TYPES, then require PID and PGRP (exactly once?), and then allow any of
# ChunkType.EXECUTION_TYPES.
Expand Down
19 changes: 13 additions & 6 deletions src/python/pants/java/nailgun_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,40 +84,44 @@ class NailgunStreamStdinReader(_StoppableDaemonThread):
Runs until the socket is closed.
"""

def __init__(self, sock, write_handle):
def __init__(self, maybe_shutdown_socket, write_handle):
"""
:param socket sock: the socket to read nailgun protocol chunks from.
:param file write_handle: A file-like (usually the write end of a pipe/pty) onto which
to write data decoded from the chunks.
"""
super(NailgunStreamStdinReader, self).__init__(name=self.__class__.__name__)
self._socket = sock
self._maybe_shutdown_socket = maybe_shutdown_socket
self._write_handle = write_handle

@classmethod
@contextmanager
def open(cls, sock, isatty=False):
def open(cls, maybe_shutdown_socket, isatty=False):
# We use a plain pipe here (as opposed to a self-closing pipe), because
# NailgunStreamStdinReader will close the file descriptor it's writing to when it's done.
# Therefore, when _self_closing_pipe tries to clean up, it will try to close an already closed fd.
# The alternative is passing an os.dup(write_fd) to NSSR, but then we have the problem where
# _self_closing_pipe doens't close the write_fd until the pants run is done, and that generates
# issues around piping stdin to interactive processes such as REPLs.
with _pipe(isatty) as (read_fd, write_fd):
reader = NailgunStreamStdinReader(sock, os.fdopen(os.dup(write_fd), 'wb'))
reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
NailgunProtocol.send_start_reading_input(sock)
with maybe_shutdown_socket.lock:
NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket)
try:
yield read_fd
finally:
os.close(read_fd)
write_to_file("BL: after reader.running()")
write_to_file("BL: after _pipe()")

# TODO: The error is that when the client finishes (and presumably closes the socket),
# the server might still want to read stdin, since this is in another thread.

def run(self):
try:
for chunk_type, payload in NailgunProtocol.iter_chunks(self._socket, return_bytes=True):
for chunk_type, payload in NailgunProtocol.iter_chunks(self._maybe_shutdown_socket, return_bytes=True):
if self.is_stopped:
return

Expand Down Expand Up @@ -157,6 +161,7 @@ def __init__(self, in_fds, sock, chunk_types, chunk_eof_type, buf_size=None, sel
"""
super(NailgunStreamWriter, self).__init__(name=self.__class__.__name__)
# Validates that we've received file descriptor numbers.
write_to_file("NSW(in_fds = {}, socket = {})".format(in_fds, sock))
self._in_fds = [int(f) for f in in_fds]
self._socket = sock
self._chunk_eof_type = chunk_eof_type
Expand Down Expand Up @@ -214,10 +219,12 @@ def run(self):
if readable:
for fileno in readable:
data = os.read(fileno, self._buf_size)
# write_to_file("NSW, read data from stdin (fd={}) {}".format(fileno, data))

if not data:
# We've reached EOF.
try:
write_to_file("NSW, We have reached EOF, chunk type is {}".format(self._chunk_eof_type))
if self._chunk_eof_type is not None:
NailgunProtocol.write_chunk(self._socket, self._chunk_eof_type)
finally:
Expand Down
70 changes: 47 additions & 23 deletions src/python/pants/java/nailgun_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import socket
import struct
import threading
import time
from abc import abstractmethod
from builtins import bytes, object, str, zip
Expand Down Expand Up @@ -251,7 +252,7 @@ def maybe_timeout_options(self):
"""

@classmethod
def iter_chunks(cls, sock, return_bytes=False, timeout_object=None):
def iter_chunks(cls, maybe_shutdown_socket, return_bytes=False, timeout_object=None):
"""Generates chunks from a connected socket until an Exit chunk is sent or a timeout occurs.
:param sock: the socket to read from.
Expand All @@ -261,32 +262,39 @@ def iter_chunks(cls, sock, return_bytes=False, timeout_object=None):
:raises: :class:`cls.ProcessStreamTimeout`
"""
assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutProvider))
orig_timeout_time = None
timeout_interval = None

if timeout_object is None:
deadline = None
else:
options = timeout_object.maybe_timeout_options()
if options is None:
deadline = None
else:
deadline = options.start_time + options.interval

while 1:
if orig_timeout_time is not None:
remaining_time = time.time() - (orig_timeout_time + timeout_interval)
if remaining_time > 0:
original_timestamp = datetime.datetime.fromtimestamp(orig_timeout_time).isoformat()
if deadline is not None:
overtime_seconds = deadline - time.time()
if overtime_seconds > 0:
original_timestamp = datetime.datetime.fromtimestamp(deadline).isoformat()
raise cls.ProcessStreamTimeout(
"iterating over bytes from nailgun timed out with timeout interval {} starting at {}, "
"overtime seconds: {}"
.format(timeout_interval, original_timestamp, remaining_time))
elif timeout_object is not None:
opts = timeout_object.maybe_timeout_options()
if opts:
orig_timeout_time = opts.start_time
timeout_interval = opts.interval
continue
remaining_time = None
else:
remaining_time = None
"iterating over bytes from nailgun timed out at {}, overtime seconds: {}"
.format(original_timestamp, overtime_seconds))

with cls._set_socket_timeout(sock, timeout=remaining_time):
chunk_type, payload = cls.read_chunk(sock, return_bytes)
yield chunk_type, payload
if chunk_type == ChunkType.EXIT:
with maybe_shutdown_socket.lock:
if maybe_shutdown_socket.is_shutdown:
break
# We poll with low timeouts because we poll under a lock. This allows the DaemonPantsRunner
# to shut down the socket, and us to notice, pretty quickly.
with cls._set_socket_timeout(maybe_shutdown_socket.socket, timeout=0.01):
try:
chunk_type, payload = cls.read_chunk(maybe_shutdown_socket.socket, return_bytes)
except socket.timeout:
# Timeouts are handled by the surrounding loop
continue
yield chunk_type, payload
if chunk_type == ChunkType.EXIT:
break

@classmethod
def send_start_reading_input(cls, sock):
Expand Down Expand Up @@ -390,3 +398,19 @@ def ttynames_from_env(cls, env):
:returns: A tuple of boolean values indicating ttyname paths or None for (stdin, stdout, stderr).
"""
return tuple(env.get(cls.TTY_PATH_ENV.format(fd_id)) for fd_id in STDIO_DESCRIPTORS)


class MaybeShutdownSocket(object):
"""A wrapper around a socket which knows whether it has been shut down.
Because we may shut down a nailgun socket from one thread, and read from it on another, we use
this wrapper so that a shutting-down thread can signal to a reading thread that it should stop
reading.
lock guards access to is_shutdown, shutting down the socket, and any calls which need to guarantee
they don't race a shutdown call.
"""
def __init__(self, sock):
self.socket = sock
self.lock = threading.Lock()
self.is_shutdown = False

0 comments on commit 49ae0dd

Please sign in to comment.