From 7bd9f8f4302a311d7d29cd92be7a5100af1d1286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 31 Jul 2024 16:52:21 +0800 Subject: [PATCH] statistics: do not copy and paste the code for saving statistics (#55046) ref pingcap/tidb#55043 --- pkg/executor/analyze.go | 78 ++++++++++++---------------------- pkg/executor/analyze_worker.go | 2 + 2 files changed, 28 insertions(+), 52 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index 1db92c94a7c0ad..96f29d26c53d08 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -393,70 +393,44 @@ func (e *AnalyzeExec) handleResultsError( partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency) // If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency. // If there is no extra session we can use, we will save analyze results in single-thread. + dom := domain.GetDomain(e.Ctx()) + internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) if partitionStatsConcurrency > 1 { - dom := domain.GetDomain(e.Ctx()) + // FIXME: Since we don't use it either to save analysis results or to store job history, it has no effect. Please remove this :( subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency) + warningMessage := "Insufficient sessions to save analyze results. Consider increasing the 'analyze-partition-concurrency-quota' configuration to improve analyze performance. " + + "This value should typically be greater than or equal to the 'tidb_analyze_partition_concurrency' variable." + if len(subSctxs) < partitionStatsConcurrency { + e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(warningMessage)) + logutil.BgLogger().Warn( + warningMessage, + zap.Int("sessionCount", len(subSctxs)), + zap.Int("needSessionCount", partitionStatsConcurrency), + ) + } if len(subSctxs) > 0 { + sessionCount := len(subSctxs) + logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount)) defer func() { dom.ReleaseAnalyzeExec(subSctxs) }() - internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) - err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - return err + return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) } } + logutil.BgLogger().Info("use single session to save analyze results") failpoint.Inject("handleResultsErrorSingleThreadPanic", nil) - tableIDs := map[int64]struct{}{} - - // save analyze results in single-thread. - statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() - panicCnt := 0 - for panicCnt < concurrency { - results, ok := <-resultsCh - if !ok { - break - } - if results.Err != nil { - err = results.Err - if isAnalyzeWorkerPanic(err) { - panicCnt++ - } else { - logutil.Logger(ctx).Error("analyze failed", zap.Error(err)) - } - finishJobWithLog(e.Ctx(), results.Job, err) - continue - } - handleGlobalStats(needGlobalStats, globalStatsMap, results) - tableIDs[results.TableID.GetStatisticsID()] = struct{}{} - - if err1 := statsHandle.SaveTableStatsToStorage(results, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot, handleutil.StatsMetaHistorySourceAnalyze); err1 != nil { - tableID := results.TableID.TableID - err = err1 - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err), zap.Int64("tableID", tableID)) - finishJobWithLog(e.Ctx(), results.Job, err) - } else { - finishJobWithLog(e.Ctx(), results.Job, nil) - } - if err := e.Ctx().GetSessionVars().SQLKiller.HandleSignal(); err != nil { - finishJobWithLog(e.Ctx(), results.Job, err) - results.DestroyAndPutToPool() - return err - } - results.DestroyAndPutToPool() - } - // Dump stats to historical storage. - for tableID := range tableIDs { - if err := recordHistoricalStats(e.Ctx(), tableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } - } - - return err + subSctxs := []sessionctx.Context{e.Ctx()} + return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) } -func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool, +func (e *AnalyzeExec) handleResultsErrorWithConcurrency( + ctx context.Context, + statsConcurrency int, + needGlobalStats bool, subSctxs []sessionctx.Context, - globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { + globalStatsMap globalStatsMap, + resultsCh <-chan *statistics.AnalyzeResults, +) error { partitionStatsConcurrency := len(subSctxs) wg := util.NewWaitGroupPool(e.gp) diff --git a/pkg/executor/analyze_worker.go b/pkg/executor/analyze_worker.go index d92df2a1da2668..b2430f6260f24a 100644 --- a/pkg/executor/analyze_worker.go +++ b/pkg/executor/analyze_worker.go @@ -56,6 +56,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b }() for results := range worker.resultsCh { if err := worker.killer.HandleSignal(); err != nil { + finishJobWithLog(worker.sctx, results.Job, err) + results.DestroyAndPutToPool() worker.errCh <- err return }