From 686f8bc383b8828e8d4b8257acb7ccd7ac2ef2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 7 Aug 2024 17:30:40 +0800 Subject: [PATCH] statistics: move history-related functions into the stats handle (#55163) ref pingcap/tidb#55043 --- pkg/executor/analyze.go | 157 +----------------- pkg/executor/analyze_col.go | 3 +- pkg/executor/analyze_col_v2.go | 11 +- pkg/executor/analyze_global_stats.go | 8 +- pkg/executor/analyze_idx.go | 3 +- pkg/executor/analyze_worker.go | 11 +- pkg/executor/test/analyzetest/analyze_test.go | 20 +-- pkg/statistics/analyze_jobs.go | 10 ++ .../handle/autoanalyze/autoanalyze.go | 138 +++++++++++++++ pkg/statistics/handle/types/interfaces.go | 15 ++ 10 files changed, 200 insertions(+), 176 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 96f29d26c53d0..9ab4963870dec 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -22,7 +22,6 @@ import ( "net" "strconv" "strings" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -41,12 +40,10 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlescape" - "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tipb/go-tipb" "github.com/tiancaiamao/gp" "go.uber.org/zap" @@ -165,7 +162,7 @@ TASKLOOP: }) // If we enabled dynamic prune mode, then we need to generate global stats here for partition tables. if needGlobalStats { - err = e.handleGlobalStats(globalStatsMap) + err = e.handleGlobalStats(statsHandle, globalStatsMap) if err != nil { return err } @@ -432,7 +429,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( resultsCh <-chan *statistics.AnalyzeResults, ) error { partitionStatsConcurrency := len(subSctxs) - + statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() wg := util.NewWaitGroupPool(e.gp) saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency) errCh := make(chan error, partitionStatsConcurrency) @@ -440,7 +437,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh, &e.Ctx().GetSessionVars().SQLKiller) ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) wg.Run(func() { - worker.run(ctx1, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot) + worker.run(ctx1, statsHandle, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot) }) } tableIDs := map[int64]struct{}{} @@ -462,7 +459,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( } else { logutil.Logger(ctx).Error("analyze failed", zap.Error(err)) } - finishJobWithLog(e.Ctx(), results.Job, err) + finishJobWithLog(statsHandle, results.Job, err) continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) @@ -490,6 +487,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency( func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask + statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -513,7 +511,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- break } failpoint.Inject("handleAnalyzeWorkerPanic", nil) - StartAnalyzeJob(e.Ctx(), task.job) + statsHandle.StartAnalyzeJob(task.job) switch task.taskType { case colTask: select { @@ -568,147 +566,8 @@ func AddNewAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) { } } -// StartAnalyzeJob marks the state of the analyze job as running and sets the start time. -func StartAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) { - if job == nil || job.ID == nil { - return - } - job.StartTime = time.Now() - job.Progress.SetLastDumpTime(job.StartTime) - exec := sctx.GetRestrictedSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?" - _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID) - if err != nil { - logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err)) - } - failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { - if val.(bool) { - logutil.BgLogger().Info("StartAnalyzeJob", - zap.Time("start_time", job.StartTime), - zap.Uint64("job id", *job.ID), - ) - } - }) -} - -// UpdateAnalyzeJob updates count of the processed rows when increment reaches a threshold. -func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) { - if job == nil || job.ID == nil { - return - } - delta := job.Progress.Update(rowCount) - if delta == 0 { - return - } - exec := sctx.GetRestrictedSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?" - _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, delta, *job.ID) - if err != nil { - logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err)) - } - failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { - if val.(bool) { - logutil.BgLogger().Info("UpdateAnalyzeJob", - zap.Int64("increase processed_rows", delta), - zap.Uint64("job id", *job.ID), - ) - } - }) -} - -// FinishAnalyzeMergeJob finishes analyze merge job -func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) { - if job == nil || job.ID == nil { - return - } - - job.EndTime = time.Now() - var sql string - var args []any - if analyzeErr != nil { - failReason := analyzeErr.Error() - const textMaxLength = 65535 - if len(failReason) > textMaxLength { - failReason = failReason[:textMaxLength] - } - sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" - args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} - } else { - sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" - args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} - } - exec := sctx.GetRestrictedSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...) - if err != nil { - var state string - if analyzeErr != nil { - state = statistics.AnalyzeFailed - } else { - state = statistics.AnalyzeFinished - } - logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err)) - } - failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { - if val.(bool) { - logutil.BgLogger().Info("FinishAnalyzeMergeJob", - zap.Time("end_time", job.EndTime), - zap.Uint64("job id", *job.ID), - ) - } - }) -} - -// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time. -func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) { - if job == nil || job.ID == nil { - return - } - job.EndTime = time.Now() - var sql string - var args []any - // process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job - // is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly. - if analyzeErr != nil { - failReason := analyzeErr.Error() - const textMaxLength = 65535 - if len(failReason) > textMaxLength { - failReason = failReason[:textMaxLength] - } - sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" - args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} - } else { - sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" - args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} - } - exec := sctx.GetRestrictedSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...) - if err != nil { - var state string - if analyzeErr != nil { - state = statistics.AnalyzeFailed - } else { - state = statistics.AnalyzeFinished - } - logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err)) - } - failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { - if val.(bool) { - logutil.BgLogger().Info("FinishAnalyzeJob", - zap.Int64("increase processed_rows", job.Progress.GetDeltaCount()), - zap.Time("end_time", job.EndTime), - zap.Uint64("job id", *job.ID), - zap.Error(analyzeErr), - ) - } - }) -} - -func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) { - FinishAnalyzeJob(sctx, job, analyzeErr) +func finishJobWithLog(statsHandle *handle.Handle, job *statistics.AnalyzeJob, analyzeErr error) { + statsHandle.FinishAnalyzeJob(job, analyzeErr, statistics.TableAnalysisJob) if job != nil { var state string if analyzeErr != nil { diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index 3a84aca7a3c01..78a5190b8d21b 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -174,6 +174,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo CMSketch: statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth])), } } + statsHandle := domain.GetDomain(e.ctx).StatsHandle() for { failpoint.Inject("mockKillRunningV1AnalyzeJob", func() { dom := domain.GetDomain(e.ctx) @@ -228,7 +229,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo rowCount = respSample.Count + respSample.NullCount collectors[i].MergeSampleCollector(sc, respSample) } - UpdateAnalyzeJob(e.ctx, e.job, rowCount) + statsHandle.UpdateAnalyzeJobProgress(e.job, rowCount) } timeZone := e.ctx.GetSessionVars().Location() if hasPkHist(e.handleCols) { diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 1d272f1f9ddfb..6eefe269fc865 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -473,6 +473,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)), } var err error + statsHandle := domain.GetDomain(e.ctx).StatsHandle() for panicCnt < statsConcurrncy { results, ok := <-resultsCh if !ok { @@ -480,13 +481,13 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In } if results.Err != nil { err = results.Err - FinishAnalyzeJob(e.ctx, results.Job, err) + statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob) if isAnalyzeWorkerPanic(err) { panicCnt++ } continue } - FinishAnalyzeJob(e.ctx, results.Job, nil) + statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob) totalResult.results[results.Ars[0].Hist[0].ID] = results } if err != nil { @@ -498,6 +499,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In // subIndexWorker receive the task for each index and return the result for them. func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) { var task *analyzeTask + statsHandle := domain.GetDomain(e.ctx).StatsHandle() defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -514,7 +516,7 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re if !ok { break } - StartAnalyzeJob(e.ctx, task.job) + statsHandle.StartAnalyzeJob(task.job) if task.taskType != idxTask { resultsCh <- &statistics.AnalyzeResults{ Err: errors.Errorf("incorrect analyze type"), @@ -628,6 +630,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu for i := 0; i < l; i++ { retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize)) } + statsHandle := domain.GetDomain(e.ctx).StatsHandle() for { data, ok := <-taskCh if !ok { @@ -649,7 +652,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu // Update processed rows. subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l) subCollector.Base().FromProto(colResp.RowCollector, e.memTracker) - UpdateAnalyzeJob(e.ctx, e.job, subCollector.Base().Count) + statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count) // Print collect log. oldRetCollectorSize := retCollector.Base().MemSize diff --git a/pkg/executor/analyze_global_stats.go b/pkg/executor/analyze_global_stats.go index 29801ca35fda8..2607e611a9e4b 100644 --- a/pkg/executor/analyze_global_stats.go +++ b/pkg/executor/analyze_global_stats.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -35,13 +36,12 @@ type globalStatsKey struct { // The meaning of value in map is some additional information needed to build global-level stats. type globalStatsMap map[globalStatsKey]statstypes.GlobalStatsInfo -func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error { +func (e *AnalyzeExec) handleGlobalStats(statsHandle *handle.Handle, globalStatsMap globalStatsMap) error { globalStatsTableIDs := make(map[int64]struct{}, len(globalStatsMap)) for globalStatsID := range globalStatsMap { globalStatsTableIDs[globalStatsID.tableID] = struct{}{} } - statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs)) for tableID := range globalStatsTableIDs { tableIDs[tableID] = struct{}{} @@ -55,7 +55,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error { continue } AddNewAnalyzeJob(e.Ctx(), job) - StartAnalyzeJob(e.Ctx(), job) + statsHandle.StartAnalyzeJob(job) mergeStatsErr := func() error { globalOpts := e.opts @@ -76,7 +76,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error { } return err }() - FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr) + statsHandle.FinishAnalyzeJob(job, mergeStatsErr, statistics.GlobalStatsMergeJob) } } diff --git a/pkg/executor/analyze_idx.go b/pkg/executor/analyze_idx.go index d5c5b82b8a0f0..1c26be7782d4f 100644 --- a/pkg/executor/analyze_idx.go +++ b/pkg/executor/analyze_idx.go @@ -319,7 +319,8 @@ func updateIndexResult( needCMS := cms != nil respHist := statistics.HistogramFromProto(resp.Hist) if job != nil { - UpdateAnalyzeJob(ctx, job, int64(respHist.TotalRowCount())) + statsHandle := domain.GetDomain(ctx).StatsHandle() + statsHandle.UpdateAnalyzeJobProgress(job, int64(respHist.TotalRowCount())) } hist, err = statistics.MergeHistograms(ctx.GetSessionVars().StmtCtx, hist, respHist, numBuckets, statsVer) if err != nil { diff --git a/pkg/executor/analyze_worker.go b/pkg/executor/analyze_worker.go index b2430f6260f24..7d5c15e4ba955 100644 --- a/pkg/executor/analyze_worker.go +++ b/pkg/executor/analyze_worker.go @@ -17,9 +17,9 @@ package executor import ( "context" - "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlkiller" @@ -47,7 +47,7 @@ func newAnalyzeSaveStatsWorker( return worker } -func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { +func (worker *analyzeSaveStatsWorker) run(ctx context.Context, statsHandle *handle.Handle, analyzeSnapshot bool) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -56,19 +56,18 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b }() for results := range worker.resultsCh { if err := worker.killer.HandleSignal(); err != nil { - finishJobWithLog(worker.sctx, results.Job, err) + finishJobWithLog(statsHandle, results.Job, err) results.DestroyAndPutToPool() worker.errCh <- err return } - statsHandle := domain.GetDomain(worker.sctx).StatsHandle() err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze) if err != nil { logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(worker.sctx, results.Job, err) + finishJobWithLog(statsHandle, results.Job, err) worker.errCh <- err } else { - finishJobWithLog(worker.sctx, results.Job, nil) + finishJobWithLog(statsHandle, results.Job, nil) } results.DestroyAndPutToPool() if err != nil { diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 2add844138bd3..9baac651d5879 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -2063,8 +2063,8 @@ func TestAnalyzeJob(t *testing.T) { require.Equal(t, addr, rows[0][9]) connID := strconv.FormatUint(tk.Session().GetSessionVars().ConnectionID, 10) require.Equal(t, connID, rows[0][10]) - - executor.StartAnalyzeJob(se, job) + statsHandle := domain.GetDomain(tk.Session()).StatsHandle() + statsHandle.StartAnalyzeJob(job) ctx := context.WithValue(context.Background(), executor.AnalyzeProgressTest, 100) rows = tk.MustQueryWithContext(ctx, "show analyze status").Rows() checkTime := func(val any) { @@ -2079,12 +2079,12 @@ func TestAnalyzeJob(t *testing.T) { require.Equal(t, "0.1", rows[0][12]) // PROGRESS require.Equal(t, "0", rows[0][13]) // ESTIMATED_TOTAL_ROWS - // UpdateAnalyzeJob requires the interval between two updates to mysql.analyze_jobs is more than 5 second. + // UpdateAnalyzeJobProgress requires the interval between two updates to mysql.analyze_jobs is more than 5 second. // Hence we fake last dump time as 10 second ago in order to make update to mysql.analyze_jobs happen. lastDumpTime := time.Now().Add(-10 * time.Second) job.Progress.SetLastDumpTime(lastDumpTime) const smallCount int64 = 100 - executor.UpdateAnalyzeJob(se, job, smallCount) + statsHandle.UpdateAnalyzeJobProgress(job, smallCount) // Delta count doesn't reach threshold so we don't dump it to mysql.analyze_jobs require.Equal(t, smallCount, job.Progress.GetDeltaCount()) require.Equal(t, lastDumpTime, job.Progress.GetLastDumpTime()) @@ -2092,7 +2092,7 @@ func TestAnalyzeJob(t *testing.T) { require.Equal(t, "0", rows[0][4]) const largeCount int64 = 15000000 - executor.UpdateAnalyzeJob(se, job, largeCount) + statsHandle.UpdateAnalyzeJobProgress(job, largeCount) // Delta count reaches threshold so we dump it to mysql.analyze_jobs and update last dump time. require.Equal(t, int64(0), job.Progress.GetDeltaCount()) require.True(t, job.Progress.GetLastDumpTime().After(lastDumpTime)) @@ -2100,7 +2100,7 @@ func TestAnalyzeJob(t *testing.T) { rows = tk.MustQuery("show analyze status").Rows() require.Equal(t, strconv.FormatInt(smallCount+largeCount, 10), rows[0][4]) - executor.UpdateAnalyzeJob(se, job, largeCount) + statsHandle.UpdateAnalyzeJobProgress(job, largeCount) // We have just updated mysql.analyze_jobs in the previous step so we don't update it until 5 second passes or the analyze job is over. require.Equal(t, largeCount, job.Progress.GetDeltaCount()) require.Equal(t, lastDumpTime, job.Progress.GetLastDumpTime()) @@ -2111,7 +2111,7 @@ func TestAnalyzeJob(t *testing.T) { if result == statistics.AnalyzeFailed { analyzeErr = errors.Errorf("analyze meets error") } - executor.FinishAnalyzeJob(se, job, analyzeErr) + statsHandle.FinishAnalyzeJob(job, analyzeErr, statistics.TableAnalysisJob) rows = tk.MustQuery("show analyze status").Rows() require.Equal(t, strconv.FormatInt(smallCount+2*largeCount, 10), rows[0][4]) checkTime(rows[0][6]) @@ -2806,11 +2806,9 @@ func TestAnalyzeColumnsSkipMVIndexJsonCol(t *testing.T) { // TestAnalyzeMVIndex tests analyzing the mv index use some real data in the table. // It checks the analyze jobs, async loading and the stats content in the memory. func TestAnalyzeMVIndex(t *testing.T) { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/DebugAnalyzeJobOperations", "return(true)")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/DebugAnalyzeJobOperations", "return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/DebugAnalyzeJobOperations", "return(true)")) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/DebugAnalyzeJobOperations")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/DebugAnalyzeJobOperations")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/DebugAnalyzeJobOperations")) }() // 1. prepare the table and insert data store, dom := testkit.CreateMockStoreAndDomain(t) diff --git a/pkg/statistics/analyze_jobs.go b/pkg/statistics/analyze_jobs.go index 7207fb62830e4..5802e8dccf926 100644 --- a/pkg/statistics/analyze_jobs.go +++ b/pkg/statistics/analyze_jobs.go @@ -31,6 +31,16 @@ const ( AnalyzeFailed = "failed" ) +// JobType is the type of the analyze job. +type JobType int + +const ( + // TableAnalysisJob means the job is to analyze a table or partition. + TableAnalysisJob JobType = iota + 1 + // GlobalStatsMergeJob means the job is to merge the global-level stats. + GlobalStatsMergeJob +) + const ( // maxDelta is the threshold of delta count. If the delta count reaches this threshold, it will be dumped into // mysql.analyze_jobs. diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 7b8e6603005bf..7ff39511b7865 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -70,6 +70,36 @@ func (sa *statsAnalyze) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance st }) } +func (sa *statsAnalyze) StartAnalyzeJob(job *statistics.AnalyzeJob) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + startAnalyzeJob(sctx, job) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to start analyze job", zap.Error(err)) + } +} + +func (sa *statsAnalyze) UpdateAnalyzeJobProgress(job *statistics.AnalyzeJob, rowCount int64) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + updateAnalyzeJobProgress(sctx, job, rowCount) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job progress", zap.Error(err)) + } +} + +func (sa *statsAnalyze) FinishAnalyzeJob(job *statistics.AnalyzeJob, failReason error, analyzeType statistics.JobType) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + finishAnalyzeJob(sctx, job, failReason, analyzeType) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to finish analyze job", zap.Error(err)) + } +} + // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error { return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { @@ -703,3 +733,111 @@ func insertAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, insta }) return nil } + +// startAnalyzeJob marks the state of the analyze job as running and sets the start time. +func startAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) { + if job == nil || job.ID == nil { + return + } + job.StartTime = time.Now() + job.Progress.SetLastDumpTime(job.StartTime) + const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?" + _, _, err := statsutil.ExecRows(sctx, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err)) + } + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logutil.BgLogger().Info("StartAnalyzeJob", + zap.Time("start_time", job.StartTime), + zap.Uint64("job id", *job.ID), + ) + } + }) +} + +// updateAnalyzeJobProgress updates count of the processed rows when increment reaches a threshold. +func updateAnalyzeJobProgress(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) { + if job == nil || job.ID == nil { + return + } + delta := job.Progress.Update(rowCount) + if delta == 0 { + return + } + const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?" + _, _, err := statsutil.ExecRows(sctx, sql, delta, *job.ID) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err)) + } + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logutil.BgLogger().Info("UpdateAnalyzeJobProgress", + zap.Int64("increase processed_rows", delta), + zap.Uint64("job id", *job.ID), + ) + } + }) +} + +// finishAnalyzeJob finishes an analyze or merge job +func finishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error, analyzeType statistics.JobType) { + if job == nil || job.ID == nil { + return + } + + job.EndTime = time.Now() + var sql string + var args []any + + // process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job + // is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly. + if analyzeErr != nil { + failReason := analyzeErr.Error() + const textMaxLength = 65535 + if len(failReason) > textMaxLength { + failReason = failReason[:textMaxLength] + } + + if analyzeType == statistics.TableAnalysisJob { + sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" + args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} + } else { + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" + args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} + } + } else { + if analyzeType == statistics.TableAnalysisJob { + sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" + args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} + } else { + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" + args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} + } + } + + _, _, err := statsutil.ExecRows(sctx, sql, args...) + if err != nil { + state := statistics.AnalyzeFinished + if analyzeErr != nil { + state = statistics.AnalyzeFailed + } + logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err)) + } + + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logger := logutil.BgLogger().With( + zap.Time("end_time", job.EndTime), + zap.Uint64("job id", *job.ID), + ) + if analyzeType == statistics.TableAnalysisJob { + logger = logger.With(zap.Int64("increase processed_rows", job.Progress.GetDeltaCount())) + } + if analyzeErr != nil { + logger = logger.With(zap.Error(analyzeErr)) + } + logger.Info("FinishAnalyzeJob") + } + }) +} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 5fbb9b66bd5fc..20c724b25a578 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -121,6 +121,21 @@ type StatsAnalyze interface { // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error + // StartAnalyzeJob updates the job status to `running` and sets the start time. + // There is no guarantee that the job record will actually be updated. If the job fails to start, an error will be logged. + // It is OK because this won't affect the analysis job's success. + StartAnalyzeJob(job *statistics.AnalyzeJob) + + // UpdateAnalyzeJobProgress updates the current progress of the analyze job. + // There is no guarantee that the job record will actually be updated. If the job fails to update, an error will be logged. + // It is OK because this won't affect the analysis job's success. + UpdateAnalyzeJobProgress(job *statistics.AnalyzeJob, rowCount int64) + + // FinishAnalyzeJob updates the job status to `finished`, sets the end time, and updates the job info. + // There is no guarantee that the job record will actually be updated. If the job fails to finish, an error will be logged. + // It is OK because this won't affect the analysis job's success. + FinishAnalyzeJob(job *statistics.AnalyzeJob, failReason error, analyzeType statistics.JobType) + // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. DeleteAnalyzeJobs(updateTime time.Time) error