Skip to content

Commit

Permalink
fix the upload postprocessing when the destination file does not exis…
Browse files Browse the repository at this point in the history
…t anymore
  • Loading branch information
2403905 committed Jan 3, 2024
1 parent 284fdaa commit 3ac0a90
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 20 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-upload-postprocessing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Fix upload postprocessing.

We fixed the upload postprocessing when the destination file does not exist anymore.

https://github.com/cs3org/reva/pull/4434
34 changes: 24 additions & 10 deletions pkg/storage/fs/ocis/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package blobstore

import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -49,8 +50,10 @@ func New(root string) (*Blobstore, error) {

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {

dest := bs.path(node)
dest, err := bs.path(node)
if err != nil {
return err
}
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
Expand All @@ -69,40 +72,51 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {

f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", bs.path(node))
return errors.Wrapf(err, "could not open blob '%s' for writing", dest)
}

w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", bs.path(node))
return errors.Wrapf(err, "could not write blob '%s'", dest)
}

return w.Flush()
}

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
file, err := os.Open(bs.path(node))
dest, err := bs.path(node)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", bs.path(node))
return nil, err
}
file, err := os.Open(dest)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
}
return file, nil
}

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
if err := utils.RemoveItem(bs.path(node)); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", bs.path(node))
dest, err := bs.path(node)
if err != nil {
return err
}
if err := utils.RemoveItem(dest); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", dest)
}
return nil
}

func (bs *Blobstore) path(node *node.Node) string {
func (bs *Blobstore) path(node *node.Node) (string, error) {
if node.BlobID == "" {
return "", fmt.Errorf("blobstore: BlobID is empty")
}
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
)
), nil
}
15 changes: 10 additions & 5 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,21 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue // NOTE: since we can't get the upload, we can't delete the blob
}

var (
failed bool
keepUpload bool
)

n, err := session.Node(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
}
if !n.Exists {
log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists")
fs.sessionStore.Cleanup(ctx, session, false, false)
continue
}

var (
failed bool
keepUpload bool
)

switch ev.Outcome {
default:
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
var err error
for name := range work {
path := filepath.Join(dir, name)
nodeID := getNodeIDFromCache(ctx, path, t.idCache)
Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/utils/decomposedfs/upload/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,7 @@ func (s *OcisSession) HeaderIfUnmodifiedSince() string {

// Node returns the node for the session
func (s *OcisSession) Node(ctx context.Context) (*node.Node, error) {
n, err := node.ReadNode(ctx, s.store.lu, s.SpaceID(), s.info.Storage["NodeId"], false, nil, true)
if err != nil {
return nil, err
}
return n, nil
return node.ReadNode(ctx, s.store.lu, s.SpaceID(), s.info.Storage["NodeId"], false, nil, true)
}

// ID returns the upload session id
Expand Down

0 comments on commit 3ac0a90

Please sign in to comment.