diff --git a/pkg/config/config.go b/pkg/config/config.go index 87d8ca379c1511..a376a4a9994b07 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -75,8 +75,8 @@ const ( DefTableColumnCountLimit = 1017 // DefMaxOfTableColumnCountLimit is maximum limitation of the number of columns in a table DefMaxOfTableColumnCountLimit = 4096 - // DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load - DefStatsLoadConcurrencyLimit = 1 + // DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load. When it is set to 0, it will be set by syncload.GetSyncLoadConcurrencyByCPU. + DefStatsLoadConcurrencyLimit = 0 // DefMaxOfStatsLoadConcurrencyLimit is maximum limitation of the concurrency of stats-load DefMaxOfStatsLoadConcurrencyLimit = 128 // DefStatsLoadQueueSizeLimit is limit of the size of stats-load request queue @@ -720,7 +720,7 @@ type Performance struct { PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` GOGC int `toml:"gogc" json:"gogc"` EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` - StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` + StatsLoadConcurrency int `toml:"stats-load-concurrency" json:"stats-load-concurrency"` StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"` @@ -1010,7 +1010,7 @@ var defaultConf = Config{ GOGC: 100, EnforceMPP: false, PlanReplayerGCLease: "10m", - StatsLoadConcurrency: 5, + StatsLoadConcurrency: 0, // 0 is auto mode. StatsLoadQueueSize: 1000, AnalyzePartitionConcurrencyQuota: 16, PlanReplayerDumpWorkerConcurrency: 1, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index f222a419ea5327..3821c12915b16e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -1290,7 +1290,7 @@ func TestConfigExample(t *testing.T) { func TestStatsLoadLimit(t *testing.T) { conf := NewConfig() checkConcurrencyValid := func(concurrency int, shouldBeValid bool) { - conf.Performance.StatsLoadConcurrency = uint(concurrency) + conf.Performance.StatsLoadConcurrency = concurrency require.Equal(t, shouldBeValid, conf.Valid() == nil) } checkConcurrencyValid(DefStatsLoadConcurrencyLimit, true) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index a5f51301f02d82..54fb27c0de2e27 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2274,6 +2274,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { do.wg.Add(1) go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } + logutil.BgLogger().Info("start load stats sub workers", zap.Int("worker count", len(ctxList))) } func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager { diff --git a/pkg/planner/core/casetest/planstats/plan_stats_test.go b/pkg/planner/core/casetest/planstats/plan_stats_test.go index 5f7c4f05d4dcb5..6adfe93de15c9a 100644 --- a/pkg/planner/core/casetest/planstats/plan_stats_test.go +++ b/pkg/planner/core/casetest/planstats/plan_stats_test.go @@ -234,7 +234,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) { p := parser.New() originConfig := config.GetGlobalConfig() newConfig := config.NewConfig() - newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel newConfig.Performance.StatsLoadQueueSize = 1 config.StoreGlobalConfig(newConfig) defer config.StoreGlobalConfig(originConfig) diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 9fc7d96cdea2c6..0742bc53dce39a 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -75,6 +75,7 @@ go_library( "//pkg/sessiontxn", "//pkg/sessiontxn/isolation", "//pkg/sessiontxn/staleread", + "//pkg/statistics/handle/syncload", "//pkg/statistics/handle/usage", "//pkg/statistics/handle/usage/indexusage", "//pkg/store/driver/error", diff --git a/pkg/session/session.go b/pkg/session/session.go index 6f9c66bd948286..e83d2139318d7d 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -88,6 +88,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" storeerr "github.com/pingcap/tidb/pkg/store/driver/error" @@ -3368,7 +3369,15 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto ) analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) - concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) + concurrency := config.GetGlobalConfig().Performance.StatsLoadConcurrency + if concurrency == 0 { + // if concurrency is 0, we will set the concurrency of sync load by CPU. + concurrency = syncload.GetSyncLoadConcurrencyByCPU() + } + if concurrency < 0 { // it is only for test, in the production, negative value is illegal. + concurrency = 0 + } + ses, err := createSessionsImpl(store, 10) if err != nil { return nil, err diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index b0bd43166f3ce9..31aacb878f3ac1 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -17,6 +17,7 @@ package syncload import ( "fmt" "math/rand" + "runtime" "time" "github.com/pingcap/errors" @@ -41,6 +42,19 @@ import ( // RetryCount is the max retry count for a sync load task. const RetryCount = 3 +// GetSyncLoadConcurrencyByCPU returns the concurrency of sync load by CPU. +func GetSyncLoadConcurrencyByCPU() int { + core := runtime.GOMAXPROCS(0) + if core <= 8 { + return 5 + } else if core <= 16 { + return 6 + } else if core <= 32 { + return 8 + } + return 10 +} + type statsSyncLoad struct { statsHandle statstypes.StatsHandle StatsLoad statstypes.StatsLoad diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 8a8929d9d93e59..e9cccf3f90f674 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -144,7 +144,7 @@ func TestConcurrentLoadHistTimeout(t *testing.T) { func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { originConfig := config.GetGlobalConfig() newConfig := config.NewConfig() - newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel config.StoreGlobalConfig(newConfig) defer config.StoreGlobalConfig(originConfig) store, dom := testkit.CreateMockStoreAndDomain(t) @@ -258,7 +258,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { func TestRetry(t *testing.T) { originConfig := config.GetGlobalConfig() newConfig := config.NewConfig() - newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel config.StoreGlobalConfig(newConfig) defer config.StoreGlobalConfig(originConfig) store, dom := testkit.CreateMockStoreAndDomain(t)