Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Pipe wrapper around pipes #7740

Merged
merged 2 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ def maybe_handle_stdin(want):

# TODO https://github.com/pantsbuild/pants/issues/7653
with maybe_handle_stdin(handle_stdin) as stdin_fd,\
NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_pipe, stderr_pipe), writer),\
stdio_as(stdout_fd=stdout_pipe.write_fd, stderr_fd=stderr_pipe.write_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
# exit chunk, to avoid any socket shutdown vs write races.
stdout, stderr = sys.stdout, sys.stderr
Expand Down
96 changes: 71 additions & 25 deletions src/python/pants/java/nailgun_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,66 @@
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol


@contextmanager
def _pipe(isatty):
r_fd, w_fd = os.openpty() if isatty else os.pipe()
yield (r_fd, w_fd)
class Pipe(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth declaring as a datatype?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for now, as we need default values for self.writable. Have left a TODO with the the link to a PR that will enable that.

"""
Wrapper around OS pipes, that knows whether its write end is closed.

Note that this exposes raw file descriptors,
which means that we could plausibly close one of the ends and re-open it with a different file,
before this class notices. For this reason, it is advised to be very careful with these
file descriptors.

TODO Wrap the read and write operations, so that we don't have to expose raw fds anymore.
This is not possible yet, because stdio_as needs to replace the fds at the OS level.
"""

def __init__(self, read_fd, write_fd):
self.read_fd = read_fd
self.write_fd = write_fd
# TODO Declare as a datatype when #6374 is merged or we have moved to dataclasses.
self.writable = True

def is_writable(self):
"""
If the write end of a pipe closes, the read end might still be open, to allow
readers to finish reading before closing it.
However, there are cases where we still want to know if the write end is closed.

:return: True if the write end of the pipe is open.
"""
if not self.writable:
return False

@contextmanager
def _self_closing_pipe(isatty):
with _pipe(isatty) as (r_fd, w_fd):
try:
yield (r_fd, w_fd)
os.fstat(self.write_fd)
except OSError:
return False
return True

def stop_writing(self):
"""Mark that you wish to close the write end of this pipe."""
self.writable = False

def close(self):
"""Close the reading end of the pipe, which should close the writing end too."""
os.close(self.read_fd)
self.writable = False

@staticmethod
def create(isatty):
"""Open a pipe and create wrapper object."""
read_fd, write_fd = os.openpty() if isatty else os.pipe()
return Pipe(read_fd, write_fd)

@staticmethod
@contextmanager
def self_closing(isatty):
"""Create a pipe and close it when done."""
pipe = Pipe.create(isatty)
try:
yield pipe
finally:
os.close(r_fd)
os.close(w_fd)
pipe.close()


class _StoppableDaemonThread(threading.Thread):
Expand Down Expand Up @@ -94,16 +140,16 @@ def open(cls, maybe_shutdown_socket, isatty=False):
# The alternative is passing an os.dup(write_fd) to NSSR, but then we have the problem where
# _self_closing_pipe doens't close the write_fd until the pants run is done, and that generates
# issues around piping stdin to interactive processes such as REPLs.
with _pipe(isatty) as (read_fd, write_fd):
reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
with maybe_shutdown_socket.lock:
NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket)
try:
yield read_fd
finally:
os.close(read_fd)
pipe = Pipe.create(isatty)
reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(pipe.write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
with maybe_shutdown_socket.lock:
NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket)
try:
yield pipe.read_fd
finally:
pipe.close()

def run(self):
try:
Expand Down Expand Up @@ -181,20 +227,20 @@ def open_multi(cls, sock, chunk_types, isattys, chunk_eof_type=None, buf_size=No

# N.B. This is purely to permit safe handling of a dynamic number of contextmanagers.
with ExitStack() as stack:
read_fds, write_fds = list(zip(
pipes = list(
# Allocate one pipe pair per chunk type provided.
*(stack.enter_context(_self_closing_pipe(isatty)) for isatty in isattys)
))
(stack.enter_context(Pipe.self_closing(isatty)) for isatty in isattys)
)
writer = NailgunStreamWriter(
read_fds,
tuple(pipe.read_fd for pipe in pipes),
sock,
chunk_types,
chunk_eof_type,
buf_size=buf_size,
select_timeout=select_timeout
)
with writer.running():
yield write_fds, writer
yield pipes, writer

def run(self):
while self._in_fds and not self.is_stopped:
Expand Down