Skip to content

Commit

Permalink
br: implement reading migration (#57237)
Browse files Browse the repository at this point in the history
close #57210
  • Loading branch information
Leavrth authored Nov 11, 2024
1 parent 6528da5 commit 169210a
Show file tree
Hide file tree
Showing 9 changed files with 795 additions and 49 deletions.
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"import_retry.go",
"log_file_manager.go",
"log_file_map.go",
"migration.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/restore/log_client",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -81,10 +82,11 @@ go_test(
"log_file_manager_test.go",
"log_file_map_test.go",
"main_test.go",
"migration_test.go",
],
embed = [":log_client"],
flaky = True,
shard_count = 41,
shard_count = 42,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
34 changes: 33 additions & 1 deletion br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ import (

var FilterFilesByRegion = filterFilesByRegion

func (metaname *MetaName) Meta() Meta {
return metaname.meta
}

func NewMetaName(meta Meta, name string) *MetaName {
return &MetaName{meta: meta, name: name}
}

func NewMigrationBuilder(shiftStartTS, startTS, restoredTS uint64) *WithMigrationsBuilder {
return &WithMigrationsBuilder{
shiftStartTS: shiftStartTS,
startTS: startTS,
restoredTS: restoredTS,
}
}

func (m *MetaWithMigrations) StoreId() int64 {
return m.meta.StoreId
}

func (m *MetaWithMigrations) Meta() *backuppb.Metadata {
return m.meta
}

func (m *PhysicalWithMigrations) PhysicalLength() uint64 {
return m.physical.Item.Length
}

func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup {
return m.physical.Item
}

func (rc *LogClient) TEST_saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
Expand All @@ -44,7 +76,7 @@ func (rc *LogClient) TEST_initSchemasMap(
}

// readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage.
func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) {
func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]*MetaName, error) {
metas, err := rc.streamingMeta(ctx)
if err != nil {
return nil, err
Expand Down
105 changes: 66 additions & 39 deletions br/pkg/restore/log_client/log_file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,33 @@ import (
// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

type MetaName struct {
meta Meta
name string
}

// MetaNameIter is the type of iterator of metadata files' content with name.
type MetaNameIter = iter.TryNextor[*MetaName]

type LogDataFileInfo struct {
*backuppb.DataFileInfo
MetaDataGroupName string
OffsetInMetaGroup int
OffsetInMergedGroup int
}

// GroupIndex is the type of physical data file with index from metadata.
type GroupIndex = iter.Indexed[*backuppb.DataFileGroup]

// GroupIndexIter is the type of iterator of physical data file with index from metadata.
type GroupIndexIter = iter.TryNextor[GroupIndex]

// FileIndex is the type of logical data file with index from physical data file.
type FileIndex = iter.Indexed[*backuppb.DataFileInfo]

// FileIndexIter is the type of iterator of logical data file with index from physical data file.
type FileIndexIter = iter.TryNextor[FileIndex]

// LogIter is the type of iterator of each log files' meta information.
type LogIter = iter.TryNextor[*LogDataFileInfo]

Expand Down Expand Up @@ -78,6 +98,8 @@ type LogFileManager struct {
storage storage.ExternalStorage
helper streamMetadataHelper

withmigrations WithMigrations

metadataDownloadBatchSize uint
}

Expand All @@ -87,6 +109,7 @@ type LogFileManagerInit struct {
RestoreTS uint64
Storage storage.ExternalStorage

Migrations WithMigrations
MetadataDownloadBatchSize uint
EncryptionManager *encryption.Manager
}
Expand All @@ -100,10 +123,11 @@ type DDLMetaGroup struct {
// Generally the config cannot be changed during its lifetime.
func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error) {
fm := &LogFileManager{
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
withmigrations: init.Migrations,

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
Expand Down Expand Up @@ -153,22 +177,22 @@ func (rc *LogFileManager) loadShiftTS(ctx context.Context) error {
return nil
}

func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaIter, error) {
func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaNameIter, error) {
return rc.streamingMetaByTS(ctx, rc.restoreTS)
}

func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) {
func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaNameIter, error) {
it, err := rc.createMetaIterOver(ctx, rc.storage)
if err != nil {
return nil, err
}
filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool {
return restoreTS < metadata.MinTs || metadata.MaxTs < rc.shiftStartTS
filtered := iter.FilterOut(it, func(metaname *MetaName) bool {
return restoreTS < metaname.meta.MinTs || metaname.meta.MaxTs < rc.shiftStartTS
})
return filtered, nil
}

func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) {
func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaNameIter, error) {
opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()}
names := []string{}
err := s.WalkDir(ctx, opt, func(path string, size int64) error {
Expand All @@ -182,7 +206,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
return nil, err
}
namesIter := iter.FromSlice(names)
readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) {
readMeta := func(ctx context.Context, name string) (*MetaName, error) {
f, err := s.ReadFile(ctx, name)
if err != nil {
return nil, errors.Annotatef(err, "failed during reading file %s", name)
Expand All @@ -191,7 +215,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
if err != nil {
return nil, errors.Annotatef(err, "failed to parse metadata of file %s", name)
}
return meta, nil
return &MetaName{meta: meta, name: name}, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
Expand All @@ -200,29 +224,32 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
return reader, nil
}

func (rc *LogFileManager) FilterDataFiles(ms MetaIter) LogIter {
return iter.FlatMap(ms, func(m *backuppb.Metadata) LogIter {
return iter.FlatMap(iter.Enumerate(iter.FromSlice(m.FileGroups)), func(gi iter.Indexed[*backuppb.DataFileGroup]) LogIter {
return iter.Map(
iter.FilterOut(iter.Enumerate(iter.FromSlice(gi.Item.DataFilesInfo)), func(di iter.Indexed[*backuppb.DataFileInfo]) bool {
func (rc *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter {
ms := rc.withmigrations.Metas(m)
return iter.FlatMap(ms, func(m *MetaWithMigrations) LogIter {
gs := m.Physicals(iter.Enumerate(iter.FromSlice(m.meta.FileGroups)))
return iter.FlatMap(gs, func(gim *PhysicalWithMigrations) LogIter {
fs := iter.FilterOut(
gim.Logicals(iter.Enumerate(iter.FromSlice(gim.physical.Item.DataFilesInfo))),
func(di FileIndex) bool {
// Modify the data internally, a little hacky.
if m.MetaVersion > backuppb.MetaVersion_V1 {
di.Item.Path = gi.Item.Path
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
di.Item.Path = gim.physical.Item.Path
}
return di.Item.IsMeta || rc.ShouldFilterOut(di.Item)
}),
func(di iter.Indexed[*backuppb.DataFileInfo]) *LogDataFileInfo {
return &LogDataFileInfo{
DataFileInfo: di.Item,

// Since there is a `datafileinfo`, the length of `m.FileGroups`
// must be larger than 0. So we use the first group's name as
// metadata's unique key.
MetaDataGroupName: m.FileGroups[0].Path,
OffsetInMetaGroup: gi.Index,
OffsetInMergedGroup: di.Index,
}
},
})
return iter.Map(fs, func(di FileIndex) *LogDataFileInfo {
return &LogDataFileInfo{
DataFileInfo: di.Item,

// Since there is a `datafileinfo`, the length of `m.FileGroups`
// must be larger than 0. So we use the first group's name as
// metadata's unique key.
MetaDataGroupName: m.meta.FileGroups[0].Path,
OffsetInMetaGroup: gim.physical.Index,
OffsetInMergedGroup: di.Index,
}
},
)
})
})
Expand Down Expand Up @@ -262,8 +289,8 @@ func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, coun
return nil, err
}
if counter != nil {
m = iter.Tap(m, func(m Meta) {
for _, fg := range m.FileGroups {
m = iter.Tap(m, func(m *MetaName) {
for _, fg := range m.meta.FileGroups {
for _, f := range fg.DataFilesInfo {
if !f.IsMeta && !rc.ShouldFilterOut(f) {
*counter += 1
Expand All @@ -285,16 +312,16 @@ func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error) {
return nil, err
}

mg := rc.FilterDataFiles(m)
return mg, nil
l := rc.FilterDataFiles(m)
return l, nil
}

func (rc *LogFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter {
return iter.FlatMap(ms, func(m Meta) MetaGroupIter {
return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup {
func (rc *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter {
return iter.FlatMap(ms, func(m *MetaName) MetaGroupIter {
return iter.Map(iter.FromSlice(m.meta.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup {
metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d Log) bool {
// Modify the data internally, a little hacky.
if m.MetaVersion > backuppb.MetaVersion_V1 {
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
d.Path = g.Path
}
return !d.IsMeta || rc.ShouldFilterOut(d)
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/log_client/log_file_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
req.NoError(err)
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
actualStoreIDs = append(actualStoreIDs, meta.Meta().StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
Expand Down Expand Up @@ -528,6 +528,7 @@ func TestFilterDataFiles(t *testing.T) {
RestoreTS: 10,
Storage: loc,

Migrations: emptyMigrations(),
MetadataDownloadBatchSize: 32,
})
req.NoError(err)
Expand All @@ -536,7 +537,9 @@ func TestFilterDataFiles(t *testing.T) {
m2(wr(1, 1, 1), wr(2, 2, 2), wr(3, 3, 3), wr(4, 4, 4), wr(5, 5, 5)),
m2(wr(1, 1, 1), wr(2, 2, 2)),
}
metaIter := iter.FromSlice(metas)
metaIter := iter.Map(iter.FromSlice(metas), func(meta logclient.Meta) *logclient.MetaName {
return logclient.NewMetaName(meta, "")
})
files := iter.CollectAll(ctx, fm.FilterDataFiles(metaIter)).Item
check := func(file *logclient.LogDataFileInfo, metaKey string, goff, foff int) {
req.Equal(file.MetaDataGroupName, metaKey)
Expand Down
Loading

0 comments on commit 169210a

Please sign in to comment.