Skip to content

Commit

Permalink
statistics: print the process of init stats (#53560)
Browse files Browse the repository at this point in the history
close #53564
  • Loading branch information
hawkingrei authored May 30, 2024
1 parent 04c66ee commit 5e5326b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 12 deletions.
20 changes: 14 additions & 6 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -352,9 +353,9 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
ls := initstats.NewRangeWorker("histogram", func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
})
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
Expand Down Expand Up @@ -462,9 +463,9 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
ls := initstats.NewRangeWorker("TopN", func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
})
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
Expand Down Expand Up @@ -665,9 +666,9 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
ls := initstats.NewRangeWorker("bucket", func(task initstats.Task) error {
return h.initStatsBucketsByPaging(cache, task)
})
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
Expand Down Expand Up @@ -700,10 +701,12 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
statslogutil.StatsLogger().Info("complete to load the meta in the lite mode")
err = h.initStatsHistogramsLite(is, cache)
if err != nil {
return errors.Trace(err)
}
statslogutil.StatsLogger().Info("complete to load the histogram in the lite mode")
h.Replace(cache)
return nil
}
Expand All @@ -728,11 +731,13 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
statslogutil.StatsLogger().Info("complete to load the meta")
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
} else {
err = h.initStatsHistograms(is, cache)
}
statslogutil.StatsLogger().Info("complete to load the histogram")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -741,6 +746,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
} else {
err = h.initStatsTopN(cache)
}
statslogutil.StatsLogger().Info("complete to load the topn")
if err != nil {
return err
}
Expand All @@ -749,8 +755,10 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
statslogutil.StatsLogger().Info("complete to load the FM Sketch")
}
err = h.initStatsBuckets(cache)
statslogutil.StatsLogger().Info("complete to load the bucket")
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/statistics/handle/logutil",
"//pkg/util",
"//pkg/util/logutil",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
60 changes: 54 additions & 6 deletions pkg/statistics/handle/initstats/load_stats_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,44 @@
package initstats

import (
"fmt"
"sync"
"sync/atomic"
"time"

statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
initSamplerLoggerOnce sync.Once
samplerLogger *zap.Logger
)

// SingletonStatsSamplerLogger with category "stats" is used to log statistic related messages.
// It is used to sample the log to avoid too many logs.
// NOTE: Do not create a new logger for each log, it will cause the sampler not work.
// Because we need to record the log count with the same level and message in this specific logger.
// Do not use it to log the message that is not related to statistics.
func singletonStatsSamplerLogger() *zap.Logger {
init := func() {
if samplerLogger == nil {
// Create a new zapcore sampler with options
// This will log the first log entries with the same level and message in 1 minutes and ignore the rest of the logs.
sampler := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(core, time.Minute, 1, 0)
})
samplerLogger = statslogutil.StatsLogger().WithOptions(sampler)
}
}

initSamplerLoggerOnce.Do(init)
return samplerLogger
}

// Task represents the range of the table for loading stats.
type Task struct {
StartTid int64
Expand All @@ -28,18 +61,29 @@ type Task struct {

// RangeWorker is used to load stats concurrently by the range of table id.
type RangeWorker struct {
dealFunc func(task Task) error
taskChan chan Task

wg util.WaitGroupWrapper
dealFunc func(task Task) error
taskChan chan Task
logger *zap.Logger
taskName string
wg util.WaitGroupWrapper
taskCnt uint64
completeTaskCnt atomic.Uint64
}

// NewRangeWorker creates a new RangeWorker.
func NewRangeWorker(dealFunc func(task Task) error) *RangeWorker {
return &RangeWorker{
func NewRangeWorker(taskName string, dealFunc func(task Task) error, maxTid, initStatsStep uint64) *RangeWorker {
taskCnt := uint64(1)
if maxTid > initStatsStep*2 {
taskCnt = maxTid / initStatsStep
}
worker := &RangeWorker{
taskName: taskName,
dealFunc: dealFunc,
taskChan: make(chan Task, 1),
taskCnt: taskCnt,
}
worker.logger = singletonStatsSamplerLogger()
return worker
}

// LoadStats loads stats concurrently when to init stats
Expand All @@ -57,6 +101,10 @@ func (ls *RangeWorker) loadStats() {
if err := ls.dealFunc(task); err != nil {
logutil.BgLogger().Error("load stats failed", zap.Error(err))
}
if ls.logger != nil {
completeTaskCnt := ls.completeTaskCnt.Add(1)
ls.logger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt))
}
}
}

Expand Down

0 comments on commit 5e5326b

Please sign in to comment.