Skip to content

Commit

Permalink
Merge pull request #4730 from aduffeck/improve-posixfs
Browse files Browse the repository at this point in the history
Improve posixfs
  • Loading branch information
aduffeck authored Jun 18, 2024
2 parents ed0273c + 385cd9d commit 0a46a16
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 77 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/improve-posixfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Enhancement: Improve posixfs storage driver

Improve the posixfs storage driver by fixing several issues and adding missing features.

https://github.com/cs3org/reva/pull/4730
https://github.com/cs3org/reva/pull/4719
https://github.com/cs3org/reva/pull/4708
https://github.com/cs3org/reva/pull/4562
65 changes: 0 additions & 65 deletions pkg/storage/fs/posix/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"path/filepath"
"strings"
"syscall"

user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -77,10 +76,6 @@ func New(b metadata.Backend, um usermapper.Mapper, o *options.Options) *Lookup {
userMapper: um,
}

go func() {
_ = lu.WarmupIDCache(o.Root)
}()

return lu
}

Expand All @@ -94,66 +89,6 @@ func (lu *Lookup) GetCachedID(ctx context.Context, spaceID, nodeID string) (stri
return lu.IDCache.Get(ctx, spaceID, nodeID)
}

// WarmupIDCache warms up the id cache
func (lu *Lookup) WarmupIDCache(root string) error {
spaceID := []byte("")

scopeSpace := func(spaceCandidate string) error {
if !lu.Options.UseSpaceGroups {
return nil
}

// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = lu.userMapper.ScopeUserByIds(-1, gid)
return err
}

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

attribs, err := lu.metadataBackend.All(context.Background(), path)
if err == nil {
nodeSpaceID := attribs[prefixes.SpaceIDAttr]
if len(nodeSpaceID) > 0 {
spaceID = nodeSpaceID

err = scopeSpace(path)
if err != nil {
return err
}
} else {
// try to find space
spaceCandidate := path
for strings.HasPrefix(spaceCandidate, lu.Options.Root) {
spaceID, err = lu.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
err = scopeSpace(path)
if err != nil {
return err
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
}

id, ok := attribs[prefixes.IDAttr]
if ok && len(spaceID) > 0 {
_ = lu.IDCache.Set(context.Background(), string(spaceID), string(id), path)
}
}
return nil
})
}

// NodeFromPath returns the node for the given path
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/fs/posix/lookup/store_idcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func NewStoreIDCache(o *options.Options) *StoreIDCache {
return &StoreIDCache{
cache: store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
microstore.Nodes(o.IDCache.Nodes...),
microstore.Database(o.IDCache.Database),
Expand Down
65 changes: 65 additions & 0 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,68 @@ assimilate:

return fi, nil
}

// WarmupIDCache warms up the id cache
func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
spaceID := []byte("")

scopeSpace := func(spaceCandidate string) error {
if !t.options.UseSpaceGroups {
return nil
}

// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = t.userMapper.ScopeUserByIds(-1, gid)
return err
}

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

attribs, err := t.lookup.MetadataBackend().All(context.Background(), path)
if err == nil {
nodeSpaceID := attribs[prefixes.SpaceIDAttr]
if len(nodeSpaceID) > 0 {
spaceID = nodeSpaceID

err = scopeSpace(path)
if err != nil {
return err
}
} else {
// try to find space
spaceCandidate := path
for strings.HasPrefix(spaceCandidate, t.options.Root) {
spaceID, err = t.lookup.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
err = scopeSpace(path)
if err != nil {
return err
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
}
if len(spaceID) == 0 {
return nil // no space found
}

id, ok := attribs[prefixes.IDAttr]
if ok {
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
} else if assimilate {
_ = t.Scan(path, false)
}
}
return nil
})
}
4 changes: 1 addition & 3 deletions pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"strconv"
"time"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
)

type GpfsFileAuditLoggingWatcher struct {
Expand Down Expand Up @@ -72,7 +70,7 @@ start:
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(ev.Path)
_ = w.tree.WarmupIDCache(ev.Path, false)
}()
}
case io.EOF:
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
kafka "github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -57,7 +56,7 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
go func() {
_ = w.tree.Scan(lwev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(lwev.Path)
_ = w.tree.WarmupIDCache(lwev.Path, false)
}()
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/fs/posix/tree/inotifywatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/pablodz/inotifywaitgo/inotifywaitgo"
)

Expand Down Expand Up @@ -52,7 +51,7 @@ func (iw *InotifyWatcher) Watch(path string) {
case inotifywaitgo.MOVED_TO:
go func() {
_ = iw.tree.Scan(event.Filename, true)
_ = iw.tree.lookup.(*lookup.Lookup).WarmupIDCache(event.Filename)
_ = iw.tree.WarmupIDCache(event.Filename, false)
}()
case inotifywaitgo.CLOSE_WRITE:
go func() { _ = iw.tree.Scan(event.Filename, true) }()
Expand Down
24 changes: 20 additions & 4 deletions pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strings"
"time"

"github.com/gofrs/flock"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -133,10 +134,25 @@ func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, o *options.Opti
}

// Start watching for fs events and put them into the queue
go t.watcher.Watch(watchPath)
go func() {
fileLock := flock.New(filepath.Join(o.Root, ".primary.lock"))
locked, err := fileLock.TryLock()
if err != nil {
log.Err(err).Msg("could not acquire primary lock")
return
}
if !locked {
log.Err(err).Msg("watcher is already locked")
return
}
log.Debug().Msg("acquired primary lock")

// Handle queued fs events
go t.workScanQueue()
go t.watcher.Watch(watchPath)
go t.workScanQueue()
go func() {
_ = t.WarmupIDCache(o.Root, true)
}()
}()

return t, nil
}
Expand Down Expand Up @@ -308,7 +324,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree
if oldNode.IsDir(ctx) {
err = t.lookup.(*lookup.Lookup).WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name))
err = t.WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name), false)
if err != nil {
return err
}
Expand Down

0 comments on commit 0a46a16

Please sign in to comment.