From 3ac0a90fd9832dffee71922c89e68a3559871b09 Mon Sep 17 00:00:00 2001 From: Roman Perekhod Date: Fri, 22 Dec 2023 15:53:56 +0100 Subject: [PATCH] fix the upload postprocessing when the destination file does not exist anymore --- .../unreleased/fix-upload-postprocessing.md | 5 +++ pkg/storage/fs/ocis/blobstore/blobstore.go | 34 +++++++++++++------ .../utils/decomposedfs/decomposedfs.go | 15 +++++--- pkg/storage/utils/decomposedfs/tree/tree.go | 1 + .../utils/decomposedfs/upload/session.go | 6 +--- 5 files changed, 41 insertions(+), 20 deletions(-) create mode 100644 changelog/unreleased/fix-upload-postprocessing.md diff --git a/changelog/unreleased/fix-upload-postprocessing.md b/changelog/unreleased/fix-upload-postprocessing.md new file mode 100644 index 00000000000..dad4172dbc1 --- /dev/null +++ b/changelog/unreleased/fix-upload-postprocessing.md @@ -0,0 +1,5 @@ +Bugfix: Fix upload postprocessing. + +We fixed the upload postprocessing when the destination file does not exist anymore. + +https://github.com/cs3org/reva/pull/4434 diff --git a/pkg/storage/fs/ocis/blobstore/blobstore.go b/pkg/storage/fs/ocis/blobstore/blobstore.go index 3f12d4905be..7afd3eeb550 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore.go @@ -20,6 +20,7 @@ package blobstore import ( "bufio" + "fmt" "io" "os" "path/filepath" @@ -49,8 +50,10 @@ func New(root string) (*Blobstore, error) { // Upload stores some data in the blobstore under the given key func (bs *Blobstore) Upload(node *node.Node, source string) error { - - dest := bs.path(node) + dest, err := bs.path(node) + if err != nil { + return err + } // ensure parent path exists if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil { return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob") @@ -69,13 +72,13 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700) if err != nil { - return errors.Wrapf(err, "could not open blob '%s' for writing", bs.path(node)) + return errors.Wrapf(err, "could not open blob '%s' for writing", dest) } w := bufio.NewWriter(f) _, err = w.ReadFrom(file) if err != nil { - return errors.Wrapf(err, "could not write blob '%s'", bs.path(node)) + return errors.Wrapf(err, "could not write blob '%s'", dest) } return w.Flush() @@ -83,26 +86,37 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { // Download retrieves a blob from the blobstore for reading func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - file, err := os.Open(bs.path(node)) + dest, err := bs.path(node) if err != nil { - return nil, errors.Wrapf(err, "could not read blob '%s'", bs.path(node)) + return nil, err + } + file, err := os.Open(dest) + if err != nil { + return nil, errors.Wrapf(err, "could not read blob '%s'", dest) } return file, nil } // Delete deletes a blob from the blobstore func (bs *Blobstore) Delete(node *node.Node) error { - if err := utils.RemoveItem(bs.path(node)); err != nil { - return errors.Wrapf(err, "could not delete blob '%s'", bs.path(node)) + dest, err := bs.path(node) + if err != nil { + return err + } + if err := utils.RemoveItem(dest); err != nil { + return errors.Wrapf(err, "could not delete blob '%s'", dest) } return nil } -func (bs *Blobstore) path(node *node.Node) string { +func (bs *Blobstore) path(node *node.Node) (string, error) { + if node.BlobID == "" { + return "", fmt.Errorf("blobstore: BlobID is empty") + } return filepath.Join( bs.root, filepath.Clean(filepath.Join( "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), ), - ) + ), nil } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index bca49ef185c..7a4e0f173c8 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -274,16 +274,21 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { continue // NOTE: since we can't get the upload, we can't delete the blob } - var ( - failed bool - keepUpload bool - ) - n, err := session.Node(ctx) if err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") continue } + if !n.Exists { + log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists") + fs.sessionStore.Cleanup(ctx, session, false, false) + continue + } + + var ( + failed bool + keepUpload bool + ) switch ev.Outcome { default: diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index e0610ed5d50..136a8fb812c 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -375,6 +375,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro // Spawn workers that'll concurrently work the queue for i := 0; i < numWorkers; i++ { g.Go(func() error { + var err error for name := range work { path := filepath.Join(dir, name) nodeID := getNodeIDFromCache(ctx, path, t.idCache) diff --git a/pkg/storage/utils/decomposedfs/upload/session.go b/pkg/storage/utils/decomposedfs/upload/session.go index bd9dc085f09..4107ea5e2dd 100644 --- a/pkg/storage/utils/decomposedfs/upload/session.go +++ b/pkg/storage/utils/decomposedfs/upload/session.go @@ -162,11 +162,7 @@ func (s *OcisSession) HeaderIfUnmodifiedSince() string { // Node returns the node for the session func (s *OcisSession) Node(ctx context.Context) (*node.Node, error) { - n, err := node.ReadNode(ctx, s.store.lu, s.SpaceID(), s.info.Storage["NodeId"], false, nil, true) - if err != nil { - return nil, err - } - return n, nil + return node.ReadNode(ctx, s.store.lu, s.SpaceID(), s.info.Storage["NodeId"], false, nil, true) } // ID returns the upload session id