Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MFS improvements #4515

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 82 additions & 58 deletions mfs/fd.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package mfs

import (
"context"
"fmt"
"io"

mod "github.com/ipfs/go-ipfs/unixfs/mod"

context "context"
node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
)

type state uint8

const (
stateFlushed state = iota
stateSynced
stateDirty
)

type FileDescriptor interface {
Expand All @@ -26,13 +35,12 @@ type FileDescriptor interface {
}

type fileDescriptor struct {
inode *File
mod *mod.DagModifier
perms int
sync bool
hasChanges bool
inode *File
mod *mod.DagModifier
mode mode
sync bool

closed bool
state state
}

// Size returns the size of the file referred to by this descriptor
Expand All @@ -42,110 +50,126 @@ func (fi *fileDescriptor) Size() (int64, error) {

// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if fi.perms == OpenReadOnly {
return fmt.Errorf("cannot call truncate on readonly file descriptor")
if !fi.mode.CanWrite() {
return fmt.Errorf("truncate failed: file is %s", fi.mode)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Truncate(size)
}

// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if !fi.mode.CanWrite() {
return 0, fmt.Errorf("write failed: file is %s", fi.mode)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Write(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if !fi.mode.CanRead() {
return 0, fmt.Errorf("read failed: file is %s", fi.mode)
}
return fi.mod.Read(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if !fi.mode.CanRead() {
return 0, fmt.Errorf("read failed: file is %s", fi.mode)
}
return fi.mod.CtxReadFull(ctx, b)
}

// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
defer func() {
switch fi.perms {
case OpenReadOnly:
fi.inode.desclock.RUnlock()
case OpenWriteOnly, OpenReadWrite:
fi.inode.desclock.Unlock()
}
}()

if fi.closed {
panic("attempted to close file descriptor twice!")
if fi.mode == Closed {
return ErrClosed
}

if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
return err
}

fi.hasChanges = false

// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(fi.sync)
if fi.mode.CanWrite() {
defer fi.inode.desclock.Unlock()
} else if fi.mode.CanRead() {
defer fi.inode.desclock.RUnlock()
}
fi.mode = Closed

return nil
return fi.flushUp(fi.sync)
}

func (fi *fileDescriptor) Sync() error {
if fi.mode == Closed {
return ErrClosed
}
return fi.flushUp(false)
}

func (fi *fileDescriptor) Flush() error {
if fi.mode == Closed {
return ErrClosed
}
return fi.flushUp(true)
}

// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *fileDescriptor) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
var nd node.Node
switch fi.state {
case stateDirty:
// calls mod.Sync internally.
var err error
nd, err = fi.mod.GetNode()
if err != nil {
return err
}

_, err = fi.inode.dserv.Add(nd)
if err != nil {
return err
}
_, err = fi.inode.dserv.Add(nd)
if err != nil {
return err
}

fi.inode.nodelk.Lock()
fi.inode.node = nd
name := fi.inode.name
parent := fi.inode.parent
fi.inode.nodelk.Unlock()
fi.inode.nodelk.Lock()
fi.inode.node = nd
fi.inode.nodelk.Unlock()
fi.state = stateSynced
fallthrough
case stateSynced:
if !fullsync {
return nil
}
if nd == nil {
fi.inode.nodelk.RLock()
nd = fi.inode.node
fi.inode.nodelk.RUnlock()
}

return parent.closeChild(name, nd, fullsync)
if err := fi.inode.parent.closeChild(fi.inode.name, nd, fullsync); err != nil {
return err
}
fi.state = stateFlushed
return nil
case stateFlushed:
return nil
default:
panic("invalid state")
}
}

// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
if fi.mode == Closed {
return 0, ErrClosed
}
return fi.mod.Seek(offset, whence)
}

// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if !fi.mode.CanWrite() {
return 0, fmt.Errorf("write-at failed: file is %s", fi.mode)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.WriteAt(b, at)
}
85 changes: 62 additions & 23 deletions mfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type File struct {

dserv dag.DAGService
node node.Node
nodelk sync.Mutex
nodelk sync.RWMutex

RawLeaves bool
}
Expand All @@ -42,16 +42,65 @@ func NewFile(name string, node node.Node, parent childCloser, dserv dag.DAGServi
return fi, nil
}

type mode uint8

const (
OpenReadOnly = iota
OpenWriteOnly
OpenReadWrite
Closed mode = 0x0 // No access. Needs to be 0.
ModeRead mode = 1 << 0
ModeWrite mode = 1 << 1
ModeReadWrite mode = ModeWrite | ModeRead
)

func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
fi.nodelk.Lock()
func (m mode) CanRead() bool {
return m&ModeRead != 0
}
func (m mode) CanWrite() bool {
return m&ModeWrite != 0
}

func (m mode) String() string {
switch m {
case ModeWrite:
return "write-only"
case ModeRead:
return "read-only"
case ModeReadWrite:
return "read-write"
case Closed:
return "closed"
default:
return "invalid"
}
}

func (fi *File) Open(mode mode, sync bool) (_ FileDescriptor, _retErr error) {
if mode > 0x3 {
// TODO: support other modes
return nil, fmt.Errorf("mode not supported")
}

if mode.CanWrite() {
fi.desclock.Lock()
defer func() {
if _retErr != nil {
fi.desclock.Unlock()
}
}()
} else if mode.CanRead() {
fi.desclock.RLock()
defer func() {
if _retErr != nil {
fi.desclock.RUnlock()
}
}()
} else {
// For now, need to open with either read or write perm.
return nil, fmt.Errorf("mode not supported")
}

fi.nodelk.RLock()
node := fi.node
fi.nodelk.Unlock()
fi.nodelk.RUnlock()

switch node := node.(type) {
case *dag.ProtoNode:
Expand All @@ -72,16 +121,6 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
// Ok as well.
}

switch flags {
case OpenReadOnly:
fi.desclock.RLock()
case OpenWriteOnly, OpenReadWrite:
fi.desclock.Lock()
default:
// TODO: support other modes
return nil, fmt.Errorf("mode not supported")
}

dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunk.DefaultSplitter)
if err != nil {
return nil, err
Expand All @@ -90,16 +129,16 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {

return &fileDescriptor{
inode: fi,
perms: flags,
mode: mode,
sync: sync,
mod: dmod,
}, nil
}

// Size returns the size of this file
func (fi *File) Size() (int64, error) {
fi.nodelk.Lock()
defer fi.nodelk.Unlock()
fi.nodelk.RLock()
defer fi.nodelk.RUnlock()
switch nd := fi.node.(type) {
case *dag.ProtoNode:
pbd, err := ft.FromBytes(nd.Data())
Expand All @@ -116,14 +155,14 @@ func (fi *File) Size() (int64, error) {

// GetNode returns the dag node associated with this file
func (fi *File) GetNode() (node.Node, error) {
fi.nodelk.Lock()
defer fi.nodelk.Unlock()
fi.nodelk.RLock()
defer fi.nodelk.RUnlock()
return fi.node, nil
}

func (fi *File) Flush() error {
// open the file in fullsync mode
fd, err := fi.Open(OpenWriteOnly, true)
fd, err := fi.Open(ModeWrite, true)
if err != nil {
return err
}
Expand Down
Loading