From 9c6e08dad026f44b607f54ba111f8f57f1ef785e Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 25 Oct 2022 11:10:24 +0800 Subject: [PATCH] address the comment Signed-off-by: yisaer --- domain/domain.go | 8 ++++---- executor/analyze.go | 6 ++++-- executor/analyze_worker.go | 6 ++++++ 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index c8ecf2793b075..60b8d316a1949 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// AvailableAnalyzeExec returned extra exec for Analyze -func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { +// ReleaseAnalyzeExec returned extra exec for Analyze +func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { @@ -1589,8 +1589,8 @@ func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { } } -// DemandAnalyzeExec get needed exec for analyze -func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { +// FetchAnalyzeExec get needed exec for analyze +func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context { if need < 1 { return nil } diff --git a/executor/analyze.go b/executor/analyze.go index ff14ba78fc220..66334ad05d647 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -222,11 +222,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n // If there is no extra session we can use, we will save analyze results in single-thread. if partitionStatsConcurrency > 1 { dom := domain.GetDomain(e.ctx) - subSctxs := dom.DemandAnalyzeExec(partitionStatsConcurrency) + subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency) if len(subSctxs) > 0 { + defer func() { + dom.ReleaseAnalyzeExec(subSctxs) + }() internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.AvailableAnalyzeExec(subSctxs) return err } } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index bb9a8e573b6b5..ae30b773e0d1e 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -44,6 +44,12 @@ func newAnalyzeSaveStatsWorker( } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) + worker.errCh <- getAnalyzePanicErr(r) + } + }() for results := range worker.resultsCh { err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) if err != nil {