Skip to content

Commit

Permalink
*: add variable to control async merge global stats (#47806)
Browse files Browse the repository at this point in the history
ref #47275
  • Loading branch information
hawkingrei authored Oct 20, 2023
1 parent 48d69d3 commit 1501bf1
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 12 deletions.
4 changes: 4 additions & 0 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,10 @@ func upgradeToVer177(s Session, ver int64) {
}
// ignore error when upgrading from v7.4 to higher version.
doReentrantDDL(s, CreateDistFrameworkMeta, infoschema.ErrTableExists)
err := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), variable.TiDBEnableAsyncMergeGlobalStats, variable.Off)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer177 error", zap.Error(err))
}
}

func writeOOMAction(s Session) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,9 @@ type SessionVars struct {
// AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats
AnalyzePartitionMergeConcurrency int

// EnableAsyncMergeGlobalStats indicates whether to enable async merge global stats
EnableAsyncMergeGlobalStats bool

// EnableExternalTSRead indicates whether to enable read through external ts
EnableExternalTSRead bool

Expand Down
8 changes: 7 additions & 1 deletion pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,7 +2370,13 @@ var defaultSysVars = []*SysVar{
return nil
},
},

{
Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableAsyncMergeGlobalStats, Value: BoolToOnOff(DefTiDBEnableAsyncMergeGlobalStats), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
s.EnableAsyncMergeGlobalStats = TiDBOptOn(val)
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptPrefixIndexSingleScan, Value: BoolToOnOff(DefTiDBOptPrefixIndexSingleScan), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.OptPrefixIndexSingleScan = TiDBOptOn(val)
return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ const (
TiDBAnalyzePartitionConcurrency = "tidb_analyze_partition_concurrency"
// TiDBMergePartitionStatsConcurrency indicates the concurrency when merge partition stats into global stats
TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency"

// TiDBEnableAsyncMergeGlobalStats indicates whether to enable async merge global stats
TiDBEnableAsyncMergeGlobalStats = "tidb_enable_async_merge_global_stats"
// TiDBOptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index.
// When set to true, `col is (not) null`(`col` is index prefix column) is regarded as index filter rather than table filter.
TiDBOptPrefixIndexSingleScan = "tidb_opt_prefix_index_single_scan"
Expand Down Expand Up @@ -1364,6 +1365,7 @@ const (
DefTiDBGOGCMaxValue = 500
DefTiDBGOGCMinValue = 100
DefTiDBOptPrefixIndexSingleScan = true
DefTiDBEnableAsyncMergeGlobalStats = true
DefTiDBExternalTS = 0
DefTiDBEnableExternalTSRead = false
DefTiDBEnableReusechunk = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ go_test(
"topn_bench_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 19,
deps = [
":globalstats",
"//pkg/config",
Expand Down
216 changes: 208 additions & 8 deletions pkg/statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package globalstats

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -23,7 +25,9 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,15 +103,18 @@ func MergePartitionStats2GlobalStats(
isIndex bool,
histIDs []int64,
) (globalStats *GlobalStats, err error) {
worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is)
if err != nil {
return nil, errors.Trace(err)
}
err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex)
if err != nil {
return nil, errors.Trace(err)
if sc.GetSessionVars().EnableAsyncMergeGlobalStats {
worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is)
if err != nil {
return nil, errors.Trace(err)
}
err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex)
if err != nil {
return nil, errors.Trace(err)
}
return worker.Result(), nil
}
return worker.Result(), nil
return blockingMergePartitionStats2GlobalStats(sc, statsHandle.GPool(), opts, is, globalTableInfo, isIndex, histIDs, nil, statsHandle)
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
Expand Down Expand Up @@ -263,3 +270,196 @@ func UpdateGlobalStats(
}
return nil
}

// blockingMergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo.
// It is the old algorithm to merge partition-level stats to global-level stats. It will happen the OOM. because it will load all the partition-level stats into memory.
func blockingMergePartitionStats2GlobalStats(
sc sessionctx.Context,
gpool *gp.Pool,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
statsHandle util.StatsHandle,
) (globalStats *GlobalStats, err error) {
externalCache := false
if allPartitionStats != nil {
externalCache = true
}

partitionNum := len(globalTableInfo.Partition.Definitions)
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
if col.IsVirtualGenerated() {
continue
}
histIDs = append(histIDs, col.ID)
}
}

// Initialized the globalStats.
globalStats = newGlobalStats(len(histIDs))

// Slice Dimensions Explanation
// First dimension: Column or Index Stats
// Second dimension: Partition Tables
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
allFms := make([][]*statistics.FMSketch, globalStats.Num)
for i := 0; i < globalStats.Num; i++ {
allHg[i] = make([]*statistics.Histogram, 0, partitionNum)
allCms[i] = make([]*statistics.CMSketch, 0, partitionNum)
allTopN[i] = make([]*statistics.TopN, 0, partitionNum)
allFms[i] = make([]*statistics.FMSketch, 0, partitionNum)
}

skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats
for _, def := range globalTableInfo.Partition.Definitions {
partitionID := def.ID
partitionTable, ok := statsHandle.TableInfoByID(is, partitionID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}
tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
var okLoad bool
if allPartitionStats != nil {
partitionStats, okLoad = allPartitionStats[partitionID]
} else {
okLoad = false
}
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats
if !okLoad {
var err1 error
partitionStats, err1 = statsHandle.LoadTablePartitionStats(tableInfo, &def)
if err1 != nil {
if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) {
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L))
continue
}
err = err1
return
}
if externalCache {
allPartitionStats[partitionID] = partitionStats
}
}

