From c53532b8334a15c6a9fcadab695e10bc095dc9bb Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Dec 2017 22:24:15 -0800 Subject: [PATCH 1/4] mfs: cleanup logic around modes * Ensure we don't flush unnecessarily. * Make it easier to work with modes. * Better handle closed files. License: MIT Signed-off-by: Steven Allen --- mfs/fd.go | 134 +++++++++++++++++++++++++++--------------------- mfs/file.go | 53 ++++++++++++++----- mfs/mfs_test.go | 22 ++++---- mfs/system.go | 1 + 4 files changed, 129 insertions(+), 81 deletions(-) diff --git a/mfs/fd.go b/mfs/fd.go index 9eb36931644..c9d344878dc 100644 --- a/mfs/fd.go +++ b/mfs/fd.go @@ -1,12 +1,21 @@ package mfs import ( + "context" "fmt" "io" mod "github.com/ipfs/go-ipfs/unixfs/mod" - context "context" + node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format" +) + +type state uint8 + +const ( + stateFlushed state = iota + stateSynced + stateDirty ) type FileDescriptor interface { @@ -26,13 +35,12 @@ type FileDescriptor interface { } type fileDescriptor struct { - inode *File - mod *mod.DagModifier - perms int - sync bool - hasChanges bool + inode *File + mod *mod.DagModifier + mode mode + sync bool - closed bool + state state } // Size returns the size of the file referred to by this descriptor @@ -42,34 +50,34 @@ func (fi *fileDescriptor) Size() (int64, error) { // Truncate truncates the file to size func (fi *fileDescriptor) Truncate(size int64) error { - if fi.perms == OpenReadOnly { - return fmt.Errorf("cannot call truncate on readonly file descriptor") + if !fi.mode.CanWrite() { + return fmt.Errorf("truncate failed: file is %s", fi.mode) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.Truncate(size) } // Write writes the given data to the file at its current offset func (fi *fileDescriptor) Write(b []byte) (int, error) { - if fi.perms == OpenReadOnly { - return 0, fmt.Errorf("cannot write on not writeable descriptor") + if !fi.mode.CanWrite() { + return 0, fmt.Errorf("write failed: file is %s", fi.mode) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.Write(b) } // Read reads into the given buffer from the current offset func (fi *fileDescriptor) Read(b []byte) (int, error) { - if fi.perms == OpenWriteOnly { - return 0, fmt.Errorf("cannot read on write-only descriptor") + if !fi.mode.CanRead() { + return 0, fmt.Errorf("read failed: file is %s", fi.mode) } return fi.mod.Read(b) } // Read reads into the given buffer from the current offset func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) { - if fi.perms == OpenWriteOnly { - return 0, fmt.Errorf("cannot read on write-only descriptor") + if !fi.mode.CanRead() { + return 0, fmt.Errorf("read failed: file is %s", fi.mode) } return fi.mod.CtxReadFull(ctx, b) } @@ -77,33 +85,17 @@ func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error // Close flushes, then propogates the modified dag node up the directory structure // and signals a republish to occur func (fi *fileDescriptor) Close() error { - defer func() { - switch fi.perms { - case OpenReadOnly: - fi.inode.desclock.RUnlock() - case OpenWriteOnly, OpenReadWrite: - fi.inode.desclock.Unlock() - } - }() - - if fi.closed { - panic("attempted to close file descriptor twice!") + if fi.mode == Closed { + return ErrClosed } - - if fi.hasChanges { - err := fi.mod.Sync() - if err != nil { - return err - } - - fi.hasChanges = false - - // explicitly stay locked for flushUp call, - // it will manage the lock for us - return fi.flushUp(fi.sync) + if fi.mode.CanWrite() { + defer fi.inode.desclock.Unlock() + } else if fi.mode.CanRead() { + defer fi.inode.desclock.RUnlock() } + fi.mode = Closed - return nil + return fi.flushUp(fi.sync) } func (fi *fileDescriptor) Sync() error { @@ -117,35 +109,61 @@ func (fi *fileDescriptor) Flush() error { // flushUp syncs the file and adds it to the dagservice // it *must* be called with the File's lock taken func (fi *fileDescriptor) flushUp(fullsync bool) error { - nd, err := fi.mod.GetNode() - if err != nil { - return err - } + var nd node.Node + switch fi.state { + case stateDirty: + // calls mod.Sync internally. + var err error + nd, err = fi.mod.GetNode() + if err != nil { + return err + } - _, err = fi.inode.dserv.Add(nd) - if err != nil { - return err - } + _, err = fi.inode.dserv.Add(nd) + if err != nil { + return err + } - fi.inode.nodelk.Lock() - fi.inode.node = nd - name := fi.inode.name - parent := fi.inode.parent - fi.inode.nodelk.Unlock() + fi.inode.nodelk.Lock() + fi.inode.node = nd + fi.inode.nodelk.Unlock() + fi.state = stateSynced + fallthrough + case stateSynced: + if !fullsync { + return nil + } + if nd == nil { + fi.inode.nodelk.Lock() + nd = fi.inode.node + fi.inode.nodelk.Unlock() + } - return parent.closeChild(name, nd, fullsync) + if err := fi.inode.parent.closeChild(fi.inode.name, nd, fullsync); err != nil { + return err + } + fi.state = stateFlushed + return nil + case stateFlushed: + return nil + default: + panic("invalid state") + } } // Seek implements io.Seeker func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) { + if fi.mode == Closed { + return 0, ErrClosed + } return fi.mod.Seek(offset, whence) } // Write At writes the given bytes at the offset 'at' func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) { - if fi.perms == OpenReadOnly { - return 0, fmt.Errorf("cannot write on not writeable descriptor") + if !fi.mode.CanWrite() { + return 0, fmt.Errorf("write-at failed: file is %s", fi.mode) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.WriteAt(b, at) } diff --git a/mfs/file.go b/mfs/file.go index 36302688689..81542071910 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -42,13 +42,38 @@ func NewFile(name string, node node.Node, parent childCloser, dserv dag.DAGServi return fi, nil } +type mode uint8 + const ( - OpenReadOnly = iota - OpenWriteOnly - OpenReadWrite + Closed mode = 0x0 // No access. Needs to be 0. + ModeRead mode = 1 << 0 + ModeWrite mode = 1 << 1 + ModeReadWrite mode = ModeWrite | ModeRead ) -func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { +func (m mode) CanRead() bool { + return m&ModeRead != 0 +} +func (m mode) CanWrite() bool { + return m&ModeWrite != 0 +} + +func (m mode) String() string { + switch m { + case ModeWrite: + return "write-only" + case ModeRead: + return "read-only" + case ModeReadWrite: + return "read-write" + case Closed: + return "closed" + default: + return "invalid" + } +} + +func (fi *File) Open(mode mode, sync bool) (FileDescriptor, error) { fi.nodelk.Lock() node := fi.node fi.nodelk.Unlock() @@ -72,16 +97,20 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { // Ok as well. } - switch flags { - case OpenReadOnly: - fi.desclock.RLock() - case OpenWriteOnly, OpenReadWrite: - fi.desclock.Lock() - default: + if mode > 0x3 { // TODO: support other modes return nil, fmt.Errorf("mode not supported") } + if mode.CanWrite() { + fi.desclock.Lock() + } else if mode.CanRead() { + fi.desclock.RLock() + } else { + // For now, need to open with either read or write perm. + return nil, fmt.Errorf("mode not supported") + } + dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunk.DefaultSplitter) if err != nil { return nil, err @@ -90,7 +119,7 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { return &fileDescriptor{ inode: fi, - perms: flags, + mode: mode, sync: sync, mod: dmod, }, nil @@ -123,7 +152,7 @@ func (fi *File) GetNode() (node.Node, error) { func (fi *File) Flush() error { // open the file in fullsync mode - fd, err := fi.Open(OpenWriteOnly, true) + fd, err := fi.Open(ModeWrite, true) if err != nil { return err } diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 6498a76158a..0e1e3876701 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -161,7 +161,7 @@ func assertFileAtPath(ds dag.DAGService, root *Directory, expn node.Node, pth st return fmt.Errorf("%s was not a file!", pth) } - rfd, err := file.Open(OpenReadOnly, false) + rfd, err := file.Open(ModeRead, false) if err != nil { return err } @@ -389,7 +389,7 @@ func TestMfsFile(t *testing.T) { t.Fatal("some is seriously wrong here") } - wfd, err := fi.Open(OpenReadWrite, true) + wfd, err := fi.Open(ModeReadWrite, true) if err != nil { t.Fatal(err) } @@ -555,7 +555,7 @@ func actorMakeFile(d *Directory) error { return err } - wfd, err := f.Open(OpenWriteOnly, true) + wfd, err := f.Open(ModeWrite, true) if err != nil { return err } @@ -635,7 +635,7 @@ func actorWriteFile(d *Directory) error { return err } - wfd, err := fi.Open(OpenWriteOnly, true) + wfd, err := fi.Open(ModeWrite, true) if err != nil { return err } @@ -667,7 +667,7 @@ func actorReadFile(d *Directory) error { return err } - rfd, err := fi.Open(OpenReadOnly, false) + rfd, err := fi.Open(ModeRead, false) if err != nil { return err } @@ -869,7 +869,7 @@ func readFile(rt *Root, path string, offset int64, buf []byte) error { return fmt.Errorf("%s was not a file", path) } - fd, err := fi.Open(OpenReadOnly, false) + fd, err := fi.Open(ModeRead, false) if err != nil { return err } @@ -947,7 +947,7 @@ func writeFile(rt *Root, path string, data []byte) error { return fmt.Errorf("expected to receive a file, but didnt get one") } - fd, err := fi.Open(OpenWriteOnly, true) + fd, err := fi.Open(ModeWrite, true) if err != nil { return err } @@ -1015,7 +1015,7 @@ func TestFileDescriptors(t *testing.T) { } // test read only - rfd1, err := fi.Open(OpenReadOnly, false) + rfd1, err := fi.Open(ModeRead, false) if err != nil { t.Fatal(err) } @@ -1039,7 +1039,7 @@ func TestFileDescriptors(t *testing.T) { go func() { defer close(done) // can open second readonly file descriptor - rfd2, err := fi.Open(OpenReadOnly, false) + rfd2, err := fi.Open(ModeRead, false) if err != nil { t.Error(err) return @@ -1062,7 +1062,7 @@ func TestFileDescriptors(t *testing.T) { done = make(chan struct{}) go func() { defer close(done) - wfd1, err := fi.Open(OpenWriteOnly, true) + wfd1, err := fi.Open(ModeWrite, true) if err != nil { t.Error(err) } @@ -1091,7 +1091,7 @@ func TestFileDescriptors(t *testing.T) { case <-done: } - wfd, err := fi.Open(OpenWriteOnly, true) + wfd, err := fi.Open(ModeWrite, true) if err != nil { t.Fatal(err) } diff --git a/mfs/system.go b/mfs/system.go index f0dd0995839..49df4a16a4f 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -25,6 +25,7 @@ import ( ) var ErrNotExist = errors.New("no such rootfs") +var ErrClosed = errors.New("file closed") var log = logging.Logger("mfs") From b051842c7abab8732f4f54da021c80425bbdc416 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Dec 2017 22:28:29 -0800 Subject: [PATCH 2/4] mfs: take the read/write lock *before* extracting the node. Otherwise, it could be exchanged out from under us! fixes #4514 License: MIT Signed-off-by: Steven Allen --- mfs/file.go | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/mfs/file.go b/mfs/file.go index 81542071910..778c495d8bf 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -73,7 +73,31 @@ func (m mode) String() string { } } -func (fi *File) Open(mode mode, sync bool) (FileDescriptor, error) { +func (fi *File) Open(mode mode, sync bool) (_ FileDescriptor, _retErr error) { + if mode > 0x3 { + // TODO: support other modes + return nil, fmt.Errorf("mode not supported") + } + + if mode.CanWrite() { + fi.desclock.Lock() + defer func() { + if _retErr != nil { + fi.desclock.Unlock() + } + }() + } else if mode.CanRead() { + fi.desclock.RLock() + defer func() { + if _retErr != nil { + fi.desclock.RUnlock() + } + }() + } else { + // For now, need to open with either read or write perm. + return nil, fmt.Errorf("mode not supported") + } + fi.nodelk.Lock() node := fi.node fi.nodelk.Unlock() @@ -97,20 +121,6 @@ func (fi *File) Open(mode mode, sync bool) (FileDescriptor, error) { // Ok as well. } - if mode > 0x3 { - // TODO: support other modes - return nil, fmt.Errorf("mode not supported") - } - - if mode.CanWrite() { - fi.desclock.Lock() - } else if mode.CanRead() { - fi.desclock.RLock() - } else { - // For now, need to open with either read or write perm. - return nil, fmt.Errorf("mode not supported") - } - dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunk.DefaultSplitter) if err != nil { return nil, err From fc39bb80c2ed0bb6150cae863d38b19f5126665f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Dec 2017 22:32:02 -0800 Subject: [PATCH 3/4] mfs: use rwlock for the nodelk This allows us to, e.g., get the size, etc. in parallel. License: MIT Signed-off-by: Steven Allen --- mfs/fd.go | 4 ++-- mfs/file.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/mfs/fd.go b/mfs/fd.go index c9d344878dc..45d60bea694 100644 --- a/mfs/fd.go +++ b/mfs/fd.go @@ -134,9 +134,9 @@ func (fi *fileDescriptor) flushUp(fullsync bool) error { return nil } if nd == nil { - fi.inode.nodelk.Lock() + fi.inode.nodelk.RLock() nd = fi.inode.node - fi.inode.nodelk.Unlock() + fi.inode.nodelk.RUnlock() } if err := fi.inode.parent.closeChild(fi.inode.name, nd, fullsync); err != nil { diff --git a/mfs/file.go b/mfs/file.go index 778c495d8bf..53b9bec5b0a 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -22,7 +22,7 @@ type File struct { dserv dag.DAGService node node.Node - nodelk sync.Mutex + nodelk sync.RWMutex RawLeaves bool } @@ -98,9 +98,9 @@ func (fi *File) Open(mode mode, sync bool) (_ FileDescriptor, _retErr error) { return nil, fmt.Errorf("mode not supported") } - fi.nodelk.Lock() + fi.nodelk.RLock() node := fi.node - fi.nodelk.Unlock() + fi.nodelk.RUnlock() switch node := node.(type) { case *dag.ProtoNode: @@ -137,8 +137,8 @@ func (fi *File) Open(mode mode, sync bool) (_ FileDescriptor, _retErr error) { // Size returns the size of this file func (fi *File) Size() (int64, error) { - fi.nodelk.Lock() - defer fi.nodelk.Unlock() + fi.nodelk.RLock() + defer fi.nodelk.RUnlock() switch nd := fi.node.(type) { case *dag.ProtoNode: pbd, err := ft.FromBytes(nd.Data()) @@ -155,8 +155,8 @@ func (fi *File) Size() (int64, error) { // GetNode returns the dag node associated with this file func (fi *File) GetNode() (node.Node, error) { - fi.nodelk.Lock() - defer fi.nodelk.Unlock() + fi.nodelk.RLock() + defer fi.nodelk.RUnlock() return fi.node, nil } From 4ae2586594ef59284cffa91eaefdb7d25c640459 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 21 Dec 2017 08:47:48 -0800 Subject: [PATCH 4/4] mfs: avoid flushing/syncing closed files. Technically, the sync could fail. If it does, we don't want to allow syncing *again* without the write lock held. License: MIT Signed-off-by: Steven Allen --- mfs/fd.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mfs/fd.go b/mfs/fd.go index 45d60bea694..3570696036e 100644 --- a/mfs/fd.go +++ b/mfs/fd.go @@ -99,10 +99,16 @@ func (fi *fileDescriptor) Close() error { } func (fi *fileDescriptor) Sync() error { + if fi.mode == Closed { + return ErrClosed + } return fi.flushUp(false) } func (fi *fileDescriptor) Flush() error { + if fi.mode == Closed { + return ErrClosed + } return fi.flushUp(true) }