Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: implement reading migration #57237

Merged
merged 9 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"import_retry.go",
"log_file_manager.go",
"log_file_map.go",
"migration.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/restore/log_client",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -81,10 +82,11 @@ go_test(
"log_file_manager_test.go",
"log_file_map_test.go",
"main_test.go",
"migration_test.go",
],
embed = [":log_client"],
flaky = True,
shard_count = 41,
shard_count = 42,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
34 changes: 33 additions & 1 deletion br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ import (

var FilterFilesByRegion = filterFilesByRegion

func (metaname *MetaName) Meta() Meta {
return metaname.meta
}

func NewMetaName(meta Meta, name string) *MetaName {
return &MetaName{meta: meta, name: name}
}

func NewMigrationBuilder(shiftStartTS, startTS, restoredTS uint64) *WithMigrationsBuilder {
return &WithMigrationsBuilder{
shiftStartTS: shiftStartTS,
startTS: startTS,
restoredTS: restoredTS,
}
}

func (m *MetaWithMigrations) StoreId() int64 {
return m.meta.StoreId
}

func (m *MetaWithMigrations) Meta() *backuppb.Metadata {
return m.meta
}

func (m *PhysicalWithMigrations) PhysicalLength() uint64 {
return m.physical.Item.Length
}

func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup {
return m.physical.Item
}

func (rc *LogClient) TEST_saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
Expand All @@ -44,7 +76,7 @@ func (rc *LogClient) TEST_initSchemasMap(
}

// readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage.
func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) {
func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]*MetaName, error) {
metas, err := rc.streamingMeta(ctx)
if err != nil {
return nil, err
Expand Down
105 changes: 66 additions & 39 deletions br/pkg/restore/log_client/log_file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,33 @@ import (
// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

type MetaName struct {
meta Meta
name string
}

// MetaNameIter is the type of iterator of metadata files' content with name.
type MetaNameIter = iter.TryNextor[*MetaName]

type LogDataFileInfo struct {
*backuppb.DataFileInfo
MetaDataGroupName string
OffsetInMetaGroup int
OffsetInMergedGroup int
}

// GroupIndex is the type of physical data file with index from metadata.
type GroupIndex = iter.Indexed[*backuppb.DataFileGroup]

// GroupIndexIter is the type of iterator of physical data file with index from metadata.
type GroupIndexIter = iter.TryNextor[GroupIndex]

// FileIndex is the type of logical data file with index from physical data file.
type FileIndex = iter.Indexed[*backuppb.DataFileInfo]

// FileIndexIter is the type of iterator of logical data file with index from physical data file.
type FileIndexIter = iter.TryNextor[FileIndex]

// LogIter is the type of iterator of each log files' meta information.
type LogIter = iter.TryNextor[*LogDataFileInfo]

Expand Down Expand Up @@ -78,6 +98,8 @@ type LogFileManager struct {
storage storage.ExternalStorage
helper streamMetadataHelper

withmigrations WithMigrations

metadataDownloadBatchSize uint
}

Expand All @@ -87,6 +109,7 @@ type LogFileManagerInit struct {
RestoreTS uint64
Storage storage.ExternalStorage

Migrations WithMigrations
MetadataDownloadBatchSize uint
EncryptionManager *encryption.Manager
}
Expand All @@ -100,10 +123,11 @@ type DDLMetaGroup struct {
// Generally the config cannot be changed during its lifetime.
func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error) {
fm := &LogFileManager{
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
withmigrations: init.Migrations,

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
Expand Down Expand Up @@ -153,22 +177,22 @@ func (rc *LogFileManager) loadShiftTS(ctx context.Context) error {
return nil
}

func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaIter, error) {
func (rc *LogFileManager) streamingMeta(ctx context.Context) (MetaNameIter, error) {
return rc.streamingMetaByTS(ctx, rc.restoreTS)
}

func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) {
func (rc *LogFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaNameIter, error) {
it, err := rc.createMetaIterOver(ctx, rc.storage)
if err != nil {
return nil, err
}
filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool {
return restoreTS < metadata.MinTs || metadata.MaxTs < rc.shiftStartTS
filtered := iter.FilterOut(it, func(metaname *MetaName) bool {
return restoreTS < metaname.meta.MinTs || metaname.meta.MaxTs < rc.shiftStartTS
})
return filtered, nil
}

func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) {
func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaNameIter, error) {
opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()}
names := []string{}
err := s.WalkDir(ctx, opt, func(path string, size int64) error {
Expand All @@ -182,7 +206,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
return nil, err
}
namesIter := iter.FromSlice(names)
readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) {
readMeta := func(ctx context.Context, name string) (*MetaName, error) {
f, err := s.ReadFile(ctx, name)
if err != nil {
return nil, errors.Annotatef(err, "failed during reading file %s", name)
Expand All @@ -191,7 +215,7 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
if err != nil {
return nil, errors.Annotatef(err, "failed to parse metadata of file %s", name)
}
return meta, nil
return &MetaName{meta: meta, name: name}, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
Expand All @@ -200,29 +224,32 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
return reader, nil
}

func (rc *LogFileManager) FilterDataFiles(ms MetaIter) LogIter {
return iter.FlatMap(ms, func(m *backuppb.Metadata) LogIter {
return iter.FlatMap(iter.Enumerate(iter.FromSlice(m.FileGroups)), func(gi iter.Indexed[*backuppb.DataFileGroup]) LogIter {
return iter.Map(
iter.FilterOut(iter.Enumerate(iter.FromSlice(gi.Item.DataFilesInfo)), func(di iter.Indexed[*backuppb.DataFileInfo]) bool {
func (rc *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter {
ms := rc.withmigrations.Metas(m)
return iter.FlatMap(ms, func(m *MetaWithMigrations) LogIter {
gs := m.Physicals(iter.Enumerate(iter.FromSlice(m.meta.FileGroups)))
return iter.FlatMap(gs, func(gim *PhysicalWithMigrations) LogIter {
fs := iter.FilterOut(
gim.Logicals(iter.Enumerate(iter.FromSlice(gim.physical.Item.DataFilesInfo))),
func(di FileIndex) bool {
// Modify the data internally, a little hacky.
if m.MetaVersion > backuppb.MetaVersion_V1 {
di.Item.Path = gi.Item.Path
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
di.Item.Path = gim.physical.Item.Path
}
return di.Item.IsMeta || rc.ShouldFilterOut(di.Item)
}),
func(di iter.Indexed[*backuppb.DataFileInfo]) *LogDataFileInfo {
return &LogDataFileInfo{
DataFileInfo: di.Item,

// Since there is a `datafileinfo`, the length of `m.FileGroups`
// must be larger than 0. So we use the first group's name as
// metadata's unique key.
MetaDataGroupName: m.FileGroups[0].Path,
OffsetInMetaGroup: gi.Index,
OffsetInMergedGroup: di.Index,
}
},
})
return iter.Map(fs, func(di FileIndex) *LogDataFileInfo {
return &LogDataFileInfo{
DataFileInfo: di.Item,

// Since there is a `datafileinfo`, the length of `m.FileGroups`
// must be larger than 0. So we use the first group's name as
// metadata's unique key.
MetaDataGroupName: m.meta.FileGroups[0].Path,
OffsetInMetaGroup: gim.physical.Index,
OffsetInMergedGroup: di.Index,
}
},
)
})
})
Expand Down Expand Up @@ -262,8 +289,8 @@ func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, coun
return nil, err
}
if counter != nil {
m = iter.Tap(m, func(m Meta) {
for _, fg := range m.FileGroups {
m = iter.Tap(m, func(m *MetaName) {
for _, fg := range m.meta.FileGroups {
for _, f := range fg.DataFilesInfo {
if !f.IsMeta && !rc.ShouldFilterOut(f) {
*counter += 1
Expand All @@ -285,16 +312,16 @@ func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error) {
return nil, err
}

mg := rc.FilterDataFiles(m)
return mg, nil
l := rc.FilterDataFiles(m)
return l, nil
}

func (rc *LogFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter {
return iter.FlatMap(ms, func(m Meta) MetaGroupIter {
return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup {
func (rc *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter {
return iter.FlatMap(ms, func(m *MetaName) MetaGroupIter {
return iter.Map(iter.FromSlice(m.meta.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup {
metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d Log) bool {
// Modify the data internally, a little hacky.
if m.MetaVersion > backuppb.MetaVersion_V1 {
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
d.Path = g.Path
}
return !d.IsMeta || rc.ShouldFilterOut(d)
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/log_client/log_file_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
req.NoError(err)
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
actualStoreIDs = append(actualStoreIDs, meta.Meta().StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
Expand Down Expand Up @@ -528,6 +528,7 @@ func TestFilterDataFiles(t *testing.T) {
RestoreTS: 10,
Storage: loc,

Migrations: emptyMigrations(),
MetadataDownloadBatchSize: 32,
})
req.NoError(err)
Expand All @@ -536,7 +537,9 @@ func TestFilterDataFiles(t *testing.T) {
m2(wr(1, 1, 1), wr(2, 2, 2), wr(3, 3, 3), wr(4, 4, 4), wr(5, 5, 5)),
m2(wr(1, 1, 1), wr(2, 2, 2)),
}
metaIter := iter.FromSlice(metas)
metaIter := iter.Map(iter.FromSlice(metas), func(meta logclient.Meta) *logclient.MetaName {
return logclient.NewMetaName(meta, "")
})
files := iter.CollectAll(ctx, fm.FilterDataFiles(metaIter)).Item
check := func(file *logclient.LogDataFileInfo, metaKey string, goff, foff int) {
req.Equal(file.MetaDataGroupName, metaKey)
Expand Down
Loading
Loading