for i := 0; i < globalStats.Num; i++ {
// GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race.
// partitionStats will be released after the for loop.
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache)
skipPartition := false
if !analyzed {
var missingPart string
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
if !skipMissingPartitionStats {
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart)
skipPartition = true
}

// Partition stats is not empty but column stats(hist, topN) is missing.
if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var missingPart string
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
if !skipMissingPartitionStats {
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN")
skipPartition = true
}

if i == 0 {
// In a partition, we will only update globalStats.Count once.
globalStats.Count += partitionStats.RealtimeCount
globalStats.ModifyCount += partitionStats.ModifyCount
}

if !skipPartition {
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
allFms[i] = append(allFms[i], fms)
}
}
}

// After collect all the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
if len(allHg[i]) == 0 {
// If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0`
// correctly. It can avoid unexpected behaviors such as nil pointer panic.
continue
}
// FMSketch use many memory, so we first deal with it and then destroy it.
// Merge FMSketch.
globalStats.Fms[i] = allFms[i][0]
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
}

// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Fms[i].DestroyAndPutToPool()

// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0]
for j := 1; j < len(allCms[i]); j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
if err != nil {
return
}
}

// Merge topN.
// Note: We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var poppedTopN []statistics.TopNMeta
wrapper := NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper,
sc.GetSessionVars().StmtCtx.TimeZone(), sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex)
if err != nil {
return
}

// Merge histogram.
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN,
int64(opts[ast.AnalyzeOptNumBuckets]), isIndex)
if err != nil {
return
}

// NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them.
for j := range globalStats.Hg[i].Buckets {
globalStats.Hg[i].Buckets[j].NDV = 0
}

globalStats.Hg[i].NDV = globalStatsNDV
}
return
}
15 changes: 14 additions & 1 deletion pkg/statistics/handle/globalstats/globalstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@ import (
"github.com/stretchr/testify/require"
)

func TestShowGlobalStats(t *testing.T) {
func TestShowGlobalStatsWithAsyncMergeGlobal(t *testing.T) {
testShowGlobalStats(t, true)
}

func TestShowGlobalStatsWithoutAsyncMergeGlobal(t *testing.T) {
testShowGlobalStats(t, false)
}

func testShowGlobalStats(t *testing.T, isAsync bool) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_analyze_version = 0")
if isAsync {
tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 0")
} else {
tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 1")
}
tk.MustExec("drop table if exists t")
tk.MustExec("set @@tidb_partition_prune_mode = 'static'")
tk.MustExec("create table t (a int, key(a)) partition by hash(a) partitions 2")
Expand Down
15 changes: 15 additions & 0 deletions pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,21 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st
return
}

func (s *statsReadWriter) LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) {
var partitionStats *statistics.Table
partitionStats, err := s.TableStatsFromStorage(tableInfo, partitionDef.ID, true, 0)
if err != nil {
return nil, err
}
// if the err == nil && partitionStats == nil, it means we lack the partition-level stats which the physicalID is equal to partitionID.
if partitionStats == nil {
errMsg := fmt.Sprintf("table `%s` partition `%s`", tableInfo.Name.L, partitionDef.Name.L)
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(errMsg)
return nil, err
}
return partitionStats, nil
}

// LoadNeededHistograms will load histograms for those needed columns/indices.
func (s *statsReadWriter) LoadNeededHistograms() (err error) {
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/util/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type StatsReadWriter interface {
// TableStatsFromStorage loads table stats info from storage.
TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error)

// LoadTablePartitionStats loads partition stats info from storage.
LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error)

// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta.
StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error)

Expand Down

0 comments on commit 1501bf1

Please sign in to comment.