Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: move history-related functions into the stats handle #55163

Merged
merged 12 commits into from
Aug 7, 2024
157 changes: 8 additions & 149 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -41,12 +40,10 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
Expand Down Expand Up @@ -165,7 +162,7 @@ TASKLOOP:
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
err = e.handleGlobalStats(globalStatsMap)
err = e.handleGlobalStats(statsHandle, globalStatsMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -432,15 +429,15 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := len(subSctxs)

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
wg := util.NewWaitGroupPool(e.gp)
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh, &e.Ctx().GetSessionVars().SQLKiller)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
wg.Run(func() {
worker.run(ctx1, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot)
worker.run(ctx1, statsHandle, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot)
})
}
tableIDs := map[int64]struct{}{}
Expand All @@ -462,7 +459,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.Ctx(), results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
Expand Down Expand Up @@ -490,6 +487,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -513,7 +511,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
break
}
failpoint.Inject("handleAnalyzeWorkerPanic", nil)
StartAnalyzeJob(e.Ctx(), task.job)
statsHandle.StartAnalyzeJob(task.job)
switch task.taskType {
case colTask:
select {
Expand Down Expand Up @@ -568,147 +566,8 @@ func AddNewAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) {
}
}

// StartAnalyzeJob marks the state of the analyze job as running and sets the start time.
func StartAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil || job.ID == nil {
return
}
job.StartTime = time.Now()
job.Progress.SetLastDumpTime(job.StartTime)
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("StartAnalyzeJob",
zap.Time("start_time", job.StartTime),
zap.Uint64("job id", *job.ID),
)
}
})
}

// UpdateAnalyzeJob updates count of the processed rows when increment reaches a threshold.
func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) {
if job == nil || job.ID == nil {
return
}
delta := job.Progress.Update(rowCount)
if delta == 0 {
return
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, delta, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("UpdateAnalyzeJob",
zap.Int64("increase processed_rows", delta),
zap.Uint64("job id", *job.ID),
)
}
})
}

// FinishAnalyzeMergeJob finishes analyze merge job
func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}

job.EndTime = time.Now()
var sql string
var args []any
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
}
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("FinishAnalyzeMergeJob",
zap.Time("end_time", job.EndTime),
zap.Uint64("job id", *job.ID),
)
}
})
}

// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time.
func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}
job.EndTime = time.Now()
var sql string
var args []any
// process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job
// is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly.
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
}
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("FinishAnalyzeJob",
zap.Int64("increase processed_rows", job.Progress.GetDeltaCount()),
zap.Time("end_time", job.EndTime),
zap.Uint64("job id", *job.ID),
zap.Error(analyzeErr),
)
}
})
}

func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
FinishAnalyzeJob(sctx, job, analyzeErr)
func finishJobWithLog(statsHandle *handle.Handle, job *statistics.AnalyzeJob, analyzeErr error) {
statsHandle.FinishAnalyzeJob(job, analyzeErr, statistics.TableAnalysisJob)
if job != nil {
var state string
if analyzeErr != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
CMSketch: statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth])),
}
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for {
failpoint.Inject("mockKillRunningV1AnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
Expand Down Expand Up @@ -228,7 +229,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
rowCount = respSample.Count + respSample.NullCount
collectors[i].MergeSampleCollector(sc, respSample)
}
UpdateAnalyzeJob(e.ctx, e.job, rowCount)
statsHandle.UpdateAnalyzeJobProgress(e.job, rowCount)
}
timeZone := e.ctx.GetSessionVars().Location()
if hasPkHist(e.handleCols) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,20 +473,21 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)),
}
var err error
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for panicCnt < statsConcurrncy {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
FinishAnalyzeJob(e.ctx, results.Job, err)
statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob)
if isAnalyzeWorkerPanic(err) {
panicCnt++
}
continue
}
FinishAnalyzeJob(e.ctx, results.Job, nil)
statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob)
totalResult.results[results.Ars[0].Hist[0].ID] = results
}
if err != nil {
Expand All @@ -498,6 +499,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
// subIndexWorker receive the task for each index and return the result for them.
func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -514,7 +516,7 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re
if !ok {
break
}
StartAnalyzeJob(e.ctx, task.job)
statsHandle.StartAnalyzeJob(task.job)
if task.taskType != idxTask {
resultsCh <- &statistics.AnalyzeResults{
Err: errors.Errorf("incorrect analyze type"),
Expand Down Expand Up @@ -628,6 +630,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
for i := 0; i < l; i++ {
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for {
data, ok := <-taskCh
if !ok {
Expand All @@ -649,7 +652,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
// Update processed rows.
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
subCollector.Base().FromProto(colResp.RowCollector, e.memTracker)
UpdateAnalyzeJob(e.ctx, e.job, subCollector.Base().Count)
statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count)

// Print collect log.
oldRetCollectorSize := retCollector.Base().MemSize
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand All @@ -35,13 +36,12 @@ type globalStatsKey struct {
// The meaning of value in map is some additional information needed to build global-level stats.
type globalStatsMap map[globalStatsKey]statstypes.GlobalStatsInfo

func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
func (e *AnalyzeExec) handleGlobalStats(statsHandle *handle.Handle, globalStatsMap globalStatsMap) error {
globalStatsTableIDs := make(map[int64]struct{}, len(globalStatsMap))
for globalStatsID := range globalStatsMap {
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs))
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
Expand All @@ -55,7 +55,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
continue
}
AddNewAnalyzeJob(e.Ctx(), job)
StartAnalyzeJob(e.Ctx(), job)
statsHandle.StartAnalyzeJob(job)

mergeStatsErr := func() error {
globalOpts := e.opts
Expand All @@ -76,7 +76,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
}
return err
}()
FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr)
statsHandle.FinishAnalyzeJob(job, mergeStatsErr, statistics.GlobalStatsMergeJob)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ func updateIndexResult(
needCMS := cms != nil
respHist := statistics.HistogramFromProto(resp.Hist)
if job != nil {
UpdateAnalyzeJob(ctx, job, int64(respHist.TotalRowCount()))
statsHandle := domain.GetDomain(ctx).StatsHandle()
statsHandle.UpdateAnalyzeJobProgress(job, int64(respHist.TotalRowCount()))
}
hist, err = statistics.MergeHistograms(ctx.GetSessionVars().StmtCtx, hist, respHist, numBuckets, statsVer)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package executor
import (
"context"

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
Expand Down Expand Up @@ -47,7 +47,7 @@ func newAnalyzeSaveStatsWorker(
return worker
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
func (worker *analyzeSaveStatsWorker) run(ctx context.Context, statsHandle *handle.Handle, analyzeSnapshot bool) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -56,19 +56,18 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
}()
for results := range worker.resultsCh {
if err := worker.killer.HandleSignal(); err != nil {
finishJobWithLog(worker.sctx, results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
results.DestroyAndPutToPool()
worker.errCh <- err
return
}
statsHandle := domain.GetDomain(worker.sctx).StatsHandle()
err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
finishJobWithLog(statsHandle, results.Job, nil)
}
results.DestroyAndPutToPool()
if err != nil {
Expand Down
Loading