Skip to content

Commit

Permalink
br: skip loading stats into memory if set --load-stats to false (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth authored Mar 15, 2024
1 parent e4dc2c9 commit cbd4111
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 50 deletions.
4 changes: 2 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command {
}

reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
dbs, err := metautil.LoadBackupTables(ctx, reader)
dbs, err := metautil.LoadBackupTables(ctx, reader, false)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
dbs, err := metautil.LoadBackupTables(ctx, reader)
dbs, err := metautil.LoadBackupTables(ctx, reader, false)
if err != nil {
log.Error("load tables failed", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checksum/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func FastChecksum(
errCh := make(chan error)
go func() {
reader := metautil.NewMetaReader(backupMeta, storage, cipher)
if err := reader.ReadSchemasFiles(ctx, ch); err != nil {
if err := reader.ReadSchemasFiles(ctx, ch, metautil.SkipStats); err != nil {
errCh <- errors.Trace(err)
}
close(ch)
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/metautil/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ func (db *Database) GetTable(name string) *Table {
}

// LoadBackupTables loads schemas from BackupMeta.
func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) {
func LoadBackupTables(ctx context.Context, reader *MetaReader, loadStats bool) (map[string]*Database, error) {
ch := make(chan *Table)
errCh := make(chan error)
go func() {
if err := reader.ReadSchemasFiles(ctx, ch); err != nil {
var opts []ReadSchemaOption
if !loadStats {
opts = []ReadSchemaOption{SkipStats}
}
if err := reader.ReadSchemasFiles(ctx, ch, opts...); err != nil {
errCh <- errors.Trace(err)
}
close(ch)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/metautil/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestLoadBackupMeta(t *testing.T) {
&backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}),
true,
)
tbl := dbs[dbName.String()].GetTable(tblName.String())
require.NoError(t, err)
Expand Down Expand Up @@ -201,6 +202,7 @@ func TestLoadBackupMetaPartionTable(t *testing.T) {
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
),
true,
)
tbl := dbs[dbName.String()].GetTable(tblName.String())
require.NoError(t, err)
Expand Down Expand Up @@ -287,6 +289,7 @@ func BenchmarkLoadBackupMeta64(b *testing.B) {
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
),
true,
)
require.NoError(b, err)
require.Len(b, dbs, 1)
Expand Down Expand Up @@ -319,6 +322,7 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) {
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
),
true,
)
require.NoError(b, err)
require.Len(b, dbs, 1)
Expand Down Expand Up @@ -351,6 +355,7 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) {
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
),
true,
)
require.NoError(b, err)
require.Len(b, dbs, 1)
Expand Down
18 changes: 13 additions & 5 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) {

type readSchemaConfig struct {
skipFiles bool
skipStats bool
}

// ReadSchemaOption describes some extra option of reading the config.
Expand All @@ -302,6 +303,10 @@ func SkipFiles(conf *readSchemaConfig) {
conf.skipFiles = true
}

func SkipStats(conf *readSchemaConfig) {
conf.skipStats = true
}

// GetBasic returns a basic copy of the backup meta.
func (reader *MetaReader) GetBasic() backuppb.BackupMeta {
return *reader.backupMeta
Expand All @@ -313,6 +318,10 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
cctx, cancel := context.WithCancel(ctx)
defer cancel()

cfg := readSchemaConfig{}
for _, opt := range opts {
opt(&cfg)
}
ch := make(chan any, MaxBatchSize)
schemaCh := make(chan *backuppb.Schema, MaxBatchSize)
// Make sure these 2 goroutine avoid to blocked by the errCh.
Expand All @@ -322,6 +331,10 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
go func() {
defer close(schemaCh)
if err := reader.readSchemas(cctx, func(s *backuppb.Schema) {
if cfg.skipStats {
s.Stats = nil
s.StatsIndex = nil
}
select {
case <-cctx.Done():
case schemaCh <- s:
Expand Down Expand Up @@ -362,11 +375,6 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
}
}()

cfg := readSchemaConfig{}
for _, opt := range opts {
opt(&cfg)
}

// It's not easy to balance memory and time costs for current structure.
// put all files in memory due to https://github.com/pingcap/br/issues/705
var fileMap map[int64][]*backuppb.File
Expand Down
51 changes: 22 additions & 29 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,10 @@ func (rc *Client) InitBackupMeta(
c context.Context,
backupMeta *backuppb.BackupMeta,
backend *backuppb.StorageBackend,
reader *metautil.MetaReader) error {
reader *metautil.MetaReader,
loadStats bool) error {
if rc.needLoadSchemas(backupMeta) {
databases, err := metautil.LoadBackupTables(c, reader)
databases, err := metautil.LoadBackupTables(c, reader, loadStats)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1866,62 +1867,54 @@ func (rc *Client) GoUpdateMetaAndLoadStats(
inCh <-chan *CreatedTable,
errCh chan<- error,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
log.Info("Start to update meta then load stats")
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(statsConcurrency, "UpdateStats")
// The rc.db is not thread safe
var updateMetaLock sync.Mutex

go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
oldTable := tbl.OldTable
// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
updateMetaLock.Lock()

log.Info("start update metas",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name))
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, oldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}

updateMetaLock.Unlock()
}

if oldTable.Stats != nil {
var statsErr error = nil
if loadStats && oldTable.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", oldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
// NOTICE: skip updating cache after load stats from json
if err := rc.statsHandler.LoadStatsFromJSONNoUpdate(ctx, rc.dom.InfoSchema(), oldTable.Stats, 0); err != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(err))
if statsErr = rc.statsHandler.LoadStatsFromJSONNoUpdate(ctx, rc.dom.InfoSchema(), oldTable.Stats, 0); statsErr != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(statsErr))
}
log.Info("restore stat done",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name),
zap.Duration("cost", time.Since(start)))
} else if oldTable.StatsFileIndexes != nil {
} else if loadStats && len(oldTable.StatsFileIndexes) > 0 {
log.Info("start to load statistic data for each partition",
zap.Int64("old id", oldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
rewriteIDMap := getTableIDMap(tbl.Table, tbl.OldTable.Info)
if err := metautil.RestoreStats(ctx, s, cipher, rc.statsHandler, tbl.Table, oldTable.StatsFileIndexes, rewriteIDMap); err != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.StatsFileIndexes), zap.Error(err))
if statsErr = metautil.RestoreStats(ctx, s, cipher, rc.statsHandler, tbl.Table, oldTable.StatsFileIndexes, rewriteIDMap); statsErr != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.StatsFileIndexes), zap.Error(statsErr))
}
log.Info("restore statistic data done",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name),
zap.Duration("cost", time.Since(start)))
}

