From b9ef08cc490e1b73569c8db1e3edb8e11447f5af Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 22 Apr 2024 19:21:09 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #52658 Signed-off-by: ti-chi-bot --- pkg/statistics/handle/syncload/BUILD.bazel | 46 ++ .../handle/syncload/stats_syncload.go | 548 ++++++++++++++++++ pkg/statistics/handle/types/interfaces.go | 494 ++++++++++++++++ statistics/handle/handle_hist_test.go | 90 +++ 4 files changed, 1178 insertions(+) create mode 100644 pkg/statistics/handle/syncload/BUILD.bazel create mode 100644 pkg/statistics/handle/syncload/stats_syncload.go create mode 100644 pkg/statistics/handle/types/interfaces.go diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel new file mode 100644 index 0000000000000..ed6e310786a2a --- /dev/null +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -0,0 +1,46 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "syncload", + srcs = ["stats_syncload.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload", + visibility = ["//visibility:public"], + deps = [ + "//pkg/config", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics", + "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/types", + "//pkg/types", + "//pkg/util", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "syncload_test", + timeout = "short", + srcs = ["stats_syncload_test.go"], + flaky = True, + race = "on", + shard_count = 5, + deps = [ + ":syncload", + "//pkg/config", + "//pkg/parser/model", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/types", + "//pkg/testkit", + "//pkg/util/mathutil", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go new file mode 100644 index 0000000000000..723a2d77b3084 --- /dev/null +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -0,0 +1,548 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncload + +import ( + "fmt" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + +type statsSyncLoad struct { + statsHandle statstypes.StatsHandle + StatsLoad statstypes.StatsLoad +} + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} + cfg := config.GetGlobalConfig() + s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + return s +} + +type statsWrapper struct { + colInfo *model.ColumnInfo + idxInfo *model.IndexInfo + col *statistics.Column + idx *statistics.Index +} + +// SendLoadRequests send neededColumns requests +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) + + failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { + if sc.OptimizeTracer != nil { + count := val.(int) + if len(remainedItems) != count { + panic("remained items count wrong") + } + } + }) + + if len(remainedItems) <= 0 { + return nil + } + sc.StatsLoad.Timeout = timeout + sc.StatsLoad.NeededItems = remainedItems + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + tasks := make([]*statstypes.NeededItemTask, 0) + for _, item := range remainedItems { + task := &statstypes.NeededItemTask{ + Item: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + tasks = append(tasks, task) + } + timer := time.NewTimer(timeout) + defer timer.Stop() + for _, task := range tasks { + select { + case s.StatsLoad.NeededItemsCh <- task: + continue + case <-timer.C: + return errors.New("sync load stats channel is full and timeout sending task to channel") + } + } + sc.StatsLoad.LoadStartTime = time.Now() + return nil +} + +// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { + if len(sc.StatsLoad.NeededItems) <= 0 { + return nil + } + var errorMsgs []string + defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } + sc.StatsLoad.NeededItems = nil + }() + resultCheckMap := map[model.TableItemID]struct{}{} + for _, col := range sc.StatsLoad.NeededItems { + resultCheckMap[col.TableItemID] = struct{}{} + } + metrics.SyncLoadCounter.Inc() + timer := time.NewTimer(sc.StatsLoad.Timeout) + defer timer.Stop() + for { + select { + case result, ok := <-sc.StatsLoad.ResultCh: + if !ok { + return errors.New("sync load stats channel closed unexpectedly") + } + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + case <-timer.C: + metrics.SyncLoadTimeoutCounter.Inc() + return errors.New("sync load stats timeout") + } + } +} + +// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.StatsLoadItem) []model.StatsLoadItem { + remainedItems := make([]model.StatsLoadItem, 0, len(neededItems)) + for _, item := range neededItems { + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + continue + } + if item.IsIndex { + _, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + continue + } + _, loadNeeded, _ := tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + } + return remainedItems +} + +// AppendNeededItem appends needed columns/indices to ch, it is only used for test +func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case s.StatsLoad.NeededItemsCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +var errExit = errors.New("Stop loading since domain is closed") + +// SubLoadWorker loads hist data for each column +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { + defer func() { + exitWg.Done() + logutil.BgLogger().Info("SubLoadWorker exited.") + }() + // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry + var lastTask *statstypes.NeededItemTask + for { + task, err := s.HandleOneTask(sctx, lastTask, exit) + lastTask = task + if err != nil { + switch err { + case errExit: + return + default: + // To avoid the thundering herd effect + // thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs. + r := rand.Intn(500) + time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond) + continue + } + } + } +} + +// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + if lastTask == nil { + task, err = s.drainColTask(sctx, exit) + if err != nil { + if err != errExit { + logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) + } + return task, err + } + } else { + task = lastTask + } + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { + err := s.handleOneItemTask(sctx, task) + return nil, err + }) + timeout := time.Until(task.ToTimeout) + select { + case sr := <-resultChan: + // sr.Val is always nil. + if sr.Err == nil { + task.ResultCh <- result + return nil, nil + } + if !isVaildForRetry(task) { + result.Error = sr.Err + task.ResultCh <- result + return nil, nil + } + return task, sr.Err + case <-time.After(timeout): + if !isVaildForRetry(task) { + result.Error = errors.New("stats loading timeout") + task.ResultCh <- result + return nil, nil + } + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + return task, nil + } +} + +func isVaildForRetry(task *statstypes.NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + item := task.Item.TableItemID + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return nil + } + wrapper := &statsWrapper{} + if item.IsIndex { + index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if !loadNeeded { + return nil + } + if index != nil { + wrapper.idxInfo = index.Info + } else { + wrapper.idxInfo = tbl.ColAndIdxExistenceMap.GetIndex(item.ID) + } + } else { + col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) + if !loadNeeded { + return nil + } + if col != nil { + wrapper.colInfo = col.Info + } else { + wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) + } + // If this column is not analyzed yet and we don't have it in memory. + // We create a fake one for the pseudo estimation. + if loadNeeded && !analyzed { + wrapper.col = &statistics.Column{ + PhysicalID: item.TableID, + Info: wrapper.colInfo, + Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0), + IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()), + } + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + return nil + } + } + t := time.Now() + needUpdate := false + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) + if err != nil { + return err + } + if item.IsIndex { + if wrapper.idxInfo != nil { + needUpdate = true + } + } else { + if wrapper.colInfo != nil { + needUpdate = true + } + } + metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) + if needUpdate { + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + } + return nil +} + +// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper, isPkIsHandle bool, fullLoad bool) (*statsWrapper, error) { + failpoint.Inject("mockReadStatsForOnePanic", nil) + failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) + } + }) + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + var hg *statistics.Histogram + var err error + isIndexFlag := int64(0) + hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorage(sctx, &item, w.colInfo) + if err != nil { + return nil, err + } + if hg == nil { + logutil.BgLogger().Error("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID), + zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) + return nil, errors.Trace(fmt.Errorf("fail to get hist meta for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) + } + if item.IsIndex { + isIndexFlag = 1 + } + var cms *statistics.CMSketch + var topN *statistics.TopN + var fms *statistics.FMSketch + if fullLoad { + if item.IsIndex { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } else { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } + cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + if loadFMSketch { + fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if item.IsIndex { + idxHist := &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + Info: w.idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: item.TableID, + } + if statsVer != statistics.Version0 { + if fullLoad { + idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + idxHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + w.idx = idxHist + } else { + colHist := &statistics.Column{ + PhysicalID: item.TableID, + Histogram: *hg, + Info: w.colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: isPkIsHandle && mysql.HasPriKeyFlag(w.colInfo.GetFlag()), + StatsVer: statsVer, + } + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + w.col = colHist + } + return w, nil +} + +// drainColTask will hang until a column task can return, and either task or error will be returned. +func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { + // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh + for { + select { + case <-exit: + return nil, errExit + case task, ok := <-s.StatsLoad.NeededItemsCh: + if !ok { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // if the task has already timeout, no sql is sync-waiting for it, + // so do not handle it just now, put it to another channel with lower priority + if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + continue + } + return task, nil + case task, ok := <-s.StatsLoad.TimeoutItemsCh: + select { + case <-exit: + return nil, errExit + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: + if !ok0 { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + return task0, nil + default: + if !ok { + return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + } + // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + return task, nil + } + } + } +} + +// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask) { + select { + case taskCh <- task: + default: + } +} + +// writeToChanWithTimeout writes a task to a channel and blocks until timeout. +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case taskCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) + } + }() + select { + case resultCh <- rs: + default: + } +} + +// updateCachedItem updates the column/index hist to global statsCache. +func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return false + } + if !item.IsIndex && colHist != nil { + c, ok := tbl.Columns[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (c.IsFullLoad() || !fullLoaded) { + return false + } + tbl = tbl.Copy() + tbl.Columns[item.ID] = colHist + // If the column is analyzed we refresh the map for the possible change. + if colHist.StatsAvailable() { + tbl.ColAndIdxExistenceMap.InsertCol(item.ID, colHist.Info, true) + } + // All the objects shares the same stats version. Update it here. + if colHist.StatsVer != statistics.Version0 { + tbl.StatsVer = statistics.Version0 + } + } else if item.IsIndex && idxHist != nil { + index, ok := tbl.Indices[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (index.IsFullLoad() || !fullLoaded) { + return true + } + tbl = tbl.Copy() + tbl.Indices[item.ID] = idxHist + // If the index is analyzed we refresh the map for the possible change. + if idxHist.IsAnalyzed() { + tbl.ColAndIdxExistenceMap.InsertIndex(item.ID, idxHist.Info, true) + // All the objects shares the same stats version. Update it here. + tbl.StatsVer = statistics.Version0 + } + } + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) + return true +} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go new file mode 100644 index 0000000000000..5c1b41d7fbd65 --- /dev/null +++ b/pkg/statistics/handle/types/interfaces.go @@ -0,0 +1,494 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "golang.org/x/sync/singleflight" +) + +// StatsGC is used to GC unnecessary stats. +type StatsGC interface { + // GCStats will garbage collect the useless stats' info. + // For dropped tables, we will first update their version + // so that other tidb could know that table is deleted. + GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) + + // ClearOutdatedHistoryStats clear outdated historical stats. + // Only for test. + ClearOutdatedHistoryStats() error + + // DeleteTableStatsFromKV deletes table statistics from kv. + // A statsID refers to statistic of a table or a partition. + DeleteTableStatsFromKV(statsIDs []int64) (err error) +} + +// ColStatsTimeInfo records usage information of this column stats. +type ColStatsTimeInfo struct { + LastUsedAt *types.Time // last time the column is used + LastAnalyzedAt *types.Time // last time the column is analyzed +} + +// StatsUsage is used to track the usage of column / index statistics. +type StatsUsage interface { + // Below methods are for predicated columns. + + // LoadColumnStatsUsage returns all columns' usage information. + LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID]ColStatsTimeInfo, error) + + // GetPredicateColumns returns IDs of predicate columns, which are the columns whose stats are used(needed) when generating query plans. + GetPredicateColumns(tableID int64) ([]int64, error) + + // CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats. + CollectColumnsInExtendedStats(tableID int64) ([]int64, error) + + IndexUsage + + // TODO: extract these function to a new interface only for delta/stats usage, like `IndexUsage`. + // Blow methods are for table delta and stats usage. + + // NewSessionStatsItem allocates a stats collector for a session. + // TODO: use interface{} to avoid cycle import, remove this interface{}. + NewSessionStatsItem() any + + // ResetSessionStatsList resets the sessions stats list. + ResetSessionStatsList() + + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. + DumpStatsDeltaToKV(dumpAll bool) error + + // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV. + DumpColStatsUsageToKV() error +} + +// IndexUsage is an interface to define the function of collecting index usage stats. +type IndexUsage interface { + // NewSessionIndexUsageCollector creates a new Collector for a session. + NewSessionIndexUsageCollector() *indexusage.SessionIndexUsageCollector + + // GCIndexUsage removes unnecessary index usage data. + GCIndexUsage() error + + // StartWorker starts for the collector worker. + StartWorker() + + // Close closes and waits for the index usage collector worker. + Close() + + // GetIndexUsage returns the index usage information + GetIndexUsage(tableID int64, indexID int64) indexusage.Sample +} + +// StatsHistory is used to manage historical stats. +type StatsHistory interface { + // RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history. + RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) + + // CheckHistoricalStatsEnable check whether historical stats is enabled. + CheckHistoricalStatsEnable() (enable bool, err error) + + // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history + RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) +} + +// StatsAnalyze is used to handle auto-analyze and manage analyze jobs. +type StatsAnalyze interface { + // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. + InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error + + // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. + DeleteAnalyzeJobs(updateTime time.Time) error + + // CleanupCorruptedAnalyzeJobsOnCurrentInstance cleans up the corrupted analyze job. + // A corrupted analyze job is one that is in a 'pending' or 'running' state, + // but is associated with a TiDB instance that is either not currently running or has been restarted. + // We use current running analyze jobs to check whether the analyze job is corrupted. + CleanupCorruptedAnalyzeJobsOnCurrentInstance(currentRunningProcessIDs map[uint64]struct{}) error + + // CleanupCorruptedAnalyzeJobsOnDeadInstances purges analyze jobs that are associated with non-existent instances. + // This function is specifically designed to handle jobs that have become corrupted due to + // their associated instances being removed from the current cluster. + CleanupCorruptedAnalyzeJobsOnDeadInstances() error + + // HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold) + // It also analyzes newly created tables and newly added indexes. + HandleAutoAnalyze() (analyzed bool) + + // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. + CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool +} + +// StatsCache is used to manage all table statistics in memory. +type StatsCache interface { + // Close closes this cache. + Close() + + // Clear clears this cache. + Clear() + + // Update reads stats meta from store and updates the stats map. + Update(is infoschema.InfoSchema) error + + // MemConsumed returns its memory usage. + MemConsumed() (size int64) + + // Get returns the specified table's stats. + Get(tableID int64) (*statistics.Table, bool) + + // Put puts this table stats into the cache. + Put(tableID int64, t *statistics.Table) + + // UpdateStatsCache updates the cache. + UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64) + + // MaxTableStatsVersion returns the version of the current cache, which is defined as + // the max table stats version the cache has in its lifecycle. + MaxTableStatsVersion() uint64 + + // Values returns all values in this cache. + Values() []*statistics.Table + + // Len returns the length of this cache. + Len() int + + // SetStatsCacheCapacity sets the cache's capacity. + SetStatsCacheCapacity(capBytes int64) + + // Replace replaces this cache. + Replace(cache StatsCache) + + // UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. + UpdateStatsHealthyMetrics() +} + +// StatsLockTable is the table info of which will be locked. +type StatsLockTable struct { + PartitionInfo map[int64]string + // schema name + table name. + FullName string +} + +// StatsLock is used to manage locked stats. +type StatsLock interface { + // LockTables add locked tables id to store. + // - tables: tables that will be locked. + // Return the message of skipped tables and error. + LockTables(tables map[int64]*StatsLockTable) (skipped string, err error) + + // LockPartitions add locked partitions id to store. + // If the whole table is locked, then skip all partitions of the table. + // - tid: table id of which will be locked. + // - tableName: table name of which will be locked. + // - pidNames: partition ids of which will be locked. + // Return the message of skipped tables and error. + // Note: If the whole table is locked, then skip all partitions of the table. + LockPartitions( + tid int64, + tableName string, + pidNames map[int64]string, + ) (skipped string, err error) + + // RemoveLockedTables remove tables from table locked records. + // - tables: tables of which will be unlocked. + // Return the message of skipped tables and error. + RemoveLockedTables(tables map[int64]*StatsLockTable) (skipped string, err error) + + // RemoveLockedPartitions remove partitions from table locked records. + // - tid: table id of which will be unlocked. + // - tableName: table name of which will be unlocked. + // - pidNames: partition ids of which will be unlocked. + // Note: If the whole table is locked, then skip all partitions of the table. + RemoveLockedPartitions( + tid int64, + tableName string, + pidNames map[int64]string, + ) (skipped string, err error) + + // GetLockedTables returns the locked status of the given tables. + // Note: This function query locked tables from store, so please try to batch the query. + GetLockedTables(tableIDs ...int64) (map[int64]struct{}, error) + + // GetTableLockedAndClearForTest for unit test only. + GetTableLockedAndClearForTest() (map[int64]struct{}, error) +} + +// PartitionStatisticLoadTask currently records a partition-level jsontable. +type PartitionStatisticLoadTask struct { + JSONTable *statsutil.JSONTable + PhysicalID int64 +} + +// PersistFunc is used to persist JSONTable in the partition level. +type PersistFunc func(ctx context.Context, jsonTable *statsutil.JSONTable, physicalID int64) error + +// StatsReadWriter is used to read and write stats to the storage. +// TODO: merge and remove some methods. +type StatsReadWriter interface { + // TableStatsFromStorage loads table stats info from storage. + TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) + + // LoadTablePartitionStats loads partition stats info from storage. + LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) + + // StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. + StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) + + // LoadNeededHistograms will load histograms for those needed columns/indices and put them into the cache. + LoadNeededHistograms() (err error) + + // ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. + ReloadExtendedStatistics() error + + // SaveStatsToStorage save the stats data to the storage. + SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) + + // SaveTableStatsToStorage saves the stats of a table to storage. + SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) + + // SaveMetaToStorage saves the stats meta of a table to storage. + SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) + + // InsertColStats2KV inserts columns stats to kv. + InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) + + // InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the + // new columns and indices which belong to this table. + InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) + + // UpdateStatsVersion will set statistics version to the newest TS, + // then tidb-server will reload automatic. + UpdateStatsVersion() error + + // UpdateStatsMetaVersionForGC updates the version of mysql.stats_meta, + // ensuring it is greater than the last garbage collection (GC) time. + // The GC worker deletes old stats based on a safe time point, + // calculated as now() - 10 * max(stats lease, ddl lease). + // The range [last GC time, safe time point) is chosen to prevent + // the simultaneous deletion of numerous stats, minimizing potential + // performance issues. + // This function ensures the version is updated beyond the last GC time, + // allowing the GC worker to delete outdated stats. + // + // Explanation: + // - ddl lease: 10 + // - stats lease: 3 + // - safe time point: now() - 10 * 10 = now() - 100 + // - now: 200 + // - last GC time: 90 + // - [last GC time, safe time point) = [90, 100) + // - To trigger stats deletion, the version must be updated beyond 90. + // + // This safeguards scenarios where a table remains unchanged for an extended period. + // For instance, if a table was created at time 90, and it's now time 200, + // with the last GC time at 95 and the safe time point at 100, + // updating the version beyond 95 ensures eventual deletion of stats. + UpdateStatsMetaVersionForGC(physicalID int64) (err error) + + // ChangeGlobalStatsID changes the global stats ID. + ChangeGlobalStatsID(from, to int64) (err error) + + // TableStatsToJSON dumps table stats to JSON. + TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*statsutil.JSONTable, error) + + // DumpStatsToJSON dumps statistic to json. + DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, + historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*statsutil.JSONTable, error) + + // DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. + // As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back + // to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. + DumpHistoricalStatsBySnapshot( + dbName string, + tableInfo *model.TableInfo, + snapshot uint64, + ) ( + jt *statsutil.JSONTable, + fallbackTbls []string, + err error, + ) + + // DumpStatsToJSONBySnapshot dumps statistic to json. + DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*statsutil.JSONTable, error) + + // PersistStatsBySnapshot dumps statistic to json and call the function for each partition statistic to persist. + // Notice: + // 1. It might call the function `persist` with nil jsontable. + // 2. It is only used by BR, so partitions' statistic are always dumped. + PersistStatsBySnapshot(ctx context.Context, dbName string, tableInfo *model.TableInfo, snapshot uint64, persist PersistFunc) error + + // LoadStatsFromJSONConcurrently consumes concurrently the statistic task from `taskCh`. + LoadStatsFromJSONConcurrently(ctx context.Context, tableInfo *model.TableInfo, taskCh chan *PartitionStatisticLoadTask, concurrencyForPartition int) error + + // LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. + // In final, it will also udpate the stats cache. + LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error + + // LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. + LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error + + // Methods for extended stast. + + // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. + InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) + + // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. + MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) + + // SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. + SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) +} + +// NeededItemTask represents one needed column/indices with expire time. +type NeededItemTask struct { + ToTimeout time.Time + ResultCh chan stmtctx.StatsLoadResult + Item model.StatsLoadItem + Retry int +} + +// StatsLoad is used to load stats concurrently +// TODO(hawkingrei): Our implementation of loading statistics is flawed. +// Currently, we enqueue tasks that require loading statistics into a channel, +// from which workers retrieve tasks to process. Then, using the singleflight mechanism, +// we filter out duplicate tasks. However, the issue with this approach is that it does +// not filter out all duplicate tasks, but only the duplicates within the number of workers. +// Such an implementation is not reasonable. +// +// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats. +// +// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐ +// │ │ +// │ singleflight │ +// │ │ +// └───────────────────────────────────────────────────────────────────────────────────────┘ +// +// │ │ +// ┌────────────▼──────┐ ┌───────▼───────────┐ +// │ │ │ │ +// │ syncload worker │ │ syncload worker │ +// │ │ │ │ +// └───────────────────┘ └───────────────────┘ +type StatsLoad struct { + NeededItemsCh chan *NeededItemTask + TimeoutItemsCh chan *NeededItemTask + Singleflight singleflight.Group + sync.Mutex +} + +// StatsSyncLoad implement the sync-load feature. +type StatsSyncLoad interface { + // SendLoadRequests sends load requests to the channel. + SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error + + // SyncWaitStatsLoad will wait for the load requests to finish. + SyncWaitStatsLoad(sc *stmtctx.StatementContext) error + + // AppendNeededItem appends a needed item to the channel. + AppendNeededItem(task *NeededItemTask, timeout time.Duration) error + + // SubLoadWorker will start a goroutine to handle the load requests. + SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) + + // HandleOneTask will handle one task. + HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) +} + +// StatsGlobal is used to manage partition table global stats. +type StatsGlobal interface { + // MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID. + MergePartitionStats2GlobalStatsByTableID(sctx sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + physicalID int64, + isIndex bool, + histIDs []int64, + ) (globalStats any, err error) +} + +// DDL is used to handle ddl events. +type DDL interface { + // HandleDDLEvent handles ddl events. + HandleDDLEvent(event *statsutil.DDLEvent) error + // DDLEventCh returns ddl events channel in handle. + DDLEventCh() chan *statsutil.DDLEvent +} + +// StatsHandle is used to manage TiDB Statistics. +type StatsHandle interface { + // Pool is used to get a session or a goroutine to execute stats updating. + statsutil.Pool + + // AutoAnalyzeProcIDGenerator is used to generate auto analyze proc ID. + statsutil.AutoAnalyzeProcIDGenerator + + // LeaseGetter is used to get stats lease. + statsutil.LeaseGetter + + // TableInfoGetter is used to get table meta info. + statsutil.TableInfoGetter + + // GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. + GetTableStats(tblInfo *model.TableInfo) *statistics.Table + + // GetTableStatsForAutoAnalyze retrieves the statistics table from cache, but it will not return pseudo. + GetTableStatsForAutoAnalyze(tblInfo *model.TableInfo) *statistics.Table + + // GetPartitionStats retrieves the partition stats from cache. + GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table + + // GetPartitionStatsForAutoAnalyze retrieves the partition stats from cache, but it will not return pseudo. + GetPartitionStatsForAutoAnalyze(tblInfo *model.TableInfo, pid int64) *statistics.Table + + // StatsGC is used to do the GC job. + StatsGC + + // StatsUsage is used to handle table delta and stats usage. + StatsUsage + + // StatsHistory is used to manage historical stats. + StatsHistory + + // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. + StatsAnalyze + + // StatsCache is used to manage all table statistics in memory. + StatsCache + + // StatsLock is used to manage locked stats. + StatsLock + + // StatsReadWriter is used to read and write stats to the storage. + StatsReadWriter + + // StatsGlobal is used to manage partition table global stats. + StatsGlobal + + // DDL is used to handle ddl events. + DDL +} diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index c4f30e6ef0de7..aa622557b0c59 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -209,6 +209,19 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Error(t, err1) require.NotNil(t, task1) + select { + case <-stmtCtx1.StatsLoad.ResultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + case <-stmtCtx2.StatsLoad.ResultCh: + t.Logf("stmtCtx2.ResultCh should not get anything") + t.FailNow() + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get anything") + t.FailNow() + default: + } + require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err3) @@ -231,3 +244,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + h := dom.StatsHandle() + + neededColumns := make([]model.StatsLoadItem, 1) + neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *types.NeededItemTask + err1 error + ) + + for i := 0; i < syncload.RetryCount; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + } + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.NoError(t, err1) + require.Nil(t, result) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) +} From 736f9ccf9e0ea3d4911b13c87c662d6dda4b09be Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 5 Jun 2024 23:04:44 +0800 Subject: [PATCH 2/4] * Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/BUILD.bazel | 46 -- .../handle/syncload/stats_syncload.go | 548 ------------------ pkg/statistics/handle/types/interfaces.go | 494 ---------------- 3 files changed, 1088 deletions(-) delete mode 100644 pkg/statistics/handle/syncload/BUILD.bazel delete mode 100644 pkg/statistics/handle/syncload/stats_syncload.go delete mode 100644 pkg/statistics/handle/types/interfaces.go diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel deleted file mode 100644 index ed6e310786a2a..0000000000000 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ /dev/null @@ -1,46 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "syncload", - srcs = ["stats_syncload.go"], - importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload", - visibility = ["//visibility:public"], - deps = [ - "//pkg/config", - "//pkg/metrics", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/sessionctx", - "//pkg/sessionctx/stmtctx", - "//pkg/statistics", - "//pkg/statistics/handle/storage", - "//pkg/statistics/handle/types", - "//pkg/types", - "//pkg/util", - "//pkg/util/logutil", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "syncload_test", - timeout = "short", - srcs = ["stats_syncload_test.go"], - flaky = True, - race = "on", - shard_count = 5, - deps = [ - ":syncload", - "//pkg/config", - "//pkg/parser/model", - "//pkg/sessionctx", - "//pkg/sessionctx/stmtctx", - "//pkg/statistics/handle/types", - "//pkg/testkit", - "//pkg/util/mathutil", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_stretchr_testify//require", - ], -) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go deleted file mode 100644 index 723a2d77b3084..0000000000000 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ /dev/null @@ -1,548 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncload - -import ( - "fmt" - "math/rand" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/storage" - statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" -) - -// RetryCount is the max retry count for a sync load task. -const RetryCount = 3 - -type statsSyncLoad struct { - statsHandle statstypes.StatsHandle - StatsLoad statstypes.StatsLoad -} - -// NewStatsSyncLoad creates a new StatsSyncLoad. -func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { - s := &statsSyncLoad{statsHandle: statsHandle} - cfg := config.GetGlobalConfig() - s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - return s -} - -type statsWrapper struct { - colInfo *model.ColumnInfo - idxInfo *model.IndexInfo - col *statistics.Column - idx *statistics.Index -} - -// SendLoadRequests send neededColumns requests -func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error { - remainedItems := s.removeHistLoadedColumns(neededHistItems) - - failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { - if sc.OptimizeTracer != nil { - count := val.(int) - if len(remainedItems) != count { - panic("remained items count wrong") - } - } - }) - - if len(remainedItems) <= 0 { - return nil - } - sc.StatsLoad.Timeout = timeout - sc.StatsLoad.NeededItems = remainedItems - sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) - tasks := make([]*statstypes.NeededItemTask, 0) - for _, item := range remainedItems { - task := &statstypes.NeededItemTask{ - Item: item, - ToTimeout: time.Now().Local().Add(timeout), - ResultCh: sc.StatsLoad.ResultCh, - } - tasks = append(tasks, task) - } - timer := time.NewTimer(timeout) - defer timer.Stop() - for _, task := range tasks { - select { - case s.StatsLoad.NeededItemsCh <- task: - continue - case <-timer.C: - return errors.New("sync load stats channel is full and timeout sending task to channel") - } - } - sc.StatsLoad.LoadStartTime = time.Now() - return nil -} - -// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout -func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { - if len(sc.StatsLoad.NeededItems) <= 0 { - return nil - } - var errorMsgs []string - defer func() { - if len(errorMsgs) > 0 { - logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", - zap.Strings("errors", errorMsgs)) - } - sc.StatsLoad.NeededItems = nil - }() - resultCheckMap := map[model.TableItemID]struct{}{} - for _, col := range sc.StatsLoad.NeededItems { - resultCheckMap[col.TableItemID] = struct{}{} - } - metrics.SyncLoadCounter.Inc() - timer := time.NewTimer(sc.StatsLoad.Timeout) - defer timer.Stop() - for { - select { - case result, ok := <-sc.StatsLoad.ResultCh: - if !ok { - return errors.New("sync load stats channel closed unexpectedly") - } - if result.HasError() { - errorMsgs = append(errorMsgs, result.ErrorMsg()) - } - delete(resultCheckMap, result.Item) - if len(resultCheckMap) == 0 { - metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) - return nil - } - case <-timer.C: - metrics.SyncLoadTimeoutCounter.Inc() - return errors.New("sync load stats timeout") - } - } -} - -// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. -func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.StatsLoadItem) []model.StatsLoadItem { - remainedItems := make([]model.StatsLoadItem, 0, len(neededItems)) - for _, item := range neededItems { - tbl, ok := s.statsHandle.Get(item.TableID) - if !ok { - continue - } - if item.IsIndex { - _, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) - if loadNeeded { - remainedItems = append(remainedItems, item) - } - continue - } - _, loadNeeded, _ := tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) - if loadNeeded { - remainedItems = append(remainedItems, item) - } - } - return remainedItems -} - -// AppendNeededItem appends needed columns/indices to ch, it is only used for test -func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case s.StatsLoad.NeededItemsCh <- task: - case <-timer.C: - return errors.New("Channel is full and timeout writing to channel") - } - return nil -} - -var errExit = errors.New("Stop loading since domain is closed") - -// SubLoadWorker loads hist data for each column -func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { - defer func() { - exitWg.Done() - logutil.BgLogger().Info("SubLoadWorker exited.") - }() - // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry - var lastTask *statstypes.NeededItemTask - for { - task, err := s.HandleOneTask(sctx, lastTask, exit) - lastTask = task - if err != nil { - switch err { - case errExit: - return - default: - // To avoid the thundering herd effect - // thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs. - r := rand.Intn(500) - time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond) - continue - } - } - } -} - -// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. -// - If the task is handled successfully, return nil, nil. -// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. -// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. -func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { - defer func() { - // recover for each task, worker keeps working - if r := recover(); r != nil { - logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) - err = errors.Errorf("stats loading panicked: %v", r) - } - }() - if lastTask == nil { - task, err = s.drainColTask(sctx, exit) - if err != nil { - if err != errExit { - logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) - } - return task, err - } - } else { - task = lastTask - } - result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} - resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { - err := s.handleOneItemTask(sctx, task) - return nil, err - }) - timeout := time.Until(task.ToTimeout) - select { - case sr := <-resultChan: - // sr.Val is always nil. - if sr.Err == nil { - task.ResultCh <- result - return nil, nil - } - if !isVaildForRetry(task) { - result.Error = sr.Err - task.ResultCh <- result - return nil, nil - } - return task, sr.Err - case <-time.After(timeout): - if !isVaildForRetry(task) { - result.Error = errors.New("stats loading timeout") - task.ResultCh <- result - return nil, nil - } - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) - return task, nil - } -} - -func isVaildForRetry(task *statstypes.NeededItemTask) bool { - task.Retry++ - return task.Retry <= RetryCount -} - -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { - defer func() { - // recover for each task, worker keeps working - if r := recover(); r != nil { - logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) - err = errors.Errorf("stats loading panicked: %v", r) - } - }() - item := task.Item.TableItemID - tbl, ok := s.statsHandle.Get(item.TableID) - if !ok { - return nil - } - wrapper := &statsWrapper{} - if item.IsIndex { - index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) - if !loadNeeded { - return nil - } - if index != nil { - wrapper.idxInfo = index.Info - } else { - wrapper.idxInfo = tbl.ColAndIdxExistenceMap.GetIndex(item.ID) - } - } else { - col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) - if !loadNeeded { - return nil - } - if col != nil { - wrapper.colInfo = col.Info - } else { - wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) - } - // If this column is not analyzed yet and we don't have it in memory. - // We create a fake one for the pseudo estimation. - if loadNeeded && !analyzed { - wrapper.col = &statistics.Column{ - PhysicalID: item.TableID, - Info: wrapper.colInfo, - Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0), - IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()), - } - s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) - return nil - } - } - t := time.Now() - needUpdate := false - wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) - if err != nil { - return err - } - if item.IsIndex { - if wrapper.idxInfo != nil { - needUpdate = true - } - } else { - if wrapper.colInfo != nil { - needUpdate = true - } - } - metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate { - s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) - } - return nil -} - -// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously -func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper, isPkIsHandle bool, fullLoad bool) (*statsWrapper, error) { - failpoint.Inject("mockReadStatsForOnePanic", nil) - failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) - } - }) - loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - var hg *statistics.Histogram - var err error - isIndexFlag := int64(0) - hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorage(sctx, &item, w.colInfo) - if err != nil { - return nil, err - } - if hg == nil { - logutil.BgLogger().Error("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID), - zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) - return nil, errors.Trace(fmt.Errorf("fail to get hist meta for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) - } - if item.IsIndex { - isIndexFlag = 1 - } - var cms *statistics.CMSketch - var topN *statistics.TopN - var fms *statistics.FMSketch - if fullLoad { - if item.IsIndex { - hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) - if err != nil { - return nil, errors.Trace(err) - } - } else { - hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) - if err != nil { - return nil, errors.Trace(err) - } - } - cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) - if err != nil { - return nil, errors.Trace(err) - } - if loadFMSketch { - fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) - if err != nil { - return nil, errors.Trace(err) - } - } - } - if item.IsIndex { - idxHist := &statistics.Index{ - Histogram: *hg, - CMSketch: cms, - TopN: topN, - FMSketch: fms, - Info: w.idxInfo, - StatsVer: statsVer, - Flag: flag, - PhysicalID: item.TableID, - } - if statsVer != statistics.Version0 { - if fullLoad { - idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - } else { - idxHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() - } - } - lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) - w.idx = idxHist - } else { - colHist := &statistics.Column{ - PhysicalID: item.TableID, - Histogram: *hg, - Info: w.colInfo, - CMSketch: cms, - TopN: topN, - FMSketch: fms, - IsHandle: isPkIsHandle && mysql.HasPriKeyFlag(w.colInfo.GetFlag()), - StatsVer: statsVer, - } - if colHist.StatsAvailable() { - if fullLoad { - colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - } else { - colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() - } - } - w.col = colHist - } - return w, nil -} - -// drainColTask will hang until a column task can return, and either task or error will be returned. -func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { - // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh - for { - select { - case <-exit: - return nil, errExit - case task, ok := <-s.StatsLoad.NeededItemsCh: - if !ok { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") - } - // if the task has already timeout, no sql is sync-waiting for it, - // so do not handle it just now, put it to another channel with lower priority - if time.Now().After(task.ToTimeout) { - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) - s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) - continue - } - return task, nil - case task, ok := <-s.StatsLoad.TimeoutItemsCh: - select { - case <-exit: - return nil, errExit - case task0, ok0 := <-s.StatsLoad.NeededItemsCh: - if !ok0 { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") - } - // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh - s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) - return task0, nil - default: - if !ok { - return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") - } - // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh - return task, nil - } - } - } -} - -// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. -func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask) { - select { - case taskCh <- task: - default: - } -} - -// writeToChanWithTimeout writes a task to a channel and blocks until timeout. -func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case taskCh <- task: - case <-timer.C: - return errors.New("Channel is full and timeout writing to channel") - } - return nil -} - -// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { - defer func() { - if r := recover(); r != nil { - logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) - } - }() - select { - case resultCh <- rs: - default: - } -} - -// updateCachedItem updates the column/index hist to global statsCache. -func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) { - s.StatsLoad.Lock() - defer s.StatsLoad.Unlock() - // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions - // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. - tbl, ok := s.statsHandle.Get(item.TableID) - if !ok { - return false - } - if !item.IsIndex && colHist != nil { - c, ok := tbl.Columns[item.ID] - // - If the stats is fully loaded, - // - If the stats is meta-loaded and we also just need the meta. - if ok && (c.IsFullLoad() || !fullLoaded) { - return false - } - tbl = tbl.Copy() - tbl.Columns[item.ID] = colHist - // If the column is analyzed we refresh the map for the possible change. - if colHist.StatsAvailable() { - tbl.ColAndIdxExistenceMap.InsertCol(item.ID, colHist.Info, true) - } - // All the objects shares the same stats version. Update it here. - if colHist.StatsVer != statistics.Version0 { - tbl.StatsVer = statistics.Version0 - } - } else if item.IsIndex && idxHist != nil { - index, ok := tbl.Indices[item.ID] - // - If the stats is fully loaded, - // - If the stats is meta-loaded and we also just need the meta. - if ok && (index.IsFullLoad() || !fullLoaded) { - return true - } - tbl = tbl.Copy() - tbl.Indices[item.ID] = idxHist - // If the index is analyzed we refresh the map for the possible change. - if idxHist.IsAnalyzed() { - tbl.ColAndIdxExistenceMap.InsertIndex(item.ID, idxHist.Info, true) - // All the objects shares the same stats version. Update it here. - tbl.StatsVer = statistics.Version0 - } - } - s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) - return true -} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go deleted file mode 100644 index 5c1b41d7fbd65..0000000000000 --- a/pkg/statistics/handle/types/interfaces.go +++ /dev/null @@ -1,494 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" - statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/sqlexec" - "golang.org/x/sync/singleflight" -) - -// StatsGC is used to GC unnecessary stats. -type StatsGC interface { - // GCStats will garbage collect the useless stats' info. - // For dropped tables, we will first update their version - // so that other tidb could know that table is deleted. - GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) - - // ClearOutdatedHistoryStats clear outdated historical stats. - // Only for test. - ClearOutdatedHistoryStats() error - - // DeleteTableStatsFromKV deletes table statistics from kv. - // A statsID refers to statistic of a table or a partition. - DeleteTableStatsFromKV(statsIDs []int64) (err error) -} - -// ColStatsTimeInfo records usage information of this column stats. -type ColStatsTimeInfo struct { - LastUsedAt *types.Time // last time the column is used - LastAnalyzedAt *types.Time // last time the column is analyzed -} - -// StatsUsage is used to track the usage of column / index statistics. -type StatsUsage interface { - // Below methods are for predicated columns. - - // LoadColumnStatsUsage returns all columns' usage information. - LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID]ColStatsTimeInfo, error) - - // GetPredicateColumns returns IDs of predicate columns, which are the columns whose stats are used(needed) when generating query plans. - GetPredicateColumns(tableID int64) ([]int64, error) - - // CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats. - CollectColumnsInExtendedStats(tableID int64) ([]int64, error) - - IndexUsage - - // TODO: extract these function to a new interface only for delta/stats usage, like `IndexUsage`. - // Blow methods are for table delta and stats usage. - - // NewSessionStatsItem allocates a stats collector for a session. - // TODO: use interface{} to avoid cycle import, remove this interface{}. - NewSessionStatsItem() any - - // ResetSessionStatsList resets the sessions stats list. - ResetSessionStatsList() - - // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. - DumpStatsDeltaToKV(dumpAll bool) error - - // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV. - DumpColStatsUsageToKV() error -} - -// IndexUsage is an interface to define the function of collecting index usage stats. -type IndexUsage interface { - // NewSessionIndexUsageCollector creates a new Collector for a session. - NewSessionIndexUsageCollector() *indexusage.SessionIndexUsageCollector - - // GCIndexUsage removes unnecessary index usage data. - GCIndexUsage() error - - // StartWorker starts for the collector worker. - StartWorker() - - // Close closes and waits for the index usage collector worker. - Close() - - // GetIndexUsage returns the index usage information - GetIndexUsage(tableID int64, indexID int64) indexusage.Sample -} - -// StatsHistory is used to manage historical stats. -type StatsHistory interface { - // RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history. - RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) - - // CheckHistoricalStatsEnable check whether historical stats is enabled. - CheckHistoricalStatsEnable() (enable bool, err error) - - // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history - RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) -} - -// StatsAnalyze is used to handle auto-analyze and manage analyze jobs. -type StatsAnalyze interface { - // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. - InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error - - // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. - DeleteAnalyzeJobs(updateTime time.Time) error - - // CleanupCorruptedAnalyzeJobsOnCurrentInstance cleans up the corrupted analyze job. - // A corrupted analyze job is one that is in a 'pending' or 'running' state, - // but is associated with a TiDB instance that is either not currently running or has been restarted. - // We use current running analyze jobs to check whether the analyze job is corrupted. - CleanupCorruptedAnalyzeJobsOnCurrentInstance(currentRunningProcessIDs map[uint64]struct{}) error - - // CleanupCorruptedAnalyzeJobsOnDeadInstances purges analyze jobs that are associated with non-existent instances. - // This function is specifically designed to handle jobs that have become corrupted due to - // their associated instances being removed from the current cluster. - CleanupCorruptedAnalyzeJobsOnDeadInstances() error - - // HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold) - // It also analyzes newly created tables and newly added indexes. - HandleAutoAnalyze() (analyzed bool) - - // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. - CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool -} - -// StatsCache is used to manage all table statistics in memory. -type StatsCache interface { - // Close closes this cache. - Close() - - // Clear clears this cache. - Clear() - - // Update reads stats meta from store and updates the stats map. - Update(is infoschema.InfoSchema) error - - // MemConsumed returns its memory usage. - MemConsumed() (size int64) - - // Get returns the specified table's stats. - Get(tableID int64) (*statistics.Table, bool) - - // Put puts this table stats into the cache. - Put(tableID int64, t *statistics.Table) - - // UpdateStatsCache updates the cache. - UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64) - - // MaxTableStatsVersion returns the version of the current cache, which is defined as - // the max table stats version the cache has in its lifecycle. - MaxTableStatsVersion() uint64 - - // Values returns all values in this cache. - Values() []*statistics.Table - - // Len returns the length of this cache. - Len() int - - // SetStatsCacheCapacity sets the cache's capacity. - SetStatsCacheCapacity(capBytes int64) - - // Replace replaces this cache. - Replace(cache StatsCache) - - // UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. - UpdateStatsHealthyMetrics() -} - -// StatsLockTable is the table info of which will be locked. -type StatsLockTable struct { - PartitionInfo map[int64]string - // schema name + table name. - FullName string -} - -// StatsLock is used to manage locked stats. -type StatsLock interface { - // LockTables add locked tables id to store. - // - tables: tables that will be locked. - // Return the message of skipped tables and error. - LockTables(tables map[int64]*StatsLockTable) (skipped string, err error) - - // LockPartitions add locked partitions id to store. - // If the whole table is locked, then skip all partitions of the table. - // - tid: table id of which will be locked. - // - tableName: table name of which will be locked. - // - pidNames: partition ids of which will be locked. - // Return the message of skipped tables and error. - // Note: If the whole table is locked, then skip all partitions of the table. - LockPartitions( - tid int64, - tableName string, - pidNames map[int64]string, - ) (skipped string, err error) - - // RemoveLockedTables remove tables from table locked records. - // - tables: tables of which will be unlocked. - // Return the message of skipped tables and error. - RemoveLockedTables(tables map[int64]*StatsLockTable) (skipped string, err error) - - // RemoveLockedPartitions remove partitions from table locked records. - // - tid: table id of which will be unlocked. - // - tableName: table name of which will be unlocked. - // - pidNames: partition ids of which will be unlocked. - // Note: If the whole table is locked, then skip all partitions of the table. - RemoveLockedPartitions( - tid int64, - tableName string, - pidNames map[int64]string, - ) (skipped string, err error) - - // GetLockedTables returns the locked status of the given tables. - // Note: This function query locked tables from store, so please try to batch the query. - GetLockedTables(tableIDs ...int64) (map[int64]struct{}, error) - - // GetTableLockedAndClearForTest for unit test only. - GetTableLockedAndClearForTest() (map[int64]struct{}, error) -} - -// PartitionStatisticLoadTask currently records a partition-level jsontable. -type PartitionStatisticLoadTask struct { - JSONTable *statsutil.JSONTable - PhysicalID int64 -} - -// PersistFunc is used to persist JSONTable in the partition level. -type PersistFunc func(ctx context.Context, jsonTable *statsutil.JSONTable, physicalID int64) error - -// StatsReadWriter is used to read and write stats to the storage. -// TODO: merge and remove some methods. -type StatsReadWriter interface { - // TableStatsFromStorage loads table stats info from storage. - TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) - - // LoadTablePartitionStats loads partition stats info from storage. - LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) - - // StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. - StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) - - // LoadNeededHistograms will load histograms for those needed columns/indices and put them into the cache. - LoadNeededHistograms() (err error) - - // ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. - ReloadExtendedStatistics() error - - // SaveStatsToStorage save the stats data to the storage. - SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, - cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) - - // SaveTableStatsToStorage saves the stats of a table to storage. - SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) - - // SaveMetaToStorage saves the stats meta of a table to storage. - SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) - - // InsertColStats2KV inserts columns stats to kv. - InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) - - // InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the - // new columns and indices which belong to this table. - InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) - - // UpdateStatsVersion will set statistics version to the newest TS, - // then tidb-server will reload automatic. - UpdateStatsVersion() error - - // UpdateStatsMetaVersionForGC updates the version of mysql.stats_meta, - // ensuring it is greater than the last garbage collection (GC) time. - // The GC worker deletes old stats based on a safe time point, - // calculated as now() - 10 * max(stats lease, ddl lease). - // The range [last GC time, safe time point) is chosen to prevent - // the simultaneous deletion of numerous stats, minimizing potential - // performance issues. - // This function ensures the version is updated beyond the last GC time, - // allowing the GC worker to delete outdated stats. - // - // Explanation: - // - ddl lease: 10 - // - stats lease: 3 - // - safe time point: now() - 10 * 10 = now() - 100 - // - now: 200 - // - last GC time: 90 - // - [last GC time, safe time point) = [90, 100) - // - To trigger stats deletion, the version must be updated beyond 90. - // - // This safeguards scenarios where a table remains unchanged for an extended period. - // For instance, if a table was created at time 90, and it's now time 200, - // with the last GC time at 95 and the safe time point at 100, - // updating the version beyond 95 ensures eventual deletion of stats. - UpdateStatsMetaVersionForGC(physicalID int64) (err error) - - // ChangeGlobalStatsID changes the global stats ID. - ChangeGlobalStatsID(from, to int64) (err error) - - // TableStatsToJSON dumps table stats to JSON. - TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*statsutil.JSONTable, error) - - // DumpStatsToJSON dumps statistic to json. - DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, - historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*statsutil.JSONTable, error) - - // DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. - // As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back - // to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. - DumpHistoricalStatsBySnapshot( - dbName string, - tableInfo *model.TableInfo, - snapshot uint64, - ) ( - jt *statsutil.JSONTable, - fallbackTbls []string, - err error, - ) - - // DumpStatsToJSONBySnapshot dumps statistic to json. - DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*statsutil.JSONTable, error) - - // PersistStatsBySnapshot dumps statistic to json and call the function for each partition statistic to persist. - // Notice: - // 1. It might call the function `persist` with nil jsontable. - // 2. It is only used by BR, so partitions' statistic are always dumped. - PersistStatsBySnapshot(ctx context.Context, dbName string, tableInfo *model.TableInfo, snapshot uint64, persist PersistFunc) error - - // LoadStatsFromJSONConcurrently consumes concurrently the statistic task from `taskCh`. - LoadStatsFromJSONConcurrently(ctx context.Context, tableInfo *model.TableInfo, taskCh chan *PartitionStatisticLoadTask, concurrencyForPartition int) error - - // LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. - // In final, it will also udpate the stats cache. - LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error - - // LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. - LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error - - // Methods for extended stast. - - // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. - InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) - - // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. - MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) - - // SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. - SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) -} - -// NeededItemTask represents one needed column/indices with expire time. -type NeededItemTask struct { - ToTimeout time.Time - ResultCh chan stmtctx.StatsLoadResult - Item model.StatsLoadItem - Retry int -} - -// StatsLoad is used to load stats concurrently -// TODO(hawkingrei): Our implementation of loading statistics is flawed. -// Currently, we enqueue tasks that require loading statistics into a channel, -// from which workers retrieve tasks to process. Then, using the singleflight mechanism, -// we filter out duplicate tasks. However, the issue with this approach is that it does -// not filter out all duplicate tasks, but only the duplicates within the number of workers. -// Such an implementation is not reasonable. -// -// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats. -// -// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐ -// │ │ -// │ singleflight │ -// │ │ -// └───────────────────────────────────────────────────────────────────────────────────────┘ -// -// │ │ -// ┌────────────▼──────┐ ┌───────▼───────────┐ -// │ │ │ │ -// │ syncload worker │ │ syncload worker │ -// │ │ │ │ -// └───────────────────┘ └───────────────────┘ -type StatsLoad struct { - NeededItemsCh chan *NeededItemTask - TimeoutItemsCh chan *NeededItemTask - Singleflight singleflight.Group - sync.Mutex -} - -// StatsSyncLoad implement the sync-load feature. -type StatsSyncLoad interface { - // SendLoadRequests sends load requests to the channel. - SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error - - // SyncWaitStatsLoad will wait for the load requests to finish. - SyncWaitStatsLoad(sc *stmtctx.StatementContext) error - - // AppendNeededItem appends a needed item to the channel. - AppendNeededItem(task *NeededItemTask, timeout time.Duration) error - - // SubLoadWorker will start a goroutine to handle the load requests. - SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) - - // HandleOneTask will handle one task. - HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) -} - -// StatsGlobal is used to manage partition table global stats. -type StatsGlobal interface { - // MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID. - MergePartitionStats2GlobalStatsByTableID(sctx sessionctx.Context, - opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, - physicalID int64, - isIndex bool, - histIDs []int64, - ) (globalStats any, err error) -} - -// DDL is used to handle ddl events. -type DDL interface { - // HandleDDLEvent handles ddl events. - HandleDDLEvent(event *statsutil.DDLEvent) error - // DDLEventCh returns ddl events channel in handle. - DDLEventCh() chan *statsutil.DDLEvent -} - -// StatsHandle is used to manage TiDB Statistics. -type StatsHandle interface { - // Pool is used to get a session or a goroutine to execute stats updating. - statsutil.Pool - - // AutoAnalyzeProcIDGenerator is used to generate auto analyze proc ID. - statsutil.AutoAnalyzeProcIDGenerator - - // LeaseGetter is used to get stats lease. - statsutil.LeaseGetter - - // TableInfoGetter is used to get table meta info. - statsutil.TableInfoGetter - - // GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. - GetTableStats(tblInfo *model.TableInfo) *statistics.Table - - // GetTableStatsForAutoAnalyze retrieves the statistics table from cache, but it will not return pseudo. - GetTableStatsForAutoAnalyze(tblInfo *model.TableInfo) *statistics.Table - - // GetPartitionStats retrieves the partition stats from cache. - GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table - - // GetPartitionStatsForAutoAnalyze retrieves the partition stats from cache, but it will not return pseudo. - GetPartitionStatsForAutoAnalyze(tblInfo *model.TableInfo, pid int64) *statistics.Table - - // StatsGC is used to do the GC job. - StatsGC - - // StatsUsage is used to handle table delta and stats usage. - StatsUsage - - // StatsHistory is used to manage historical stats. - StatsHistory - - // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. - StatsAnalyze - - // StatsCache is used to manage all table statistics in memory. - StatsCache - - // StatsLock is used to manage locked stats. - StatsLock - - // StatsReadWriter is used to read and write stats to the storage. - StatsReadWriter - - // StatsGlobal is used to manage partition table global stats. - StatsGlobal - - // DDL is used to handle ddl events. - DDL -} From a323762eb8b0a6a727d64f2069c21b15138e4ff7 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 5 Jun 2024 23:15:04 +0800 Subject: [PATCH 3/4] * Signed-off-by: Weizhen Wang --- statistics/handle/handle_hist.go | 58 ++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index 0c56812f0647c..038e59dce12a0 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -37,6 +37,9 @@ import ( "golang.org/x/sync/singleflight" ) +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + type statsWrapper struct { col *statistics.Column idx *statistics.Index @@ -56,6 +59,7 @@ type NeededItemTask struct { TableItemID model.TableItemID ToTimeout time.Time ResultCh chan stmtctx.StatsLoadResult + Retry int } // SendLoadRequests send neededColumns requests @@ -217,6 +221,9 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working @@ -236,28 +243,37 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC } else { task = lastTask } + result := stmtctx.StatsLoadResult{Item: task.TableItemID} resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) { - return h.handleOneItemTask(task, readerCtx, ctx) + err := h.handleOneItemTask(task, readerCtx, ctx) + return nil, err }) timeout := time.Until(task.ToTimeout) select { - case result := <-resultChan: - if result.Err == nil { - slr := result.Val.(*stmtctx.StatsLoadResult) - if slr.Error != nil { - return task, slr.Error - } - task.ResultCh <- *slr + case sr := <-resultChan: + // sr.Val is always nil. + if sr.Err == nil { + task.ResultCh <- result return nil, nil } - return task, result.Err + if !isVaildForRetry(task) { + result.Error = sr.Err + task.ResultCh <- result + return nil, nil + } + return task, sr.Err case <-time.After(timeout): task.ToTimeout.Add(time.Duration(h.mu.ctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) return task, nil } } -func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) { +func isVaildForRetry(task *NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -265,24 +281,23 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC err = errors.Errorf("stats loading panicked: %v", r) } }() - result = &stmtctx.StatsLoadResult{Item: task.TableItemID} - item := result.Item + item := task.TableItemID oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - return result, nil + return nil } wrapper := &statsWrapper{} if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - return result, nil + return nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - return result, nil + return nil } wrapper.col = col } @@ -292,8 +307,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC needUpdate := false wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader) if err != nil { - result.Error = err - return result, err + return err } if item.IsIndex { if wrapper.idx != nil { @@ -305,10 +319,10 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - return result, nil + if needUpdate { + h.updateCachedItem(item, wrapper.col, wrapper.idx) } - return nil, nil + return nil } func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) { @@ -493,12 +507,12 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - return true + return false } if !item.IsIndex && colHist != nil { c, ok := tbl.Columns[item.ID] if !ok || c.IsFullLoad() { - return true + return false } tbl = tbl.Copy() tbl.Columns[c.ID] = colHist From df1aeae78bcd6e271dea7c8e64c3f897146582d8 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 5 Jun 2024 23:21:58 +0800 Subject: [PATCH 4/4] * Signed-off-by: Weizhen Wang --- statistics/handle/handle_hist_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index aa622557b0c59..d39f110852ff4 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -274,8 +274,8 @@ func TestRetry(t *testing.T) { h := dom.StatsHandle() - neededColumns := make([]model.StatsLoadItem, 1) - neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + neededColumns := make([]model.TableItemID, 1) + neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false} timeout := time.Nanosecond * mathutil.MaxInt // clear statsCache @@ -287,20 +287,20 @@ func TestRetry(t *testing.T) { c, ok := stat.Columns[tableInfo.Columns[2].ID] require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) - stmtCtx1 := stmtctx.NewStmtCtx() + stmtCtx1 := &stmtctx.StatementContext{} h.SendLoadRequests(stmtCtx1, neededColumns, timeout) - stmtCtx2 := stmtctx.NewStmtCtx() + stmtCtx2 := &stmtctx.StatementContext{} h.SendLoadRequests(stmtCtx2, neededColumns, timeout) exitCh := make(chan struct{}) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail", "return(true)")) var ( - task1 *types.NeededItemTask + task1 *handle.NeededItemTask err1 error ) - - for i := 0; i < syncload.RetryCount; i++ { - task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + readerCtx := &handle.StatsReaderContext{} + for i := 0; i < handle.RetryCount; i++ { + task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.Error(t, err1) require.NotNil(t, task1) select { @@ -310,7 +310,7 @@ func TestRetry(t *testing.T) { default: } } - result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err1) require.Nil(t, result) select { @@ -319,5 +319,5 @@ func TestRetry(t *testing.T) { t.Logf("task1.ResultCh should get nothing") t.FailNow() } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail")) }