diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index d55c8066514f3..85da59628f8b7 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "import_retry.go", "log_file_manager.go", "log_file_map.go", + "migration.go", ], importpath = "github.com/pingcap/tidb/br/pkg/restore/log_client", visibility = ["//visibility:public"], @@ -81,10 +82,11 @@ go_test( "log_file_manager_test.go", "log_file_map_test.go", "main_test.go", + "migration_test.go", ], embed = [":log_client"], flaky = True, - shard_count = 41, + shard_count = 42, deps = [ "//br/pkg/errors", "//br/pkg/glue", diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 9a35b35e8eb57..70a15e1ad2393 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -29,6 +29,38 @@ import ( var FilterFilesByRegion = filterFilesByRegion +func (metaname *MetaName) Meta() Meta { + return metaname.meta +} + +func NewMetaName(meta Meta, name string) *MetaName { + return &MetaName{meta: meta, name: name} +} + +func NewMigrationBuilder(shiftStartTS, startTS, restoredTS uint64) *WithMigrationsBuilder { + return &WithMigrationsBuilder{ + shiftStartTS: shiftStartTS, + startTS: startTS, + restoredTS: restoredTS, + } +} + +func (m *MetaWithMigrations) StoreId() int64 { + return m.meta.StoreId +} + +func (m *MetaWithMigrations) Meta() *backuppb.Metadata { + return m.meta +} + +func (m *PhysicalWithMigrations) PhysicalLength() uint64 { + return m.physical.Item.Length +} + +func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup { + return m.physical.Item +} + func (rc *LogClient) TEST_saveIDMap( ctx context.Context, sr *stream.SchemasReplace, @@ -44,7 +76,7 @@ func (rc *LogClient) TEST_initSchemasMap( } // readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage. -func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) { +func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]*MetaName, error) { metas, err := rc.streamingMeta(ctx) if err != nil { return nil, err diff --git a/br/pkg/restore/log_client/log_file_manager.go b/br/pkg/restore/log_client/log_file_manager.go index 81af10cf542b0..cbaa6a594dff4 100644 --- a/br/pkg/restore/log_client/log_file_manager.go +++ b/br/pkg/restore/log_client/log_file_manager.go @@ -28,6 +28,14 @@ import ( // MetaIter is the type of iterator of metadata files' content. type MetaIter = iter.TryNextor[*backuppb.Metadata] +type MetaName struct { + meta Meta + name string +} + +// MetaNameIter is the type of iterator of metadata files' content with name. +type MetaNameIter = iter.TryNextor[*MetaName] + type LogDataFileInfo struct { *backuppb.DataFileInfo MetaDataGroupName string @@ -35,6 +43,18 @@ type LogDataFileInfo struct { OffsetInMergedGroup int } +// GroupIndex is the type of physical data file with index from metadata. +type GroupIndex = iter.Indexed[*backuppb.DataFileGroup] + +// GroupIndexIter is the type of iterator of physical data file with index from metadata. +type GroupIndexIter = iter.TryNextor[GroupIndex] + +// FileIndex is the type of logical data file with index from physical data file. +type FileIndex = iter.Indexed[*backuppb.DataFileInfo] + +// FileIndexIter is the type of iterator of logical data file with index from physical data file. +type FileIndexIter = iter.TryNextor[FileIndex] + // LogIter is the type of iterator of each log files' meta information. type LogIter = iter.TryNextor[*LogDataFileInfo] @@ -78,6 +98,8 @@ type LogFileManager struct { storage storage.ExternalStorage helper streamMetadataHelper + withmigrations WithMigrations + metadataDownloadBatchSize uint } @@ -87,6 +109,7 @@ type LogFileManagerInit struct { RestoreTS uint64 Storage storage.ExternalStorage + Migrations WithMigrations MetadataDownloadBatchSize uint EncryptionManager *encryption.Manager } @@ -100,10 +123,11 @@ type DDLMetaGroup struct { // Generally the config cannot be changed during its lifetime. func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error) { fm := &LogFileManager{ - startTS: init.StartTS, - restoreTS: init.RestoreTS, - storage: init.Storage, - helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)), + startTS: init.StartTS, + restoreTS: init.RestoreTS, + storage: init.Storage, + helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)), + withmigrations: init.Migrations, metadataDownloadBatchSize: init.MetadataDownloadBatchSize, } @@ -153,22 +177,22 @@ func (rc *LogFileManager) loadShiftTS(ctx context.Context) error { return nil } -func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaIter, error) { +func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaNameIter, error) { return rc.streamingMetaByTS(ctx, rc.restoreTS) } -func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) { +func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaNameIter, error) { it, err := rc.createMetaIterOver(ctx, rc.storage) if err != nil { return nil, err } - filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool { - return restoreTS < metadata.MinTs || metadata.MaxTs < rc.shiftStartTS + filtered := iter.FilterOut(it, func(metaname *MetaName) bool { + return restoreTS < metaname.meta.MinTs || metaname.meta.MaxTs < rc.shiftStartTS }) return filtered, nil } -func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) { +func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaNameIter, error) { opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()} names := []string{} err := s.WalkDir(ctx, opt, func(path string, size int64) error { @@ -182,7 +206,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte return nil, err } namesIter := iter.FromSlice(names) - readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) { + readMeta := func(ctx context.Context, name string) (*MetaName, error) { f, err := s.ReadFile(ctx, name) if err != nil { return nil, errors.Annotatef(err, "failed during reading file %s", name) @@ -191,7 +215,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte if err != nil { return nil, errors.Annotatef(err, "failed to parse metadata of file %s", name) } - return meta, nil + return &MetaName{meta: meta, name: name}, nil } // TODO: maybe we need to be able to adjust the concurrency to download files, // which currently is the same as the chunk size @@ -200,29 +224,32 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte return reader, nil } -func (rc *LogFileManager) FilterDataFiles(ms MetaIter) LogIter { - return iter.FlatMap(ms, func(m *backuppb.Metadata) LogIter { - return iter.FlatMap(iter.Enumerate(iter.FromSlice(m.FileGroups)), func(gi iter.Indexed[*backuppb.DataFileGroup]) LogIter { - return iter.Map( - iter.FilterOut(iter.Enumerate(iter.FromSlice(gi.Item.DataFilesInfo)), func(di iter.Indexed[*backuppb.DataFileInfo]) bool { +func (rc *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter { + ms := rc.withmigrations.Metas(m) + return iter.FlatMap(ms, func(m *MetaWithMigrations) LogIter { + gs := m.Physicals(iter.Enumerate(iter.FromSlice(m.meta.FileGroups))) + return iter.FlatMap(gs, func(gim *PhysicalWithMigrations) LogIter { + fs := iter.FilterOut( + gim.Logicals(iter.Enumerate(iter.FromSlice(gim.physical.Item.DataFilesInfo))), + func(di FileIndex) bool { // Modify the data internally, a little hacky. - if m.MetaVersion > backuppb.MetaVersion_V1 { - di.Item.Path = gi.Item.Path + if m.meta.MetaVersion > backuppb.MetaVersion_V1 { + di.Item.Path = gim.physical.Item.Path } return di.Item.IsMeta || rc.ShouldFilterOut(di.Item) - }), - func(di iter.Indexed[*backuppb.DataFileInfo]) *LogDataFileInfo { - return &LogDataFileInfo{ - DataFileInfo: di.Item, - - // Since there is a `datafileinfo`, the length of `m.FileGroups` - // must be larger than 0. So we use the first group's name as - // metadata's unique key. - MetaDataGroupName: m.FileGroups[0].Path, - OffsetInMetaGroup: gi.Index, - OffsetInMergedGroup: di.Index, - } - }, + }) + return iter.Map(fs, func(di FileIndex) *LogDataFileInfo { + return &LogDataFileInfo{ + DataFileInfo: di.Item, + + // Since there is a `datafileinfo`, the length of `m.FileGroups` + // must be larger than 0. So we use the first group's name as + // metadata's unique key. + MetaDataGroupName: m.meta.FileGroups[0].Path, + OffsetInMetaGroup: gim.physical.Index, + OffsetInMergedGroup: di.Index, + } + }, ) }) }) @@ -262,8 +289,8 @@ func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, coun return nil, err } if counter != nil { - m = iter.Tap(m, func(m Meta) { - for _, fg := range m.FileGroups { + m = iter.Tap(m, func(m *MetaName) { + for _, fg := range m.meta.FileGroups { for _, f := range fg.DataFilesInfo { if !f.IsMeta && !rc.ShouldFilterOut(f) { *counter += 1 @@ -285,16 +312,16 @@ func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error) { return nil, err } - mg := rc.FilterDataFiles(m) - return mg, nil + l := rc.FilterDataFiles(m) + return l, nil } -func (rc *LogFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter { - return iter.FlatMap(ms, func(m Meta) MetaGroupIter { - return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { +func (rc *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter { + return iter.FlatMap(ms, func(m *MetaName) MetaGroupIter { + return iter.Map(iter.FromSlice(m.meta.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d Log) bool { // Modify the data internally, a little hacky. - if m.MetaVersion > backuppb.MetaVersion_V1 { + if m.meta.MetaVersion > backuppb.MetaVersion_V1 { d.Path = g.Path } return !d.IsMeta || rc.ShouldFilterOut(d) diff --git a/br/pkg/restore/log_client/log_file_manager_test.go b/br/pkg/restore/log_client/log_file_manager_test.go index 82fcf628d0139..0ac289b65b8b0 100644 --- a/br/pkg/restore/log_client/log_file_manager_test.go +++ b/br/pkg/restore/log_client/log_file_manager_test.go @@ -244,7 +244,7 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) { req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { - actualStoreIDs = append(actualStoreIDs, meta.StoreId) + actualStoreIDs = append(actualStoreIDs, meta.Meta().StoreId) } expectedStoreIDs := make([]int64, 0, len(c.expected)) for _, meta := range c.expected { @@ -528,6 +528,7 @@ func TestFilterDataFiles(t *testing.T) { RestoreTS: 10, Storage: loc, + Migrations: emptyMigrations(), MetadataDownloadBatchSize: 32, }) req.NoError(err) @@ -536,7 +537,9 @@ func TestFilterDataFiles(t *testing.T) { m2(wr(1, 1, 1), wr(2, 2, 2), wr(3, 3, 3), wr(4, 4, 4), wr(5, 5, 5)), m2(wr(1, 1, 1), wr(2, 2, 2)), } - metaIter := iter.FromSlice(metas) + metaIter := iter.Map(iter.FromSlice(metas), func(meta logclient.Meta) *logclient.MetaName { + return logclient.NewMetaName(meta, "") + }) files := iter.CollectAll(ctx, fm.FilterDataFiles(metaIter)).Item check := func(file *logclient.LogDataFileInfo, metaKey string, goff, foff int) { req.Equal(file.MetaDataGroupName, metaKey) diff --git a/br/pkg/restore/log_client/log_file_map.go b/br/pkg/restore/log_client/log_file_map.go index db50c8391418b..c7c05e1be4e7e 100644 --- a/br/pkg/restore/log_client/log_file_map.go +++ b/br/pkg/restore/log_client/log_file_map.go @@ -35,6 +35,18 @@ func (m bitMap) Hit(off int) bool { return (m[blockIndex] & bitOffset) > 0 } +type bitMapExt struct { + bitMap + skip bool +} + +func newBitMapExt(skip bool) bitMapExt { + return bitMapExt{ + bitMap: newBitMap(), + skip: skip, + } +} + type fileMap struct { // group index -> bitmap of kv files pos map[int]bitMap @@ -46,6 +58,19 @@ func newFileMap() fileMap { } } +type fileMapExt struct { + // group index -> bitmap of kv files + pos map[int]bitMapExt + skip bool +} + +func newFileMapExt(skip bool) fileMapExt { + return fileMapExt{ + pos: make(map[int]bitMapExt), + skip: skip, + } +} + type LogFilesSkipMap struct { // metadata group key -> group map skipMap map[string]fileMap @@ -82,3 +107,68 @@ func (m *LogFilesSkipMap) NeedSkip(metaKey string, groupOff, fileOff int) bool { } return gp.Hit(fileOff) } + +type LogFilesSkipMapExt struct { + // metadata group key -> group map + skipMap map[string]fileMapExt +} + +func NewLogFilesSkipMapExt() *LogFilesSkipMapExt { + return &LogFilesSkipMapExt{ + skipMap: make(map[string]fileMapExt), + } +} + +func (m *LogFilesSkipMapExt) Insert(metaKey string, groupOff, fileOff int) { + mp, exists := m.skipMap[metaKey] + if !exists { + mp = newFileMapExt(false) + m.skipMap[metaKey] = mp + } + if mp.skip { + return + } + gp, exists := mp.pos[groupOff] + if !exists { + gp = newBitMapExt(false) + mp.pos[groupOff] = gp + } + if gp.skip { + return + } + gp.Set(fileOff) +} + +func (m *LogFilesSkipMapExt) SkipMeta(metaKey string) { + m.skipMap[metaKey] = newFileMapExt(true) +} + +func (m *LogFilesSkipMapExt) SkipGroup(metaKey string, groupOff int) { + mp, exists := m.skipMap[metaKey] + if !exists { + mp = newFileMapExt(false) + m.skipMap[metaKey] = mp + } + if mp.skip { + return + } + mp.pos[groupOff] = newBitMapExt(true) +} + +func (m *LogFilesSkipMapExt) NeedSkip(metaKey string, groupOff, fileOff int) bool { + mp, exists := m.skipMap[metaKey] + if !exists { + return false + } + if mp.skip { + return true + } + gp, exists := mp.pos[groupOff] + if !exists { + return false + } + if gp.skip { + return true + } + return gp.Hit(fileOff) +} diff --git a/br/pkg/restore/log_client/migration.go b/br/pkg/restore/log_client/migration.go new file mode 100644 index 0000000000000..19d9d3daeb3cb --- /dev/null +++ b/br/pkg/restore/log_client/migration.go @@ -0,0 +1,212 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logclient + +import ( + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/br/pkg/utils/iter" +) + +type logicalSkipMap map[uint64]struct{} +type logicalFileSkipMap struct { + skipmap logicalSkipMap + skip bool +} +type physicalSkipMap map[string]*logicalFileSkipMap +type physicalFileSkipMap struct { + skipmap physicalSkipMap + skip bool +} +type metaSkipMap map[string]*physicalFileSkipMap + +func (skipmap metaSkipMap) skipMeta(metaPath string) { + skipmap[metaPath] = &physicalFileSkipMap{ + skip: true, + } +} + +func (skipmap metaSkipMap) skipPhysical(metaPath, physicalPath string) { + metaMap, exists := skipmap[metaPath] + if !exists { + metaMap = &physicalFileSkipMap{ + skipmap: make(map[string]*logicalFileSkipMap), + } + skipmap[metaPath] = metaMap + } else if metaMap.skip { + return + } + metaMap.skipmap[physicalPath] = &logicalFileSkipMap{ + skip: true, + } +} + +func (skipmap metaSkipMap) skipLogical(metaPath, physicalPath string, offset uint64) { + metaMap, exists := skipmap[metaPath] + if !exists { + metaMap = &physicalFileSkipMap{ + skipmap: make(map[string]*logicalFileSkipMap), + } + skipmap[metaPath] = metaMap + } else if metaMap.skip { + return + } + fileMap, exists := metaMap.skipmap[physicalPath] + if !exists { + fileMap = &logicalFileSkipMap{ + skipmap: make(map[uint64]struct{}), + } + metaMap.skipmap[physicalPath] = fileMap + } else if fileMap.skip { + return + } + fileMap.skipmap[offset] = struct{}{} +} + +func (skipmap metaSkipMap) NeedSkip(metaPath, physicalPath string, offset uint64) bool { + metaMap, exists := skipmap[metaPath] + if exists { + return false + } + if metaMap.skip { + return true + } + fileMap, exists := metaMap.skipmap[physicalPath] + if exists { + return false + } + if fileMap.skip { + return true + } + _, exists = fileMap.skipmap[offset] + return exists +} + +type WithMigrationsBuilder struct { + shiftStartTS uint64 + startTS uint64 + restoredTS uint64 +} + +func (builder *WithMigrationsBuilder) updateSkipMap(skipmap metaSkipMap, metas []*backuppb.MetaEdit) { + for _, meta := range metas { + if meta.DestructSelf { + skipmap.skipMeta(meta.Path) + continue + } + for _, path := range meta.DeletePhysicalFiles { + skipmap.skipPhysical(meta.Path, path) + } + for _, filesInPhysical := range meta.DeleteLogicalFiles { + for _, span := range filesInPhysical.Spans { + skipmap.skipLogical(meta.Path, filesInPhysical.Path, span.Offset) + } + } + } +} + +func (builder *WithMigrationsBuilder) coarseGrainedFilter(mig *backuppb.Migration) bool { + // Maybe the sst creation by compaction contains the kvs whose ts is larger than shift start ts. + // But currently log restore still restores the kvs. + // Besides, it indicates that the truncate task and the log restore task cannot be performed simultaneously. + // + // compaction until ts --+ +-- shift start ts + // v v + // log file [ .. .. .. .. ] + // + for _, compaction := range mig.Compactions { + if compaction.CompactionUntilTs < builder.shiftStartTS || compaction.CompactionFromTs > builder.restoredTS { + return true + } + } + return false +} + +// Create the wrapper by migrations. +func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigrations { + skipmap := make(metaSkipMap) + for _, mig := range migs { + // TODO: deal with TruncatedTo and DestructPrefix + if builder.coarseGrainedFilter(mig) { + continue + } + builder.updateSkipMap(skipmap, mig.EditMeta) + } + return WithMigrations(skipmap) +} + +type PhysicalMigrationsIter = iter.TryNextor[*PhysicalWithMigrations] + +type PhysicalWithMigrations struct { + skipmap logicalSkipMap + physical GroupIndex +} + +func (pwm *PhysicalWithMigrations) Logicals(fileIndexIter FileIndexIter) FileIndexIter { + return iter.FilterOut(fileIndexIter, func(fileIndex FileIndex) bool { + if pwm.skipmap != nil { + if _, ok := pwm.skipmap[fileIndex.Item.RangeOffset]; ok { + return true + } + } + return false + }) +} + +type MetaMigrationsIter = iter.TryNextor[*MetaWithMigrations] + +type MetaWithMigrations struct { + skipmap physicalSkipMap + meta Meta +} + +func (mwm *MetaWithMigrations) Physicals(groupIndexIter GroupIndexIter) PhysicalMigrationsIter { + return iter.MapFilter(groupIndexIter, func(groupIndex GroupIndex) (*PhysicalWithMigrations, bool) { + var logiSkipmap logicalSkipMap = nil + if mwm.skipmap != nil { + skipmap := mwm.skipmap[groupIndex.Item.Path] + if skipmap != nil { + if skipmap.skip { + return nil, true + } + logiSkipmap = skipmap.skipmap + } + } + return &PhysicalWithMigrations{ + skipmap: logiSkipmap, + physical: groupIndex, + }, false + }) +} + +type WithMigrations metaSkipMap + +func (wm WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter { + return iter.MapFilter(metaNameIter, func(mname *MetaName) (*MetaWithMigrations, bool) { + var phySkipmap physicalSkipMap = nil + if wm != nil { + skipmap := wm[mname.name] + if skipmap != nil { + if skipmap.skip { + return nil, true + } + phySkipmap = skipmap.skipmap + } + } + return &MetaWithMigrations{ + skipmap: phySkipmap, + meta: mname.meta, + }, false + }) +} diff --git a/br/pkg/restore/log_client/migration_test.go b/br/pkg/restore/log_client/migration_test.go new file mode 100644 index 0000000000000..0dd7b06197c7c --- /dev/null +++ b/br/pkg/restore/log_client/migration_test.go @@ -0,0 +1,352 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logclient_test + +import ( + "context" + "fmt" + "testing" + + backuppb "github.com/pingcap/kvproto/pkg/brpb" + logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" + "github.com/pingcap/tidb/br/pkg/utils/iter" + "github.com/stretchr/testify/require" +) + +func emptyMigrations() logclient.WithMigrations { + return logclient.WithMigrations{} +} + +func nameFromID(prefix string, id uint64) string { + return fmt.Sprintf("%s_%d", prefix, id) +} + +func phyNameFromID(metaid, phyLen uint64) string { + return fmt.Sprintf("meta_%d_phy_%d", metaid, phyLen) +} + +func generateSpans(metaid, physicalLength, spanLength uint64) []*backuppb.Span { + spans := make([]*backuppb.Span, 0, spanLength) + for i := uint64(0); i < spanLength; i += 1 { + spans = append(spans, &backuppb.Span{ + Offset: lfl(metaid, physicalLength, i), + Length: 1, + }) + } + return spans +} + +func generateDeleteLogicalFiles(metaid, physicalLength, logicalLength uint64) []*backuppb.DeleteSpansOfFile { + spans := make([]*backuppb.DeleteSpansOfFile, 0, logicalLength) + spans = append(spans, &backuppb.DeleteSpansOfFile{ + Path: phyNameFromID(metaid, physicalLength), + Spans: generateSpans(metaid, physicalLength, logicalLength), + }) + return spans +} + +func generateDeletePhysicalFiles(metaid, physicalLength uint64) []string { + names := make([]string, 0, physicalLength) + for i := uint64(0); i < physicalLength; i += 1 { + names = append(names, phyNameFromID(metaid, i)) + } + return names +} + +func generateMigrationMeta(metaid uint64) *backuppb.MetaEdit { + return &backuppb.MetaEdit{ + Path: nameFromID("meta", metaid), + DestructSelf: true, + } +} + +func generateMigrationFile(metaid, physicalLength, physicalOffset, logicalLength uint64) *backuppb.MetaEdit { + return &backuppb.MetaEdit{ + Path: nameFromID("meta", metaid), + DeletePhysicalFiles: generateDeletePhysicalFiles(metaid, physicalLength), + DeleteLogicalFiles: generateDeleteLogicalFiles(metaid, physicalOffset, logicalLength), + DestructSelf: false, + } +} + +// mark the store id of metadata as test id identity +func generateMetaNameIter() logclient.MetaNameIter { + return iter.FromSlice([]*logclient.MetaName{ + logclient.NewMetaName(&backuppb.Metadata{StoreId: 0, FileGroups: generateGroupFiles(0, 3)}, nameFromID("meta", 0)), + logclient.NewMetaName(&backuppb.Metadata{StoreId: 1, FileGroups: generateGroupFiles(1, 3)}, nameFromID("meta", 1)), + logclient.NewMetaName(&backuppb.Metadata{StoreId: 2, FileGroups: generateGroupFiles(2, 3)}, nameFromID("meta", 2)), + }) +} + +// group file length +func gfl(storeId, length uint64) uint64 { + return storeId*100000 + length*100 +} + +func gfls(m [][]uint64) [][]uint64 { + glenss := make([][]uint64, 0, len(m)) + for storeId, gs := range m { + if len(gs) == 0 { + continue + } + glens := make([]uint64, 0, len(gs)) + for _, glen := range gs { + glens = append(glens, gfl(uint64(storeId), glen)) + } + glenss = append(glenss, glens) + } + return glenss +} + +// mark the length of group file as test id identity +func generateGroupFiles(metaId, length uint64) []*backuppb.DataFileGroup { + groupFiles := make([]*backuppb.DataFileGroup, 0, length) + for i := uint64(0); i < length; i += 1 { + groupFiles = append(groupFiles, &backuppb.DataFileGroup{ + Path: phyNameFromID(metaId, i), + Length: gfl(metaId, i), + DataFilesInfo: generateDataFiles(metaId, i, 3), + }) + } + return groupFiles +} + +// logical file length +func lfl(storeId, glen, plen uint64) uint64 { + return storeId*100000 + glen*100 + plen +} + +func lfls(m [][][]uint64) [][][]uint64 { + flensss := make([][][]uint64, 0, len(m)) + for storeId, glens := range m { + if len(glens) == 0 { + continue + } + flenss := make([][]uint64, 0, len(glens)) + for glen, fs := range glens { + if len(fs) == 0 { + continue + } + flens := make([]uint64, 0, len(fs)) + for _, flen := range fs { + flens = append(flens, lfl(uint64(storeId), uint64(glen), flen)) + } + flenss = append(flenss, flens) + } + flensss = append(flensss, flenss) + } + return flensss +} + +func generateDataFiles(metaId, glen, plen uint64) []*backuppb.DataFileInfo { + files := make([]*backuppb.DataFileInfo, 0, plen) + for i := uint64(0); i < plen; i += 1 { + files = append(files, &backuppb.DataFileInfo{ + Path: phyNameFromID(metaId, glen), + RangeOffset: lfl(metaId, glen, i), + Length: lfl(metaId, glen, i), + }) + } + return files +} + +func checkMetaNameIter(t *testing.T, expectStoreIds []int64, actualIter logclient.MetaMigrationsIter) { + res := iter.CollectAll(context.TODO(), iter.Map(actualIter, func(m *logclient.MetaWithMigrations) int64 { + return m.StoreId() + })) + require.NoError(t, res.Err) + require.Equal(t, expectStoreIds, res.Item) +} + +func checkPhysicalIter(t *testing.T, expectLengths []uint64, actualIter logclient.PhysicalMigrationsIter) { + res := iter.CollectAll(context.TODO(), iter.Map(actualIter, func(p *logclient.PhysicalWithMigrations) uint64 { + return p.PhysicalLength() + })) + require.NoError(t, res.Err) + require.Equal(t, expectLengths, res.Item) +} + +func checkLogicalIter(t *testing.T, expectLengths []uint64, actualIter logclient.FileIndexIter) { + res := iter.CollectAll(context.TODO(), iter.Map(actualIter, func(l logclient.FileIndex) uint64 { + return l.Item.Length + })) + require.NoError(t, res.Err) + require.Equal(t, expectLengths, res.Item) +} + +func generatePhysicalIter(meta *logclient.MetaWithMigrations) logclient.PhysicalMigrationsIter { + groupIter := iter.FromSlice(meta.Meta().FileGroups) + groupIndexIter := iter.Enumerate(groupIter) + return meta.Physicals(groupIndexIter) +} + +func generateLogicalIter(phy *logclient.PhysicalWithMigrations) logclient.FileIndexIter { + fileIter := iter.FromSlice(phy.Physical().DataFilesInfo) + fileIndexIter := iter.Enumerate(fileIter) + return phy.Logicals(fileIndexIter) +} + +func TestMigrations(t *testing.T) { + cases := []struct { + migrations []*backuppb.Migration + // test meta name iter + expectStoreIds []int64 + expectPhyLengths [][]uint64 + expectLogLengths [][][]uint64 + }{ + { + migrations: []*backuppb.Migration{ + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationMeta(0), + generateMigrationFile(2, 1, 2, 2), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 0, + CompactionUntilTs: 9, + }, + }, + }, + }, + expectStoreIds: []int64{0, 1, 2}, + expectPhyLengths: gfls([][]uint64{ + {0, 1, 2}, {0, 1, 2}, {0, 1, 2}, + }), + expectLogLengths: lfls([][][]uint64{ + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + }), + }, + { + migrations: []*backuppb.Migration{ + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationMeta(0), + generateMigrationFile(2, 1, 2, 2), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 50, + CompactionUntilTs: 52, + }, + }, + }, + }, + expectStoreIds: []int64{1, 2}, + expectPhyLengths: gfls([][]uint64{ + { /*0, 1, 2*/ }, {0, 1, 2}, { /*0 */ 1, 2}, + }), + expectLogLengths: lfls([][][]uint64{ + { /*{0, 1, 2}, {0, 1, 2}, {0, 1, 2}*/ }, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + {{ /*0, 1, 2*/ }, {0, 1, 2}, { /*0, 1 */ 2}}, + }), + }, + { + migrations: []*backuppb.Migration{ + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationMeta(0), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 50, + CompactionUntilTs: 52, + }, + }, + }, + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationFile(2, 1, 2, 2), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 120, + CompactionUntilTs: 140, + }, + }, + }, + }, + expectStoreIds: []int64{1, 2}, + expectPhyLengths: gfls([][]uint64{ + { /*0, 1, 2*/ }, {0, 1, 2}, { /*0 */ 1, 2}, + }), + expectLogLengths: lfls([][][]uint64{ + { /*{0, 1, 2}, {0, 1, 2}, {0, 1, 2}*/ }, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + {{ /*0, 1, 2*/ }, {0, 1, 2}, { /*0, 1 */ 2}}, + }), + }, + { + migrations: []*backuppb.Migration{ + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationMeta(0), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 50, + CompactionUntilTs: 52, + }, + }, + }, + { + EditMeta: []*backuppb.MetaEdit{ + generateMigrationFile(2, 1, 2, 2), + }, + Compactions: []*backuppb.LogFileCompaction{ + { + CompactionFromTs: 1200, + CompactionUntilTs: 1400, + }, + }, + }, + }, + expectStoreIds: []int64{1, 2}, + expectPhyLengths: gfls([][]uint64{ + { /*0, 1, 2*/ }, {0, 1, 2}, {0, 1, 2}, + }), + expectLogLengths: lfls([][][]uint64{ + { /*{0, 1, 2}, {0, 1, 2}, {0, 1, 2}*/ }, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + {{0, 1, 2}, {0, 1, 2}, {0, 1, 2}}, + }), + }, + } + + ctx := context.Background() + for _, cs := range cases { + builder := logclient.NewMigrationBuilder(10, 100, 200) + withMigrations := builder.Build(cs.migrations) + it := withMigrations.Metas(generateMetaNameIter()) + checkMetaNameIter(t, cs.expectStoreIds, it) + it = withMigrations.Metas(generateMetaNameIter()) + collect := iter.CollectAll(ctx, it) + require.NoError(t, collect.Err) + for j, meta := range collect.Item { + physicalIter := generatePhysicalIter(meta) + checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter) + physicalIter = generatePhysicalIter(meta) + collect := iter.CollectAll(ctx, physicalIter) + require.NoError(t, collect.Err) + for k, phy := range collect.Item { + logicalIter := generateLogicalIter(phy) + checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter) + } + } + } +} diff --git a/br/pkg/utils/iter/combinator_types.go b/br/pkg/utils/iter/combinator_types.go index dca8a9fe37bd2..34288d104236c 100644 --- a/br/pkg/utils/iter/combinator_types.go +++ b/br/pkg/utils/iter/combinator_types.go @@ -88,12 +88,6 @@ func (t *take[T]) TryNext(ctx context.Context) IterResult[T] { return t.inner.TryNext(ctx) } -type join[T any] struct { - inner TryNextor[TryNextor[T]] - - current TryNextor[T] -} - type pureMap[T, R any] struct { inner TryNextor[T] @@ -109,6 +103,33 @@ func (p pureMap[T, R]) TryNext(ctx context.Context) IterResult[R] { return Emit(p.mapper(r.Item)) } +type filterMap[T, R any] struct { + inner TryNextor[T] + + mapper func(T) (R, bool) +} + +func (f filterMap[T, R]) TryNext(ctx context.Context) IterResult[R] { + for { + r := f.inner.TryNext(ctx) + + if r.FinishedOrError() { + return DoneBy[R](r) + } + + res, skip := f.mapper(r.Item) + if !skip { + return Emit(res) + } + } +} + +type join[T any] struct { + inner TryNextor[TryNextor[T]] + + current TryNextor[T] +} + func (j *join[T]) TryNext(ctx context.Context) IterResult[T] { r := j.current.TryNext(ctx) if r.Err != nil { diff --git a/br/pkg/utils/iter/combinators.go b/br/pkg/utils/iter/combinators.go index 5fc8985eadcc3..6247237161912 100644 --- a/br/pkg/utils/iter/combinators.go +++ b/br/pkg/utils/iter/combinators.go @@ -84,6 +84,13 @@ func Map[T, R any](it TryNextor[T], mapper func(T) R) TryNextor[R] { } } +func MapFilter[T, R any](it TryNextor[T], mapper func(T) (R, bool)) TryNextor[R] { + return filterMap[T, R]{ + inner: it, + mapper: mapper, + } +} + // ConcatAll concatenates all elements yields by the iterators. // In another word, it 'chains' all the input iterators. func ConcatAll[T any](items ...TryNextor[T]) TryNextor[T] {