Skip to content

Commit

Permalink
statistics: support adaptive sync load concurrency (pingcap#53142)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored and RidRisR committed May 23, 2024
1 parent fa21bed commit 484258f
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 9 deletions.
8 changes: 4 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ const (
DefTableColumnCountLimit = 1017
// DefMaxOfTableColumnCountLimit is maximum limitation of the number of columns in a table
DefMaxOfTableColumnCountLimit = 4096
// DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load
DefStatsLoadConcurrencyLimit = 1
// DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load. When it is set to 0, it will be set by syncload.GetSyncLoadConcurrencyByCPU.
DefStatsLoadConcurrencyLimit = 0
// DefMaxOfStatsLoadConcurrencyLimit is maximum limitation of the concurrency of stats-load
DefMaxOfStatsLoadConcurrencyLimit = 128
// DefStatsLoadQueueSizeLimit is limit of the size of stats-load request queue
Expand Down Expand Up @@ -720,7 +720,7 @@ type Performance struct {
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadConcurrency int `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
Expand Down Expand Up @@ -1010,7 +1010,7 @@ var defaultConf = Config{
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadConcurrency: 0, // 0 is auto mode.
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
PlanReplayerDumpWorkerConcurrency: 1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func TestConfigExample(t *testing.T) {
func TestStatsLoadLimit(t *testing.T) {
conf := NewConfig()
checkConcurrencyValid := func(concurrency int, shouldBeValid bool) {
conf.Performance.StatsLoadConcurrency = uint(concurrency)
conf.Performance.StatsLoadConcurrency = concurrency
require.Equal(t, shouldBeValid, conf.Valid() == nil)
}
checkConcurrencyValid(DefStatsLoadConcurrencyLimit, true)
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
do.wg.Add(1)
go statsHandle.SubLoadWorker(ctx, do.exit, do.wg)
}
logutil.BgLogger().Info("start load stats sub workers", zap.Int("worker count", len(ctxList)))
}

func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/planstats/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
p := parser.New()
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel
newConfig.Performance.StatsLoadQueueSize = 1
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"//pkg/sessiontxn",
"//pkg/sessiontxn/isolation",
"//pkg/sessiontxn/staleread",
"//pkg/statistics/handle/syncload",
"//pkg/statistics/handle/usage",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/store/driver/error",
Expand Down
11 changes: 10 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics/handle/syncload"
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
storeerr "github.com/pingcap/tidb/pkg/store/driver/error"
Expand Down Expand Up @@ -3368,7 +3369,15 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto
)

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
concurrency := config.GetGlobalConfig().Performance.StatsLoadConcurrency
if concurrency == 0 {
// if concurrency is 0, we will set the concurrency of sync load by CPU.
concurrency = syncload.GetSyncLoadConcurrencyByCPU()
}
if concurrency < 0 { // it is only for test, in the production, negative value is illegal.
concurrency = 0
}

ses, err := createSessionsImpl(store, 10)
if err != nil {
return nil, err
Expand Down
14 changes: 14 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package syncload
import (
"fmt"
"math/rand"
"runtime"
"time"

"github.com/pingcap/errors"
Expand All @@ -41,6 +42,19 @@ import (
// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

// GetSyncLoadConcurrencyByCPU returns the concurrency of sync load by CPU.
func GetSyncLoadConcurrencyByCPU() int {
core := runtime.GOMAXPROCS(0)
if core <= 8 {
return 5
} else if core <= 16 {
return 6
} else if core <= 32 {
return 8
}
return 10
}

type statsSyncLoad struct {
statsHandle statstypes.StatsHandle
StatsLoad statstypes.StatsLoad
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestConcurrentLoadHistTimeout(t *testing.T) {
func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)
Expand Down

0 comments on commit 484258f

Please sign in to comment.