Skip to content

Commit

Permalink
Handle tashbin file listing concurrently
Browse files Browse the repository at this point in the history
Co-authored-by: André Duffeck <[email protected]>

Signed-off-by: Christian Richter <[email protected]>
dragonchaser committed Nov 29, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent ea8d133 commit 6697bce
Showing 1 changed file with 88 additions and 47 deletions.
135 changes: 88 additions & 47 deletions pkg/storage/utils/decomposedfs/recycle.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ package decomposedfs

import (
"context"
"golang.org/x/sync/errgroup"
iofs "io/fs"
"os"
"path/filepath"
@@ -214,66 +215,106 @@ func readTrashLink(path string) (string, string, string, error) {

func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)
items := make([]*provider.RecycleItem, 0)

trashRoot := fs.getRecycleRoot(spaceID)
matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*")
if err != nil {
return nil, err
}

for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}
numWorkers := fs.o.MaxConcurrency
if len(matches) < numWorkers {
numWorkers = len(matches)
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
work := make(chan string, len(matches))
results := make(chan *provider.RecycleItem, len(matches))

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
g, ctx := errgroup.WithContext(ctx)

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
// Distribute work
g.Go(func() error {
defer close(work)
for _, itemPath := range matches {
select {
case work <- itemPath:
case <-ctx.Done():
return ctx.Err()
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for itemPath := range work {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping")
continue
}
// TODO filter results by permission ... on the original parent? or the trashed node?
// if it were on the original parent it would be possible to see files that were trashed before the current user got access
// so -> check the trash node itself
// hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items.
// for now we can only really check if the current user is the owner
items = append(items, item)
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

// Collect results
items := make([]*provider.RecycleItem, len(matches))
i := 0
for ri := range results {
items[i] = ri
i++
}
return items, nil
}

0 comments on commit 6697bce

Please sign in to comment.