Skip to content

Commit

Permalink
Merge pull request #4654 from butonic/backport-4615
Browse files Browse the repository at this point in the history
stable2.19 - Bugfix: write blob based on session id
  • Loading branch information
butonic authored Apr 25, 2024
2 parents 76da22e + a8c5f12 commit 3b6ef86
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 23 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/fix-blobstore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: write blob based on session id

Decomposedfs now uses the session id and size when moving an uplode to the blobstore. This fixes a cornercase that prevents an upload session from correctly being finished when another upload session to the file was started and already finished.

https://github.com/cs3org/reva/pull/4654
https://github.com/cs3org/reva/pull/4615
13 changes: 10 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,16 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

now := time.Now()
if failed {
// propagate sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
log.Error().Err(err).Str("node", n.ID).Str("uploadID", ev.UploadID).Msg("reading node for session failed")
}
if latestSession == session.ID() {
// propagate reverted sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
}
}
} else if p := getParent(); p != nil {
// update parent tmtime to propagate etag change after successful postprocessing
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool {
return err == nil && strings.HasPrefix(v, ProcessingStatus)
}

// ProcessingID returns the latest upload session id
func (n *Node) ProcessingID(ctx context.Context) (string, error) {
v, err := n.XattrString(ctx, prefixes.StatusPrefix)
return strings.TrimPrefix(v, ProcessingStatus), err
}

// IsSpaceRoot checks if the node is a space root
func (n *Node) IsSpaceRoot(ctx context.Context) bool {
_, err := n.Xattr(ctx, prefixes.SpaceNameAttr)
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/utils/decomposedfs/upload/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,15 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
}

var f *lockedfile.File
if session.NodeExists() {
if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node
// we do not need to propagate a change when a node is created, only when the upload is ready.
// that still creates problems for desktop clients because if another change causes propagation it will detects an empty file
// so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision)
// any finished postprocessing will not affect the node metadata.
// *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks.
// so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists())
// FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter!

f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size()))
if f != nil {
appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode")
Expand Down
27 changes: 17 additions & 10 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,12 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo
func (session *OcisSession) Finalize() (err error) {
ctx, span := tracer.Start(session.Context(context.Background()), "Finalize")
defer span.End()
n, err := session.Node(ctx)
if err != nil {
return err
}

revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()}

// upload the data to the blobstore
_, subspan := tracer.Start(ctx, "WriteBlob")
err = session.store.tp.WriteBlob(n, session.binPath())
err = session.store.tp.WriteBlob(revisionNode, session.binPath())
subspan.End()
if err != nil {
return errors.Wrap(err, "failed to upload file to blobstore")
Expand Down Expand Up @@ -316,12 +314,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
ctx := session.Context(context.Background())

if revertNodeMetadata {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() {
p := session.info.MetaData["versionsPath"]
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
Expand All @@ -337,7 +335,16 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
}

} else {
session.removeNode(ctx)
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
}

Expand Down
75 changes: 66 additions & 9 deletions pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/mocks"
Expand Down Expand Up @@ -61,7 +62,8 @@ var _ = Describe("Async file uploads", Ordered, func() {
Username: "username",
}

fileContent = []byte("0123456789")
fileContent = []byte("0123456789")
file2Content = []byte("01234567890123456789")

ctx = ruser.ContextSetUser(context.Background(), user)

Expand All @@ -84,8 +86,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

o, err = options.New(map[string]interface{}{
"root": tmpRoot,
"asyncfileuploads": true,
"root": tmpRoot,
"asyncfileuploads": true,
"treetime_accounting": true,
"treesize_accounting": true,
})
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -141,10 +145,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
n := args.Get(0).(*node.Node)
data, err := os.ReadFile(args.Get(1).(string))

Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(fileContent))
Expect(len(data)).To(Equal(int(n.Blobsize)))
})

// start upload of a file
Expand Down Expand Up @@ -412,7 +416,7 @@ var _ = Describe("Async file uploads", Ordered, func() {

JustBeforeEach(func() {
// upload again
uploadIds, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{})
uploadIds, err := fs.InitiateUpload(ctx, ref, 20, map[string]string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(uploadIds)).To(Equal(2))
Expect(uploadIds["simple"]).ToNot(BeEmpty())
Expand All @@ -422,8 +426,8 @@ var _ = Describe("Async file uploads", Ordered, func() {

_, err = fs.Upload(ctx, storage.UploadRequest{
Ref: uploadRef,
Body: io.NopCloser(bytes.NewReader(fileContent)),
Length: int64(len(fileContent)),
Body: io.NopCloser(bytes.NewReader(file2Content)),
Length: int64(len(file2Content)),
}, nil)
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -456,7 +460,7 @@ var _ = Describe("Async file uploads", Ordered, func() {
})

It("removes processing status when second upload is finished, even if first isn't", func() {
// finish postprocessing
// finish postprocessing of second upload
con <- events.PostprocessingFinished{
UploadID: secondUploadID,
Outcome: events.PPOutcomeContinue,
Expand All @@ -475,5 +479,58 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(item.Path).To(Equal(ref.Path))
Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal(""))
})

It("correctly calculates the size when the second upload is finishes, even if first is deleted", func() {
// finish postprocessing of second upload
con <- events.PostprocessingFinished{
UploadID: secondUploadID,
Outcome: events.PPOutcomeContinue,
}
// wait for upload to be ready
ev, ok := (<-pub).(events.UploadReady)
Expect(ok).To(BeTrue())
Expect(ev.Failed).To(BeFalse())

// check processing status
resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(resources)).To(Equal(1))

item := resources[0]
Expect(item.Path).To(Equal(ref.Path))
Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal(""))

// size should match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should match second upload as well
parentInfo, err := fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))

// finish postprocessing of first upload
con <- events.PostprocessingFinished{
UploadID: uploadID,
Outcome: events.PPOutcomeDelete,
// Outcome: events.PPOutcomeAbort, // This as well ... fck
}
// wait for upload to be ready
ev, ok = (<-pub).(events.UploadReady)
Expect(ok).To(BeTrue())
Expect(ev.Failed).To(BeTrue())

// check processing status
resources, err = fs.ListFolder(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(resources)).To(Equal(1))

// size should still match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should still match second upload as well
parentInfo, err = fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))
})
})
})

0 comments on commit 3b6ef86

Please sign in to comment.