From 70f3348ebf0d512ad42d86a2d13dd93f7884fd7e Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 25 Oct 2022 15:57:55 +0800 Subject: [PATCH] executor: support save partition stats in concurrency (#38239) ref pingcap/tidb#35142 --- config/config.go | 37 ++++----- domain/domain.go | 45 +++++++++++ executor/analyze.go | 124 ++++++++++++++++++++++++------- executor/analyze_global_stats.go | 2 +- executor/analyze_test.go | 3 + executor/analyze_worker.go | 70 +++++++++++++++++ session/session.go | 9 ++- sessionctx/variable/session.go | 2 + sessionctx/variable/sysvar.go | 5 ++ sessionctx/variable/tidb_vars.go | 5 +- statistics/handle/handle.go | 55 +++++++++----- 11 files changed, 292 insertions(+), 65 deletions(-) create mode 100644 executor/analyze_worker.go diff --git a/config/config.go b/config/config.go index 7170c587922de..b598fcafa3808 100644 --- a/config/config.go +++ b/config/config.go @@ -643,14 +643,16 @@ type Performance struct { ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"` MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` // Deprecated - MemProfileInterval string `toml:"-" json:"-"` - IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` - PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` - GOGC int `toml:"gogc" json:"gogc"` - EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` - StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` - StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` - EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` + MemProfileInterval string `toml:"-" json:"-"` + + IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` + PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` + GOGC int `toml:"gogc" json:"gogc"` + EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` + StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` + StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` + AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` + EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` // The following items are deprecated. We need to keep them here temporarily // to support the upgrade process. They can be removed in future. @@ -905,15 +907,16 @@ var defaultConf = Config{ CommitterConcurrency: defTiKVCfg.CommitterConcurrency, MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour // TODO: set indexUsageSyncLease to 60s. - IndexUsageSyncLease: "0s", - GOGC: 100, - EnforceMPP: false, - PlanReplayerGCLease: "10m", - StatsLoadConcurrency: 5, - StatsLoadQueueSize: 1000, - EnableStatsCacheMemQuota: false, - RunAutoAnalyze: true, - EnableLoadFMSketch: false, + IndexUsageSyncLease: "0s", + GOGC: 100, + EnforceMPP: false, + PlanReplayerGCLease: "10m", + StatsLoadConcurrency: 5, + StatsLoadQueueSize: 1000, + AnalyzePartitionConcurrencyQuota: 16, + EnableStatsCacheMemQuota: false, + RunAutoAnalyze: true, + EnableLoadFMSketch: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/domain/domain.go b/domain/domain.go index 1988f5fa838e3..60b8d316a1949 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -125,6 +125,11 @@ type Domain struct { sysProcesses SysProcesses mdlCheckTableInfo *mdlCheckTableInfo + + analyzeMu struct { + sync.Mutex + sctxs map[sessionctx.Context]bool + } } type mdlCheckTableInfo struct { @@ -1575,6 +1580,46 @@ func (do *Domain) SetStatsUpdating(val bool) { } } +// ReleaseAnalyzeExec returned extra exec for Analyze +func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) { + do.analyzeMu.Lock() + defer do.analyzeMu.Unlock() + for _, ctx := range sctxs { + do.analyzeMu.sctxs[ctx] = false + } +} + +// FetchAnalyzeExec get needed exec for analyze +func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context { + if need < 1 { + return nil + } + count := 0 + r := make([]sessionctx.Context, 0) + do.analyzeMu.Lock() + defer do.analyzeMu.Unlock() + for sctx, used := range do.analyzeMu.sctxs { + if used { + continue + } + r = append(r, sctx) + do.analyzeMu.sctxs[sctx] = true + count++ + if count >= need { + break + } + } + return r +} + +// SetupAnalyzeExec setups exec for Analyze Executor +func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) { + do.analyzeMu.sctxs = make(map[sessionctx.Context]bool) + for _, ctx := range ctxs { + do.analyzeMu.sctxs[ctx] = false + } +} + // LoadAndUpdateStatsLoop loads and updates stats info. func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error { if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil { diff --git a/executor/analyze.go b/executor/analyze.go index 6fccec8a9bf5b..66334ad05d647 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -188,8 +188,8 @@ func (e *AnalyzeExec) saveV2AnalyzeOpts() error { return nil } -func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { - statsHandle := domain.GetDomain(e.ctx).StatsHandle() +func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error { + statsHandle := domain.GetDomain(sctx).StatsHandle() historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable() if err != nil { return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) @@ -198,7 +198,7 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { return nil } - is := domain.GetDomain(e.ctx).InfoSchema() + is := domain.GetDomain(sctx).InfoSchema() tbl, existed := is.TableByID(tableID) if !existed { return errors.Errorf("cannot get table by id %d", tableID) @@ -217,6 +217,23 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { // handleResultsError will handle the error fetch from resultsCh and record it in log func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool, globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { + partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency + // If 'partitionStatsConcurrency' > 1, we will try to demand extra session from Domain to save Analyze results in concurrency. + // If there is no extra session we can use, we will save analyze results in single-thread. + if partitionStatsConcurrency > 1 { + dom := domain.GetDomain(e.ctx) + subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency) + if len(subSctxs) > 0 { + defer func() { + dom.ReleaseAnalyzeExec(subSctxs) + }() + internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) + err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) + return err + } + } + + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error @@ -235,36 +252,16 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) continue } - if results.TableID.IsPartitionTable() && needGlobalStats { - for _, result := range results.Ars { - if result.IsIndex == 0 { - // If it does not belong to the statistics of index, we need to set it to -1 to distinguish. - globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)} - histIDs := make([]int64, 0, len(result.Hist)) - for _, hg := range result.Hist { - // It's normal virtual column, skip. - if hg == nil { - continue - } - histIDs = append(histIDs, hg.ID) - } - globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer} - } else { - for _, hg := range result.Hist { - globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID} - globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer} - } - } - } - } - if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { + handleGlobalStats(needGlobalStats, globalStatsMap, results) + + if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { err = err1 logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) // Dump stats to historical storage. - if err := e.recordHistoricalStats(results.TableID.TableID); err != nil { + if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) } } @@ -273,6 +270,54 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return err } +func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool, + subSctxs []sessionctx.Context, + globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { + partitionStatsConcurrency := len(subSctxs) + + var wg util.WaitGroupWrapper + saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency) + errCh := make(chan error, partitionStatsConcurrency) + for i := 0; i < partitionStatsConcurrency; i++ { + worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh) + ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + wg.Run(func() { + worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + }) + } + panicCnt := 0 + var err error + for panicCnt < statsConcurrency { + results, ok := <-resultsCh + if !ok { + break + } + if results.Err != nil { + err = results.Err + if isAnalyzeWorkerPanic(err) { + panicCnt++ + } else { + logutil.Logger(ctx).Error("analyze failed", zap.Error(err)) + } + finishJobWithLog(e.ctx, results.Job, err) + continue + } + handleGlobalStats(needGlobalStats, globalStatsMap, results) + saveResultsCh <- results + } + close(saveResultsCh) + wg.Wait() + close(errCh) + if len(errCh) > 0 { + errMsg := make([]string, 0) + for err1 := range errCh { + errMsg = append(errMsg, err1.Error()) + } + err = errors.New(strings.Join(errMsg, ",")) + } + return err +} + func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask defer func() { @@ -434,3 +479,28 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy zap.String("cost", job.EndTime.Sub(job.StartTime).String())) } } + +func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, results *statistics.AnalyzeResults) { + if results.TableID.IsPartitionTable() && needGlobalStats { + for _, result := range results.Ars { + if result.IsIndex == 0 { + // If it does not belong to the statistics of index, we need to set it to -1 to distinguish. + globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)} + histIDs := make([]int64, 0, len(result.Hist)) + for _, hg := range result.Hist { + // It's normal virtual column, skip. + if hg == nil { + continue + } + histIDs = append(histIDs, hg.ID) + } + globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer} + } else { + for _, hg := range result.Hist { + globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID} + globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer} + } + } + } + } +} diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index c9ff6217a195c..440f0a104e207 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -83,7 +83,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err)) } // Dump stats to historical storage. - if err := e.recordHistoricalStats(globalStatsID.tableID); err != nil { + if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil { logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) } } diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 246016a4082f7..bbe3f5b8d1b1e 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -371,6 +371,9 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) { fmt.Println("testcase ", concurrency) tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'") } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go new file mode 100644 index 0000000000000..0297b142dbb14 --- /dev/null +++ b/executor/analyze_worker.go @@ -0,0 +1,70 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type analyzeSaveStatsWorker struct { + resultsCh <-chan *statistics.AnalyzeResults + sctx sessionctx.Context + errCh chan<- error +} + +func newAnalyzeSaveStatsWorker( + resultsCh <-chan *statistics.AnalyzeResults, + sctx sessionctx.Context, + errCh chan<- error) *analyzeSaveStatsWorker { + worker := &analyzeSaveStatsWorker{ + resultsCh: resultsCh, + sctx: sctx, + errCh: errCh, + } + return worker +} + +func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) + worker.errCh <- getAnalyzePanicErr(r) + } + }() + for results := range worker.resultsCh { + err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) + if err != nil { + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(worker.sctx, results.Job, err) + worker.errCh <- err + } else { + finishJobWithLog(worker.sctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) + if err != nil { + return + } + } +} diff --git a/session/session.go b/session/session.go index 5f3c18546293f..324f39bdbf2aa 100644 --- a/session/session.go +++ b/session/session.go @@ -2849,8 +2849,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { runInBootstrapSession(store, upgrade) } + analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 7+concurrency) + ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota) if err != nil { return nil, err } @@ -2933,7 +2934,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil { return nil, err } - + subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota) + for i := 0; i < analyzeConcurrencyQuota; i++ { + subCtxs2[i] = ses[7+concurrency+i] + } + dom.SetupAnalyzeExec(subCtxs2) dom.DumpFileGcCheckerLoop() dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 892a512ac9530..f7f875e50fb93 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1271,6 +1271,8 @@ type SessionVars struct { // LastPlanReplayerToken indicates the last plan replayer token LastPlanReplayerToken string + // AnalyzePartitionConcurrency indicates concurrency for partitions in Analyze + AnalyzePartitionConcurrency int // AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats AnalyzePartitionMergeConcurrency int diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index ad1c41848cc84..6e156f3aed68a 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1942,6 +1942,11 @@ var defaultSysVars = []*SysVar{ s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzePartitionConcurrency, Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10), + MinValue: 1, MaxValue: uint64(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota), SetSession: func(s *SessionVars, val string) error { + s.AnalyzePartitionConcurrency = int(TidbOptInt64(val, DefTiDBAnalyzePartitionConcurrency)) + return nil + }}, { Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 13df336ca3511..76c205d68647a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -751,7 +751,9 @@ const ( // limit for ranges. TiDBOptRangeMaxSize = "tidb_opt_range_max_size" - // TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats + // TiDBAnalyzePartitionConcurrency indicates concurrency for save/read partitions stats in Analyze + TiDBAnalyzePartitionConcurrency = "tidb_analyze_partition_concurrency" + // TiDBMergePartitionStatsConcurrency indicates the concurrency when merge partition stats into global stats TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency" // TiDBOptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index. @@ -1060,6 +1062,7 @@ const ( DefTiDBRcWriteCheckTs = false DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false + DefTiDBAnalyzePartitionConcurrency = 1 DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 6c08b5245fcaf..5e0d74e816379 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -1344,7 +1344,7 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm } // SaveTableStatsToStorage saves the stats of a table to storage. -func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS, analyzeSnapshot bool) (err error) { +func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) { tableID := results.TableID.GetStatisticsID() statsVer := uint64(0) defer func() { @@ -1354,8 +1354,21 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee }() h.mu.Lock() defer h.mu.Unlock() + return SaveTableStatsToStorage(h.mu.ctx, results, analyzeSnapshot) +} + +// SaveTableStatsToStorage saves the stats of a table to storage. +func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) { + needDumpFMS := results.TableID.IsPartitionTable() + tableID := results.TableID.GetStatisticsID() + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = recordHistoricalStatsMeta(sctx, tableID, statsVer) + } + }() ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - exec := h.mu.ctx.(sqlexec.SQLExecutor) + exec := sctx.(sqlexec.SQLExecutor) _, err = exec.ExecuteInternal(ctx, "begin pessimistic") if err != nil { return err @@ -1363,7 +1376,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee defer func() { err = finishTransaction(ctx, exec, err) }() - txn, err := h.mu.ctx.Txn(true) + txn, err := sctx.Txn(true) if err != nil { return err } @@ -1376,7 +1389,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee return err } var rows []chunk.Row - rows, err = sqlexec.DrainRecordSet(ctx, rs, h.mu.ctx.GetSessionVars().MaxChunkSize) + rows, err = sqlexec.DrainRecordSet(ctx, rs, sctx.GetSessionVars().MaxChunkSize) if err != nil { return err } @@ -1473,7 +1486,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } - sc := h.mu.ctx.GetSessionVars().StmtCtx + sc := sctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg) if err != nil { @@ -2338,33 +2351,36 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. return version, nil } -// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled. -func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) { - h.mu.Lock() - defer h.mu.Unlock() - val, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) +func checkHistoricalStatsEnable(sctx sessionctx.Context) (enable bool, err error) { + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) if err != nil { return false, errors.Trace(err) } return variable.TiDBOptOn(val), nil } -func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error { +// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled. +func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) { + h.mu.Lock() + defer h.mu.Unlock() + return checkHistoricalStatsEnable(h.mu.ctx) +} + +func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64) error { if tableID == 0 || version == 0 { return errors.Errorf("tableID %d, version %d are invalid", tableID, version) } - historicalStatsEnabled, err := h.CheckHistoricalStatsEnable() + historicalStatsEnabled, err := checkHistoricalStatsEnable(sctx) if err != nil { return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) } if !historicalStatsEnabled { return nil } - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - h.mu.Lock() - defer h.mu.Unlock() - rows, _, err := h.execRestrictedSQL(ctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + exec := sctx.(sqlexec.SQLExecutor) + rexec := sctx.(sqlexec.RestrictedSQLExecutor) + rows, _, err := rexec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) if err != nil { return errors.Trace(err) } @@ -2373,7 +2389,6 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error } modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) - exec := h.mu.ctx.(sqlexec.SQLExecutor) _, err = exec.ExecuteInternal(ctx, "begin pessimistic") if err != nil { return errors.Trace(err) @@ -2389,6 +2404,12 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error return nil } +func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error { + h.mu.Lock() + defer h.mu.Unlock() + return recordHistoricalStatsMeta(h.mu.ctx, tableID, version) +} + // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error { h.mu.Lock()