Skip to content

Commit

Permalink
This is an automated cherry-pick of #47637
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Nov 3, 2023
1 parent aa93091 commit cbbf773
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ go_library(
"@com_github_prometheus_client_golang//api/prometheus/v1:prometheus",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_common//model",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
8 changes: 5 additions & 3 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -57,9 +58,10 @@ var _ exec.Executor = &AnalyzeExec{}
type AnalyzeExec struct {
exec.BaseExecutor
tasks []*analyzeTask
wg util.WaitGroupWrapper
wg *util.WaitGroupPool
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
gp *gp.Pool
}

var (
Expand Down Expand Up @@ -419,7 +421,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := len(subSctxs)

var wg util.WaitGroupWrapper
wg := util.NewWaitGroupPool(e.gp)
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
Expand Down Expand Up @@ -495,7 +497,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.Ctx(), task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec)
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
)

// AnalyzeColumnsExec represents Analyze columns push down executor.
Expand All @@ -64,9 +65,9 @@ type AnalyzeColumnsExec struct {
memTracker *memory.Tracker
}

func analyzeColumnsPushDownEntry(e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
func analyzeColumnsPushDownEntry(gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
if e.AnalyzeInfo.StatsVersion >= statistics.Version2 {
return e.toV2().analyzeColumnsPushDownWithRetryV2()
return e.toV2().analyzeColumnsPushDownWithRetryV2(gp)
}
return e.toV1().analyzeColumnsPushDownV1()
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -55,8 +56,8 @@ type AnalyzeColumnsExecV2 struct {
*AnalyzeColumnsExec
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.AnalyzeResults {
analyzeResult := e.analyzeColumnsPushDownV2()
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2(gp *gp.Pool) *statistics.AnalyzeResults {
analyzeResult := e.analyzeColumnsPushDownV2(gp)
if e.notRetryable(analyzeResult) {
return analyzeResult
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.A
prepareV2AnalyzeJobInfo(e.AnalyzeColumnsExec, true)
AddNewAnalyzeJob(e.ctx, e.job)
StartAnalyzeJob(e.ctx, e.job)
return e.analyzeColumnsPushDownV2()
return e.analyzeColumnsPushDownV2(gp)
}

// Do **not** retry if succeed / not oom error / not auto-analyze / samplerate not set.
Expand All @@ -96,7 +97,7 @@ func (e *AnalyzeColumnsExecV2) notRetryable(analyzeResult *statistics.AnalyzeRes
e.analyzePB.ColReq == nil || *e.analyzePB.ColReq.SampleRate <= 0
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeResults {
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics.AnalyzeResults {
var ranges []*ranger.Range
if hc := e.handleCols; hc != nil {
if hc.IsInt() {
Expand Down Expand Up @@ -141,13 +142,13 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
// subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would
// report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test
// case with `-race` flag now.
var wg util.WaitGroupWrapper
wg := util.NewWaitGroupPool(gp)
wg.Run(func() {
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, statsConcurrncy)
})
defer wg.Wait()

count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
Expand Down Expand Up @@ -245,6 +246,7 @@ func printAnalyzeMergeCollectorLog(oldRootCount, newRootCount, subCount, tableID
}

func (e *AnalyzeColumnsExecV2) buildSamplingStats(
gp *gp.Pool,
ranges []*ranger.Range,
needExtStats bool,
indexesWithVirtualColOffsets []int,
Expand Down Expand Up @@ -292,7 +294,10 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(samplingStatsConcurrency)
for i := 0; i < samplingStatsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
id := i
gp.Go(func() {
e.subMergeWorker(mergeResultCh, mergeTaskCh, l, id)
})
}
// Merge the result from collectors.
mergeWorkerPanicCnt := 0
Expand Down Expand Up @@ -388,7 +393,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
if totalLen < samplingStatsConcurrency {
samplingStatsConcurrency = totalLen
}
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(buildResultChan)
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(gp, buildResultChan)
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
exitCh := make(chan struct{})
e.samplingBuilderWg.Add(samplingStatsConcurrency)
Expand Down
14 changes: 10 additions & 4 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
<<<<<<< HEAD
"github.com/pingcap/tidb/pkg/util/memory"
=======
"github.com/pingcap/tidb/pkg/util"
"github.com/tiancaiamao/gp"
>>>>>>> 5a22d566a88 (executor: reuse goroutine in the analyze (#47637))
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -107,16 +112,17 @@ func (w *analyzeResultsNotifyWaitGroupWrapper) Run(exec func()) {
// notifyErrorWaitGroupWrapper is a wrapper for sync.WaitGroup
// Please add all goroutine count when to `Add` to avoid exiting in advance.
type notifyErrorWaitGroupWrapper struct {
sync.WaitGroup
*util.WaitGroupPool
notify chan error
cnt atomic.Uint64
}

// newNotifyErrorWaitGroupWrapper is to create notifyErrorWaitGroupWrapper
func newNotifyErrorWaitGroupWrapper(notify chan error) *notifyErrorWaitGroupWrapper {
func newNotifyErrorWaitGroupWrapper(gp *gp.Pool, notify chan error) *notifyErrorWaitGroupWrapper {
return &notifyErrorWaitGroupWrapper{
notify: notify,
cnt: *atomic.NewUint64(0),
WaitGroupPool: util.NewWaitGroupPool(gp),
notify: notify,
cnt: *atomic.NewUint64(0),
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2890,11 +2890,14 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(
}

func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
gp := domain.GetDomain(b.ctx).StatsHandle().GPool()
e := &AnalyzeExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
opts: v.Opts,
OptionsMap: v.OptionsMap,
wg: util.NewWaitGroupPool(gp),
gp: gp,
}
autoAnalyze := ""
if b.ctx.GetSessionVars().InRestrictedSQL {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//oracle",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
],
embed = [":topsql"],
flaky = True,
shard_count = 5,
deps = [
"//pkg/config",
"//pkg/parser",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/reporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
],
embed = [":reporter"],
flaky = True,
shard_count = 36,
deps = [
"//pkg/config",
"//pkg/testkit/testsetup",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/stmtstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
],
embed = [":stmtstats"],
flaky = True,
shard_count = 11,
deps = [
"//pkg/testkit/testsetup",
"//pkg/util/topsql/state",
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/wait_group_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -172,3 +173,27 @@ func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r interfac
exec()
}()
}

// WaitGroupPool is a wrapper for sync.WaitGroup and support goroutine pool
type WaitGroupPool struct {
sync.WaitGroup
gp *gp.Pool
}

// NewWaitGroupPool returns WaitGroupPool
func NewWaitGroupPool(gp *gp.Pool) *WaitGroupPool {
var wg WaitGroupPool
wg.gp = gp
return &wg
}

// Run runs a function in a goroutine, adds 1 to WaitGroup
// and calls done when function returns. Please DO NOT use panic
// in the cb function.
func (w *WaitGroupPool) Run(exec func()) {
w.Add(1)
w.gp.Go(func() {
defer w.Done()
exec()
})
}
1 change: 1 addition & 0 deletions tools/tazel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ func skipShardCount(path string) bool {
(strings.HasPrefix(path, "pkg/util") &&
!strings.HasPrefix(path, "pkg/util/admin") &&
!strings.HasPrefix(path, "pkg/util/chunk") &&
!strings.HasPrefix(path, "pkg/util/topsql") &&
!strings.HasPrefix(path, "pkg/util/stmtsummary"))
}

0 comments on commit cbbf773

Please sign in to comment.