Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve posixfs #4739

Merged
merged 19 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/improve-posixfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Improve posixfs storage driver

Improve the posixfs storage driver by fixing several issues and adding missing features.

https://github.com/cs3org/reva/pull/4739
5 changes: 4 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,10 @@ func getFS(c *config) (storage.FS, error) {
}

if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver], evstream)
driverConf := c.Drivers[c.Driver]
driverConf["mount_id"] = c.MountID // pass the mount id to the driver

return f(driverConf, evstream)
}

return nil, errtypes.NotFound("driver not found: " + c.Driver)
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/fs/posix/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ func init() {
// IDCache is a cache for node ids
type IDCache interface {
Get(ctx context.Context, spaceID, nodeID string) (string, bool)
GetByPath(ctx context.Context, path string) (string, string, bool)

Set(ctx context.Context, spaceID, nodeID, val string) error

Delete(ctx context.Context, spaceID, nodeID string) error
DeleteByPath(ctx context.Context, path string) error

DeletePath(ctx context.Context, path string) error
}

// Lookup implements transformations from filepath to node and back
Expand All @@ -79,16 +86,25 @@ func New(b metadata.Backend, um usermapper.Mapper, o *options.Options) *Lookup {
return lu
}

// CacheID caches the id for the given space and node id
// CacheID caches the path for the given space and node id
func (lu *Lookup) CacheID(ctx context.Context, spaceID, nodeID, val string) error {
return lu.IDCache.Set(ctx, spaceID, nodeID, val)
}

// GetCachedID returns the cached id for the given space and node id
// GetCachedID returns the cached path for the given space and node id
func (lu *Lookup) GetCachedID(ctx context.Context, spaceID, nodeID string) (string, bool) {
return lu.IDCache.Get(ctx, spaceID, nodeID)
}

// IDsForPath returns the space and opaque id for the given path
func (lu *Lookup) IDsForPath(ctx context.Context, path string) (string, string, error) {
spaceID, nodeID, ok := lu.IDCache.GetByPath(ctx, path)
if !ok {
return "", "", fmt.Errorf("path %s not found in cache", path)
}
return spaceID, nodeID, nil
}

// NodeFromPath returns the node for the given path
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
Expand Down
61 changes: 60 additions & 1 deletion pkg/storage/fs/posix/lookup/store_idcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package lookup

import (
"context"
"strings"

microstore "go-micro.dev/v4/store"

Expand All @@ -46,12 +47,53 @@ func NewStoreIDCache(o *options.Options) *StoreIDCache {
}
}

// Delete removes an entry from the cache
func (c *StoreIDCache) Delete(_ context.Context, spaceID, nodeID string) error {
v, err := c.cache.Read(cacheKey(spaceID, nodeID))
if err == nil {
err := c.cache.Delete(reverseCacheKey(string(v[0].Value)))
if err != nil {
return err
}
}

return c.cache.Delete(cacheKey(spaceID, nodeID))
}

// DeleteByPath removes an entry from the cache
func (c *StoreIDCache) DeleteByPath(ctx context.Context, path string) error {
spaceID, nodeID, ok := c.GetByPath(ctx, path)
if !ok {
return nil
}

err := c.cache.Delete(reverseCacheKey(path))
if err != nil {
return err
}

return c.cache.Delete(cacheKey(spaceID, nodeID))
}

// DeletePath removes only the path entry from the cache
func (c *StoreIDCache) DeletePath(ctx context.Context, path string) error {
return c.cache.Delete(reverseCacheKey(path))
}

// Add adds a new entry to the cache
func (c *StoreIDCache) Set(_ context.Context, spaceID, nodeID, val string) error {
return c.cache.Write(&microstore.Record{
err := c.cache.Write(&microstore.Record{
Key: cacheKey(spaceID, nodeID),
Value: []byte(val),
})
if err != nil {
return err
}

return c.cache.Write(&microstore.Record{
Key: reverseCacheKey(val),
Value: []byte(cacheKey(spaceID, nodeID)),
})
}

// Get returns the value for a given key
Expand All @@ -63,6 +105,23 @@ func (c *StoreIDCache) Get(_ context.Context, spaceID, nodeID string) (string, b
return string(records[0].Value), true
}

// GetByPath returns the key for a given value
func (c *StoreIDCache) GetByPath(_ context.Context, val string) (string, string, bool) {
records, err := c.cache.Read(reverseCacheKey(val))
if err != nil || len(records) == 0 {
return "", "", false
}
parts := strings.SplitN(string(records[0].Value), "!", 2)
if len(parts) != 2 {
return "", "", false
}
return parts[0], parts[1], true
}

func cacheKey(spaceid, nodeID string) string {
return spaceid + "!" + nodeID
}

func reverseCacheKey(val string) string {
return val
}
1 change: 1 addition & 0 deletions pkg/storage/fs/posix/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Options struct {

UseSpaceGroups bool `mapstructure:"use_space_groups"`

WatchFS bool `mapstructure:"watch_fs"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchFolderKafkaBrokers string `mapstructure:"watch_folder_kafka_brokers"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/fs/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, um, o)
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), um, o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), um, o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp, err := tree.New(lu, bs, um, o, store.Create(
tp, err := tree.New(lu, bs, um, o, stream, store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/fs/posix/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
"treesize_accounting": true,
"personalspacepath_template": "users/{{.User.Username}}",
"generalspacepath_template": "projects/{{.SpaceId}}",
"watch_fs": true,
}
// make it possible to override single config values
for k, v := range config {
Expand Down Expand Up @@ -154,7 +155,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, um, o)
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), um, o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), um, o)
default:
Expand All @@ -174,7 +175,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
)

bs := &treemocks.Blobstore{}
tree, err := tree.New(lu, bs, um, o, store.Create())
tree, err := tree.New(lu, bs, um, o, nil, store.Create())
if err != nil {
return nil, err
}
Expand Down
Loading
Loading