From cbd41115f32b8f49c105473cc65291cdc8d26da6 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Fri, 15 Mar 2024 18:48:41 +0800 Subject: [PATCH] br: skip loading stats into memory if set `--load-stats` to false (#51535) ref pingcap/tidb#50568 --- br/cmd/br/debug.go | 4 +- br/pkg/checksum/validate.go | 2 +- br/pkg/metautil/load.go | 8 ++- br/pkg/metautil/load_test.go | 5 ++ br/pkg/metautil/metafile.go | 18 +++++-- br/pkg/restore/client.go | 51 ++++++++----------- br/pkg/task/restore.go | 8 ++- br/pkg/task/restore_raw.go | 2 +- br/pkg/task/restore_test.go | 1 + br/pkg/task/restore_txn.go | 2 +- br/pkg/task/show/cmd.go | 2 +- .../handle/storage/stats_read_writer.go | 6 +-- pkg/statistics/handle/types/interfaces.go | 3 ++ 13 files changed, 62 insertions(+), 50 deletions(-) diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index df9795fd32482..2ca774b287159 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command { } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := metautil.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader, false) if err != nil { return errors.Trace(err) } @@ -173,7 +173,7 @@ func newBackupMetaValidateCommand() *cobra.Command { return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := metautil.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader, false) if err != nil { log.Error("load tables failed", zap.Error(err)) return errors.Trace(err) diff --git a/br/pkg/checksum/validate.go b/br/pkg/checksum/validate.go index 427d30200c073..c23ff8884d5d0 100644 --- a/br/pkg/checksum/validate.go +++ b/br/pkg/checksum/validate.go @@ -33,7 +33,7 @@ func FastChecksum( errCh := make(chan error) go func() { reader := metautil.NewMetaReader(backupMeta, storage, cipher) - if err := reader.ReadSchemasFiles(ctx, ch); err != nil { + if err := reader.ReadSchemasFiles(ctx, ch, metautil.SkipStats); err != nil { errCh <- errors.Trace(err) } close(ch) diff --git a/br/pkg/metautil/load.go b/br/pkg/metautil/load.go index 6f62a534b337b..e0c2ad717f3b3 100644 --- a/br/pkg/metautil/load.go +++ b/br/pkg/metautil/load.go @@ -38,11 +38,15 @@ func (db *Database) GetTable(name string) *Table { } // LoadBackupTables loads schemas from BackupMeta. -func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) { +func LoadBackupTables(ctx context.Context, reader *MetaReader, loadStats bool) (map[string]*Database, error) { ch := make(chan *Table) errCh := make(chan error) go func() { - if err := reader.ReadSchemasFiles(ctx, ch); err != nil { + var opts []ReadSchemaOption + if !loadStats { + opts = []ReadSchemaOption{SkipStats} + } + if err := reader.ReadSchemasFiles(ctx, ch, opts...); err != nil { errCh <- errors.Trace(err) } close(ch) diff --git a/br/pkg/metautil/load_test.go b/br/pkg/metautil/load_test.go index 92a8987ef7ee9..ef885f3bcbf26 100644 --- a/br/pkg/metautil/load_test.go +++ b/br/pkg/metautil/load_test.go @@ -105,6 +105,7 @@ func TestLoadBackupMeta(t *testing.T) { &backuppb.CipherInfo{ CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }), + true, ) tbl := dbs[dbName.String()].GetTable(tblName.String()) require.NoError(t, err) @@ -201,6 +202,7 @@ func TestLoadBackupMetaPartionTable(t *testing.T) { CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }, ), + true, ) tbl := dbs[dbName.String()].GetTable(tblName.String()) require.NoError(t, err) @@ -287,6 +289,7 @@ func BenchmarkLoadBackupMeta64(b *testing.B) { CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }, ), + true, ) require.NoError(b, err) require.Len(b, dbs, 1) @@ -319,6 +322,7 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) { CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }, ), + true, ) require.NoError(b, err) require.Len(b, dbs, 1) @@ -351,6 +355,7 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) { CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }, ), + true, ) require.NoError(b, err) require.Len(b, dbs, 1) diff --git a/br/pkg/metautil/metafile.go b/br/pkg/metautil/metafile.go index e8292d32972b3..a5a363c25ba8a 100644 --- a/br/pkg/metautil/metafile.go +++ b/br/pkg/metautil/metafile.go @@ -291,6 +291,7 @@ func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) { type readSchemaConfig struct { skipFiles bool + skipStats bool } // ReadSchemaOption describes some extra option of reading the config. @@ -302,6 +303,10 @@ func SkipFiles(conf *readSchemaConfig) { conf.skipFiles = true } +func SkipStats(conf *readSchemaConfig) { + conf.skipStats = true +} + // GetBasic returns a basic copy of the backup meta. func (reader *MetaReader) GetBasic() backuppb.BackupMeta { return *reader.backupMeta @@ -313,6 +318,10 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T cctx, cancel := context.WithCancel(ctx) defer cancel() + cfg := readSchemaConfig{} + for _, opt := range opts { + opt(&cfg) + } ch := make(chan any, MaxBatchSize) schemaCh := make(chan *backuppb.Schema, MaxBatchSize) // Make sure these 2 goroutine avoid to blocked by the errCh. @@ -322,6 +331,10 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T go func() { defer close(schemaCh) if err := reader.readSchemas(cctx, func(s *backuppb.Schema) { + if cfg.skipStats { + s.Stats = nil + s.StatsIndex = nil + } select { case <-cctx.Done(): case schemaCh <- s: @@ -362,11 +375,6 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T } }() - cfg := readSchemaConfig{} - for _, opt := range opts { - opt(&cfg) - } - // It's not easy to balance memory and time costs for current structure. // put all files in memory due to https://github.com/pingcap/br/issues/705 var fileMap map[int64][]*backuppb.File diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index bd3eb02f1560a..74db13d968e15 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -573,9 +573,10 @@ func (rc *Client) InitBackupMeta( c context.Context, backupMeta *backuppb.BackupMeta, backend *backuppb.StorageBackend, - reader *metautil.MetaReader) error { + reader *metautil.MetaReader, + loadStats bool) error { if rc.needLoadSchemas(backupMeta) { - databases, err := metautil.LoadBackupTables(c, reader) + databases, err := metautil.LoadBackupTables(c, reader, loadStats) if err != nil { return errors.Trace(err) } @@ -1866,62 +1867,54 @@ func (rc *Client) GoUpdateMetaAndLoadStats( inCh <-chan *CreatedTable, errCh chan<- error, statsConcurrency uint, + loadStats bool, ) chan *CreatedTable { log.Info("Start to update meta then load stats") outCh := DefaultOutputTableChan() workers := utils.NewWorkerPool(statsConcurrency, "UpdateStats") - // The rc.db is not thread safe - var updateMetaLock sync.Mutex go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error { oldTable := tbl.OldTable - // Not need to return err when failed because of update analysis-meta - restoreTS, err := rc.GetTSWithRetry(ctx) - if err != nil { - log.Error("getTS failed", zap.Error(err)) - } else { - updateMetaLock.Lock() - - log.Info("start update metas", - zap.Stringer("table", oldTable.Info.Name), - zap.Stringer("db", oldTable.DB.Name)) - err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, oldTable.TotalKvs) - if err != nil { - log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err)) - } - - updateMetaLock.Unlock() - } - - if oldTable.Stats != nil { + var statsErr error = nil + if loadStats && oldTable.Stats != nil { log.Info("start loads analyze after validate checksum", zap.Int64("old id", oldTable.Info.ID), zap.Int64("new id", tbl.Table.ID), ) start := time.Now() // NOTICE: skip updating cache after load stats from json - if err := rc.statsHandler.LoadStatsFromJSONNoUpdate(ctx, rc.dom.InfoSchema(), oldTable.Stats, 0); err != nil { - log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(err)) + if statsErr = rc.statsHandler.LoadStatsFromJSONNoUpdate(ctx, rc.dom.InfoSchema(), oldTable.Stats, 0); statsErr != nil { + log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(statsErr)) } log.Info("restore stat done", zap.Stringer("table", oldTable.Info.Name), zap.Stringer("db", oldTable.DB.Name), zap.Duration("cost", time.Since(start))) - } else if oldTable.StatsFileIndexes != nil { + } else if loadStats && len(oldTable.StatsFileIndexes) > 0 { log.Info("start to load statistic data for each partition", zap.Int64("old id", oldTable.Info.ID), zap.Int64("new id", tbl.Table.ID), ) start := time.Now() rewriteIDMap := getTableIDMap(tbl.Table, tbl.OldTable.Info) - if err := metautil.RestoreStats(ctx, s, cipher, rc.statsHandler, tbl.Table, oldTable.StatsFileIndexes, rewriteIDMap); err != nil { - log.Error("analyze table failed", zap.Any("table", oldTable.StatsFileIndexes), zap.Error(err)) + if statsErr = metautil.RestoreStats(ctx, s, cipher, rc.statsHandler, tbl.Table, oldTable.StatsFileIndexes, rewriteIDMap); statsErr != nil { + log.Error("analyze table failed", zap.Any("table", oldTable.StatsFileIndexes), zap.Error(statsErr)) } log.Info("restore statistic data done", zap.Stringer("table", oldTable.Info.Name), zap.Stringer("db", oldTable.DB.Name), zap.Duration("cost", time.Since(start))) } + + if statsErr != nil || !loadStats || (oldTable.Stats == nil && len(oldTable.StatsFileIndexes) == 0) { + // Not need to return err when failed because of update analysis-meta + log.Info("start update metas", zap.Stringer("table", oldTable.Info.Name), zap.Stringer("db", oldTable.DB.Name)) + // the total kvs contains the index kvs, but the stats meta needs the count of rows + count := int64(oldTable.TotalKvs / uint64(len(oldTable.Info.Indices)+1)) + if statsErr = rc.statsHandler.SaveMetaToStorage(tbl.Table.ID, count, 0, "br restore"); statsErr != nil { + log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr)) + } + } return nil }, func() { log.Info("all stats updated") @@ -2802,7 +2795,7 @@ func initFullBackupTables( // read full backup databases to get map[table]table.Info reader := metautil.NewMetaReader(backupMeta, s, nil) - databases, err := metautil.LoadBackupTables(ctx, reader) + databases, err := metautil.LoadBackupTables(ctx, reader, false) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 3d13d258212e1..b103562e427c4 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -770,7 +770,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil { + if err = client.InitBackupMeta(c, backupMeta, u, reader, cfg.LoadStats); err != nil { return errors.Trace(err) } @@ -1076,10 +1076,8 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency) } - // pipeline load stats - if cfg.LoadStats { - postHandleCh = client.GoUpdateMetaAndLoadStats(ctx, s, &cfg.CipherInfo, postHandleCh, errCh, cfg.StatsConcurrency) - } + // pipeline update meta and load stats + postHandleCh = client.GoUpdateMetaAndLoadStats(ctx, s, &cfg.CipherInfo, postHandleCh, errCh, cfg.StatsConcurrency, cfg.LoadStats) // pipeline wait Tiflash synced if cfg.WaitTiflashReady { diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 3e79525e6a1fb..5b9b009853a02 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -116,7 +116,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil { + if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 031355042346b..9301c1ea88f39 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -260,6 +260,7 @@ func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) &backuppb.CipherInfo{ CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }), + true, ) require.NoError(t, err) return dbs diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 56c221a877105..596b1d29d714e 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -60,7 +60,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil { + if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/show/cmd.go b/br/pkg/task/show/cmd.go index 9358b64d8636b..88ff61843a4b3 100644 --- a/br/pkg/task/show/cmd.go +++ b/br/pkg/task/show/cmd.go @@ -93,7 +93,7 @@ func (exec *CmdExecutor) Read(ctx context.Context) (ShowResult, error) { out := make(chan *metautil.Table, 16) errc := make(chan error, 1) go func() { - errc <- exec.meta.ReadSchemasFiles(ctx, out, metautil.SkipFiles) + errc <- exec.meta.ReadSchemasFiles(ctx, out, metautil.SkipFiles, metautil.SkipStats) close(out) }() ts, err := collectResult(ctx, out, errc, convertTable) diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go index c2fd6fdb0e2af..4a479a0a820ae 100644 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -262,8 +262,8 @@ func (s *statsReadWriter) SaveStatsToStorage( return } -// saveMetaToStorage saves stats meta to the storage. -func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { +// SaveMetaToStorage saves stats meta to the storage. +func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { var statsVer uint64 err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) @@ -708,5 +708,5 @@ func (s *statsReadWriter) loadStatsFromJSON(tableInfo *model.TableInfo, physical if err != nil { return errors.Trace(err) } - return s.saveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats) + return s.SaveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats) } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 13d8834870dbe..031760fca7ce9 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -271,6 +271,9 @@ type StatsReadWriter interface { // SaveTableStatsToStorage saves the stats of a table to storage. SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) + // SaveMetaToStorage saves the stats meta of a table to storage. + SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) + // InsertColStats2KV inserts columns stats to kv. InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error)