Skip to content

Commit

Permalink
net, os: don't wait for Close in blocking mode
Browse files Browse the repository at this point in the history
Updates #7970
Updates #21856
Updates #23111

Change-Id: I0cd0151fcca740c40c3c976f941b04e98e67b0bf
Reviewed-on: https://go-review.googlesource.com/83715
Reviewed-by: Russ Cox <[email protected]>
  • Loading branch information
ianlancetaylor committed Dec 14, 2017
1 parent d0b2467 commit 0f3ab14
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 4 deletions.
25 changes: 23 additions & 2 deletions src/internal/poll/fd_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type FD struct {

// Whether this is a file rather than a network socket.
isFile bool

// Whether this file has been set to blocking mode.
isBlocking bool
}

// Init initializes the FD. The Sysfd field should already be set.
Expand Down Expand Up @@ -76,18 +79,26 @@ func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}

// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
// the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any
// attempts to block in the pollDesc will return errClosing(fd.isFile).
fd.pd.evict()

// The call to decref will call destroy if there are no other
// references.
err := fd.decref()

// Wait until the descriptor is closed. If this was the only
// reference, it is already closed.
runtime_Semacquire(&fd.csema)
// reference, it is already closed. Only wait if the file has
// not been set to blocking mode, as otherwise any current I/O
// may be blocking, and that would block the Close.
if !fd.isBlocking {
runtime_Semacquire(&fd.csema)
}

return err
}

Expand All @@ -100,6 +111,16 @@ func (fd *FD) Shutdown(how int) error {
return syscall.Shutdown(fd.Sysfd, how)
}

// SetBlocking puts the file into blocking mode.
func (fd *FD) SetBlocking() error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
fd.isBlocking = true
return syscall.SetNonblock(fd.Sysfd, false)
}

// Darwin and FreeBSD can't read or write 2GB+ files at a time,
// even on 64-bit systems.
// The same is true of socket implementations on many systems.
Expand Down
2 changes: 1 addition & 1 deletion src/net/fd_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (fd *netFD) dup() (f *os.File, err error) {
// This also puts the old fd into blocking mode, meaning that
// I/O will block the thread instead of letting us use the epoll server.
// Everything will still work, just with more threads.
if err = syscall.SetNonblock(ns, false); err != nil {
if err = fd.pfd.SetBlocking(); err != nil {
return nil, os.NewSyscallError("setnonblock", err)
}

Expand Down
2 changes: 1 addition & 1 deletion src/os/file_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (f *File) Fd() uintptr {
// opened in blocking mode. The File will continue to work,
// but any blocking operation will tie up a thread.
if f.nonblock {
syscall.SetNonblock(f.pfd.Sysfd, false)
f.pfd.SetBlocking()
}

return uintptr(f.pfd.Sysfd)
Expand Down
68 changes: 68 additions & 0 deletions src/os/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -220,3 +221,70 @@ func TestReadNonblockingFd(t *testing.T) {
t.Errorf("child process failed: %v", err)
}
}

// Test that we don't let a blocking read prevent a close.
func TestCloseWithBlockingRead(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()

c1, c2 := make(chan bool), make(chan bool)
var wg sync.WaitGroup

wg.Add(1)
go func(c chan bool) {
defer wg.Done()
// Give the other goroutine a chance to enter the Read
// or Write call. This is sloppy but the test will
// pass even if we close before the read/write.
time.Sleep(20 * time.Millisecond)

if err := r.Close(); err != nil {
t.Error(err)
}
close(c)
}(c1)

// Calling Fd will put the file into blocking mode.
_ = r.Fd()

wg.Add(1)
go func(c chan bool) {
defer wg.Done()
var b [1]byte
_, err = r.Read(b[:])
close(c)
if err == nil {
t.Error("I/O on closed pipe unexpectedly succeeded")
}
}(c2)

for c1 != nil || c2 != nil {
select {
case <-c1:
c1 = nil
// r.Close has completed, but the blocking Read
// is hanging. Close the writer to unblock it.
w.Close()
case <-c2:
c2 = nil
case <-time.After(1 * time.Second):
switch {
case c1 != nil && c2 != nil:
t.Error("timed out waiting for Read and Close")
w.Close()
case c1 != nil:
t.Error("timed out waiting for Close")
case c2 != nil:
t.Error("timed out waiting for Read")
default:
t.Error("impossible case")
}
}
}

wg.Wait()
}

0 comments on commit 0f3ab14

Please sign in to comment.