diff --git a/changelogs/unreleased/7452-Lyndon-Li b/changelogs/unreleased/7452-Lyndon-Li new file mode 100644 index 0000000000..af2563e507 --- /dev/null +++ b/changelogs/unreleased/7452-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #7211. Enable advanced feature capability and add support to concatenate objects for unified repo. \ No newline at end of file diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index a0a99d2b45..93d3c05b1f 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -366,6 +366,39 @@ func (kr *kopiaRepository) Flush(ctx context.Context) error { return nil } +func (kr *kopiaRepository) GetAdvancedFeatures() udmrepo.AdvancedFeatureInfo { + return udmrepo.AdvancedFeatureInfo{ + MultiPartBackup: true, + } +} + +func (kr *kopiaRepository) ConcatenateObjects(ctx context.Context, objectIDs []udmrepo.ID) (udmrepo.ID, error) { + if kr.rawWriter == nil { + return "", errors.New("repo writer is closed or not open") + } + + if len(objectIDs) == 0 { + return udmrepo.ID(""), errors.New("object list is empty") + } + + rawIDs := []object.ID{} + for _, id := range objectIDs { + rawID, err := object.ParseID(string(id)) + if err != nil { + return udmrepo.ID(""), errors.Wrapf(err, "error to parse object ID from %v", id) + } + + rawIDs = append(rawIDs, rawID) + } + + result, err := kr.rawWriter.ConcatenateObjects(ctx, rawIDs) + if err != nil { + return udmrepo.ID(""), errors.Wrap(err, "error to concatenate objects") + } + + return udmrepo.ID(result.String()), nil +} + // updateProgress is called when the repository writes a piece of blob data to the storage during data write func (kr *kopiaRepository) updateProgress(uploaded int64) { total := atomic.AddInt64(&kr.uploaded, uploaded) diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_test.go index e444c20d6f..952d3fcdaa 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo_test.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo_test.go @@ -716,6 +716,78 @@ func TestFlush(t *testing.T) { } } +func TestConcatenateObjects(t *testing.T) { + testCases := []struct { + name string + setWriter bool + rawWriter *repomocks.DirectRepositoryWriter + rawWriterRetErr error + objectIDs []udmrepo.ID + expectedErr string + }{ + { + name: "writer is nil", + expectedErr: "repo writer is closed or not open", + }, + { + name: "empty object list", + setWriter: true, + expectedErr: "object list is empty", + }, + { + name: "invalid object id", + objectIDs: []udmrepo.ID{ + "I123456", + "fake-id", + "I678901", + }, + setWriter: true, + expectedErr: "error to parse object ID from fake-id: malformed content ID: \"fake-id\": invalid content prefix", + }, + { + name: "concatenate error", + rawWriter: repomocks.NewDirectRepositoryWriter(t), + rawWriterRetErr: errors.New("fake-concatenate-error"), + objectIDs: []udmrepo.ID{ + "I123456", + }, + setWriter: true, + expectedErr: "error to concatenate objects: fake-concatenate-error", + }, + { + name: "succeed", + rawWriter: repomocks.NewDirectRepositoryWriter(t), + objectIDs: []udmrepo.ID{ + "I123456", + }, + setWriter: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{} + + if tc.rawWriter != nil { + require.NotNil(t, tc.rawWriter) + tc.rawWriter.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(object.ID{}, tc.rawWriterRetErr) + } + + if tc.setWriter { + kr.rawWriter = tc.rawWriter + } + + _, err := kr.ConcatenateObjects(context.Background(), tc.objectIDs) + + if tc.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.expectedErr) + } + }) + } +} + func TestNewObjectWriter(t *testing.T) { rawObjWriter := repomocks.NewWriter(t) testCases := []struct { diff --git a/pkg/repository/udmrepo/mocks/BackupRepo.go b/pkg/repository/udmrepo/mocks/BackupRepo.go index ea8e2ba3c4..7d044356d5 100644 --- a/pkg/repository/udmrepo/mocks/BackupRepo.go +++ b/pkg/repository/udmrepo/mocks/BackupRepo.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -20,6 +20,10 @@ type BackupRepo struct { func (_m *BackupRepo) Close(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -30,10 +34,42 @@ func (_m *BackupRepo) Close(ctx context.Context) error { return r0 } +// ConcatenateObjects provides a mock function with given fields: ctx, objectIDs +func (_m *BackupRepo) ConcatenateObjects(ctx context.Context, objectIDs []udmrepo.ID) (udmrepo.ID, error) { + ret := _m.Called(ctx, objectIDs) + + if len(ret) == 0 { + panic("no return value specified for ConcatenateObjects") + } + + var r0 udmrepo.ID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []udmrepo.ID) (udmrepo.ID, error)); ok { + return rf(ctx, objectIDs) + } + if rf, ok := ret.Get(0).(func(context.Context, []udmrepo.ID) udmrepo.ID); ok { + r0 = rf(ctx, objectIDs) + } else { + r0 = ret.Get(0).(udmrepo.ID) + } + + if rf, ok := ret.Get(1).(func(context.Context, []udmrepo.ID) error); ok { + r1 = rf(ctx, objectIDs) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // DeleteManifest provides a mock function with given fields: ctx, id func (_m *BackupRepo) DeleteManifest(ctx context.Context, id udmrepo.ID) error { ret := _m.Called(ctx, id) + if len(ret) == 0 { + panic("no return value specified for DeleteManifest") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) error); ok { r0 = rf(ctx, id) @@ -48,7 +84,15 @@ func (_m *BackupRepo) DeleteManifest(ctx context.Context, id udmrepo.ID) error { func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.ManifestFilter) ([]*udmrepo.ManifestEntryMetadata, error) { ret := _m.Called(ctx, filter) + if len(ret) == 0 { + panic("no return value specified for FindManifests") + } + var r0 []*udmrepo.ManifestEntryMetadata + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ManifestFilter) ([]*udmrepo.ManifestEntryMetadata, error)); ok { + return rf(ctx, filter) + } if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ManifestFilter) []*udmrepo.ManifestEntryMetadata); ok { r0 = rf(ctx, filter) } else { @@ -57,7 +101,6 @@ func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.Manifest } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, udmrepo.ManifestFilter) error); ok { r1 = rf(ctx, filter) } else { @@ -71,6 +114,10 @@ func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.Manifest func (_m *BackupRepo) Flush(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Flush") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -81,10 +128,32 @@ func (_m *BackupRepo) Flush(ctx context.Context) error { return r0 } +// GetAdvancedFeatures provides a mock function with given fields: +func (_m *BackupRepo) GetAdvancedFeatures() udmrepo.AdvancedFeatureInfo { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetAdvancedFeatures") + } + + var r0 udmrepo.AdvancedFeatureInfo + if rf, ok := ret.Get(0).(func() udmrepo.AdvancedFeatureInfo); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(udmrepo.AdvancedFeatureInfo) + } + + return r0 +} + // GetManifest provides a mock function with given fields: ctx, id, mani func (_m *BackupRepo) GetManifest(ctx context.Context, id udmrepo.ID, mani *udmrepo.RepoManifest) error { ret := _m.Called(ctx, id, mani) + if len(ret) == 0 { + panic("no return value specified for GetManifest") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID, *udmrepo.RepoManifest) error); ok { r0 = rf(ctx, id, mani) @@ -99,6 +168,10 @@ func (_m *BackupRepo) GetManifest(ctx context.Context, id udmrepo.ID, mani *udmr func (_m *BackupRepo) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) udmrepo.ObjectWriter { ret := _m.Called(ctx, opt) + if len(ret) == 0 { + panic("no return value specified for NewObjectWriter") + } + var r0 udmrepo.ObjectWriter if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ObjectWriteOptions) udmrepo.ObjectWriter); ok { r0 = rf(ctx, opt) @@ -115,7 +188,15 @@ func (_m *BackupRepo) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWri func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.ObjectReader, error) { ret := _m.Called(ctx, id) + if len(ret) == 0 { + panic("no return value specified for OpenObject") + } + var r0 udmrepo.ObjectReader + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) (udmrepo.ObjectReader, error)); ok { + return rf(ctx, id) + } if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) udmrepo.ObjectReader); ok { r0 = rf(ctx, id) } else { @@ -124,7 +205,6 @@ func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.Ob } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, udmrepo.ID) error); ok { r1 = rf(ctx, id) } else { @@ -138,14 +218,21 @@ func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.Ob func (_m *BackupRepo) PutManifest(ctx context.Context, mani udmrepo.RepoManifest) (udmrepo.ID, error) { ret := _m.Called(ctx, mani) + if len(ret) == 0 { + panic("no return value specified for PutManifest") + } + var r0 udmrepo.ID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoManifest) (udmrepo.ID, error)); ok { + return rf(ctx, mani) + } if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoManifest) udmrepo.ID); ok { r0 = rf(ctx, mani) } else { r0 = ret.Get(0).(udmrepo.ID) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, udmrepo.RepoManifest) error); ok { r1 = rf(ctx, mani) } else { @@ -159,6 +246,10 @@ func (_m *BackupRepo) PutManifest(ctx context.Context, mani udmrepo.RepoManifest func (_m *BackupRepo) Time() time.Time { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Time") + } + var r0 time.Time if rf, ok := ret.Get(0).(func() time.Time); ok { r0 = rf() @@ -169,13 +260,12 @@ func (_m *BackupRepo) Time() time.Time { return r0 } -type mockConstructorTestingTNewBackupRepo interface { +// NewBackupRepo creates a new instance of BackupRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBackupRepo(t interface { mock.TestingT Cleanup(func()) -} - -// NewBackupRepo creates a new instance of BackupRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBackupRepo(t mockConstructorTestingTNewBackupRepo) *BackupRepo { +}) *BackupRepo { mock := &BackupRepo{} mock.Mock.Test(t) diff --git a/pkg/repository/udmrepo/mocks/BackupRepoService.go b/pkg/repository/udmrepo/mocks/BackupRepoService.go index b61bf5ccc3..2acf94816f 100644 --- a/pkg/repository/udmrepo/mocks/BackupRepoService.go +++ b/pkg/repository/udmrepo/mocks/BackupRepoService.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -20,6 +20,10 @@ type BackupRepoService struct { func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for DefaultMaintenanceFrequency") + } + var r0 time.Duration if rf, ok := ret.Get(0).(func() time.Duration); ok { r0 = rf() @@ -34,6 +38,10 @@ func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration { func (_m *BackupRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOptions, createNew bool) error { ret := _m.Called(ctx, repoOption, createNew) + if len(ret) == 0 { + panic("no return value specified for Init") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions, bool) error); ok { r0 = rf(ctx, repoOption, createNew) @@ -48,6 +56,10 @@ func (_m *BackupRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOp func (_m *BackupRepoService) Maintain(ctx context.Context, repoOption udmrepo.RepoOptions) error { ret := _m.Called(ctx, repoOption) + if len(ret) == 0 { + panic("no return value specified for Maintain") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) error); ok { r0 = rf(ctx, repoOption) @@ -62,7 +74,15 @@ func (_m *BackupRepoService) Maintain(ctx context.Context, repoOption udmrepo.Re func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOptions) (udmrepo.BackupRepo, error) { ret := _m.Called(ctx, repoOption) + if len(ret) == 0 { + panic("no return value specified for Open") + } + var r0 udmrepo.BackupRepo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) (udmrepo.BackupRepo, error)); ok { + return rf(ctx, repoOption) + } if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) udmrepo.BackupRepo); ok { r0 = rf(ctx, repoOption) } else { @@ -71,7 +91,6 @@ func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOp } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, udmrepo.RepoOptions) error); ok { r1 = rf(ctx, repoOption) } else { @@ -81,13 +100,12 @@ func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOp return r0, r1 } -type mockConstructorTestingTNewBackupRepoService interface { +// NewBackupRepoService creates a new instance of BackupRepoService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBackupRepoService(t interface { mock.TestingT Cleanup(func()) -} - -// NewBackupRepoService creates a new instance of BackupRepoService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBackupRepoService(t mockConstructorTestingTNewBackupRepoService) *BackupRepoService { +}) *BackupRepoService { mock := &BackupRepoService{} mock.Mock.Test(t) diff --git a/pkg/repository/udmrepo/mocks/ObjectReader.go b/pkg/repository/udmrepo/mocks/ObjectReader.go index 2410acd6e6..8a0a350da3 100644 --- a/pkg/repository/udmrepo/mocks/ObjectReader.go +++ b/pkg/repository/udmrepo/mocks/ObjectReader.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -13,6 +13,10 @@ type ObjectReader struct { func (_m *ObjectReader) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -27,6 +31,10 @@ func (_m *ObjectReader) Close() error { func (_m *ObjectReader) Length() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Length") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -41,14 +49,21 @@ func (_m *ObjectReader) Length() int64 { func (_m *ObjectReader) Read(p []byte) (int, error) { ret := _m.Called(p) + if len(ret) == 0 { + panic("no return value specified for Read") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(p) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(p) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(p) } else { @@ -62,14 +77,21 @@ func (_m *ObjectReader) Read(p []byte) (int, error) { func (_m *ObjectReader) Seek(offset int64, whence int) (int64, error) { ret := _m.Called(offset, whence) + if len(ret) == 0 { + panic("no return value specified for Seek") + } + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(int64, int) (int64, error)); ok { + return rf(offset, whence) + } if rf, ok := ret.Get(0).(func(int64, int) int64); ok { r0 = rf(offset, whence) } else { r0 = ret.Get(0).(int64) } - var r1 error if rf, ok := ret.Get(1).(func(int64, int) error); ok { r1 = rf(offset, whence) } else { @@ -79,13 +101,12 @@ func (_m *ObjectReader) Seek(offset int64, whence int) (int64, error) { return r0, r1 } -type mockConstructorTestingTNewObjectReader interface { +// NewObjectReader creates a new instance of ObjectReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewObjectReader(t interface { mock.TestingT Cleanup(func()) -} - -// NewObjectReader creates a new instance of ObjectReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewObjectReader(t mockConstructorTestingTNewObjectReader) *ObjectReader { +}) *ObjectReader { mock := &ObjectReader{} mock.Mock.Test(t) diff --git a/pkg/repository/udmrepo/mocks/ObjectWriter.go b/pkg/repository/udmrepo/mocks/ObjectWriter.go index 277a0ed4a3..4bc21f8b79 100644 --- a/pkg/repository/udmrepo/mocks/ObjectWriter.go +++ b/pkg/repository/udmrepo/mocks/ObjectWriter.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -16,14 +16,21 @@ type ObjectWriter struct { func (_m *ObjectWriter) Checkpoint() (udmrepo.ID, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Checkpoint") + } + var r0 udmrepo.ID + var r1 error + if rf, ok := ret.Get(0).(func() (udmrepo.ID, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() udmrepo.ID); ok { r0 = rf() } else { r0 = ret.Get(0).(udmrepo.ID) } - var r1 error if rf, ok := ret.Get(1).(func() error); ok { r1 = rf() } else { @@ -37,6 +44,10 @@ func (_m *ObjectWriter) Checkpoint() (udmrepo.ID, error) { func (_m *ObjectWriter) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -51,14 +62,21 @@ func (_m *ObjectWriter) Close() error { func (_m *ObjectWriter) Result() (udmrepo.ID, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Result") + } + var r0 udmrepo.ID + var r1 error + if rf, ok := ret.Get(0).(func() (udmrepo.ID, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() udmrepo.ID); ok { r0 = rf() } else { r0 = ret.Get(0).(udmrepo.ID) } - var r1 error if rf, ok := ret.Get(1).(func() error); ok { r1 = rf() } else { @@ -72,14 +90,21 @@ func (_m *ObjectWriter) Result() (udmrepo.ID, error) { func (_m *ObjectWriter) Seek(offset int64, whence int) (int64, error) { ret := _m.Called(offset, whence) + if len(ret) == 0 { + panic("no return value specified for Seek") + } + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(int64, int) (int64, error)); ok { + return rf(offset, whence) + } if rf, ok := ret.Get(0).(func(int64, int) int64); ok { r0 = rf(offset, whence) } else { r0 = ret.Get(0).(int64) } - var r1 error if rf, ok := ret.Get(1).(func(int64, int) error); ok { r1 = rf(offset, whence) } else { @@ -93,14 +118,21 @@ func (_m *ObjectWriter) Seek(offset int64, whence int) (int64, error) { func (_m *ObjectWriter) Write(p []byte) (int, error) { ret := _m.Called(p) + if len(ret) == 0 { + panic("no return value specified for Write") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(p) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(p) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(p) } else { @@ -110,13 +142,12 @@ func (_m *ObjectWriter) Write(p []byte) (int, error) { return r0, r1 } -type mockConstructorTestingTNewObjectWriter interface { +// NewObjectWriter creates a new instance of ObjectWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewObjectWriter(t interface { mock.TestingT Cleanup(func()) -} - -// NewObjectWriter creates a new instance of ObjectWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewObjectWriter(t mockConstructorTestingTNewObjectWriter) *ObjectWriter { +}) *ObjectWriter { mock := &ObjectWriter{} mock.Mock.Test(t) diff --git a/pkg/repository/udmrepo/repo.go b/pkg/repository/udmrepo/repo.go index c186bab4b0..db32d09535 100644 --- a/pkg/repository/udmrepo/repo.go +++ b/pkg/repository/udmrepo/repo.go @@ -71,6 +71,10 @@ type ObjectWriteOptions struct { AsyncWrites int // Num of async writes for the object, 0 means no async write } +type AdvancedFeatureInfo struct { + MultiPartBackup bool // if set to true, it means the repo supports multiple-part backup +} + // BackupRepoService is used to initialize, open or maintain a backup repository type BackupRepoService interface { // Init creates a backup repository or connect to an existing backup repository. @@ -116,6 +120,12 @@ type BackupRepo interface { // Flush flushes all the backup repository data Flush(ctx context.Context) error + // GetAdvancedFeatures returns the support for advanced features + GetAdvancedFeatures() AdvancedFeatureInfo + + // ConcatenateObjects is for multiple-part backup, it concatenates multiple objects into one object + ConcatenateObjects(ctx context.Context, objectIDs []ID) (ID, error) + // Time returns the local time of the backup repository. It may be different from the time of the caller Time() time.Time diff --git a/pkg/uploader/kopia/shim.go b/pkg/uploader/kopia/shim.go index d20b7fa011..b2e5e942c2 100644 --- a/pkg/uploader/kopia/shim.go +++ b/pkg/uploader/kopia/shim.go @@ -238,7 +238,21 @@ func (sr *shimRepository) Flush(ctx context.Context) error { } func (sr *shimRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { - return object.ID{}, errors.New("ConcatenateObjects is not supported") + if len(objectIDs) == 0 { + return object.EmptyID, errors.New("object list is empty") + } + + ids := []udmrepo.ID{} + for _, id := range objectIDs { + ids = append(ids, udmrepo.ID(id.String())) + } + + id, err := sr.udmRepo.ConcatenateObjects(ctx, ids) + if err != nil { + return object.EmptyID, err + } + + return object.ParseID(string(id)) } func (sr *shimRepository) OnSuccessfulFlush(callback repo.RepositoryWriterCallback) { diff --git a/pkg/uploader/kopia/shim_test.go b/pkg/uploader/kopia/shim_test.go index a888d63ecb..554bdeb4c3 100644 --- a/pkg/uploader/kopia/shim_test.go +++ b/pkg/uploader/kopia/shim_test.go @@ -65,10 +65,6 @@ func TestShimRepo(t *testing.T) { backupRepo.On("Flush", mock.Anything).Return(nil) NewShimRepo(backupRepo).Flush(ctx) - var objID object.ID - backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(objID) - NewShimRepo(backupRepo).ConcatenateObjects(ctx, []object.ID{}) - backupRepo.On("NewObjectWriter", mock.Anything, mock.Anything).Return(nil) NewShimRepo(backupRepo).NewObjectWriter(ctx, object.WriterOptions{}) } @@ -290,3 +286,64 @@ func TestReplaceManifests(t *testing.T) { }) } } + +func TestConcatenateObjects(t *testing.T) { + tests := []struct { + name string + backupRepo *mocks.BackupRepo + objectIDs []object.ID + expectedError string + }{ + { + name: "empty object list", + expectedError: "object list is empty", + }, + { + name: "concatenate error", + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID(""), errors.New("fake-concatenate-error")) + return backupRepo + }(), + objectIDs: []object.ID{ + {}, + }, + expectedError: "fake-concatenate-error", + }, + { + name: "parse error", + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID("fake-id"), nil) + return backupRepo + }(), + objectIDs: []object.ID{ + {}, + }, + expectedError: "malformed content ID: \"fake-id\": invalid content prefix", + }, + { + name: "success", + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID("I123456"), nil) + return backupRepo + }(), + objectIDs: []object.ID{ + {}, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + _, err := NewShimRepo(tc.backupRepo).ConcatenateObjects(ctx, tc.objectIDs) + + if tc.expectedError != "" { + assert.EqualError(t, err, tc.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index a578797884..1a09562566 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -54,6 +54,8 @@ var listSnapshotsFunc = snapshot.ListSnapshots var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath var restoreEntryFunc = restore.Entry +const UploaderConfigMultipartKey = "uploader-multipart" + // SnapshotUploader which mainly used for UT test that could overwrite Upload interface type SnapshotUploader interface { Upload( @@ -120,6 +122,10 @@ func setupPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snap } } + if _, ok := uploaderCfg[UploaderConfigMultipartKey]; ok { + curPolicy.UploadPolicy.ParallelUploadAboveSize = newOptionalInt64(2 << 30) + } + err := setPolicyFunc(ctx, rep, sourceInfo, curPolicy) if err != nil { return nil, errors.Wrap(err, "error to set policy") diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 6d1dbbf722..6048f0582b 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -159,6 +159,14 @@ func (kp *kopiaProvider) RunBackup( realSource = fmt.Sprintf("%s/%s/%s", kp.requestorType, uploader.KopiaType, realSource) } + if kp.bkRepo.GetAdvancedFeatures().MultiPartBackup { + if uploaderCfg == nil { + uploaderCfg = make(map[string]string) + } + + uploaderCfg[kopia.UploaderConfigMultipartKey] = "true" + } + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log) if err != nil { if kpUploader.IsCanceled() { diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 507be4ae54..f1f16fb926 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -62,8 +62,12 @@ type FakeRestoreProgressUpdater struct { func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {} func TestRunBackup(t *testing.T) { + mockBRepo := udmrepomocks.NewBackupRepo(t) + mockBRepo.On("GetAdvancedFeatures").Return(udmrepo.AdvancedFeatureInfo{}) + var kp kopiaProvider kp.log = logrus.New() + kp.bkRepo = mockBRepo updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()} testCases := []struct {