From 8b53e0d495714f581450c295296b2d81ce6c2a75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 5 Apr 2024 17:30:37 +0200 Subject: [PATCH] write blob based on session id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/fs/ocis/blobstore/blobstore.go | 24 +++--- .../fs/ocis/blobstore/blobstore_test.go | 18 ++--- pkg/storage/fs/posix/blobstore/blobstore.go | 24 +++--- .../fs/posix/blobstore/blobstore_test.go | 18 ++--- pkg/storage/fs/posix/tree/mocks/Blobstore.go | 79 ++++++++++--------- pkg/storage/fs/posix/tree/tree.go | 32 +++----- pkg/storage/fs/s3ng/blobstore/blobstore.go | 57 ++++++++----- .../utils/decomposedfs/decomposedfs.go | 2 +- pkg/storage/utils/decomposedfs/node/node.go | 6 +- .../utils/decomposedfs/recycle_test.go | 8 +- pkg/storage/utils/decomposedfs/revisions.go | 17 ++-- .../decomposedfs/tree/mocks/Blobstore.go | 79 ++++++++++--------- pkg/storage/utils/decomposedfs/tree/tree.go | 32 +++----- .../utils/decomposedfs/tree/tree_test.go | 8 +- .../utils/decomposedfs/upload/store.go | 10 ++- .../utils/decomposedfs/upload/upload.go | 12 +-- .../utils/decomposedfs/upload_async_test.go | 4 +- pkg/storage/utils/decomposedfs/upload_test.go | 10 +-- 18 files changed, 229 insertions(+), 211 deletions(-) diff --git a/pkg/storage/fs/ocis/blobstore/blobstore.go b/pkg/storage/fs/ocis/blobstore/blobstore.go index 7afd3eeb550..00e1d78a587 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore.go @@ -26,7 +26,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -49,8 +48,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { - dest, err := bs.path(node) +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -85,8 +84,8 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - dest, err := bs.path(node) +func (bs *Blobstore) Download(spaceID, blobID string) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) if err != nil { return nil, err } @@ -98,8 +97,8 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - dest, err := bs.path(node) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -109,14 +108,17 @@ func (bs *Blobstore) Delete(node *node.Node) error { return nil } -func (bs *Blobstore) path(node *node.Node) (string, error) { - if node.BlobID == "" { - return "", fmt.Errorf("blobstore: BlobID is empty") +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), + "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/ocis/blobstore/blobstore_test.go b/pkg/storage/fs/ocis/blobstore/blobstore_test.go index e895497abcd..a865f186ee6 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore_test.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore_test.go @@ -24,7 +24,6 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -34,7 +33,8 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - blobNode *node.Node + spaceID string + blobID string blobPath string blobSrcFile string data []byte @@ -48,10 +48,8 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - blobNode = &node.Node{ - SpaceID: "wonderfullspace", - BlobID: "huuuuugeblob", - } + spaceID = "wonderfullspace" + blobID = "huuuuugeblob" blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -77,7 +75,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(blobNode, blobSrcFile) + err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -95,7 +93,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -104,7 +102,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -118,7 +116,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(blobNode) + err = bs.Delete(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/blobstore/blobstore.go b/pkg/storage/fs/posix/blobstore/blobstore.go index 7afd3eeb550..00e1d78a587 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore.go +++ b/pkg/storage/fs/posix/blobstore/blobstore.go @@ -26,7 +26,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -49,8 +48,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { - dest, err := bs.path(node) +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -85,8 +84,8 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - dest, err := bs.path(node) +func (bs *Blobstore) Download(spaceID, blobID string) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) if err != nil { return nil, err } @@ -98,8 +97,8 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - dest, err := bs.path(node) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -109,14 +108,17 @@ func (bs *Blobstore) Delete(node *node.Node) error { return nil } -func (bs *Blobstore) path(node *node.Node) (string, error) { - if node.BlobID == "" { - return "", fmt.Errorf("blobstore: BlobID is empty") +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), + "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/posix/blobstore/blobstore_test.go b/pkg/storage/fs/posix/blobstore/blobstore_test.go index e895497abcd..a865f186ee6 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore_test.go +++ b/pkg/storage/fs/posix/blobstore/blobstore_test.go @@ -24,7 +24,6 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -34,7 +33,8 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - blobNode *node.Node + spaceID string + blobID string blobPath string blobSrcFile string data []byte @@ -48,10 +48,8 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - blobNode = &node.Node{ - SpaceID: "wonderfullspace", - BlobID: "huuuuugeblob", - } + spaceID = "wonderfullspace" + blobID = "huuuuugeblob" blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -77,7 +75,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(blobNode, blobSrcFile) + err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -95,7 +93,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -104,7 +102,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -118,7 +116,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(blobNode) + err = bs.Delete(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/tree/mocks/Blobstore.go b/pkg/storage/fs/posix/tree/mocks/Blobstore.go index 3f23a832e15..17a196234d4 100644 --- a/pkg/storage/fs/posix/tree/mocks/Blobstore.go +++ b/pkg/storage/fs/posix/tree/mocks/Blobstore.go @@ -23,7 +23,6 @@ package mocks import ( io "io" - node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -40,17 +39,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: _a0 -func (_m *Blobstore) Delete(_a0 *node.Node) error { - ret := _m.Called(_a0) +// Delete provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Delete(spaceID string, blobID string) error { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(spaceID, blobID) } else { r0 = ret.Error(0) } @@ -64,14 +63,15 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} } -func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: _a0 -func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) +// Download provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Download(spaceID string, blobID string) (io.ReadCloser, error) { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) (io.ReadCloser, error)); ok { + return rf(spaceID, blobID) } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok { + r0 = rf(spaceID, blobID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(spaceID, blobID) } else { r1 = ret.Error(1) } @@ -122,14 +122,15 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID)} } -func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -139,22 +140,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: _a0, source -func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) +// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source +func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { + ret := _m.Called(spaceID, blobID, blobSize, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) + if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { + r0 = rf(spaceID, blobID, blobSize, source) } else { r0 = ret.Error(0) } @@ -168,15 +169,17 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - _a0 *node.Node +// - spaceID string +// - blobID string +// - blobSize int64 // - source string -func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} +func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node), args[1].(string)) + run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) }) return _c } @@ -186,7 +189,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index b157133cac9..eb9562bb91d 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -21,7 +21,6 @@ package tree import ( "bytes" "context" - "fmt" "io" "io/fs" "os" @@ -56,9 +55,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(node *node.Node, source string) error - Download(node *node.Node) (io.ReadCloser, error) - Delete(node *node.Node) error + Upload(spaceID, blobID string, blobSize int64, source string) error + Download(spaceID, blobID string) (io.ReadCloser, error) + Delete(spaceID, blobID string) error } // Tree manages a hierarchical tree @@ -616,7 +615,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n); err != nil { + if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { log.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -645,7 +644,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error } if bID != "" { - if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { + if err := t.DeleteBlob(n.SpaceID, bID); err != nil { log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -662,29 +661,22 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(node *node.Node, source string) error { - return t.blobstore.Upload(node, source) +func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { + return t.blobstore.Upload(spaceID, blobID, blobSize, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { - if node.BlobID == "" { +func (t *Tree) ReadBlob(spaceID, blobID string) (io.ReadCloser, error) { + if blobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(node) + return t.blobstore.Download(spaceID, blobID) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(node *node.Node) error { - if node == nil { - return fmt.Errorf("could not delete blob, nil node was given") - } - if node.BlobID == "" { - return fmt.Errorf("could not delete blob, node with empty blob id was given") - } - - return t.blobstore.Delete(node) +func (t *Tree) DeleteBlob(spaceID, blobID string) error { + return t.blobstore.Delete(spaceID, blobID) } // TODO check if node exists? diff --git a/pkg/storage/fs/s3ng/blobstore/blobstore.go b/pkg/storage/fs/s3ng/blobstore/blobstore.go index 48a40b764f0..b5bf4cb286e 100644 --- a/pkg/storage/fs/s3ng/blobstore/blobstore.go +++ b/pkg/storage/fs/s3ng/blobstore/blobstore.go @@ -27,7 +27,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" @@ -76,14 +75,19 @@ func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOption } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) + if err != nil { + return err + } + reader, err := os.Open(source) if err != nil { return errors.Wrap(err, "can not open source file to upload") } defer reader.Close() - _, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ + _, err = bs.client.PutObject(context.Background(), bs.bucket, dest, reader, blobSize, minio.PutObjectOptions{ ContentType: "application/octet-stream", SendContentMd5: bs.defaultPutOptions.SendContentMd5, ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts, @@ -94,40 +98,57 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { }) if err != nil { - return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket) + return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", dest, bs.bucket) } return nil } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{}) +func (bs *Blobstore) Download(spaceID, blobID string) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) if err != nil { - return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket) + return nil, err } - - stat, err := reader.Stat() + reader, err := bs.client.GetObject(context.Background(), bs.bucket, dest, minio.GetObjectOptions{}) if err != nil { - return nil, errors.Wrapf(err, "blob path: %s", bs.path(node)) + return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", dest, bs.bucket) } - if node.Blobsize != stat.Size { - return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", node.Blobsize, stat.Size) - } + // FIXME move to caller + /* + stat, err := reader.Stat() + if err != nil { + return nil, errors.Wrapf(err, "blob path: %s", dest) + } + + if node.Blobsize != stat.Size { + return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", node.Blobsize, stat.Size) + } + */ return reader, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{}) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) + if err != nil { + return err + } + err = bs.client.RemoveObject(context.Background(), bs.bucket, dest, minio.RemoveObjectOptions{}) if err != nil { - return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket) + return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", dest, bs.bucket) } return nil } -func (bs *Blobstore) path(node *node.Node) string { +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") + } // https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/ // Prefixes are used to partion a bucket. A prefix is everything except the filename. // For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix. @@ -136,5 +157,5 @@ func (bs *Blobstore) path(node *node.Node) string { // // Since the spaceID is always the same for a space, we don't need to pathify that, because it would // not yield any performance gains - return filepath.Clean(filepath.Join(node.SpaceID, lookup.Pathify(node.BlobID, 4, 2))) + return filepath.Clean(filepath.Join(spaceID, lookup.Pathify(blobID, 4, 2))), nil } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index a9754c07439..b192cce9804 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -1045,7 +1045,7 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) ( if currentEtag != expectedEtag { return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag)) } - reader, err := fs.tp.ReadBlob(n) + reader, err := fs.tp.ReadBlob(n.SpaceID, n.BlobID) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'") } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index f1db7f5eb81..c0a36cd49c3 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -99,9 +99,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error) - WriteBlob(node *Node, source string) error - ReadBlob(node *Node) (io.ReadCloser, error) - DeleteBlob(node *Node) error + WriteBlob(spaceID, blobId string, blobSize int64, source string) error + ReadBlob(spaceID, blobId string) (io.ReadCloser, error) + DeleteBlob(spaceID, blobId string) error Propagate(ctx context.Context, node *Node, sizeDiff int64) (err error) } diff --git a/pkg/storage/utils/decomposedfs/recycle_test.go b/pkg/storage/utils/decomposedfs/recycle_test.go index 1af163d0932..7dd0f365d9d 100644 --- a/pkg/storage/utils/decomposedfs/recycle_test.go +++ b/pkg/storage/utils/decomposedfs/recycle_test.go @@ -86,7 +86,7 @@ var _ = Describe("Recycle", func() { It("they can be permanently deleted by this user", func() { // mock call to blobstore - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -104,7 +104,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -177,7 +177,7 @@ var _ = Describe("Recycle", func() { }) It("they can be permanently deleted by the other user", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -206,7 +206,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored by the other user", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index c079c3bddb1..9bad3c5b5a2 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -152,14 +152,17 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) } - blobsize, err := fs.lu.ReadBlobSizeAttr(ctx, contentPath) - if err != nil { - return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey) - } + // FIXME should ReadBlob check if the size matches? + /* + blobsize, err := fs.lu.ReadBlobSizeAttr(ctx, contentPath) + if err != nil { + return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey) + } - revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore + revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore + */ - reader, err := fs.tp.ReadBlob(&revisionNode) + reader, err := fs.tp.ReadBlob(spaceID, blobid) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } @@ -320,7 +323,7 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere return err } - return fs.tp.DeleteBlob(n) + return fs.tp.DeleteBlob(n.SpaceID, n.BlobID) } func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { diff --git a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go index 3f23a832e15..17a196234d4 100644 --- a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go +++ b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go @@ -23,7 +23,6 @@ package mocks import ( io "io" - node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -40,17 +39,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: _a0 -func (_m *Blobstore) Delete(_a0 *node.Node) error { - ret := _m.Called(_a0) +// Delete provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Delete(spaceID string, blobID string) error { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(spaceID, blobID) } else { r0 = ret.Error(0) } @@ -64,14 +63,15 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} } -func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: _a0 -func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) +// Download provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Download(spaceID string, blobID string) (io.ReadCloser, error) { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) (io.ReadCloser, error)); ok { + return rf(spaceID, blobID) } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok { + r0 = rf(spaceID, blobID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(spaceID, blobID) } else { r1 = ret.Error(1) } @@ -122,14 +122,15 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID)} } -func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -139,22 +140,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: _a0, source -func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) +// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source +func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { + ret := _m.Called(spaceID, blobID, blobSize, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) + if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { + r0 = rf(spaceID, blobID, blobSize, source) } else { r0 = ret.Error(0) } @@ -168,15 +169,17 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - _a0 *node.Node +// - spaceID string +// - blobID string +// - blobSize int64 // - source string -func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} +func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node), args[1].(string)) + run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) }) return _c } @@ -186,7 +189,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 3d8a057d8b6..546e567a482 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -21,7 +21,6 @@ package tree import ( "bytes" "context" - "fmt" "io" "io/fs" "os" @@ -55,9 +54,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(node *node.Node, source string) error - Download(node *node.Node) (io.ReadCloser, error) - Delete(node *node.Node) error + Upload(spaceID, blobID string, blobSize int64, source string) error + Download(spaceID, blobID string) (io.ReadCloser, error) + Delete(spaceID, blobID string) error } // Tree manages a hierarchical tree @@ -695,7 +694,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n); err != nil { + if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { logger.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -724,7 +723,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. } if bID != "" { - if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { + if err := t.DeleteBlob(n.SpaceID, bID); err != nil { logger.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -741,29 +740,22 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(node *node.Node, source string) error { - return t.blobstore.Upload(node, source) +func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { + return t.blobstore.Upload(spaceID, blobID, blobSize, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { - if node.BlobID == "" { +func (t *Tree) ReadBlob(spaceID, blobID string) (io.ReadCloser, error) { + if blobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(node) + return t.blobstore.Download(spaceID, blobID) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(node *node.Node) error { - if node == nil { - return fmt.Errorf("could not delete blob, nil node was given") - } - if node.BlobID == "" { - return fmt.Errorf("could not delete blob, node with empty blob id was given") - } - - return t.blobstore.Delete(node) +func (t *Tree) DeleteBlob(spaceID, blobID string) error { + return t.blobstore.Delete(spaceID, blobID) } // TODO check if node exists? diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index 74acdf74730..2f9bb397126 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -128,7 +128,7 @@ var _ = Describe("Tree", func() { }) It("does not delete the blob from the blobstore", func() { - env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) + env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) }) }) }) @@ -139,7 +139,7 @@ var _ = Describe("Tree", func() { ) JustBeforeEach(func() { - env.Blobstore.On("Delete", mock.AnythingOfType("*node.Node")).Return(nil) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) }) @@ -160,7 +160,7 @@ var _ = Describe("Tree", func() { }) It("deletes the blob from the blobstore", func() { - env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) + env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) }) }) @@ -264,7 +264,7 @@ var _ = Describe("Tree", func() { trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) - env.Blobstore.On("Delete", mock.Anything).Return(nil) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) }) Describe("PurgeRecycleItemFunc", func() { diff --git a/pkg/storage/utils/decomposedfs/upload/store.go b/pkg/storage/utils/decomposedfs/upload/store.go index 57a7aea8f4b..aeab235de17 100644 --- a/pkg/storage/utils/decomposedfs/upload/store.go +++ b/pkg/storage/utils/decomposedfs/upload/store.go @@ -205,7 +205,15 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. } var f *lockedfile.File - if session.NodeExists() { + if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node + // we do not need to propagate a change when a node is created, only when the upload is ready. + // that still creates problems for desktop clients because if another change causes propagation it will detects an empty file + // so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision) + // any finished postprocessing will not affect the node metadata. + // *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks. + // so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists()) + // FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter! + f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size())) if f != nil { appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 9244052a107..9a1a086ce46 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -67,9 +67,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - WriteBlob(node *node.Node, binPath string) error - ReadBlob(node *node.Node) (io.ReadCloser, error) - DeleteBlob(node *node.Node) error + WriteBlob(spaceID, blobID string, blobSize int64, binPath string) error + ReadBlob(spaceID, blobID string) (io.ReadCloser, error) + DeleteBlob(spaceID, blobID string) error Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } @@ -278,14 +278,10 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo func (session *OcisSession) Finalize() (err error) { ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") defer span.End() - n, err := session.Node(ctx) - if err != nil { - return err - } // upload the data to the blobstore _, subspan := tracer.Start(ctx, "WriteBlob") - err = session.store.tp.WriteBlob(n, session.binPath()) + err = session.store.tp.WriteBlob(session.SpaceID(), session.ID(), session.Size(), session.binPath()) subspan.End() if err != nil { return errors.Wrap(err, "failed to upload file to blobstore") diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index d200bebe5b2..fa48cb2af75 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -138,10 +138,10 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(err).ToNot(HaveOccurred()) ref.ResourceId = &resID - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil). Run(func(args mock.Arguments) { - data, err := os.ReadFile(args.Get(1).(string)) + data, err := os.ReadFile(args.Get(3).(string)) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal(fileContent)) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 5899b59f964..90e37edb588 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -238,7 +238,7 @@ var _ = Describe("File uploads", func() { When("the user initiates a zero byte file upload", func() { It("succeeds", func() { - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -253,7 +253,7 @@ var _ = Describe("File uploads", func() { }) It("fails when trying to upload empty data. 0-byte uploads are finished during initialization already", func() { - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -288,10 +288,10 @@ var _ = Describe("File uploads", func() { uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]} - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil). Run(func(args mock.Arguments) { - data, err := os.ReadFile(args.Get(1).(string)) + data, err := os.ReadFile(args.Get(3).(string)) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal([]byte("0123456789"))) @@ -304,7 +304,7 @@ var _ = Describe("File uploads", func() { }, nil) Expect(err).ToNot(HaveOccurred()) - bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) + bs.AssertCalled(GinkgoT(), "Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{})