diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index 43111039ce1ce..eceea5c9ca792 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -351,9 +351,13 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error { var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { + taskCnt := uint64(1) + if maxTid > initStatsStep*2 { + taskCnt = uint64(maxTid / initStatsStep) + } + ls := initstats.NewRangeWorker("histogram", func(task initstats.Task) error { return h.initStatsHistogramsByPaging(is, cache, task) - }) + }, taskCnt) ls.LoadStats() for tid <= maxTid { ls.SendTask(initstats.Task{ @@ -461,9 +465,13 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error { var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { + taskCnt := uint64(1) + if maxTid > initStatsStep*2 { + taskCnt = uint64(maxTid / initStatsStep) + } + ls := initstats.NewRangeWorker("TopN", func(task initstats.Task) error { return h.initStatsTopNByPaging(cache, task) - }) + }, taskCnt) ls.LoadStats() for tid <= maxTid { ls.SendTask(initstats.Task{ @@ -664,9 +672,13 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error { var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker(func(task initstats.Task) error { + taskCnt := uint64(1) + if maxTid > initStatsStep*2 { + taskCnt = uint64(maxTid / initStatsStep) + } + ls := initstats.NewRangeWorker("bucket", func(task initstats.Task) error { return h.initStatsBucketsByPaging(cache, task) - }) + }, taskCnt) ls.LoadStats() for tid <= maxTid { ls.SendTask(initstats.Task{ diff --git a/pkg/statistics/handle/initstats/load_stats_page.go b/pkg/statistics/handle/initstats/load_stats_page.go index 31694f293e75a..99edd9a429bc0 100644 --- a/pkg/statistics/handle/initstats/load_stats_page.go +++ b/pkg/statistics/handle/initstats/load_stats_page.go @@ -15,6 +15,10 @@ package initstats import ( + "fmt" + "sync/atomic" + + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -28,18 +32,31 @@ type Task struct { // RangeWorker is used to load stats concurrently by the range of table id. type RangeWorker struct { - dealFunc func(task Task) error - taskChan chan Task - - wg util.WaitGroupWrapper + dealFunc func(task Task) error + taskChan chan Task + logger *zap.Logger + wg util.WaitGroupWrapper + taskCnt uint64 + completeTaskCnt atomic.Uint64 + taskName string } // NewRangeWorker creates a new RangeWorker. -func NewRangeWorker(dealFunc func(task Task) error) *RangeWorker { - return &RangeWorker{ +func NewRangeWorker(taskName string, dealFunc func(task Task) error, taskCnt uint64) *RangeWorker { + worker := &RangeWorker{ + taskName: taskName, dealFunc: dealFunc, taskChan: make(chan Task, 1), + taskCnt: taskCnt, } + if taskCnt == 1 { + worker.logger = nil + } else if taskCnt < 100 { + worker.logger = statslogutil.StatsLogger() + } else { + worker.logger = statslogutil.SingletonStatsSamplerLogger() + } + return worker } // LoadStats loads stats concurrently when to init stats @@ -57,6 +74,10 @@ func (ls *RangeWorker) loadStats() { if err := ls.dealFunc(task); err != nil { logutil.BgLogger().Error("load stats failed", zap.Error(err)) } + if ls.logger != nil { + completeTaskCnt := ls.completeTaskCnt.Add(1) + ls.logger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt)) + } } }