Skip to content

Commit

Permalink
Create Pipe wrapper around pipes (#7740)
Browse files Browse the repository at this point in the history
### Problem

Dealing with raw file descriptors is confusing at best and error-prone at worst.

### Solution

Define a `Pipe` abstraction that owns (read: "managest the lifetimes of") the write and read ends of a pipe, as well as provide convenience methods to query the state of the fds.

### Result

It's now a bit more obvious what pipes are open.

Part one of the fix for #7465
  • Loading branch information
blorente authored and illicitonion committed May 22, 2019
1 parent eab4321 commit a0e48dc
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ def maybe_handle_stdin(want):

# TODO https://github.com/pantsbuild/pants/issues/7653
with maybe_handle_stdin(handle_stdin) as stdin_fd,\
NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
NailgunStreamWriter.open_multi(maybe_shutdown_socket.socket, types, ttys) as ((stdout_pipe, stderr_pipe), writer),\
stdio_as(stdout_fd=stdout_pipe.write_fd, stderr_fd=stderr_pipe.write_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
# exit chunk, to avoid any socket shutdown vs write races.
stdout, stderr = sys.stdout, sys.stderr
Expand Down
96 changes: 71 additions & 25 deletions src/python/pants/java/nailgun_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,66 @@
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol


@contextmanager
def _pipe(isatty):
r_fd, w_fd = os.openpty() if isatty else os.pipe()
yield (r_fd, w_fd)
class Pipe(object):
"""
Wrapper around OS pipes, that knows whether its write end is closed.
Note that this exposes raw file descriptors,
which means that we could plausibly close one of the ends and re-open it with a different file,
before this class notices. For this reason, it is advised to be very careful with these
file descriptors.
TODO Wrap the read and write operations, so that we don't have to expose raw fds anymore.
This is not possible yet, because stdio_as needs to replace the fds at the OS level.
"""

def __init__(self, read_fd, write_fd):
self.read_fd = read_fd
self.write_fd = write_fd
# TODO Declare as a datatype when #6374 is merged or we have moved to dataclasses.
self.writable = True

def is_writable(self):
"""
If the write end of a pipe closes, the read end might still be open, to allow
readers to finish reading before closing it.
However, there are cases where we still want to know if the write end is closed.
:return: True if the write end of the pipe is open.
"""
if not self.writable:
return False

@contextmanager
def _self_closing_pipe(isatty):
with _pipe(isatty) as (r_fd, w_fd):
try:
yield (r_fd, w_fd)
os.fstat(self.write_fd)
except OSError:
return False
return True

def stop_writing(self):
"""Mark that you wish to close the write end of this pipe."""
self.writable = False

def close(self):
"""Close the reading end of the pipe, which should close the writing end too."""
os.close(self.read_fd)
self.writable = False

@staticmethod
def create(isatty):
"""Open a pipe and create wrapper object."""
read_fd, write_fd = os.openpty() if isatty else os.pipe()
return Pipe(read_fd, write_fd)

@staticmethod
@contextmanager
def self_closing(isatty):
"""Create a pipe and close it when done."""
pipe = Pipe.create(isatty)
try:
yield pipe
finally:
os.close(r_fd)
os.close(w_fd)
pipe.close()


class _StoppableDaemonThread(threading.Thread):
Expand Down Expand Up @@ -94,16 +140,16 @@ def open(cls, maybe_shutdown_socket, isatty=False):
# The alternative is passing an os.dup(write_fd) to NSSR, but then we have the problem where
# _self_closing_pipe doens't close the write_fd until the pants run is done, and that generates
# issues around piping stdin to interactive processes such as REPLs.
with _pipe(isatty) as (read_fd, write_fd):
reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
with maybe_shutdown_socket.lock:
NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket)
try:
yield read_fd
finally:
os.close(read_fd)
pipe = Pipe.create(isatty)
reader = NailgunStreamStdinReader(maybe_shutdown_socket, os.fdopen(pipe.write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
with maybe_shutdown_socket.lock:
NailgunProtocol.send_start_reading_input(maybe_shutdown_socket.socket)
try:
yield pipe.read_fd
finally:
pipe.close()

def run(self):
try:
Expand Down Expand Up @@ -181,20 +227,20 @@ def open_multi(cls, sock, chunk_types, isattys, chunk_eof_type=None, buf_size=No

# N.B. This is purely to permit safe handling of a dynamic number of contextmanagers.
with ExitStack() as stack:
read_fds, write_fds = list(zip(
pipes = list(
# Allocate one pipe pair per chunk type provided.
*(stack.enter_context(_self_closing_pipe(isatty)) for isatty in isattys)
))
(stack.enter_context(Pipe.self_closing(isatty)) for isatty in isattys)
)
writer = NailgunStreamWriter(
read_fds,
tuple(pipe.read_fd for pipe in pipes),
sock,
chunk_types,
chunk_eof_type,
buf_size=buf_size,
select_timeout=select_timeout
)
with writer.running():
yield write_fds, writer
yield pipes, writer

def run(self):
while self._in_fds and not self.is_stopped:
Expand Down

0 comments on commit a0e48dc

Please sign in to comment.