Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: set different priorities for write-leader and write-peer #3937

Merged
merged 9 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,19 +481,13 @@ func (bs *balanceSolver) init() {
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}

priorities := bs.sche.conf.ReadPriorities
if bs.rwTy == write {
priorities = bs.sche.conf.WritePriorities
}
bs.firstPriority = stringToDim(priorities[0])
bs.secondPriority = stringToDim(priorities[1])

if bs.rwTy == write {
bs.writeLeaderFirstPriority = statistics.KeyDim
if bs.firstPriority == statistics.QueryDim {
bs.writeLeaderFirstPriority = statistics.QueryDim
}
bs.writeLeaderSecondPriority = statistics.ByteDim
// For read, transfer-leader and move-peer have the same priority config
// For write, they are different
if bs.rwTy == read {
bs.firstPriority, bs.secondPriority = prioritiesTodim(bs.sche.conf.ReadPriorities)
} else {
bs.firstPriority, bs.secondPriority = prioritiesTodim(bs.sche.conf.WritePeerPriorities)
bs.writeLeaderFirstPriority, bs.writeLeaderSecondPriority = prioritiesTodim(bs.sche.conf.WriteLeaderPriorities)
}

bs.isSelectedDim = func(dim int) bool {
Expand Down Expand Up @@ -1297,3 +1291,7 @@ func dimToString(dim int) string {
return ""
}
}

func prioritiesTodim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}
6 changes: 4 additions & 2 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
ReadPriorities: []string{QueryPriority, BytePriority},
WritePriorities: []string{BytePriority, KeyPriority},
WriteLeaderPriorities: []string{KeyPriority, BytePriority},
WritePeerPriorities: []string{BytePriority, KeyPriority},
StrictPickingStore: true,
}
}
Expand All @@ -82,7 +83,8 @@ type hotRegionSchedulerConfig struct {
SrcToleranceRatio float64 `json:"src-tolerance-ratio"`
DstToleranceRatio float64 `json:"dst-tolerance-ratio"`
ReadPriorities []string `json:"read-priorities"`
WritePriorities []string `json:"write-priorities"`
WriteLeaderPriorities []string `json:"write-leader-priorities"`
WritePeerPriorities []string `json:"write-peer-priorities"`
StrictPickingStore bool `json:"strict-picking-store,string"`
}

Expand Down
47 changes: 43 additions & 4 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,12 +1478,12 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
{1, []uint64{1, 2, 3}, 2 * MB, 1 * MB},
{6, []uint64{4, 2, 3}, 1 * MB, 2 * MB},
})
hb.(*hotScheduler).conf.WritePriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
ops := hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5)
hb.(*hotScheduler).clearPendingInfluence()
hb.(*hotScheduler).conf.WritePriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
Expand Down Expand Up @@ -1519,7 +1519,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5)
Expand All @@ -1530,9 +1530,48 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 1*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
hb.(*hotScheduler).clearPendingInfluence()
}
func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)

tc := mockcluster.NewCluster(ctx, opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
tc.AddRegionStore(3, 20)
tc.UpdateStorageWrittenStats(1, 31*MB*statistics.StoreHeartBeatReportInterval, 31*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 10*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 1*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)

addRegionInfo(tc, write, []testRegionInfo{
{1, []uint64{1, 2, 3}, 10 * MB, 10 * MB},
{2, []uint64{1, 2, 3}, 10 * MB, 10 * MB},
{3, []uint64{1, 2, 3}, 10 * MB, 10 * MB},
{4, []uint64{2, 1, 3}, 10 * MB, 0 * MB},
{5, []uint64{3, 2, 1}, 0 * MB, 10 * MB},
})
old := schedulePeerPr
schedulePeerPr = 0.0
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{KeyPriority, BytePriority}
ops := hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{BytePriority, KeyPriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 3)
schedulePeerPr = old
}
12 changes: 11 additions & 1 deletion tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
"src-tolerance-ratio": 1.05,
"dst-tolerance-ratio": 1.05,
"read-priorities": []interface{}{"qps", "byte"},
"write-priorities": []interface{}{"byte", "key"},
"write-leader-priorities": []interface{}{"key", "byte"},
"write-peer-priorities": []interface{}{"byte", "key"},
"strict-picking-store": "true",
}
c.Assert(conf, DeepEquals, expected1)
Expand Down Expand Up @@ -309,6 +310,15 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)

// write-priorities is divided into write-leader-priorities and write-peer-priorities
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)
// cannot set qps as write-peer-priorities
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "qps,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)

// test show scheduler with paused and disabled status.
checkSchedulerWithStatusCommand := func(args []string, status string, expected []string) {
if args != nil {
Expand Down
6 changes: 5 additions & 1 deletion tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar
if err != nil {
val = value
}
if schedulerName == "balance-hot-region-scheduler" && (key == "read-priorities" || key == "write-priorities") {
if schedulerName == "balance-hot-region-scheduler" && (key == "read-priorities" || key == "write-leader-priorities" || key == "write-peer-priorities") {
priorities := make([]string, 0)
prioritiesMap := make(map[string]struct{})
for _, priority := range strings.Split(value, ",") {
Expand All @@ -539,6 +539,10 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar
schedulers.KeyPriority))
return
}
if priority == schedulers.QueryPriority && key == "write-peer-priorities" {
cmd.Println("qps is not allowed to be set in priorities for write-peer-priorities")
return
}
priorities = append(priorities, priority)
prioritiesMap[priority] = struct{}{}
}
Expand Down