if statsErr != nil || !loadStats || (oldTable.Stats == nil && len(oldTable.StatsFileIndexes) == 0) {
// Not need to return err when failed because of update analysis-meta
log.Info("start update metas", zap.Stringer("table", oldTable.Info.Name), zap.Stringer("db", oldTable.DB.Name))
// the total kvs contains the index kvs, but the stats meta needs the count of rows
count := int64(oldTable.TotalKvs / uint64(len(oldTable.Info.Indices)+1))
if statsErr = rc.statsHandler.SaveMetaToStorage(tbl.Table.ID, count, 0, "br restore"); statsErr != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
return nil
}, func() {
log.Info("all stats updated")
Expand Down Expand Up @@ -2802,7 +2795,7 @@ func initFullBackupTables(
// read full backup databases to get map[table]table.Info
reader := metautil.NewMetaReader(backupMeta, s, nil)

databases, err := metautil.LoadBackupTables(ctx, reader)
databases, err := metautil.LoadBackupTables(ctx, reader, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 3 additions & 5 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil {
if err = client.InitBackupMeta(c, backupMeta, u, reader, cfg.LoadStats); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -1076,10 +1076,8 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency)
}

// pipeline load stats
if cfg.LoadStats {
postHandleCh = client.GoUpdateMetaAndLoadStats(ctx, s, &cfg.CipherInfo, postHandleCh, errCh, cfg.StatsConcurrency)
}
// pipeline update meta and load stats
postHandleCh = client.GoUpdateMetaAndLoadStats(ctx, s, &cfg.CipherInfo, postHandleCh, errCh, cfg.StatsConcurrency, cfg.LoadStats)

// pipeline wait Tiflash synced
if cfg.WaitTiflashReady {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil {
if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil {
return errors.Trace(err)
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string)
&backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}),
true,
)
require.NoError(t, err)
return dbs
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil {
if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil {
return errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/show/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (exec *CmdExecutor) Read(ctx context.Context) (ShowResult, error) {
out := make(chan *metautil.Table, 16)
errc := make(chan error, 1)
go func() {
errc <- exec.meta.ReadSchemasFiles(ctx, out, metautil.SkipFiles)
errc <- exec.meta.ReadSchemasFiles(ctx, out, metautil.SkipFiles, metautil.SkipStats)
close(out)
}()
ts, err := collectResult(ctx, out, errc, convertTable)
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ func (s *statsReadWriter) SaveStatsToStorage(
return
}

// saveMetaToStorage saves stats meta to the storage.
func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) {
// SaveMetaToStorage saves stats meta to the storage.
func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) {
var statsVer uint64
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount)
Expand Down Expand Up @@ -708,5 +708,5 @@ func (s *statsReadWriter) loadStatsFromJSON(tableInfo *model.TableInfo, physical
if err != nil {
return errors.Trace(err)
}
return s.saveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats)
return s.SaveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats)
}
3 changes: 3 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ type StatsReadWriter interface {
// SaveTableStatsToStorage saves the stats of a table to storage.
SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error)

// SaveMetaToStorage saves the stats meta of a table to storage.
SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error)

// InsertColStats2KV inserts columns stats to kv.
InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error)

Expand Down

0 comments on commit cbd4111

Please sign in to comment.