Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Oct 25, 2022
1 parent c65cc87 commit 9c6e08d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
8 changes: 4 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,17 +1580,17 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// AvailableAnalyzeExec returned extra exec for Analyze
func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) {
// ReleaseAnalyzeExec returned extra exec for Analyze
func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// DemandAnalyzeExec get needed exec for analyze
func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context {
// FetchAnalyzeExec get needed exec for analyze
func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context {
if need < 1 {
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
// If there is no extra session we can use, we will save analyze results in single-thread.
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.ctx)
subSctxs := dom.DemandAnalyzeExec(partitionStatsConcurrency)
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
if len(subSctxs) > 0 {
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
dom.AvailableAnalyzeExec(subSctxs)
return err
}
}
Expand Down
6 changes: 6 additions & 0 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func newAnalyzeSaveStatsWorker(
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
worker.errCh <- getAnalyzePanicErr(r)
}
}()
for results := range worker.resultsCh {
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
if err != nil {
Expand Down

0 comments on commit 9c6e08d

Please sign in to comment.