Skip to content

Commit

Permalink
Merge pull request #4377 from dragonchaser/backport-trashbin-fixes-to…
Browse files Browse the repository at this point in the history
…-2.16

Backport trashbin fixes to 2.16
  • Loading branch information
dragonchaser authored Nov 30, 2023
2 parents a45f8da + 4320010 commit ce3f3e9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 53 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/concurrent-trashbin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Handle trashbin file listings concurrently

We now use a concurrent walker to list files in the trashbin. This
improves performance when listing files in the trashbin.

https://github.com/cs3org/reva/pull/4377
https://github.com/cs3org/reva/pull/4374
https://github.com/owncloud/ocis/issues/7844
147 changes: 97 additions & 50 deletions pkg/storage/utils/decomposedfs/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -35,7 +38,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
)

// Recycle items are stored inside the node folder and start with the uuid of the deleted node.
Expand Down Expand Up @@ -214,66 +216,111 @@ func readTrashLink(path string) (string, string, string, error) {

func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)
items := make([]*provider.RecycleItem, 0)

trashRoot := fs.getRecycleRoot(spaceID)
matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*")

subTrees, err := filepath.Glob(trashRoot + "/*")
if err != nil {
return nil, err
}

for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}
numWorkers := fs.o.MaxConcurrency
if len(subTrees) < numWorkers {
numWorkers = len(subTrees)
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
work := make(chan string, len(subTrees))
results := make(chan *provider.RecycleItem, len(subTrees))

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
g, ctx := errgroup.WithContext(ctx)

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
// Distribute work
g.Go(func() error {
defer close(work)
for _, itemPath := range subTrees {
select {
case work <- itemPath:
case <-ctx.Done():
return ctx.Err()
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for subTree := range work {
matches, err := filepath.Glob(subTree + "/*/*/*/*")
if err != nil {
return err
}

for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping")
continue
}
// TODO filter results by permission ... on the original parent? or the trashed node?
// if it were on the original parent it would be possible to see files that were trashed before the current user got access
// so -> check the trash node itself
// hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items.
// for now we can only really check if the current user is the owner
items = append(items, item)
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

// Collect results
items := []*provider.RecycleItem{}
for ri := range results {
items = append(items, ri)
}
return items, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/utils/decomposedfs/recycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,10 @@ var _ = Describe("Recycle", func() {
itemsA, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/")
Expect(err).ToNot(HaveOccurred())
Expect(len(itemsA)).To(Equal(2))

itemsB, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/")
Expect(err).ToNot(HaveOccurred())
Expect(len(itemsB)).To(Equal(2))

Expect(itemsA).To(Equal(itemsB))
Expect(itemsA).To(ConsistOf(itemsB))
})

It("they can be permanently deleted by the other user", func() {
Expand Down

0 comments on commit ce3f3e9

Please sign in to comment.