From ec2731b8f53993987b756ecda000789a364c5064 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 10 Oct 2023 19:52:55 +0800 Subject: [PATCH] analyze: merge task and send task concurrently (#47379) ref pingcap/tidb#47275 --- executor/analyze_col_v2.go | 76 +++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/executor/analyze_col_v2.go b/executor/analyze_col_v2.go index 6a9773640ed6d..ed08d5f718e6e 100644 --- a/executor/analyze_col_v2.go +++ b/executor/analyze_col_v2.go @@ -16,6 +16,7 @@ package executor import ( "context" + stderrors "errors" "math" "sort" "sync/atomic" @@ -46,6 +47,7 @@ import ( "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // AnalyzeColumnsExecV2 is used to maintain v2 analyze process @@ -270,42 +272,64 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( // Start workers to merge the result from collectors. mergeResultCh := make(chan *samplingMergeResult, statsConcurrency) mergeTaskCh := make(chan []byte, statsConcurrency) + var taskEg errgroup.Group + // Start read data from resultHandler and send them to mergeTaskCh. + taskEg.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = getAnalyzePanicErr(r) + } + }() + return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) + }) e.samplingMergeWg = &util.WaitGroupWrapper{} e.samplingMergeWg.Add(statsConcurrency) for i := 0; i < statsConcurrency; i++ { go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i) } - - // Start read data from resultHandler and send them to mergeTaskCh. - if err = readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker); err != nil { - return 0, nil, nil, nil, nil, getAnalyzePanicErr(err) - } - // Merge the result from collectors. mergeWorkerPanicCnt := 0 - for mergeWorkerPanicCnt < statsConcurrency { - mergeResult, ok := <-mergeResultCh - if !ok { - break - } - if mergeResult.err != nil { - err = mergeResult.err - if isAnalyzeWorkerPanic(mergeResult.err) { - mergeWorkerPanicCnt++ + mergeEg, mergeCtx := errgroup.WithContext(context.Background()) + mergeEg.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = getAnalyzePanicErr(r) } - continue + }() + for mergeWorkerPanicCnt < statsConcurrency { + mergeResult, ok := <-mergeResultCh + if !ok { + break + } + if mergeResult.err != nil { + err = mergeResult.err + if isAnalyzeWorkerPanic(mergeResult.err) { + mergeWorkerPanicCnt++ + } + continue + } + oldRootCollectorSize := rootRowCollector.Base().MemSize + oldRootCollectorCount := rootRowCollector.Base().Count + // Merge the result from sub-collectors. + rootRowCollector.MergeCollector(mergeResult.collector) + newRootCollectorCount := rootRowCollector.Base().Count + printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount, + mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(), + "merge subMergeWorker in AnalyzeColumnsExecV2", -1) + e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize) + mergeResult.collector.DestroyAndPutToPool() } - oldRootCollectorSize := rootRowCollector.Base().MemSize - oldRootCollectorCount := rootRowCollector.Base().Count - // Merge the result from sub-collectors. - rootRowCollector.MergeCollector(mergeResult.collector) - newRootCollectorCount := rootRowCollector.Base().Count - printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount, - mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(), - "merge subMergeWorker in AnalyzeColumnsExecV2", -1) - e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize) - mergeResult.collector.DestroyAndPutToPool() + return err + }) + err = taskEg.Wait() + if err != nil { + mergeCtx.Done() + if err1 := mergeEg.Wait(); err1 != nil { + err = stderrors.Join(err, err1) + } + return 0, nil, nil, nil, nil, getAnalyzePanicErr(err) } + err = mergeEg.Wait() defer e.memTracker.Release(rootRowCollector.Base().MemSize) if err != nil { return 0, nil, nil, nil, nil, err