Skip to content

Commit

Permalink
analyze: merge task and send task concurrently (#47379)
Browse files Browse the repository at this point in the history
ref #47275
  • Loading branch information
hawkingrei authored Oct 10, 2023
1 parent 97c7741 commit ec2731b
Showing 1 changed file with 50 additions and 26 deletions.
76 changes: 50 additions & 26 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
stderrors "errors"
"math"
"sort"
"sync/atomic"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// AnalyzeColumnsExecV2 is used to maintain v2 analyze process
Expand Down Expand Up @@ -270,42 +272,64 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
// Start workers to merge the result from collectors.
mergeResultCh := make(chan *samplingMergeResult, statsConcurrency)
mergeTaskCh := make(chan []byte, statsConcurrency)
var taskEg errgroup.Group
// Start read data from resultHandler and send them to mergeTaskCh.
taskEg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = getAnalyzePanicErr(r)
}
}()
return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker)
})
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(statsConcurrency)
for i := 0; i < statsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
}

// Start read data from resultHandler and send them to mergeTaskCh.
if err = readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker); err != nil {
return 0, nil, nil, nil, nil, getAnalyzePanicErr(err)
}

// Merge the result from collectors.
mergeWorkerPanicCnt := 0
for mergeWorkerPanicCnt < statsConcurrency {
mergeResult, ok := <-mergeResultCh
if !ok {
break
}
if mergeResult.err != nil {
err = mergeResult.err
if isAnalyzeWorkerPanic(mergeResult.err) {
mergeWorkerPanicCnt++
mergeEg, mergeCtx := errgroup.WithContext(context.Background())
mergeEg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = getAnalyzePanicErr(r)
}
continue
}()
for mergeWorkerPanicCnt < statsConcurrency {
mergeResult, ok := <-mergeResultCh
if !ok {
break
}
if mergeResult.err != nil {
err = mergeResult.err
if isAnalyzeWorkerPanic(mergeResult.err) {
mergeWorkerPanicCnt++
}
continue
}
oldRootCollectorSize := rootRowCollector.Base().MemSize
oldRootCollectorCount := rootRowCollector.Base().Count
// Merge the result from sub-collectors.
rootRowCollector.MergeCollector(mergeResult.collector)
newRootCollectorCount := rootRowCollector.Base().Count
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
"merge subMergeWorker in AnalyzeColumnsExecV2", -1)
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
mergeResult.collector.DestroyAndPutToPool()
}
oldRootCollectorSize := rootRowCollector.Base().MemSize
oldRootCollectorCount := rootRowCollector.Base().Count
// Merge the result from sub-collectors.
rootRowCollector.MergeCollector(mergeResult.collector)
newRootCollectorCount := rootRowCollector.Base().Count
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
"merge subMergeWorker in AnalyzeColumnsExecV2", -1)
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
mergeResult.collector.DestroyAndPutToPool()
return err
})
err = taskEg.Wait()
if err != nil {
mergeCtx.Done()
if err1 := mergeEg.Wait(); err1 != nil {
err = stderrors.Join(err, err1)
}
return 0, nil, nil, nil, nil, getAnalyzePanicErr(err)
}
err = mergeEg.Wait()
defer e.memTracker.Release(rootRowCollector.Base().MemSize)
if err != nil {
return 0, nil, nil, nil, nil, err
Expand Down

0 comments on commit ec2731b

Please sign in to comment.