From e890aede71f4e7fae25959f0159329e351a45a7e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 14 Sep 2023 23:18:09 +0800 Subject: [PATCH] handle: add global status handler (#46974) ref pingcap/tidb#46905 --- executor/BUILD.bazel | 1 + executor/historical_stats_test.go | 5 +- server/handler/optimizor/BUILD.bazel | 1 + .../optimizor/statistics_handler_test.go | 3 +- statistics/BUILD.bazel | 3 - statistics/cmsketch.go | 20 +- statistics/cmsketch_util.go | 15 +- statistics/handle/BUILD.bazel | 2 + statistics/handle/dump.go | 7 +- statistics/handle/dump_test.go | 5 +- statistics/handle/globalstats/BUILD.bazel | 45 +++ statistics/handle/globalstats/global_stats.go | 287 +++++++++++++++ .../{ => handle/globalstats}/merge_worker.go | 31 +- statistics/handle/globalstats/topn.go | 115 ++++++ .../globalstats/topn_bench_test.go} | 15 +- statistics/handle/handle.go | 327 +----------------- 16 files changed, 517 insertions(+), 365 deletions(-) create mode 100644 statistics/handle/globalstats/BUILD.bazel create mode 100644 statistics/handle/globalstats/global_stats.go rename statistics/{ => handle/globalstats}/merge_worker.go (83%) create mode 100644 statistics/handle/globalstats/topn.go rename statistics/{cmsketch_bench_test.go => handle/globalstats/topn_bench_test.go} (91%) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index b2189d60742ee..cca4a87b7fc24 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -410,6 +410,7 @@ go_test( "//sessiontxn/staleread", "//statistics", "//statistics/handle", + "//statistics/handle/globalstats", "//store/copr", "//store/driver/error", "//store/helper", diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 7288a8958b062..df449107c2d52 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/statistics/handle/globalstats" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -343,7 +344,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // only has p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.Nil(t, jsTable.Partitions[handle.TiDBGlobalStats]) + require.Nil(t, jsTable.Partitions[globalstats.TiDBGlobalStats]) // change static to dynamic then assert tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") @@ -365,7 +366,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // has both global and p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.NotNil(t, jsTable.Partitions[handle.TiDBGlobalStats]) + require.NotNil(t, jsTable.Partitions[globalstats.TiDBGlobalStats]) } func TestDumpHistoricalStatsFallback(t *testing.T) { diff --git a/server/handler/optimizor/BUILD.bazel b/server/handler/optimizor/BUILD.bazel index bdcdbe69820c3..fc2f68e929edd 100644 --- a/server/handler/optimizor/BUILD.bazel +++ b/server/handler/optimizor/BUILD.bazel @@ -56,6 +56,7 @@ go_test( "//server/internal/util", "//session", "//statistics/handle", + "//statistics/handle/globalstats", "//store/mockstore/unistore", "//testkit", "//testkit/testsetup", diff --git a/server/handler/optimizor/statistics_handler_test.go b/server/handler/optimizor/statistics_handler_test.go index f9cdf89d2e0d5..e775114cd5788 100644 --- a/server/handler/optimizor/statistics_handler_test.go +++ b/server/handler/optimizor/statistics_handler_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/server/internal/util" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/statistics/handle/globalstats" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -180,7 +181,7 @@ func testDumpPartitionTableStats(t *testing.T, client *testserverclient.TestServ jsonTable := &handle.JSONTable{} err = json.Unmarshal(b, jsonTable) require.NoError(t, err) - require.NotNil(t, jsonTable.Partitions[handle.TiDBGlobalStats]) + require.NotNil(t, jsonTable.Partitions[globalstats.TiDBGlobalStats]) require.Len(t, jsonTable.Partitions, expectedLen) } check(false) diff --git a/statistics/BUILD.bazel b/statistics/BUILD.bazel index aed693bb9b77a..81be7afe4658d 100644 --- a/statistics/BUILD.bazel +++ b/statistics/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "histogram.go", "index.go", "interact_with_storage.go", - "merge_worker.go", "row_sampler.go", "sample.go", "scalar.go", @@ -63,7 +62,6 @@ go_test( name = "statistics_test", timeout = "short", srcs = [ - "cmsketch_bench_test.go", "cmsketch_test.go", "fmsketch_test.go", "histogram_bench_test.go", @@ -100,7 +98,6 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", - "@com_github_tiancaiamao_gp//:gp", "@org_uber_go_goleak//:goleak", ], ) diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index a9897839f79ca..7a51a5e466157 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -624,7 +624,7 @@ func (c *TopN) QueryTopN(sctx sessionctx.Context, d []byte) (result uint64, foun if c == nil { return 0, false } - idx := c.findTopN(d) + idx := c.FindTopN(d) if sctx != nil && sctx.GetSessionVars().StmtCtx.EnableOptimizerDebugTrace { debugtrace.RecordAnyValuesWithNames(sctx, "FindTopN idx", idx) } @@ -634,7 +634,8 @@ func (c *TopN) QueryTopN(sctx sessionctx.Context, d []byte) (result uint64, foun return c.TopN[idx].Count, true } -func (c *TopN) findTopN(d []byte) int { +// FindTopN finds the index of the given value in the TopN. +func (c *TopN) FindTopN(d []byte) int { if c == nil { return -1 } @@ -741,7 +742,7 @@ func (c *TopN) RemoveVal(val []byte) { if c == nil { return } - pos := c.findTopN(val) + pos := c.FindTopN(val) if pos == -1 { return } @@ -766,7 +767,7 @@ func (c *TopN) updateTopNWithDelta(d []byte, delta uint64, increase bool) bool { if c == nil || c.TopN == nil { return false } - idx := c.findTopN(d) + idx := c.FindTopN(d) if idx >= 0 { if increase { c.TopN[idx].Count += delta @@ -795,7 +796,7 @@ func NewTopN(n int) *TopN { // 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool, killed *uint32) (*TopN, []TopNMeta, []*Histogram, error) { - if checkEmptyTopNs(topNs) { + if CheckEmptyTopNs(topNs) { return nil, nil, hists, nil } partNum := len(topNs) @@ -803,7 +804,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n counter := make(map[hack.MutableString]float64) // datumMap is used to store the mapping from the string type to datum type. // The datum is used to find the value in the histogram. - datumMap := newDatumMapCache() + datumMap := NewDatumMapCache() for i, topN := range topNs { if atomic.LoadUint32(killed) == 1 { return nil, nil, nil, errors.Trace(ErrQueryInterrupted) @@ -826,7 +827,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n if atomic.LoadUint32(killed) == 1 { return nil, nil, nil, errors.Trace(ErrQueryInterrupted) } - if (j == i && version >= 2) || topNs[j].findTopN(val.Encoded) != -1 { + if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 { continue } // Get the encodedVal from the hists[j] @@ -866,7 +867,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n // The output parameters are the newly generated TopN structure and the remaining numbers. // Notice: The n can be 0. So n has no default value, we must explicitly specify this value. func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) { - if checkEmptyTopNs(topNs) { + if CheckEmptyTopNs(topNs) { return nil, nil } // Different TopN structures may hold the same value, we have to merge them. @@ -891,7 +892,8 @@ func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) { return GetMergedTopNFromSortedSlice(sorted, n) } -func checkEmptyTopNs(topNs []*TopN) bool { +// CheckEmptyTopNs checks whether all TopNs are empty. +func CheckEmptyTopNs(topNs []*TopN) bool { count := uint64(0) for _, topN := range topNs { count += topN.TotalCount() diff --git a/statistics/cmsketch_util.go b/statistics/cmsketch_util.go index 204952239545d..131141405a66e 100644 --- a/statistics/cmsketch_util.go +++ b/statistics/cmsketch_util.go @@ -22,22 +22,27 @@ import ( "github.com/pingcap/tidb/util/hack" ) -type datumMapCache struct { +// DatumMapCache is used to store the mapping from the string type to datum type. +// The datum is used to find the value in the histogram. +type DatumMapCache struct { datumMap map[hack.MutableString]types.Datum } -func newDatumMapCache() *datumMapCache { - return &datumMapCache{ +// NewDatumMapCache creates a new DatumMapCache. +func NewDatumMapCache() *DatumMapCache { + return &DatumMapCache{ datumMap: make(map[hack.MutableString]types.Datum), } } -func (d *datumMapCache) Get(key hack.MutableString) (val types.Datum, ok bool) { +// Get gets the datum from the cache. +func (d *DatumMapCache) Get(key hack.MutableString) (val types.Datum, ok bool) { val, ok = d.datumMap[key] return } -func (d *datumMapCache) Put(val TopNMeta, encodedVal hack.MutableString, +// Put puts the datum into the cache. +func (d *DatumMapCache) Put(val TopNMeta, encodedVal hack.MutableString, tp byte, isIndex bool, loc *time.Location) (dat types.Datum, err error) { dat, err = topNMetaToDatum(val, tp, isIndex, loc) if err != nil { diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 2cb92e17d7d50..4aee63521cdff 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//sessiontxn", "//statistics", "//statistics/handle/cache", + "//statistics/handle/globalstats", "//statistics/handle/lockstats", "//statistics/handle/metrics", "//table", @@ -76,6 +77,7 @@ go_test( "//sessionctx/stmtctx", "//sessionctx/variable", "//statistics", + "//statistics/handle/globalstats", "//statistics/handle/internal", "//testkit", "//testkit/testsetup", diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 56bd83c9cb626..ba240a606757f 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle/globalstats" handle_metrics "github.com/pingcap/tidb/statistics/handle/metrics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" @@ -195,7 +196,7 @@ func (h *Handle) DumpHistoricalStatsBySnapshot( } // dump its global-stats if existed if tbl != nil { - jsonTbl.Partitions[TiDBGlobalStats] = tbl + jsonTbl.Partitions[globalstats.TiDBGlobalStats] = tbl } return jsonTbl, fallbackTbls, nil } @@ -233,7 +234,7 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table return nil, errors.Trace(err) } if tbl != nil { - jsonTbl.Partitions[TiDBGlobalStats] = tbl + jsonTbl.Partitions[globalstats.TiDBGlobalStats] = tbl } return jsonTbl, nil } @@ -396,7 +397,7 @@ func (h *Handle) LoadStatsFromJSON(is infoschema.InfoSchema, jsonTbl *JSONTable) } } // load global-stats if existed - if globalStats, ok := jsonTbl.Partitions[TiDBGlobalStats]; ok { + if globalStats, ok := jsonTbl.Partitions[globalstats.TiDBGlobalStats]; ok { if err := h.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { return errors.Trace(err) } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index 661640897b636..354334a0a7fd3 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/statistics/handle/globalstats" "github.com/pingcap/tidb/statistics/handle/internal" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" @@ -136,7 +137,7 @@ func TestDumpGlobalStats(t *testing.T) { stats := getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.Nil(t, stats.Partitions[handle.TiDBGlobalStats]) + require.Nil(t, stats.Partitions[globalstats.TiDBGlobalStats]) // global-stats is existed tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") @@ -144,7 +145,7 @@ func TestDumpGlobalStats(t *testing.T) { stats = getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.NotNil(t, stats.Partitions[handle.TiDBGlobalStats]) + require.NotNil(t, stats.Partitions[globalstats.TiDBGlobalStats]) } func TestLoadGlobalStats(t *testing.T) { diff --git a/statistics/handle/globalstats/BUILD.bazel b/statistics/handle/globalstats/BUILD.bazel new file mode 100644 index 0000000000000..4b984d346f9eb --- /dev/null +++ b/statistics/handle/globalstats/BUILD.bazel @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "globalstats", + srcs = [ + "global_stats.go", + "merge_worker.go", + "topn.go", + ], + importpath = "github.com/pingcap/tidb/statistics/handle/globalstats", + visibility = ["//visibility:public"], + deps = [ + "//infoschema", + "//parser/ast", + "//parser/model", + "//sessionctx", + "//sessionctx/variable", + "//statistics", + "//table", + "//types", + "//util/hack", + "//util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_tiancaiamao_gp//:gp", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "globalstats_test", + timeout = "short", + srcs = ["topn_bench_test.go"], + embed = [":globalstats"], + flaky = True, + deps = [ + "//parser/mysql", + "//sessionctx/stmtctx", + "//statistics", + "//types", + "//util/chunk", + "//util/codec", + "@com_github_stretchr_testify//require", + "@com_github_tiancaiamao_gp//:gp", + ], +) diff --git a/statistics/handle/globalstats/global_stats.go b/statistics/handle/globalstats/global_stats.go new file mode 100644 index 0000000000000..a6fc7848d2f1a --- /dev/null +++ b/statistics/handle/globalstats/global_stats.go @@ -0,0 +1,287 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" + "github.com/tiancaiamao/gp" + "go.uber.org/zap" +) + +const ( + // TiDBGlobalStats represents the global-stats for a partitioned table. + TiDBGlobalStats = "global" + // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats + MaxPartitionMergeBatchSize = 256 +) + +// GlobalStats is used to store the statistics contained in the global-level stats +// which is generated by the merge of partition-level stats. +// It will both store the column stats and index stats. +// In the column statistics, the variable `num` is equal to the number of columns in the partition table. +// In the index statistics, the variable `num` is always equal to one. +type GlobalStats struct { + Hg []*statistics.Histogram + Cms []*statistics.CMSketch + TopN []*statistics.TopN + Fms []*statistics.FMSketch + MissingPartitionStats []string + Num int + Count int64 + ModifyCount int64 +} + +type ( + getTableByPhysicalIDFunc func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) + loadTablePartitionStatsFunc func(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) + // GlobalStatusHandler is used to handle the global-level stats. + GlobalStatusHandler struct { + // this gpool is used to reuse goroutine in the mergeGlobalStatsTopN. + gpool *gp.Pool + } +) + +// NewGlobalStatusHandler creates a new GlobalStatusHandler. +func NewGlobalStatusHandler(gpool *gp.Pool) *GlobalStatusHandler { + return &GlobalStatusHandler{gpool: gpool} +} + +// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo, + isIndex int, histIDs []int64, + allPartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) { + partitionNum := len(globalTableInfo.Partition.Definitions) + + // initialized the globalStats + globalStats = new(GlobalStats) + if len(histIDs) == 0 { + for _, col := range globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsGenerated() && !col.GeneratedStored { + continue + } + histIDs = append(histIDs, col.ID) + } + } + globalStats.Num = len(histIDs) + globalStats.Count = 0 + globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) + globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) + globalStats.TopN = make([]*statistics.TopN, globalStats.Num) + globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num) + + // The first dimension of slice is means the number of column or index stats in the globalStats. + // The second dimension of slice is means the number of partition tables. + // Because all topN and histograms need to be collected before they can be merged. + // So we should store all of the partition-level stats first, and merge them together. + allHg := make([][]*statistics.Histogram, globalStats.Num) + allCms := make([][]*statistics.CMSketch, globalStats.Num) + allTopN := make([][]*statistics.TopN, globalStats.Num) + allFms := make([][]*statistics.FMSketch, globalStats.Num) + for i := 0; i < globalStats.Num; i++ { + allHg[i] = make([]*statistics.Histogram, 0, partitionNum) + allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) + allTopN[i] = make([]*statistics.TopN, 0, partitionNum) + allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) + } + + skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats + if sc.GetSessionVars().InRestrictedSQL { + // For AutoAnalyze and HandleDDLEvent(ActionDropTablePartition), we need to use @@global.tidb_skip_missing_partition_stats + val, err1 := sc.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err1 != nil { + logutil.BgLogger().Error("loading tidb_skip_missing_partition_stats failed", zap.Error(err1)) + err = err1 + return + } + skipMissingPartitionStats = variable.TiDBOptOn(val) + } + for _, def := range globalTableInfo.Partition.Definitions { + partitionID := def.ID + partitionTable, ok := getTableByPhysicalIDFn(is, partitionID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + return + } + tableInfo := partitionTable.Meta() + var partitionStats *statistics.Table + if allPartitionStats != nil { + partitionStats, ok = allPartitionStats[partitionID] + } + // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats + if allPartitionStats == nil || partitionStats == nil || !ok { + var err1 error + partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def) + if err1 != nil { + if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err) { + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) + continue + } + err = err1 + return + } + if allPartitionStats == nil { + allPartitionStats = make(map[int64]*statistics.Table) + } + allPartitionStats[partitionID] = partitionStats + } + for i := 0; i < globalStats.Num; i++ { + hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1) + skipPartition := false + if !analyzed { + var missingPart string + if isIndex == 0 { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) + skipPartition = true + } + // partition stats is not empty but column stats(hist, topn) is missing + if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { + var missingPart string + if isIndex == 0 { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topn") + skipPartition = true + } + if i == 0 { + // In a partition, we will only update globalStats.Count once + globalStats.Count += partitionStats.RealtimeCount + globalStats.ModifyCount += partitionStats.ModifyCount + } + if !skipPartition { + allHg[i] = append(allHg[i], hg) + allCms[i] = append(allCms[i], cms) + allTopN[i] = append(allTopN[i], topN) + allFms[i] = append(allFms[i], fms) + } + } + } + + // After collect all of the statistics from the partition-level stats, + // we should merge them together. + for i := 0; i < globalStats.Num; i++ { + if len(allHg[i]) == 0 { + // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` + // correctly. It can avoid unexpected behaviors such as nil pointer panic. + continue + } + // Merge CMSketch + globalStats.Cms[i] = allCms[i][0].Copy() + for j := 1; j < len(allCms[i]); j++ { + err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) + if err != nil { + return + } + } + + // Merge topN. We need to merge TopN before merging the histogram. + // Because after merging TopN, some numbers will be left. + // These remaining topN numbers will be used as a separate bucket for later histogram merging. + var popedTopN []statistics.TopNMeta + wrapper := NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(g.gpool, sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1) + if err != nil { + return + } + + // Merge histogram + globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], popedTopN, int64(opts[ast.AnalyzeOptNumBuckets]), isIndex == 1) + if err != nil { + return + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range globalStats.Hg[i].Buckets { + globalStats.Hg[i].Buckets[j].NDV = 0 + } + + // Update NDV of global-level stats + globalStats.Fms[i] = allFms[i][0].Copy() + for j := 1; j < len(allFms[i]); j++ { + globalStats.Fms[i].MergeFMSketch(allFms[i][j]) + } + + // update the NDV + globalStatsNDV := globalStats.Fms[i].NDV() + if globalStatsNDV > globalStats.Count { + globalStatsNDV = globalStats.Count + } + globalStats.Hg[i].NDV = globalStatsNDV + } + return +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func (g *GlobalStatusHandler) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + physicalID int64, isIndex int, histIDs []int64, + tablePartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) { + // get the partition table IDs + globalTable, ok := getTableByPhysicalIDFn(is, physicalID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID) + return + } + globalTableInfo := globalTable.Meta() + globalStats, err = g.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn) + if err != nil { + return + } + if len(globalStats.MissingPartitionStats) > 0 { + var item string + if isIndex == 0 { + item = "columns" + } else { + item = "index" + if len(histIDs) > 0 { + item += " " + globalTableInfo.FindIndexNameByID(histIDs[0]) + } + } + logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L), + zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats)) + } + return +} + +// Close closes the GlobalStatusHandler. +func (g *GlobalStatusHandler) Close() { + g.gpool.Close() +} diff --git a/statistics/merge_worker.go b/statistics/handle/globalstats/merge_worker.go similarity index 83% rename from statistics/merge_worker.go rename to statistics/handle/globalstats/merge_worker.go index 370f1efa9f504..3edcb860a3465 100644 --- a/statistics/merge_worker.go +++ b/statistics/handle/globalstats/merge_worker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package statistics +package globalstats import ( "sync" @@ -20,17 +20,18 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/util/hack" ) // StatsWrapper wrapper stats type StatsWrapper struct { - AllHg []*Histogram - AllTopN []*TopN + AllHg []*statistics.Histogram + AllTopN []*statistics.TopN } // NewStatsWrapper returns wrapper -func NewStatsWrapper(hg []*Histogram, topN []*TopN) *StatsWrapper { +func NewStatsWrapper(hg []*statistics.Histogram, topN []*statistics.TopN) *StatsWrapper { return &StatsWrapper{ AllHg: hg, AllTopN: topN, @@ -80,8 +81,8 @@ func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask { // TopnStatsMergeResponse indicates topn merge worker response type TopnStatsMergeResponse struct { Err error - TopN *TopN - PopedTopn []TopNMeta + TopN *statistics.TopN + PopedTopn []statistics.TopNMeta } // Run runs topn merge like statistics.MergePartTopN2GlobalTopN @@ -95,7 +96,7 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, allTopNs := worker.statsWrapper.AllTopN allHists := worker.statsWrapper.AllHg resp := &TopnStatsMergeResponse{} - if checkEmptyTopNs(checkTopNs) { + if statistics.CheckEmptyTopNs(checkTopNs) { worker.respCh <- resp return } @@ -104,11 +105,11 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, counter := make(map[hack.MutableString]float64) // datumMap is used to store the mapping from the string type to datum type. // The datum is used to find the value in the histogram. - datumMap := newDatumMapCache() + datumMap := statistics.NewDatumMapCache() for i, topN := range checkTopNs { if atomic.LoadUint32(worker.killed) == 1 { - resp.Err = errors.Trace(ErrQueryInterrupted) + resp.Err = errors.Trace(statistics.ErrQueryInterrupted) worker.respCh <- resp return } @@ -128,11 +129,11 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. for j := 0; j < partNum; j++ { if atomic.LoadUint32(worker.killed) == 1 { - resp.Err = errors.Trace(ErrQueryInterrupted) + resp.Err = errors.Trace(statistics.ErrQueryInterrupted) worker.respCh <- resp return } - if (j == i && version >= 2) || allTopNs[j].findTopN(val.Encoded) != -1 { + if (j == i && version >= 2) || allTopNs[j].FindTopN(val.Encoded) != -1 { continue } // Get the encodedVal from the hists[j] @@ -152,7 +153,7 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, counter[encodedVal] += count // Remove the value corresponding to encodedVal from the histogram. worker.shardMutex[j].Lock() - worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) + worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) worker.shardMutex[j].Unlock() } } @@ -163,12 +164,12 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, worker.respCh <- resp continue } - sorted := make([]TopNMeta, 0, numTop) + sorted := make([]statistics.TopNMeta, 0, numTop) for value, cnt := range counter { data := hack.Slice(string(value)) - sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)}) + sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) } - globalTopN, leftTopN := GetMergedTopNFromSortedSlice(sorted, n) + globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n) resp.TopN = globalTopN resp.PopedTopn = leftTopN worker.respCh <- resp diff --git a/statistics/handle/globalstats/topn.go b/statistics/handle/globalstats/topn.go new file mode 100644 index 0000000000000..cab648e31be39 --- /dev/null +++ b/statistics/handle/globalstats/topn.go @@ -0,0 +1,115 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "strings" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/tiancaiamao/gp" +) + +func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency + killed := &sc.GetSessionVars().Killed + // use original method if concurrency equals 1 or for version1 + if mergeConcurrency < 2 { + return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed) + } + batchSize := len(wrapper.AllTopN) / mergeConcurrency + if batchSize < 1 { + batchSize = 1 + } else if batchSize > MaxPartitionMergeBatchSize { + batchSize = MaxPartitionMergeBatchSize + } + return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed) +} + +// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency +// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker. +// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control +// the partition size for each worker to solve it +func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool, killed *uint32) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + if len(wrapper.AllTopN) < mergeConcurrency { + mergeConcurrency = len(wrapper.AllTopN) + } + tasks := make([]*TopnStatsMergeTask, 0) + for start := 0; start < len(wrapper.AllTopN); { + end := start + mergeBatchSize + if end > len(wrapper.AllTopN) { + end = len(wrapper.AllTopN) + } + task := NewTopnStatsMergeTask(start, end) + tasks = append(tasks, task) + start = end + } + var wg sync.WaitGroup + taskNum := len(tasks) + taskCh := make(chan *TopnStatsMergeTask, taskNum) + respCh := make(chan *TopnStatsMergeResponse, taskNum) + for i := 0; i < mergeConcurrency; i++ { + worker := NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killed) + wg.Add(1) + gp.Go(func() { + defer wg.Done() + worker.Run(timeZone, isIndex, n, version) + }) + } + for _, task := range tasks { + taskCh <- task + } + close(taskCh) + wg.Wait() + close(respCh) + resps := make([]*TopnStatsMergeResponse, 0) + + // handle Error + hasErr := false + errMsg := make([]string, 0) + for resp := range respCh { + if resp.Err != nil { + hasErr = true + errMsg = append(errMsg, resp.Err.Error()) + } + resps = append(resps, resp) + } + if hasErr { + return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) + } + + // fetch the response from each worker and merge them into global topn stats + sorted := make([]statistics.TopNMeta, 0, mergeConcurrency) + leftTopn := make([]statistics.TopNMeta, 0) + for _, resp := range resps { + if resp.TopN != nil { + sorted = append(sorted, resp.TopN.TopN...) + } + leftTopn = append(leftTopn, resp.PopedTopn...) + } + + globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) + + result := append(leftTopn, popedTopn...) + statistics.SortTopnMeta(result) + return globalTopN, result, wrapper.AllHg, nil +} diff --git a/statistics/cmsketch_bench_test.go b/statistics/handle/globalstats/topn_bench_test.go similarity index 91% rename from statistics/cmsketch_bench_test.go rename to statistics/handle/globalstats/topn_bench_test.go index 9c6f2fdda6e87..0269d260db680 100644 --- a/statistics/cmsketch_bench_test.go +++ b/statistics/handle/globalstats/topn_bench_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package statistics_test +package globalstats import ( "fmt" @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -30,7 +29,7 @@ import ( "github.com/tiancaiamao/gp" ) -// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/statistics +// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/statistics/handle/globalstats func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { loc := time.UTC sc := &stmtctx.StatementContext{TimeZone: loc} @@ -81,7 +80,7 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { } } -// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/statistics +// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/statistics/handle/globalstats func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) { loc := time.UTC sc := &stmtctx.StatementContext{TimeZone: loc} @@ -124,20 +123,20 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40}) hists = append(hists, h) } - wrapper := statistics.NewStatsWrapper(hists, topNs) + wrapper := NewStatsWrapper(hists, topNs) const mergeConcurrency = 4 batchSize := len(wrapper.AllTopN) / mergeConcurrency if batchSize < 1 { batchSize = 1 - } else if batchSize > handle.MaxPartitionMergeBatchSize { - batchSize = handle.MaxPartitionMergeBatchSize + } else if batchSize > MaxPartitionMergeBatchSize { + batchSize = MaxPartitionMergeBatchSize } gpool := gp.New(mergeConcurrency, 5*time.Minute) defer gpool.Close() b.ResetTimer() for i := 0; i < b.N; i++ { // Benchmark merge 10 topN. - _, _, _, _ = handle.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled) + _, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled) } } diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 2c8df37316797..93346210379c3 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle/cache" + "github.com/pingcap/tidb/statistics/handle/globalstats" handle_metrics "github.com/pingcap/tidb/statistics/handle/metrics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" @@ -53,18 +54,8 @@ import ( "go.uber.org/zap" ) -const ( - // TiDBGlobalStats represents the global-stats for a partitioned table. - TiDBGlobalStats = "global" - // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats - MaxPartitionMergeBatchSize = 256 -) - // Handle can update stats info periodically. type Handle struct { - // this gpool is used to reuse goroutine in the mergeGlobalStatsTopN. - gpool *gp.Pool - pool sessionPool // initStatsCtx is the ctx only used for initStats @@ -98,6 +89,8 @@ type Handle struct { // statsUsage contains all the column stats usage information from collectors when we dump them to KV. statsUsage *statsUsage + globalstatushandler *globalstats.GlobalStatusHandler + // StatsLoad is used to load stats concurrently StatsLoad StatsLoad @@ -192,8 +185,9 @@ type sessionPool interface { // NewHandle creates a Handle for update stats. func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) { cfg := config.GetGlobalConfig() + gpool := gp.New(math.MaxInt16, time.Minute) handle := &Handle{ - gpool: gp.New(math.MaxInt16, time.Minute), + globalstatushandler: globalstats.NewGlobalStatusHandler(gpool), ddlEventCh: make(chan *ddlUtil.Event, 1000), listHead: NewSessionStatsCollector(), idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, @@ -341,52 +335,12 @@ func (h *Handle) UpdateSessionVar() error { return err } -// GlobalStats is used to store the statistics contained in the global-level stats -// which is generated by the merge of partition-level stats. -// It will both store the column stats and index stats. -// In the column statistics, the variable `num` is equal to the number of columns in the partition table. -// In the index statistics, the variable `num` is always equal to one. -type GlobalStats struct { - Hg []*statistics.Histogram - Cms []*statistics.CMSketch - TopN []*statistics.TopN - Fms []*statistics.FMSketch - MissingPartitionStats []string - Num int - Count int64 - ModifyCount int64 -} - // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, physicalID int64, isIndex int, histIDs []int64, - tablePartitionStats map[int64]*statistics.Table) (globalStats *GlobalStats, err error) { - // get the partition table IDs - globalTable, ok := h.getTableByPhysicalID(is, physicalID) - if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID) - return - } - globalTableInfo := globalTable.Meta() - globalStats, err = h.mergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats) - if err != nil { - return - } - if len(globalStats.MissingPartitionStats) > 0 { - var item string - if isIndex == 0 { - item = "columns" - } else { - item = "index" - if len(histIDs) > 0 { - item += " " + globalTableInfo.FindIndexNameByID(histIDs[0]) - } - } - logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L), - zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats)) - } - return + tablePartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) { + return h.globalstatushandler.MergePartitionStats2GlobalStatsByTableID(sc, opts, is, physicalID, isIndex, histIDs, tablePartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats) } func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) { @@ -408,269 +362,8 @@ func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDe func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo, isIndex int, histIDs []int64, - allPartitionStats map[int64]*statistics.Table) (globalStats *GlobalStats, err error) { - partitionNum := len(globalTableInfo.Partition.Definitions) - - // initialized the globalStats - globalStats = new(GlobalStats) - if len(histIDs) == 0 { - for _, col := range globalTableInfo.Columns { - // The virtual generated column stats can not be merged to the global stats. - if col.IsGenerated() && !col.GeneratedStored { - continue - } - histIDs = append(histIDs, col.ID) - } - } - globalStats.Num = len(histIDs) - globalStats.Count = 0 - globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) - globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) - globalStats.TopN = make([]*statistics.TopN, globalStats.Num) - globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num) - - // The first dimension of slice is means the number of column or index stats in the globalStats. - // The second dimension of slice is means the number of partition tables. - // Because all topN and histograms need to be collected before they can be merged. - // So we should store all of the partition-level stats first, and merge them together. - allHg := make([][]*statistics.Histogram, globalStats.Num) - allCms := make([][]*statistics.CMSketch, globalStats.Num) - allTopN := make([][]*statistics.TopN, globalStats.Num) - allFms := make([][]*statistics.FMSketch, globalStats.Num) - for i := 0; i < globalStats.Num; i++ { - allHg[i] = make([]*statistics.Histogram, 0, partitionNum) - allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) - allTopN[i] = make([]*statistics.TopN, 0, partitionNum) - allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) - } - - skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats - if sc.GetSessionVars().InRestrictedSQL { - // For AutoAnalyze and HandleDDLEvent(ActionDropTablePartition), we need to use @@global.tidb_skip_missing_partition_stats - val, err1 := sc.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) - if err1 != nil { - logutil.BgLogger().Error("loading tidb_skip_missing_partition_stats failed", zap.Error(err1)) - err = err1 - return - } - skipMissingPartitionStats = variable.TiDBOptOn(val) - } - for _, def := range globalTableInfo.Partition.Definitions { - partitionID := def.ID - partitionTable, ok := h.getTableByPhysicalID(is, partitionID) - if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) - return - } - tableInfo := partitionTable.Meta() - var partitionStats *statistics.Table - if allPartitionStats != nil { - partitionStats, ok = allPartitionStats[partitionID] - } - // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats - if allPartitionStats == nil || partitionStats == nil || !ok { - var err1 error - partitionStats, err1 = h.loadTablePartitionStats(tableInfo, &def) - if err1 != nil { - if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err) { - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) - continue - } - err = err1 - return - } - if allPartitionStats == nil { - allPartitionStats = make(map[int64]*statistics.Table) - } - allPartitionStats[partitionID] = partitionStats - } - for i := 0; i < globalStats.Num; i++ { - hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1) - skipPartition := false - if !analyzed { - var missingPart string - if isIndex == 0 { - missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) - } else { - missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) - } - if !skipMissingPartitionStats { - err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) - return - } - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) - skipPartition = true - } - // partition stats is not empty but column stats(hist, topn) is missing - if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { - var missingPart string - if isIndex == 0 { - missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) - } else { - missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) - } - if !skipMissingPartitionStats { - err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) - return - } - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topn") - skipPartition = true - } - if i == 0 { - // In a partition, we will only update globalStats.Count once - globalStats.Count += partitionStats.RealtimeCount - globalStats.ModifyCount += partitionStats.ModifyCount - } - if !skipPartition { - allHg[i] = append(allHg[i], hg) - allCms[i] = append(allCms[i], cms) - allTopN[i] = append(allTopN[i], topN) - allFms[i] = append(allFms[i], fms) - } - } - } - - // After collect all of the statistics from the partition-level stats, - // we should merge them together. - for i := 0; i < globalStats.Num; i++ { - if len(allHg[i]) == 0 { - // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` - // correctly. It can avoid unexpected behaviors such as nil pointer panic. - continue - } - // Merge CMSketch - globalStats.Cms[i] = allCms[i][0].Copy() - for j := 1; j < len(allCms[i]); j++ { - err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) - if err != nil { - return - } - } - - // Merge topN. We need to merge TopN before merging the histogram. - // Because after merging TopN, some numbers will be left. - // These remaining topN numbers will be used as a separate bucket for later histogram merging. - var popedTopN []statistics.TopNMeta - wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i]) - globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(h.gpool, sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1) - if err != nil { - return - } - - // Merge histogram - globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], popedTopN, int64(opts[ast.AnalyzeOptNumBuckets]), isIndex == 1) - if err != nil { - return - } - - // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. - for j := range globalStats.Hg[i].Buckets { - globalStats.Hg[i].Buckets[j].NDV = 0 - } - - // Update NDV of global-level stats - globalStats.Fms[i] = allFms[i][0].Copy() - for j := 1; j < len(allFms[i]); j++ { - globalStats.Fms[i].MergeFMSketch(allFms[i][j]) - } - - // update the NDV - globalStatsNDV := globalStats.Fms[i].NDV() - if globalStatsNDV > globalStats.Count { - globalStatsNDV = globalStats.Count - } - globalStats.Hg[i].NDV = globalStatsNDV - } - return -} - -func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *statistics.StatsWrapper, - timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, - []statistics.TopNMeta, []*statistics.Histogram, error) { - mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency - killed := &sc.GetSessionVars().Killed - // use original method if concurrency equals 1 or for version1 - if mergeConcurrency < 2 { - return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed) - } - batchSize := len(wrapper.AllTopN) / mergeConcurrency - if batchSize < 1 { - batchSize = 1 - } else if batchSize > MaxPartitionMergeBatchSize { - batchSize = MaxPartitionMergeBatchSize - } - return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed) -} - -// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency -// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker. -// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control -// the partition size for each worker to solve it -func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper, - timeZone *time.Location, version int, n uint32, isIndex bool, killed *uint32) (*statistics.TopN, - []statistics.TopNMeta, []*statistics.Histogram, error) { - if len(wrapper.AllTopN) < mergeConcurrency { - mergeConcurrency = len(wrapper.AllTopN) - } - tasks := make([]*statistics.TopnStatsMergeTask, 0) - for start := 0; start < len(wrapper.AllTopN); { - end := start + mergeBatchSize - if end > len(wrapper.AllTopN) { - end = len(wrapper.AllTopN) - } - task := statistics.NewTopnStatsMergeTask(start, end) - tasks = append(tasks, task) - start = end - } - var wg sync.WaitGroup - taskNum := len(tasks) - taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum) - respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum) - for i := 0; i < mergeConcurrency; i++ { - worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killed) - wg.Add(1) - gp.Go(func() { - defer wg.Done() - worker.Run(timeZone, isIndex, n, version) - }) - } - for _, task := range tasks { - taskCh <- task - } - close(taskCh) - wg.Wait() - close(respCh) - resps := make([]*statistics.TopnStatsMergeResponse, 0) - - // handle Error - hasErr := false - errMsg := make([]string, 0) - for resp := range respCh { - if resp.Err != nil { - hasErr = true - errMsg = append(errMsg, resp.Err.Error()) - } - resps = append(resps, resp) - } - if hasErr { - return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) - } - - // fetch the response from each worker and merge them into global topn stats - sorted := make([]statistics.TopNMeta, 0, mergeConcurrency) - leftTopn := make([]statistics.TopNMeta, 0) - for _, resp := range resps { - if resp.TopN != nil { - sorted = append(sorted, resp.TopN.TopN...) - } - leftTopn = append(leftTopn, resp.PopedTopn...) - } - - globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) - - result := append(leftTopn, popedTopn...) - statistics.SortTopnMeta(result) - return globalTopN, result, wrapper.AllHg, nil + allPartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) { + return h.globalstatushandler.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats) } func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { @@ -2033,6 +1726,6 @@ func (h *Handle) SetStatsCacheCapacity(c int64) { // Close stops the background func (h *Handle) Close() { - h.gpool.Close() + h.globalstatushandler.Close() h.statsCache.Load().Close() }