From 74bb4511afeef4761f547fde2b337f630675233a Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 15 May 2024 23:22:12 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #53298 Signed-off-by: ti-chi-bot --- ddl/metadatalocktest/BUILD.bazel | 9 + pkg/statistics/handle/bootstrap.go | 761 ++++++++++++++++++ .../handle/handletest/statstest/stats_test.go | 395 +++++++++ 3 files changed, 1165 insertions(+) create mode 100644 pkg/statistics/handle/bootstrap.go create mode 100644 pkg/statistics/handle/handletest/statstest/stats_test.go diff --git a/ddl/metadatalocktest/BUILD.bazel b/ddl/metadatalocktest/BUILD.bazel index c1287f1eb14db..6f867a8570e54 100644 --- a/ddl/metadatalocktest/BUILD.bazel +++ b/ddl/metadatalocktest/BUILD.bazel @@ -10,12 +10,21 @@ go_test( flaky = True, shard_count = 34, deps = [ +<<<<<<< HEAD:ddl/metadatalocktest/BUILD.bazel "//config", "//ddl", "//errno", "//server", "//testkit", "//testkit/testsetup", +======= + "//pkg/config", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/statistics/handle/internal", + "//pkg/testkit", + "//pkg/testkit/testsetup", +>>>>>>> 687f39c12f8 (statistics: fix wrong behavior for primary key' non-lite init stats (#53298)):pkg/statistics/handle/handletest/statstest/BUILD.bazel "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go new file mode 100644 index 0000000000000..d238c620fec8d --- /dev/null +++ b/pkg/statistics/handle/bootstrap.go @@ -0,0 +1,761 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/cache" + "github.com/pingcap/tidb/pkg/statistics/handle/initstats" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +// initStatsStep is the step to load stats by paging. +const initStatsStep = int64(500) + +var maxTidRecord MaxTidRecord + +// MaxTidRecord is to record the max tid. +type MaxTidRecord struct { + mu sync.Mutex + tid atomic.Int64 +} + +func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + var physicalID int64 + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + physicalID = row.GetInt64(1) + // The table is read-only. Please do not modify it. + table, ok := h.TableInfoByID(is, physicalID) + if !ok { + logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID)) + continue + } + tableInfo := table.Meta() + newHistColl := statistics.HistColl{ + PhysicalID: physicalID, + HavePhysicalID: true, + RealtimeCount: row.GetInt64(3), + ModifyCount: row.GetInt64(2), + Columns: make(map[int64]*statistics.Column, 4), + Indices: make(map[int64]*statistics.Index, 4), + } + tbl := &statistics.Table{ + HistColl: newHistColl, + Version: row.GetUint64(0), + ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(len(tableInfo.Columns), len(tableInfo.Indices)), + IsPkIsHandle: tableInfo.PKIsHandle, + } + cache.Put(physicalID, tbl) // put this table again since it is updated + } + maxTidRecord.mu.Lock() + defer maxTidRecord.mu.Unlock() + if maxTidRecord.tid.Load() < physicalID { + maxTidRecord.tid.Store(physicalID) + } +} + +func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache, error) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return nil, errors.Trace(err) + } + defer terror.Call(rc.Close) + tables, err := cache.NewStatsCacheImpl(h) + if err != nil { + return nil, err + } + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return nil, errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsMeta4Chunk(is, tables, iter) + } + return tables, nil +} + +func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + var table *statistics.Table + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + tblID := row.GetInt64(0) + if table == nil || table.PhysicalID != tblID { + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + var ok bool + table, ok = cache.Get(tblID) + if !ok { + continue + } + table = table.Copy() + } + isIndex := row.GetInt64(1) + id := row.GetInt64(2) + ndv := row.GetInt64(3) + nullCount := row.GetInt64(5) + statsVer := row.GetInt64(7) + tbl, _ := h.TableInfoByID(is, table.PhysicalID) + // All the objects in the table share the same stats version. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + } + if isIndex > 0 { + var idxInfo *model.IndexInfo + for _, idx := range tbl.Meta().Indices { + if idx.ID == id { + idxInfo = idx + break + } + } + if idxInfo == nil { + continue + } + table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, idxInfo, statsVer != statistics.Version0) + if statsVer != statistics.Version0 { + // The LastAnalyzeVersion is added by ALTER table so its value might be 0. + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4)) + } + } else { + var colInfo *model.ColumnInfo + for _, col := range tbl.Meta().Columns { + if col.ID == id { + colInfo = col + break + } + } + if colInfo == nil { + continue + } + table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, colInfo, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0) + if statsVer != statistics.Version0 { + // The LastAnalyzeVersion is added by ALTER table so its value might be 0. + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4)) + } + } + } + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } +} + +func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + var table *statistics.Table + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + tblID, statsVer := row.GetInt64(0), row.GetInt64(8) + if table == nil || table.PhysicalID != tblID { + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + var ok bool + table, ok = cache.Get(tblID) + if !ok { + continue + } + table = table.Copy() + } + // All the objects in the table share the same stats version. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + } + id, ndv, nullCount, version, totColSize := row.GetInt64(2), row.GetInt64(3), row.GetInt64(5), row.GetUint64(4), row.GetInt64(7) + lastAnalyzePos := row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)) + tbl, _ := h.TableInfoByID(is, table.PhysicalID) + if row.GetInt64(1) > 0 { + var idxInfo *model.IndexInfo + for _, idx := range tbl.Meta().Indices { + if idx.ID == id { + idxInfo = idx + break + } + } + if idxInfo == nil { + continue + } + cms, topN, err := statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil) + if err != nil { + cms = nil + terror.Log(errors.Trace(err)) + } + hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0) + index := &statistics.Index{ + Histogram: *hist, + CMSketch: cms, + TopN: topN, + Info: idxInfo, + StatsVer: statsVer, + Flag: row.GetInt64(10), + PhysicalID: tblID, + } + if statsVer != statistics.Version0 { + index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + // The LastAnalyzeVersion is added by ALTER table so its value might be 0. + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version) + } + lastAnalyzePos.Copy(&index.LastAnalyzePos) + table.Indices[hist.ID] = index + table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, idxInfo, statsVer != statistics.Version0) + } else { + var colInfo *model.ColumnInfo + for _, col := range tbl.Meta().Columns { + if col.ID == id { + colInfo = col + break + } + } + if colInfo == nil { + continue + } + hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize) + hist.Correlation = row.GetFloat64(9) + col := &statistics.Column{ + Histogram: *hist, + PhysicalID: table.PhysicalID, + Info: colInfo, + IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + Flag: row.GetInt64(10), + StatsVer: statsVer, + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + table.Columns[hist.ID] = col + table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, colInfo, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0) + if statsVer != statistics.Version0 { + // The LastAnalyzeVersion is added by ALTER table so its value might be 0. + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version) + } + } + } + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } +} + +func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statstypes.StatsCache) error { + sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsHistograms4ChunkLite(is, cache, iter) + } + return nil +} + +func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error { + sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsHistograms4Chunk(is, cache, iter) + } + return nil +} + +func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsHistograms4Chunk(is, cache, iter) + } + return nil +} + +func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsHistogramsByPaging(is, cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + initStatsStep, + }) + tid += initStatsStep + } + ls.Wait() + return nil +} + +func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + affectedIndexes := make(map[*statistics.Index]struct{}) + var table *statistics.Table + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + tblID := row.GetInt64(0) + if table == nil || table.PhysicalID != tblID { + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + var ok bool + table, ok = cache.Get(tblID) + if !ok { + continue + } + table = table.Copy() + } + idx, ok := table.Indices[row.GetInt64(1)] + if !ok || (idx.CMSketch == nil && idx.StatsVer <= statistics.Version1) { + continue + } + if idx.TopN == nil { + idx.TopN = statistics.NewTopN(32) + } + affectedIndexes[idx] = struct{}{} + data := make([]byte, len(row.GetBytes(2))) + copy(data, row.GetBytes(2)) + idx.TopN.AppendTopN(data, row.GetUint64(3)) + } + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + for idx := range affectedIndexes { + idx.TopN.Sort() + } +} + +func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error { + sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsTopN4Chunk(cache, iter) + } + return nil +} + +func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? order by table_id" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsTopN4Chunk(cache, iter) + } + return nil +} + +func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsTopNByPaging(cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + initStatsStep, + }) + tid += initStatsStep + } + ls.Wait() + return nil +} + +func (*Handle) initStatsFMSketch4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + table, ok := cache.Get(row.GetInt64(0)) + if !ok { + continue + } + fms, err := statistics.DecodeFMSketch(row.GetBytes(3)) + if err != nil { + fms = nil + terror.Log(errors.Trace(err)) + } + + isIndex := row.GetInt64(1) + id := row.GetInt64(2) + if isIndex == 1 { + if idxStats, ok := table.Indices[id]; ok { + idxStats.FMSketch = fms + } + } else { + if colStats, ok := table.Columns[id]; ok { + colStats.FMSketch = fms + } + } + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } +} + +func (h *Handle) initStatsFMSketch(cache statstypes.StatsCache) error { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsFMSketch4Chunk(cache, iter) + } + return nil +} + +func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + var table *statistics.Table + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) + if table == nil || table.PhysicalID != tableID { + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + var ok bool + table, ok = cache.Get(tableID) + if !ok { + continue + } + table = table.Copy() + } + var lower, upper types.Datum + var hist *statistics.Histogram + if isIndex > 0 { + index, ok := table.Indices[histID] + if !ok { + continue + } + hist = &index.Histogram + lower, upper = types.NewBytesDatum(row.GetBytes(5)), types.NewBytesDatum(row.GetBytes(6)) + } else { + column, ok := table.Columns[histID] + if !ok { + continue + } + if !mysql.HasPriKeyFlag(column.Info.GetFlag()) { + continue + } + hist = &column.Histogram + d := types.NewBytesDatum(row.GetBytes(5)) + var err error + lower, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, &column.Info.FieldType) + if err != nil { + logutil.BgLogger().Debug("decode bucket lower bound failed", zap.Error(err)) + delete(table.Columns, histID) + continue + } + d = types.NewBytesDatum(row.GetBytes(6)) + upper, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, &column.Info.FieldType) + if err != nil { + logutil.BgLogger().Debug("decode bucket upper bound failed", zap.Error(err)) + delete(table.Columns, histID) + continue + } + } + hist.AppendBucketWithNDV(&lower, &upper, row.GetInt64(3), row.GetInt64(4), row.GetInt64(7)) + } + if table != nil { + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } +} + +func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error { + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err := h.initStatsBucketsConcurrency(cache) + if err != nil { + return errors.Trace(err) + } + } else { + sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsBuckets4Chunk(cache, iter) + } + } + tables := cache.Values() + for _, table := range tables { + for _, idx := range table.Indices { + for i := 1; i < idx.Len(); i++ { + idx.Buckets[i].Count += idx.Buckets[i-1].Count + } + idx.PreCalculateScalar() + } + for _, col := range table.Columns { + for i := 1; i < col.Len(); i++ { + col.Buckets[i].Count += col.Buckets[i-1].Count + } + col.PreCalculateScalar() + } + cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. + } + return nil +} + +func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? order by table_id, is_index, hist_id, bucket_id" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsBuckets4Chunk(cache, iter) + } + return nil +} + +func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsBucketsByPaging(cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + initStatsStep, + }) + tid += initStatsStep + } + ls.Wait() + return nil +} + +// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats. +// 1. Basic stats meta data is loaded.(count, modify count, etc.) +// 2. Column/index stats are loaded. (only histogram) +// 3. TopN, Bucket, FMSketch are not loaded. +func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { + defer func() { + _, err1 := util.Exec(h.initStatsCtx, "commit") + if err == nil && err1 != nil { + err = err1 + } + }() + _, err = util.Exec(h.initStatsCtx, "begin") + if err != nil { + return err + } + cache, err := h.initStatsMeta(is) + if err != nil { + return errors.Trace(err) + } + err = h.initStatsHistogramsLite(is, cache) + if err != nil { + return errors.Trace(err) + } + h.Replace(cache) + return nil +} + +// InitStats initiates the stats cache. +// 1. Basic stats meta data is loaded.(count, modify count, etc.) +// 2. Column/index stats are loaded. (histogram, topn, buckets, FMSketch) +func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + defer func() { + _, err1 := util.Exec(h.initStatsCtx, "commit") + if err == nil && err1 != nil { + err = err1 + } + }() + _, err = util.Exec(h.initStatsCtx, "begin") + if err != nil { + return err + } + cache, err := h.initStatsMeta(is) + if err != nil { + return errors.Trace(err) + } + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err = h.initStatsHistogramsConcurrency(is, cache) + } else { + err = h.initStatsHistograms(is, cache) + } + if err != nil { + return errors.Trace(err) + } + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err = h.initStatsTopNConcurrency(cache) + } else { + err = h.initStatsTopN(cache) + } + if err != nil { + return err + } + if loadFMSketch { + err = h.initStatsFMSketch(cache) + if err != nil { + return err + } + } + err = h.initStatsBuckets(cache) + if err != nil { + return errors.Trace(err) + } + // Set columns' stats status. + for _, table := range cache.Values() { + for _, col := range table.Columns { + if col.StatsAvailable() { + // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn + col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + } + h.Replace(cache) + return nil +} diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go new file mode 100644 index 0000000000000..3d9a0c1066635 --- /dev/null +++ b/pkg/statistics/handle/handletest/statstest/stats_test.go @@ -0,0 +1,395 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statstest + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/statistics/handle/internal" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" +) + +func TestStatsCache(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int)") + testKit.MustExec("insert into t values(1, 2)") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.True(t, statsTbl.Pseudo) + testKit.MustExec("analyze table t") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl.Pseudo) + testKit.MustExec("create index idx_t on t(c1)") + do.InfoSchema() + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + // If index is build, but stats is not updated. statsTbl can also work. + require.False(t, statsTbl.Pseudo) + // But the added index will not work. + require.Nil(t, statsTbl.Indices[int64(1)]) + + testKit.MustExec("analyze table t") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl.Pseudo) + // If the new schema drop a column, the table stats can still work. + testKit.MustExec("alter table t drop column c2") + is = do.InfoSchema() + do.StatsHandle().Clear() + err = do.StatsHandle().Update(is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl.Pseudo) + + // If the new schema add a column, the table stats can still work. + testKit.MustExec("alter table t add column c10 int") + is = do.InfoSchema() + + do.StatsHandle().Clear() + err = do.StatsHandle().Update(is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl.Pseudo) +} + +func TestStatsCacheMemTracker(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int,c3 int)") + testKit.MustExec("insert into t values(1, 2, 3)") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.True(t, statsTbl.MemoryUsage().TotalMemUsage == 0) + require.True(t, statsTbl.Pseudo) + + testKit.MustExec("analyze table t") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + + require.False(t, statsTbl.Pseudo) + testKit.MustExec("create index idx_t on t(c1)") + do.InfoSchema() + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + + // If index is build, but stats is not updated. statsTbl can also work. + require.False(t, statsTbl.Pseudo) + // But the added index will not work. + require.Nil(t, statsTbl.Indices[int64(1)]) + + testKit.MustExec("analyze table t") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + + require.False(t, statsTbl.Pseudo) + + // If the new schema drop a column, the table stats can still work. + testKit.MustExec("alter table t drop column c2") + is = do.InfoSchema() + do.StatsHandle().Clear() + err = do.StatsHandle().Update(is) + require.NoError(t, err) + + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.True(t, statsTbl.MemoryUsage().TotalMemUsage > 0) + require.False(t, statsTbl.Pseudo) + + // If the new schema add a column, the table stats can still work. + testKit.MustExec("alter table t add column c10 int") + is = do.InfoSchema() + + do.StatsHandle().Clear() + err = do.StatsHandle().Update(is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl.Pseudo) +} + +func TestStatsStoreAndLoad(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int)") + recordCount := 1000 + for i := 0; i < recordCount; i++ { + testKit.MustExec("insert into t values (?, ?)", i, i+1) + } + testKit.MustExec("create index idx_t on t(c2)") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + testKit.MustExec("analyze table t") + statsTbl1 := do.StatsHandle().GetTableStats(tableInfo) + + do.StatsHandle().Clear() + err = do.StatsHandle().Update(is) + require.NoError(t, err) + statsTbl2 := do.StatsHandle().GetTableStats(tableInfo) + require.False(t, statsTbl2.Pseudo) + require.Equal(t, int64(recordCount), statsTbl2.RealtimeCount) + internal.AssertTableEqual(t, statsTbl1, statsTbl2) +} + +func testInitStatsMemTrace(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, b int, c int, primary key(a), key idx(b))") + tk.MustExec("insert into t1 values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") + tk.MustExec("analyze table t1") + for i := 2; i < 10; i++ { + tk.MustExec(fmt.Sprintf("create table t%v (a int, b int, c int, primary key(a), key idx(b))", i)) + tk.MustExec(fmt.Sprintf("insert into t%v select * from t1", i)) + tk.MustExec(fmt.Sprintf("analyze table t%v", i)) + } + h := dom.StatsHandle() + is := dom.InfoSchema() + h.Clear() + require.Equal(t, h.MemConsumed(), int64(0)) + require.NoError(t, h.InitStats(is)) + + var memCostTot int64 + for i := 1; i < 10; i++ { + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(fmt.Sprintf("t%v", i))) + require.NoError(t, err) + tStats := h.GetTableStats(tbl.Meta()) + memCostTot += tStats.MemoryUsage().TotalMemUsage + } + tables := h.StatsCache.Values() + for _, tt := range tables { + tbl, ok := h.StatsCache.Get(tt.PhysicalID) + require.True(t, ok) + require.Equal(t, tbl.PhysicalID, tt.PhysicalID) + } + + require.Equal(t, h.MemConsumed(), memCostTot) +} + +func TestInitStatsMemTraceWithLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = false + }) + testInitStatsMemTraceFunc(t, true) +} + +func TestInitStatsMemTraceWithoutLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = false + }) + testInitStatsMemTraceFunc(t, false) +} + +func TestInitStatsMemTraceWithConcurrrencyLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = true + }) + testInitStatsMemTraceFunc(t, true) +} + +func TestInitStatsMemTraceWithoutConcurrrencyLite(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.ConcurrentlyInitStats = true + }) + testInitStatsMemTraceFunc(t, false) +} + +func testInitStatsMemTraceFunc(t *testing.T, liteInitStats bool) { + originValue := config.GetGlobalConfig().Performance.LiteInitStats + defer func() { + config.GetGlobalConfig().Performance.LiteInitStats = originValue + }() + config.GetGlobalConfig().Performance.LiteInitStats = liteInitStats + testInitStatsMemTrace(t) +} + +func TestInitStats(t *testing.T) { + originValue := config.GetGlobalConfig().Performance.LiteInitStats + defer func() { + config.GetGlobalConfig().Performance.LiteInitStats = originValue + }() + config.GetGlobalConfig().Performance.LiteInitStats = false + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("set @@session.tidb_analyze_version = 1") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") + testKit.MustExec("analyze table t") + h := dom.StatsHandle() + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when + // `Lease` is not 0, so here we just change it. + h.SetLease(time.Millisecond) + + h.Clear() + require.NoError(t, h.InitStats(is)) + table0 := h.GetTableStats(tbl.Meta()) + cols := table0.Columns + require.Equal(t, uint8(0x36), cols[1].LastAnalyzePos.GetBytes()[0]) + require.Equal(t, uint8(0x37), cols[2].LastAnalyzePos.GetBytes()[0]) + require.Equal(t, uint8(0x38), cols[3].LastAnalyzePos.GetBytes()[0]) + h.Clear() + require.NoError(t, h.Update(is)) + // Index and pk are loaded. + needed := fmt.Sprintf(`Table:%v RealtimeCount:6 +column:1 ndv:6 totColSize:0 +num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0 +num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0 +num: 1 lower_bound: 3 upper_bound: 3 repeats: 1 ndv: 0 +num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0 +num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0 +num: 1 lower_bound: 6 upper_bound: 6 repeats: 1 ndv: 0 +column:2 ndv:6 totColSize:6 +column:3 ndv:6 totColSize:6 +index:1 ndv:6 +num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0 +num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0 +num: 1 lower_bound: 3 upper_bound: 3 repeats: 1 ndv: 0 +num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0 +num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0 +num: 1 lower_bound: 7 upper_bound: 7 repeats: 1 ndv: 0`, tbl.Meta().ID) + require.Equal(t, needed, table0.String()) + h.SetLease(0) +} + +func TestInitStats51358(t *testing.T) { + originValue := config.GetGlobalConfig().Performance.LiteInitStats + defer func() { + config.GetGlobalConfig().Performance.LiteInitStats = originValue + }() + config.GetGlobalConfig().Performance.LiteInitStats = false + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("set @@session.tidb_analyze_version = 1") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") + testKit.MustExec("analyze table t") + h := dom.StatsHandle() + is := dom.InfoSchema() + // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when + // `Lease` is not 0, so here we just change it. + h.SetLease(time.Millisecond) + + h.Clear() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/cache/StatsCacheGetNil", "return()")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/cache/StatsCacheGetNil")) + }() + require.NoError(t, h.InitStats(is)) + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + stats := h.GetTableStats(tbl.Meta()) + for _, column := range stats.Columns { + if mysql.HasPriKeyFlag(column.Info.GetFlag()) { + // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn + require.Nil(t, column.TopN) + } + require.False(t, column.IsFullLoad()) + } +} + +func TestInitStatsVer2(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + config.GetGlobalConfig().Performance.LiteInitStats = false + config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false + }) + initStatsVer2(t) +} + +func TestInitStatsVer2Concurrency(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + config.GetGlobalConfig().Performance.LiteInitStats = false + config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true + }) + initStatsVer2(t) +} + +func initStatsVer2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("create table t(a int, b int, c int, index idx(a), index idxab(a, b))") + tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (4, 4, 4), (4, 4, 4)") + tk.MustExec("analyze table t with 2 topn, 3 buckets") + h := dom.StatsHandle() + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when + // `Lease` is not 0, so here we just change it. + h.SetLease(time.Millisecond) + + h.Clear() + require.NoError(t, h.InitStats(is)) + table0 := h.GetTableStats(tbl.Meta()) + cols := table0.Columns + require.Equal(t, uint8(0x33), cols[1].LastAnalyzePos.GetBytes()[0]) + require.Equal(t, uint8(0x33), cols[2].LastAnalyzePos.GetBytes()[0]) + require.Equal(t, uint8(0x33), cols[3].LastAnalyzePos.GetBytes()[0]) + h.Clear() + require.NoError(t, h.InitStats(is)) + table1 := h.GetTableStats(tbl.Meta()) + internal.AssertTableEqual(t, table0, table1) + h.SetLease(0) +} + +func TestInitStatsIssue41938(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_analyze_version=1") + tk.MustExec("set @@session.tidb_analyze_version=1") + tk.MustExec("create table t1 (a timestamp primary key)") + tk.MustExec("insert into t1 values ('2023-03-07 14:24:30'), ('2023-03-07 14:24:31'), ('2023-03-07 14:24:32'), ('2023-03-07 14:24:33')") + tk.MustExec("analyze table t1 with 0 topn") + h := dom.StatsHandle() + // `InitStats` is only called when `Lease` is not 0, so here we just change it. + h.SetLease(time.Millisecond) + h.Clear() + require.NoError(t, h.InitStats(dom.InfoSchema())) + h.SetLease(0) +} From d9d4779aba6c0ed2a5eb0c180c56d160c2dfa7cc Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 29 Aug 2024 10:51:51 +0800 Subject: [PATCH 2/4] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/bootstrap.go | 761 ------------------ .../handle/handletest/statstest/stats_test.go | 395 --------- statistics/handle/bootstrap.go | 7 +- 3 files changed, 2 insertions(+), 1161 deletions(-) delete mode 100644 pkg/statistics/handle/bootstrap.go delete mode 100644 pkg/statistics/handle/handletest/statstest/stats_test.go diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go deleted file mode 100644 index d238c620fec8d..0000000000000 --- a/pkg/statistics/handle/bootstrap.go +++ /dev/null @@ -1,761 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package handle - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/cache" - "github.com/pingcap/tidb/pkg/statistics/handle/initstats" - statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" - "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" -) - -// initStatsStep is the step to load stats by paging. -const initStatsStep = int64(500) - -var maxTidRecord MaxTidRecord - -// MaxTidRecord is to record the max tid. -type MaxTidRecord struct { - mu sync.Mutex - tid atomic.Int64 -} - -func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - var physicalID int64 - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - physicalID = row.GetInt64(1) - // The table is read-only. Please do not modify it. - table, ok := h.TableInfoByID(is, physicalID) - if !ok { - logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID)) - continue - } - tableInfo := table.Meta() - newHistColl := statistics.HistColl{ - PhysicalID: physicalID, - HavePhysicalID: true, - RealtimeCount: row.GetInt64(3), - ModifyCount: row.GetInt64(2), - Columns: make(map[int64]*statistics.Column, 4), - Indices: make(map[int64]*statistics.Index, 4), - } - tbl := &statistics.Table{ - HistColl: newHistColl, - Version: row.GetUint64(0), - ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(len(tableInfo.Columns), len(tableInfo.Indices)), - IsPkIsHandle: tableInfo.PKIsHandle, - } - cache.Put(physicalID, tbl) // put this table again since it is updated - } - maxTidRecord.mu.Lock() - defer maxTidRecord.mu.Unlock() - if maxTidRecord.tid.Load() < physicalID { - maxTidRecord.tid.Store(physicalID) - } -} - -func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache, error) { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return nil, errors.Trace(err) - } - defer terror.Call(rc.Close) - tables, err := cache.NewStatsCacheImpl(h) - if err != nil { - return nil, err - } - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return nil, errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsMeta4Chunk(is, tables, iter) - } - return tables, nil -} - -func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - var table *statistics.Table - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - tblID := row.GetInt64(0) - if table == nil || table.PhysicalID != tblID { - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - var ok bool - table, ok = cache.Get(tblID) - if !ok { - continue - } - table = table.Copy() - } - isIndex := row.GetInt64(1) - id := row.GetInt64(2) - ndv := row.GetInt64(3) - nullCount := row.GetInt64(5) - statsVer := row.GetInt64(7) - tbl, _ := h.TableInfoByID(is, table.PhysicalID) - // All the objects in the table share the same stats version. - if statsVer != statistics.Version0 { - table.StatsVer = int(statsVer) - } - if isIndex > 0 { - var idxInfo *model.IndexInfo - for _, idx := range tbl.Meta().Indices { - if idx.ID == id { - idxInfo = idx - break - } - } - if idxInfo == nil { - continue - } - table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, idxInfo, statsVer != statistics.Version0) - if statsVer != statistics.Version0 { - // The LastAnalyzeVersion is added by ALTER table so its value might be 0. - table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4)) - } - } else { - var colInfo *model.ColumnInfo - for _, col := range tbl.Meta().Columns { - if col.ID == id { - colInfo = col - break - } - } - if colInfo == nil { - continue - } - table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, colInfo, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0) - if statsVer != statistics.Version0 { - // The LastAnalyzeVersion is added by ALTER table so its value might be 0. - table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4)) - } - } - } - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } -} - -func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - var table *statistics.Table - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - tblID, statsVer := row.GetInt64(0), row.GetInt64(8) - if table == nil || table.PhysicalID != tblID { - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - var ok bool - table, ok = cache.Get(tblID) - if !ok { - continue - } - table = table.Copy() - } - // All the objects in the table share the same stats version. - if statsVer != statistics.Version0 { - table.StatsVer = int(statsVer) - } - id, ndv, nullCount, version, totColSize := row.GetInt64(2), row.GetInt64(3), row.GetInt64(5), row.GetUint64(4), row.GetInt64(7) - lastAnalyzePos := row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)) - tbl, _ := h.TableInfoByID(is, table.PhysicalID) - if row.GetInt64(1) > 0 { - var idxInfo *model.IndexInfo - for _, idx := range tbl.Meta().Indices { - if idx.ID == id { - idxInfo = idx - break - } - } - if idxInfo == nil { - continue - } - cms, topN, err := statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil) - if err != nil { - cms = nil - terror.Log(errors.Trace(err)) - } - hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0) - index := &statistics.Index{ - Histogram: *hist, - CMSketch: cms, - TopN: topN, - Info: idxInfo, - StatsVer: statsVer, - Flag: row.GetInt64(10), - PhysicalID: tblID, - } - if statsVer != statistics.Version0 { - index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - // The LastAnalyzeVersion is added by ALTER table so its value might be 0. - table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version) - } - lastAnalyzePos.Copy(&index.LastAnalyzePos) - table.Indices[hist.ID] = index - table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, idxInfo, statsVer != statistics.Version0) - } else { - var colInfo *model.ColumnInfo - for _, col := range tbl.Meta().Columns { - if col.ID == id { - colInfo = col - break - } - } - if colInfo == nil { - continue - } - hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize) - hist.Correlation = row.GetFloat64(9) - col := &statistics.Column{ - Histogram: *hist, - PhysicalID: table.PhysicalID, - Info: colInfo, - IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), - Flag: row.GetInt64(10), - StatsVer: statsVer, - } - lastAnalyzePos.Copy(&col.LastAnalyzePos) - table.Columns[hist.ID] = col - table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, colInfo, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0) - if statsVer != statistics.Version0 { - // The LastAnalyzeVersion is added by ALTER table so its value might be 0. - table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version) - } - } - } - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } -} - -func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statstypes.StatsCache) error { - sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsHistograms4ChunkLite(is, cache, iter) - } - return nil -} - -func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error { - sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsHistograms4Chunk(is, cache, iter) - } - return nil -} - -func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error { - se, err := h.Pool.SPool().Get() - if err != nil { - return err - } - defer func() { - if err == nil { // only recycle when no error - h.Pool.SPool().Put(se) - } - }() - sctx := se.(sessionctx.Context) - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?" - rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsHistograms4Chunk(is, cache, iter) - } - return nil -} - -func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error { - var maxTid = maxTidRecord.tid.Load() - tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { - return h.initStatsHistogramsByPaging(is, cache, task) - }) - ls.LoadStats() - for tid <= maxTid { - ls.SendTask(initstats.Task{ - StartTid: tid, - EndTid: tid + initStatsStep, - }) - tid += initStatsStep - } - ls.Wait() - return nil -} - -func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - affectedIndexes := make(map[*statistics.Index]struct{}) - var table *statistics.Table - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - tblID := row.GetInt64(0) - if table == nil || table.PhysicalID != tblID { - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - var ok bool - table, ok = cache.Get(tblID) - if !ok { - continue - } - table = table.Copy() - } - idx, ok := table.Indices[row.GetInt64(1)] - if !ok || (idx.CMSketch == nil && idx.StatsVer <= statistics.Version1) { - continue - } - if idx.TopN == nil { - idx.TopN = statistics.NewTopN(32) - } - affectedIndexes[idx] = struct{}{} - data := make([]byte, len(row.GetBytes(2))) - copy(data, row.GetBytes(2)) - idx.TopN.AppendTopN(data, row.GetUint64(3)) - } - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - for idx := range affectedIndexes { - idx.TopN.Sort() - } -} - -func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error { - sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsTopN4Chunk(cache, iter) - } - return nil -} - -func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error { - se, err := h.Pool.SPool().Get() - if err != nil { - return err - } - defer func() { - if err == nil { // only recycle when no error - h.Pool.SPool().Put(se) - } - }() - sctx := se.(sessionctx.Context) - sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? order by table_id" - rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsTopN4Chunk(cache, iter) - } - return nil -} - -func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error { - var maxTid = maxTidRecord.tid.Load() - tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { - return h.initStatsTopNByPaging(cache, task) - }) - ls.LoadStats() - for tid <= maxTid { - ls.SendTask(initstats.Task{ - StartTid: tid, - EndTid: tid + initStatsStep, - }) - tid += initStatsStep - } - ls.Wait() - return nil -} - -func (*Handle) initStatsFMSketch4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - table, ok := cache.Get(row.GetInt64(0)) - if !ok { - continue - } - fms, err := statistics.DecodeFMSketch(row.GetBytes(3)) - if err != nil { - fms = nil - terror.Log(errors.Trace(err)) - } - - isIndex := row.GetInt64(1) - id := row.GetInt64(2) - if isIndex == 1 { - if idxStats, ok := table.Indices[id]; ok { - idxStats.FMSketch = fms - } - } else { - if colStats, ok := table.Columns[id]; ok { - colStats.FMSketch = fms - } - } - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } -} - -func (h *Handle) initStatsFMSketch(cache statstypes.StatsCache) error { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsFMSketch4Chunk(cache, iter) - } - return nil -} - -func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { - var table *statistics.Table - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) - if table == nil || table.PhysicalID != tableID { - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - var ok bool - table, ok = cache.Get(tableID) - if !ok { - continue - } - table = table.Copy() - } - var lower, upper types.Datum - var hist *statistics.Histogram - if isIndex > 0 { - index, ok := table.Indices[histID] - if !ok { - continue - } - hist = &index.Histogram - lower, upper = types.NewBytesDatum(row.GetBytes(5)), types.NewBytesDatum(row.GetBytes(6)) - } else { - column, ok := table.Columns[histID] - if !ok { - continue - } - if !mysql.HasPriKeyFlag(column.Info.GetFlag()) { - continue - } - hist = &column.Histogram - d := types.NewBytesDatum(row.GetBytes(5)) - var err error - lower, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, &column.Info.FieldType) - if err != nil { - logutil.BgLogger().Debug("decode bucket lower bound failed", zap.Error(err)) - delete(table.Columns, histID) - continue - } - d = types.NewBytesDatum(row.GetBytes(6)) - upper, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, &column.Info.FieldType) - if err != nil { - logutil.BgLogger().Debug("decode bucket upper bound failed", zap.Error(err)) - delete(table.Columns, histID) - continue - } - } - hist.AppendBucketWithNDV(&lower, &upper, row.GetInt64(3), row.GetInt64(4), row.GetInt64(7)) - } - if table != nil { - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } -} - -func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error { - if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err := h.initStatsBucketsConcurrency(cache) - if err != nil { - return errors.Trace(err) - } - } else { - sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsBuckets4Chunk(cache, iter) - } - } - tables := cache.Values() - for _, table := range tables { - for _, idx := range table.Indices { - for i := 1; i < idx.Len(); i++ { - idx.Buckets[i].Count += idx.Buckets[i-1].Count - } - idx.PreCalculateScalar() - } - for _, col := range table.Columns { - for i := 1; i < col.Len(); i++ { - col.Buckets[i].Count += col.Buckets[i-1].Count - } - col.PreCalculateScalar() - } - cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read. - } - return nil -} - -func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task initstats.Task) error { - se, err := h.Pool.SPool().Get() - if err != nil { - return err - } - defer func() { - if err == nil { // only recycle when no error - h.Pool.SPool().Put(se) - } - }() - sctx := se.(sessionctx.Context) - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? order by table_id, is_index, hist_id, bucket_id" - rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsBuckets4Chunk(cache, iter) - } - return nil -} - -func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error { - var maxTid = maxTidRecord.tid.Load() - tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { - return h.initStatsBucketsByPaging(cache, task) - }) - ls.LoadStats() - for tid <= maxTid { - ls.SendTask(initstats.Task{ - StartTid: tid, - EndTid: tid + initStatsStep, - }) - tid += initStatsStep - } - ls.Wait() - return nil -} - -// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats. -// 1. Basic stats meta data is loaded.(count, modify count, etc.) -// 2. Column/index stats are loaded. (only histogram) -// 3. TopN, Bucket, FMSketch are not loaded. -func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { - defer func() { - _, err1 := util.Exec(h.initStatsCtx, "commit") - if err == nil && err1 != nil { - err = err1 - } - }() - _, err = util.Exec(h.initStatsCtx, "begin") - if err != nil { - return err - } - cache, err := h.initStatsMeta(is) - if err != nil { - return errors.Trace(err) - } - err = h.initStatsHistogramsLite(is, cache) - if err != nil { - return errors.Trace(err) - } - h.Replace(cache) - return nil -} - -// InitStats initiates the stats cache. -// 1. Basic stats meta data is loaded.(count, modify count, etc.) -// 2. Column/index stats are loaded. (histogram, topn, buckets, FMSketch) -func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { - loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - defer func() { - _, err1 := util.Exec(h.initStatsCtx, "commit") - if err == nil && err1 != nil { - err = err1 - } - }() - _, err = util.Exec(h.initStatsCtx, "begin") - if err != nil { - return err - } - cache, err := h.initStatsMeta(is) - if err != nil { - return errors.Trace(err) - } - if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err = h.initStatsHistogramsConcurrency(is, cache) - } else { - err = h.initStatsHistograms(is, cache) - } - if err != nil { - return errors.Trace(err) - } - if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err = h.initStatsTopNConcurrency(cache) - } else { - err = h.initStatsTopN(cache) - } - if err != nil { - return err - } - if loadFMSketch { - err = h.initStatsFMSketch(cache) - if err != nil { - return err - } - } - err = h.initStatsBuckets(cache) - if err != nil { - return errors.Trace(err) - } - // Set columns' stats status. - for _, table := range cache.Values() { - for _, col := range table.Columns { - if col.StatsAvailable() { - // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn - col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() - } - } - } - h.Replace(cache) - return nil -} diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go deleted file mode 100644 index 3d9a0c1066635..0000000000000 --- a/pkg/statistics/handle/handletest/statstest/stats_test.go +++ /dev/null @@ -1,395 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statstest - -import ( - "fmt" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/statistics/handle/internal" - "github.com/pingcap/tidb/pkg/testkit" - "github.com/stretchr/testify/require" -) - -func TestStatsCache(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - testKit.MustExec("create table t (c1 int, c2 int)") - testKit.MustExec("insert into t values(1, 2)") - do := dom - is := do.InfoSchema() - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - tableInfo := tbl.Meta() - statsTbl := do.StatsHandle().GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo) - testKit.MustExec("analyze table t") - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl.Pseudo) - testKit.MustExec("create index idx_t on t(c1)") - do.InfoSchema() - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - // If index is build, but stats is not updated. statsTbl can also work. - require.False(t, statsTbl.Pseudo) - // But the added index will not work. - require.Nil(t, statsTbl.Indices[int64(1)]) - - testKit.MustExec("analyze table t") - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl.Pseudo) - // If the new schema drop a column, the table stats can still work. - testKit.MustExec("alter table t drop column c2") - is = do.InfoSchema() - do.StatsHandle().Clear() - err = do.StatsHandle().Update(is) - require.NoError(t, err) - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl.Pseudo) - - // If the new schema add a column, the table stats can still work. - testKit.MustExec("alter table t add column c10 int") - is = do.InfoSchema() - - do.StatsHandle().Clear() - err = do.StatsHandle().Update(is) - require.NoError(t, err) - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl.Pseudo) -} - -func TestStatsCacheMemTracker(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - testKit.MustExec("create table t (c1 int, c2 int,c3 int)") - testKit.MustExec("insert into t values(1, 2, 3)") - do := dom - is := do.InfoSchema() - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - tableInfo := tbl.Meta() - statsTbl := do.StatsHandle().GetTableStats(tableInfo) - require.True(t, statsTbl.MemoryUsage().TotalMemUsage == 0) - require.True(t, statsTbl.Pseudo) - - testKit.MustExec("analyze table t") - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - - require.False(t, statsTbl.Pseudo) - testKit.MustExec("create index idx_t on t(c1)") - do.InfoSchema() - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - - // If index is build, but stats is not updated. statsTbl can also work. - require.False(t, statsTbl.Pseudo) - // But the added index will not work. - require.Nil(t, statsTbl.Indices[int64(1)]) - - testKit.MustExec("analyze table t") - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - - require.False(t, statsTbl.Pseudo) - - // If the new schema drop a column, the table stats can still work. - testKit.MustExec("alter table t drop column c2") - is = do.InfoSchema() - do.StatsHandle().Clear() - err = do.StatsHandle().Update(is) - require.NoError(t, err) - - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.True(t, statsTbl.MemoryUsage().TotalMemUsage > 0) - require.False(t, statsTbl.Pseudo) - - // If the new schema add a column, the table stats can still work. - testKit.MustExec("alter table t add column c10 int") - is = do.InfoSchema() - - do.StatsHandle().Clear() - err = do.StatsHandle().Update(is) - require.NoError(t, err) - statsTbl = do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl.Pseudo) -} - -func TestStatsStoreAndLoad(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - testKit.MustExec("create table t (c1 int, c2 int)") - recordCount := 1000 - for i := 0; i < recordCount; i++ { - testKit.MustExec("insert into t values (?, ?)", i, i+1) - } - testKit.MustExec("create index idx_t on t(c2)") - do := dom - is := do.InfoSchema() - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - tableInfo := tbl.Meta() - - testKit.MustExec("analyze table t") - statsTbl1 := do.StatsHandle().GetTableStats(tableInfo) - - do.StatsHandle().Clear() - err = do.StatsHandle().Update(is) - require.NoError(t, err) - statsTbl2 := do.StatsHandle().GetTableStats(tableInfo) - require.False(t, statsTbl2.Pseudo) - require.Equal(t, int64(recordCount), statsTbl2.RealtimeCount) - internal.AssertTableEqual(t, statsTbl1, statsTbl2) -} - -func testInitStatsMemTrace(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t1 (a int, b int, c int, primary key(a), key idx(b))") - tk.MustExec("insert into t1 values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") - tk.MustExec("analyze table t1") - for i := 2; i < 10; i++ { - tk.MustExec(fmt.Sprintf("create table t%v (a int, b int, c int, primary key(a), key idx(b))", i)) - tk.MustExec(fmt.Sprintf("insert into t%v select * from t1", i)) - tk.MustExec(fmt.Sprintf("analyze table t%v", i)) - } - h := dom.StatsHandle() - is := dom.InfoSchema() - h.Clear() - require.Equal(t, h.MemConsumed(), int64(0)) - require.NoError(t, h.InitStats(is)) - - var memCostTot int64 - for i := 1; i < 10; i++ { - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(fmt.Sprintf("t%v", i))) - require.NoError(t, err) - tStats := h.GetTableStats(tbl.Meta()) - memCostTot += tStats.MemoryUsage().TotalMemUsage - } - tables := h.StatsCache.Values() - for _, tt := range tables { - tbl, ok := h.StatsCache.Get(tt.PhysicalID) - require.True(t, ok) - require.Equal(t, tbl.PhysicalID, tt.PhysicalID) - } - - require.Equal(t, h.MemConsumed(), memCostTot) -} - -func TestInitStatsMemTraceWithLite(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - conf.Performance.ConcurrentlyInitStats = false - }) - testInitStatsMemTraceFunc(t, true) -} - -func TestInitStatsMemTraceWithoutLite(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - conf.Performance.ConcurrentlyInitStats = false - }) - testInitStatsMemTraceFunc(t, false) -} - -func TestInitStatsMemTraceWithConcurrrencyLite(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - conf.Performance.ConcurrentlyInitStats = true - }) - testInitStatsMemTraceFunc(t, true) -} - -func TestInitStatsMemTraceWithoutConcurrrencyLite(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - conf.Performance.ConcurrentlyInitStats = true - }) - testInitStatsMemTraceFunc(t, false) -} - -func testInitStatsMemTraceFunc(t *testing.T, liteInitStats bool) { - originValue := config.GetGlobalConfig().Performance.LiteInitStats - defer func() { - config.GetGlobalConfig().Performance.LiteInitStats = originValue - }() - config.GetGlobalConfig().Performance.LiteInitStats = liteInitStats - testInitStatsMemTrace(t) -} - -func TestInitStats(t *testing.T) { - originValue := config.GetGlobalConfig().Performance.LiteInitStats - defer func() { - config.GetGlobalConfig().Performance.LiteInitStats = originValue - }() - config.GetGlobalConfig().Performance.LiteInitStats = false - store, dom := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - testKit.MustExec("set @@session.tidb_analyze_version = 1") - testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") - testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") - testKit.MustExec("analyze table t") - h := dom.StatsHandle() - is := dom.InfoSchema() - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when - // `Lease` is not 0, so here we just change it. - h.SetLease(time.Millisecond) - - h.Clear() - require.NoError(t, h.InitStats(is)) - table0 := h.GetTableStats(tbl.Meta()) - cols := table0.Columns - require.Equal(t, uint8(0x36), cols[1].LastAnalyzePos.GetBytes()[0]) - require.Equal(t, uint8(0x37), cols[2].LastAnalyzePos.GetBytes()[0]) - require.Equal(t, uint8(0x38), cols[3].LastAnalyzePos.GetBytes()[0]) - h.Clear() - require.NoError(t, h.Update(is)) - // Index and pk are loaded. - needed := fmt.Sprintf(`Table:%v RealtimeCount:6 -column:1 ndv:6 totColSize:0 -num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0 -num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0 -num: 1 lower_bound: 3 upper_bound: 3 repeats: 1 ndv: 0 -num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0 -num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0 -num: 1 lower_bound: 6 upper_bound: 6 repeats: 1 ndv: 0 -column:2 ndv:6 totColSize:6 -column:3 ndv:6 totColSize:6 -index:1 ndv:6 -num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0 -num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0 -num: 1 lower_bound: 3 upper_bound: 3 repeats: 1 ndv: 0 -num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0 -num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0 -num: 1 lower_bound: 7 upper_bound: 7 repeats: 1 ndv: 0`, tbl.Meta().ID) - require.Equal(t, needed, table0.String()) - h.SetLease(0) -} - -func TestInitStats51358(t *testing.T) { - originValue := config.GetGlobalConfig().Performance.LiteInitStats - defer func() { - config.GetGlobalConfig().Performance.LiteInitStats = originValue - }() - config.GetGlobalConfig().Performance.LiteInitStats = false - store, dom := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - testKit.MustExec("set @@session.tidb_analyze_version = 1") - testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") - testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") - testKit.MustExec("analyze table t") - h := dom.StatsHandle() - is := dom.InfoSchema() - // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when - // `Lease` is not 0, so here we just change it. - h.SetLease(time.Millisecond) - - h.Clear() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/cache/StatsCacheGetNil", "return()")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/cache/StatsCacheGetNil")) - }() - require.NoError(t, h.InitStats(is)) - tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - stats := h.GetTableStats(tbl.Meta()) - for _, column := range stats.Columns { - if mysql.HasPriKeyFlag(column.Info.GetFlag()) { - // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn - require.Nil(t, column.TopN) - } - require.False(t, column.IsFullLoad()) - } -} - -func TestInitStatsVer2(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - config.GetGlobalConfig().Performance.LiteInitStats = false - config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false - }) - initStatsVer2(t) -} - -func TestInitStatsVer2Concurrency(t *testing.T) { - restore := config.RestoreFunc() - defer restore() - config.UpdateGlobal(func(conf *config.Config) { - config.GetGlobalConfig().Performance.LiteInitStats = false - config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true - }) - initStatsVer2(t) -} - -func initStatsVer2(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set @@session.tidb_analyze_version=2") - tk.MustExec("create table t(a int, b int, c int, index idx(a), index idxab(a, b))") - tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (4, 4, 4), (4, 4, 4)") - tk.MustExec("analyze table t with 2 topn, 3 buckets") - h := dom.StatsHandle() - is := dom.InfoSchema() - tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) - require.NoError(t, err) - // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when - // `Lease` is not 0, so here we just change it. - h.SetLease(time.Millisecond) - - h.Clear() - require.NoError(t, h.InitStats(is)) - table0 := h.GetTableStats(tbl.Meta()) - cols := table0.Columns - require.Equal(t, uint8(0x33), cols[1].LastAnalyzePos.GetBytes()[0]) - require.Equal(t, uint8(0x33), cols[2].LastAnalyzePos.GetBytes()[0]) - require.Equal(t, uint8(0x33), cols[3].LastAnalyzePos.GetBytes()[0]) - h.Clear() - require.NoError(t, h.InitStats(is)) - table1 := h.GetTableStats(tbl.Meta()) - internal.AssertTableEqual(t, table0, table1) - h.SetLease(0) -} - -func TestInitStatsIssue41938(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set @@global.tidb_analyze_version=1") - tk.MustExec("set @@session.tidb_analyze_version=1") - tk.MustExec("create table t1 (a timestamp primary key)") - tk.MustExec("insert into t1 values ('2023-03-07 14:24:30'), ('2023-03-07 14:24:31'), ('2023-03-07 14:24:32'), ('2023-03-07 14:24:33')") - tk.MustExec("analyze table t1 with 0 topn") - h := dom.StatsHandle() - // `InitStats` is only called when `Lease` is not 0, so here we just change it. - h.SetLease(time.Millisecond) - h.Clear() - require.NoError(t, h.InitStats(dom.InfoSchema())) - h.SetLease(0) -} diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index a4c69a0ba3f0e..db23990808c41 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -530,11 +530,8 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { for _, table := range cache.Values() { for _, col := range table.Columns { if col.StatsAvailable() { - if mysql.HasPriKeyFlag(col.Info.GetFlag()) { - col.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - } else { - col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() - } + // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn + col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() } } } From 8dcfcf6a00d216e5ac29e01dd4ffad79157b3632 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 29 Aug 2024 11:31:16 +0800 Subject: [PATCH 3/4] update Signed-off-by: Weizhen Wang --- ddl/metadatalocktest/BUILD.bazel | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ddl/metadatalocktest/BUILD.bazel b/ddl/metadatalocktest/BUILD.bazel index 6f867a8570e54..c1287f1eb14db 100644 --- a/ddl/metadatalocktest/BUILD.bazel +++ b/ddl/metadatalocktest/BUILD.bazel @@ -10,21 +10,12 @@ go_test( flaky = True, shard_count = 34, deps = [ -<<<<<<< HEAD:ddl/metadatalocktest/BUILD.bazel "//config", "//ddl", "//errno", "//server", "//testkit", "//testkit/testsetup", -======= - "//pkg/config", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/statistics/handle/internal", - "//pkg/testkit", - "//pkg/testkit/testsetup", ->>>>>>> 687f39c12f8 (statistics: fix wrong behavior for primary key' non-lite init stats (#53298)):pkg/statistics/handle/handletest/statstest/BUILD.bazel "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", From 356715bd9d2de202084541b5beafb419d0cf9442 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 29 Aug 2024 11:34:20 +0800 Subject: [PATCH 4/4] update Signed-off-by: Weizhen Wang --- statistics/handle/handletest/BUILD.bazel | 1 + statistics/handle/handletest/handle_test.go | 34 +++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/statistics/handle/handletest/BUILD.bazel b/statistics/handle/handletest/BUILD.bazel index e8f00b0655a86..83b79c901e443 100644 --- a/statistics/handle/handletest/BUILD.bazel +++ b/statistics/handle/handletest/BUILD.bazel @@ -13,6 +13,7 @@ go_test( "//config", "//domain", "//parser/model", + "//parser/mysql", "//session", "//sessionctx/variable", "//statistics", diff --git a/statistics/handle/handletest/handle_test.go b/statistics/handle/handletest/handle_test.go index 0971f972c01a4..99eea6f4710ee 100644 --- a/statistics/handle/handletest/handle_test.go +++ b/statistics/handle/handletest/handle_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" @@ -457,6 +458,39 @@ func TestInitStats(t *testing.T) { h.SetLease(0) } +func TestInitStats51358(t *testing.T) { + originValue := config.GetGlobalConfig().Performance.LiteInitStats + defer func() { + config.GetGlobalConfig().Performance.LiteInitStats = originValue + }() + config.GetGlobalConfig().Performance.LiteInitStats = false + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("set @@session.tidb_analyze_version = 1") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,7,8)") + testKit.MustExec("analyze table t") + h := dom.StatsHandle() + is := dom.InfoSchema() + // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when + // `Lease` is not 0, so here we just change it. + h.SetLease(time.Millisecond) + + h.Clear() + require.NoError(t, h.InitStats(is)) + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + stats := h.GetTableStats(tbl.Meta()) + for _, column := range stats.Columns { + if mysql.HasPriKeyFlag(column.Info.GetFlag()) { + // primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn + require.Nil(t, column.TopN) + } + require.False(t, column.IsFullLoad()) + } +} + func TestInitStatsVer2(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store)