diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 3888ebe096165..01b19a458eb86 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -2840,6 +2840,10 @@ func upgradeToVer177(s Session, ver int64) { } // ignore error when upgrading from v7.4 to higher version. doReentrantDDL(s, CreateDistFrameworkMeta, infoschema.ErrTableExists) + err := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), variable.TiDBEnableAsyncMergeGlobalStats, variable.Off) + if err != nil { + logutil.BgLogger().Fatal("upgradeToVer177 error", zap.Error(err)) + } } func writeOOMAction(s Session) { diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 4f4ff2d2b5afa..31b887c18af88 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -1434,6 +1434,9 @@ type SessionVars struct { // AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats AnalyzePartitionMergeConcurrency int + // EnableAsyncMergeGlobalStats indicates whether to enable async merge global stats + EnableAsyncMergeGlobalStats bool + // EnableExternalTSRead indicates whether to enable read through external ts EnableExternalTSRead bool diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index dc65f26a12da6..a13dc44bf8a81 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -2370,7 +2370,13 @@ var defaultSysVars = []*SysVar{ return nil }, }, - + { + Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableAsyncMergeGlobalStats, Value: BoolToOnOff(DefTiDBEnableAsyncMergeGlobalStats), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.EnableAsyncMergeGlobalStats = TiDBOptOn(val) + return nil + }, + }, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptPrefixIndexSingleScan, Value: BoolToOnOff(DefTiDBOptPrefixIndexSingleScan), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.OptPrefixIndexSingleScan = TiDBOptOn(val) return nil diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index c7a50824b9045..e10aa9c5bbb9f 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -840,7 +840,8 @@ const ( TiDBAnalyzePartitionConcurrency = "tidb_analyze_partition_concurrency" // TiDBMergePartitionStatsConcurrency indicates the concurrency when merge partition stats into global stats TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency" - + // TiDBEnableAsyncMergeGlobalStats indicates whether to enable async merge global stats + TiDBEnableAsyncMergeGlobalStats = "tidb_enable_async_merge_global_stats" // TiDBOptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index. // When set to true, `col is (not) null`(`col` is index prefix column) is regarded as index filter rather than table filter. TiDBOptPrefixIndexSingleScan = "tidb_opt_prefix_index_single_scan" @@ -1364,6 +1365,7 @@ const ( DefTiDBGOGCMaxValue = 500 DefTiDBGOGCMinValue = 100 DefTiDBOptPrefixIndexSingleScan = true + DefTiDBEnableAsyncMergeGlobalStats = true DefTiDBExternalTS = 0 DefTiDBEnableExternalTSRead = false DefTiDBEnableReusechunk = true diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index faea29a259d7c..f6cd5381056e7 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -42,7 +42,7 @@ go_test( ], embed = [":globalstats"], flaky = True, - shard_count = 20, + shard_count = 21, deps = [ "//pkg/config", "//pkg/parser/model", diff --git a/pkg/statistics/handle/globalstats/global_stats.go b/pkg/statistics/handle/globalstats/global_stats.go index 6fc7e94ee8487..d5abb16a03ee1 100644 --- a/pkg/statistics/handle/globalstats/global_stats.go +++ b/pkg/statistics/handle/globalstats/global_stats.go @@ -15,6 +15,8 @@ package globalstats import ( + "fmt" + "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/ast" @@ -23,7 +25,9 @@ import ( "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tiancaiamao/gp" "go.uber.org/zap" ) @@ -99,15 +103,18 @@ func MergePartitionStats2GlobalStats( isIndex bool, histIDs []int64, ) (globalStats *GlobalStats, err error) { - worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) - if err != nil { - return nil, errors.Trace(err) - } - err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) - if err != nil { - return nil, errors.Trace(err) + if sc.GetSessionVars().EnableAsyncMergeGlobalStats { + worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) + if err != nil { + return nil, errors.Trace(err) + } + err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) + if err != nil { + return nil, errors.Trace(err) + } + return worker.Result(), nil } - return worker.Result(), nil + return blockingMergePartitionStats2GlobalStats(sc, statsHandle.GPool(), opts, is, globalTableInfo, isIndex, histIDs, nil, statsHandle) } // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. @@ -263,3 +270,196 @@ func UpdateGlobalStats( } return nil } + +// blockingMergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +// It is the old algorithm to merge partition-level stats to global-level stats. It will happen the OOM. because it will load all the partition-level stats into memory. +func blockingMergePartitionStats2GlobalStats( + sc sessionctx.Context, + gpool *gp.Pool, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + globalTableInfo *model.TableInfo, + isIndex bool, + histIDs []int64, + allPartitionStats map[int64]*statistics.Table, + statsHandle util.StatsHandle, +) (globalStats *GlobalStats, err error) { + externalCache := false + if allPartitionStats != nil { + externalCache = true + } + + partitionNum := len(globalTableInfo.Partition.Definitions) + if len(histIDs) == 0 { + for _, col := range globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsVirtualGenerated() { + continue + } + histIDs = append(histIDs, col.ID) + } + } + + // Initialized the globalStats. + globalStats = newGlobalStats(len(histIDs)) + + // Slice Dimensions Explanation + // First dimension: Column or Index Stats + // Second dimension: Partition Tables + // Because all topN and histograms need to be collected before they can be merged. + // So we should store all the partition-level stats first, and merge them together. + allHg := make([][]*statistics.Histogram, globalStats.Num) + allCms := make([][]*statistics.CMSketch, globalStats.Num) + allTopN := make([][]*statistics.TopN, globalStats.Num) + allFms := make([][]*statistics.FMSketch, globalStats.Num) + for i := 0; i < globalStats.Num; i++ { + allHg[i] = make([]*statistics.Histogram, 0, partitionNum) + allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) + allTopN[i] = make([]*statistics.TopN, 0, partitionNum) + allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) + } + + skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats + for _, def := range globalTableInfo.Partition.Definitions { + partitionID := def.ID + partitionTable, ok := statsHandle.TableInfoByID(is, partitionID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + return + } + tableInfo := partitionTable.Meta() + var partitionStats *statistics.Table + var okLoad bool + if allPartitionStats != nil { + partitionStats, okLoad = allPartitionStats[partitionID] + } else { + okLoad = false + } + // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats + if !okLoad { + var err1 error + partitionStats, err1 = statsHandle.LoadTablePartitionStats(tableInfo, &def) + if err1 != nil { + if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) { + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) + continue + } + err = err1 + return + } + if externalCache { + allPartitionStats[partitionID] = partitionStats + } + } + + for i := 0; i < globalStats.Num; i++ { + // GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race. + // partitionStats will be released after the for loop. + hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache) + skipPartition := false + if !analyzed { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) + skipPartition = true + } + + // Partition stats is not empty but column stats(hist, topN) is missing. + if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN") + skipPartition = true + } + + if i == 0 { + // In a partition, we will only update globalStats.Count once. + globalStats.Count += partitionStats.RealtimeCount + globalStats.ModifyCount += partitionStats.ModifyCount + } + + if !skipPartition { + allHg[i] = append(allHg[i], hg) + allCms[i] = append(allCms[i], cms) + allTopN[i] = append(allTopN[i], topN) + allFms[i] = append(allFms[i], fms) + } + } + } + + // After collect all the statistics from the partition-level stats, + // we should merge them together. + for i := 0; i < globalStats.Num; i++ { + if len(allHg[i]) == 0 { + // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` + // correctly. It can avoid unexpected behaviors such as nil pointer panic. + continue + } + // FMSketch use many memory, so we first deal with it and then destroy it. + // Merge FMSketch. + globalStats.Fms[i] = allFms[i][0] + for j := 1; j < len(allFms[i]); j++ { + globalStats.Fms[i].MergeFMSketch(allFms[i][j]) + allFms[i][j].DestroyAndPutToPool() + } + + // Update the global NDV. + globalStatsNDV := globalStats.Fms[i].NDV() + if globalStatsNDV > globalStats.Count { + globalStatsNDV = globalStats.Count + } + globalStats.Fms[i].DestroyAndPutToPool() + + // Merge CMSketch. + globalStats.Cms[i] = allCms[i][0] + for j := 1; j < len(allCms[i]); j++ { + err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) + if err != nil { + return + } + } + + // Merge topN. + // Note: We need to merge TopN before merging the histogram. + // Because after merging TopN, some numbers will be left. + // These remaining topN numbers will be used as a separate bucket for later histogram merging. + var poppedTopN []statistics.TopNMeta + wrapper := NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper, + sc.GetSessionVars().StmtCtx.TimeZone(), sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) + if err != nil { + return + } + + // Merge histogram. + globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN, + int64(opts[ast.AnalyzeOptNumBuckets]), isIndex) + if err != nil { + return + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range globalStats.Hg[i].Buckets { + globalStats.Hg[i].Buckets[j].NDV = 0 + } + + globalStats.Hg[i].NDV = globalStatsNDV + } + return +} diff --git a/pkg/statistics/handle/globalstats/globalstats_test.go b/pkg/statistics/handle/globalstats/globalstats_test.go index 84a534517dd0a..69b5c3d81445b 100644 --- a/pkg/statistics/handle/globalstats/globalstats_test.go +++ b/pkg/statistics/handle/globalstats/globalstats_test.go @@ -26,11 +26,24 @@ import ( "github.com/stretchr/testify/require" ) -func TestShowGlobalStats(t *testing.T) { +func TestShowGlobalStatsWithAsyncMergeGlobal(t *testing.T) { + testShowGlobalStats(t, true) +} + +func TestShowGlobalStatsWithoutAsyncMergeGlobal(t *testing.T) { + testShowGlobalStats(t, false) +} + +func testShowGlobalStats(t *testing.T, isAsync bool) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set @@session.tidb_analyze_version = 0") + if isAsync { + tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 0") + } else { + tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 1") + } tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_partition_prune_mode = 'static'") tk.MustExec("create table t (a int, key(a)) partition by hash(a) partitions 2") diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go index 998dcac551cff..7feb85f7794e1 100644 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -297,6 +297,21 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st return } +func (s *statsReadWriter) LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) { + var partitionStats *statistics.Table + partitionStats, err := s.TableStatsFromStorage(tableInfo, partitionDef.ID, true, 0) + if err != nil { + return nil, err + } + // if the err == nil && partitionStats == nil, it means we lack the partition-level stats which the physicalID is equal to partitionID. + if partitionStats == nil { + errMsg := fmt.Sprintf("table `%s` partition `%s`", tableInfo.Name.L, partitionDef.Name.L) + err = types.ErrPartitionStatsMissing.GenWithStackByArgs(errMsg) + return nil, err + } + return partitionStats, nil +} + // LoadNeededHistograms will load histograms for those needed columns/indices. func (s *statsReadWriter) LoadNeededHistograms() (err error) { err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index eb894d22118e4..14c85f4ed092d 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -219,6 +219,9 @@ type StatsReadWriter interface { // TableStatsFromStorage loads table stats info from storage. TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) + // LoadTablePartitionStats loads partition stats info from storage. + LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) + // StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error)