diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index dc386af7431..896e3606d7c 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/pkg/mock/mockoption" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/pd/server/statistics" "go.uber.org/zap" ) @@ -33,6 +34,7 @@ type Cluster struct { *core.BasicCluster *mockid.IDAllocator *mockoption.ScheduleOptions + *placement.RuleManager *statistics.HotCache *statistics.StoresStats ID uint64 @@ -113,6 +115,11 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { return peer, nil } +// FitRegion fits a region to the rules it matches. +func (mc *Cluster) FitRegion(region *core.RegionInfo) *placement.RegionFit { + return mc.RuleManager.FitRegion(mc.BasicCluster, region) +} + // SetStoreUp sets store state to be up. func (mc *Cluster) SetStoreUp(storeID uint64) { store := mc.GetStore(storeID) diff --git a/server/cluster.go b/server/cluster.go index 1055fa28e6f..0a52ee99f28 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -35,6 +35,7 @@ import ( syncer "github.com/pingcap/pd/server/region_syncer" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/schedule/checker" + "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" @@ -83,6 +84,8 @@ type RaftCluster struct { wg sync.WaitGroup quit chan struct{} regionSyncer *syncer.RegionSyncer + + ruleManager *placement.RuleManager } // ClusterStatus saves some state information @@ -1434,6 +1437,18 @@ func (c *RaftCluster) putRegion(region *core.RegionInfo) error { return nil } +// GetRuleManager returns the rule manager reference. +func (c *RaftCluster) GetRuleManager() *placement.RuleManager { + c.RLock() + defer c.RUnlock() + return c.ruleManager +} + +// FitRegion tries to fit the region with placement rules. +func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit { + return c.GetRuleManager().FitRegion(c, region) +} + type prepareChecker struct { reactiveRegions map[uint64]int start time.Time diff --git a/server/schedule/opt/opts.go b/server/schedule/opt/opts.go index e8513acb9a6..b27777acb78 100644 --- a/server/schedule/opt/opts.go +++ b/server/schedule/opt/opts.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/pd/server/statistics" ) @@ -82,4 +83,6 @@ type Cluster interface { // TODO: it should be removed. Schedulers don't need to know anything // about peers. AllocPeer(storeID uint64) (*metapb.Peer, error) + + FitRegion(*core.RegionInfo) *placement.RegionFit } diff --git a/server/schedule/placement/fit.go b/server/schedule/placement/fit.go new file mode 100644 index 00000000000..ed6f9677dbe --- /dev/null +++ b/server/schedule/placement/fit.go @@ -0,0 +1,114 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" +) + +// RegionFit is the result of fitting a region's peers to rule list. +// All peers are divided into corresponding rules according to the matching +// rules, and the remaining Peers are placed in the OrphanPeers list. +type RegionFit struct { + RuleFits []*RuleFit + OrphanPeers []*metapb.Peer +} + +// IsSatisfied returns if the rules are properly satisfied. +// It means all Rules are fulfilled and there is no orphan peers. +func (f *RegionFit) IsSatisfied() bool { + if len(f.RuleFits) == 0 { + return false + } + for _, r := range f.RuleFits { + if !r.IsSatisfied() { + return false + } + } + return len(f.OrphanPeers) == 0 +} + +// GetRuleFit returns the RuleFit that contains the peer. +func (f *RegionFit) GetRuleFit(peerID uint64) *RuleFit { + for _, rf := range f.RuleFits { + for _, p := range rf.Peers { + if p.GetId() == peerID { + return rf + } + } + } + return nil +} + +// CompareRegionFit determines the superiority of 2 fits. +// It returns 1 when the first fit result is better. +func CompareRegionFit(a, b *RegionFit) int { + for i := range a.RuleFits { + if cmp := compareRuleFit(a.RuleFits[i], b.RuleFits[i]); cmp != 0 { + return cmp + } + } + switch { + case len(a.OrphanPeers) < len(b.OrphanPeers): + return 1 + case len(a.OrphanPeers) > len(b.OrphanPeers): + return -1 + default: + return 0 + } +} + +// RuleFit is the result of fitting status of a Rule. +type RuleFit struct { + Rule *Rule + // Peers of the Region that are divided to this Rule. + Peers []*metapb.Peer + // PeersWithDifferentRole is subset of `Peers`. It contains all Peers that have + // different Role from configuration (the Role can be migrated to target role + // by scheduling). + PeersWithDifferentRole []*metapb.Peer + // IsolationLevel indicates at which level of labeling these Peers are + // isolated. A larger value indicates a higher isolation level. + IsolationLevel int +} + +// IsSatisfied returns if the rule is properly satisfied. +func (f *RuleFit) IsSatisfied() bool { + return len(f.Peers) == f.Rule.Count && len(f.PeersWithDifferentRole) == 0 +} + +func compareRuleFit(a, b *RuleFit) int { + switch { + case len(a.Peers) < len(b.Peers): + return -1 + case len(a.Peers) > len(b.Peers): + return 1 + case len(a.PeersWithDifferentRole) > len(b.PeersWithDifferentRole): + return -1 + case len(a.PeersWithDifferentRole) < len(b.PeersWithDifferentRole): + return 1 + case a.IsolationLevel < b.IsolationLevel: + return -1 + case a.IsolationLevel > b.IsolationLevel: + return 1 + default: + return 0 + } +} + +// FitRegion tries to fit peers of a region to the rules. +func FitRegion(stores core.StoreSetInformer, region *core.RegionInfo, rules []*Rule) *RegionFit { + return nil +} diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go new file mode 100644 index 00000000000..b89bd397667 --- /dev/null +++ b/server/schedule/placement/rule_manager.go @@ -0,0 +1,280 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "bytes" + "encoding/hex" + "sort" + "sync" + + "github.com/pingcap/pd/server/core" + "github.com/pkg/errors" +) + +// RuleManager is responsible for the lifecycle of all placement Rules. +// It is threadsafe. +type RuleManager struct { + store *core.Storage + sync.RWMutex + rules map[[2]string]*Rule + splitKeys [][]byte +} + +// NewRuleManager creates a RuleManager instance. +func NewRuleManager(store *core.Storage, maxReplica int, locationLabels []string) (*RuleManager, error) { + m := &RuleManager{ + store: store, + rules: make(map[[2]string]*Rule), + } + if err := m.initialize(maxReplica, locationLabels); err != nil { + return nil, err + } + return m, nil +} + +func (m *RuleManager) initialize(maxReplica int, locationLabels []string) error { + var rules []*Rule + ok, err := m.store.LoadRules(&rules) + if err != nil { + return err + } + if !ok { + rules = append(rules, &Rule{ + GroupID: "pd", + ID: "default", + Role: Voter, + Count: maxReplica, + LocationLabels: locationLabels, + }) + } + for _, r := range rules { + if err = m.adjustRule(r); err != nil { + return err + } + m.rules[r.Key()] = r + } + m.updateSplitKeys() + return nil +} + +// check and adjust rule from client or storage. +func (m *RuleManager) adjustRule(r *Rule) error { + var err error + r.StartKey, err = hex.DecodeString(r.StartKeyHex) + if err != nil { + return errors.Wrap(err, "start key is not hex format") + } + r.EndKey, err = hex.DecodeString(r.EndKeyHex) + if err != nil { + return errors.Wrap(err, "end key is not hex format") + } + if len(r.EndKey) > 0 && bytes.Compare(r.EndKey, r.StartKey) <= 0 { + return errors.New("endKey should be greater than startKey") + } + if r.GroupID == "" { + return errors.New("group ID should not be empty") + } + if r.ID == "" { + return errors.New("ID should not be empty") + } + if !validateRole(r.Role) { + return errors.Errorf("invalid role %s", r.Role) + } + if r.Count <= 0 { + return errors.Errorf("invalid count %v", r.Count) + } + for _, c := range r.LabelConstraints { + if !validateOp(c.Op) { + return errors.Errorf("invalid op %s", c.Op) + } + } + return nil +} + +func (m *RuleManager) saveRules() error { + rules := make([]*Rule, 0, len(m.rules)) + for _, r := range m.rules { + rules = append(rules, r) + } + return m.store.SaveRules(rules) +} + +// GetRule returns the Rule with the same (group, id). +func (m *RuleManager) GetRule(group, id string) *Rule { + m.RLock() + defer m.RUnlock() + return m.rules[[2]string{group, id}] +} + +// SetRule inserts or updates a Rule. +func (m *RuleManager) SetRule(rule *Rule) error { + err := m.adjustRule(rule) + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + old := m.rules[rule.Key()] + m.rules[rule.Key()] = rule + + if err = m.saveRules(); err != nil { + // restore + if old == nil { + delete(m.rules, rule.Key()) + } else { + m.rules[old.Key()] = old + } + return err + } + m.updateSplitKeys() + return nil +} + +// DeleteRule removes a Rule. +func (m *RuleManager) DeleteRule(group, id string) error { + m.Lock() + defer m.Unlock() + key := [2]string{group, id} + old, ok := m.rules[key] + if !ok { + return nil + } + delete(m.rules, [2]string{group, id}) + if err := m.saveRules(); err != nil { + // restore + m.rules[key] = old + return err + } + m.updateSplitKeys() + return nil +} + +func (m *RuleManager) updateSplitKeys() { + keys := m.splitKeys[:0] + m.splitKeys = m.splitKeys[:0] + for _, r := range m.rules { + if len(r.StartKey) > 0 { + keys = append(keys, r.StartKey) + } + if len(r.EndKey) > 0 { + keys = append(keys, r.EndKey) + } + } + sort.Slice(keys, func(i, j int) bool { return bytes.Compare(keys[i], keys[j]) < 0 }) + for _, k := range keys { + if len(m.splitKeys) == 0 || !bytes.Equal(m.splitKeys[len(m.splitKeys)-1], k) { + m.splitKeys = append(m.splitKeys, k) + } + } +} + +// GetSplitKeys returns all split keys in the range (start, end). +func (m *RuleManager) GetSplitKeys(start, end []byte) [][]byte { + m.RLock() + defer m.RUnlock() + var keys [][]byte + i := sort.Search(len(m.splitKeys), func(i int) bool { + return bytes.Compare(m.splitKeys[i], start) > 0 + }) + for ; i < len(m.splitKeys) && (len(end) == 0 || bytes.Compare(m.splitKeys[i], end) < 0); i++ { + keys = append(keys, m.splitKeys[i]) + } + return keys +} + +// GetAllRules returns sorted all rules. +func (m *RuleManager) GetAllRules() []*Rule { + m.RLock() + defer m.RUnlock() + var rules []*Rule + for _, r := range m.rules { + rules = append(rules, r) + } + sortRules(rules) + return rules +} + +// GetRulesByGroup returns sorted rules of a group. +func (m *RuleManager) GetRulesByGroup(group string) []*Rule { + m.RLock() + defer m.RUnlock() + var rules []*Rule + for _, r := range m.rules { + if r.GroupID == group { + rules = append(rules, r) + } + } + sortRules(rules) + return rules +} + +// GetRulesByKey returns sorted rules that affects a key. +func (m *RuleManager) GetRulesByKey(key []byte) []*Rule { + m.RLock() + defer m.RUnlock() + var rules []*Rule + for _, r := range m.rules { + // rule.start <= key < rule.end + if bytes.Compare(r.StartKey, key) <= 0 && + (len(r.EndKey) == 0 || bytes.Compare(key, r.EndKey) < 0) { + rules = append(rules, r) + } + } + sortRules(rules) + return rules +} + +// GetRulesForApplyRegion returns the rules list that should be applied to a region. +func (m *RuleManager) GetRulesForApplyRegion(region *core.RegionInfo) []*Rule { + m.RLock() + defer m.RUnlock() + var rules []*Rule + for _, r := range m.rules { + // no intersection + // |<-- rule -->| + // |<-- region -->| + // or + // |<-- rule -->| + // |<-- region -->| + if (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), r.StartKey) <= 0) || + len(r.EndKey) > 0 && bytes.Compare(region.GetStartKey(), r.EndKey) >= 0 { + continue + } + // in range + // |<----- rule ----->| + // |<-- region -->| + if bytes.Compare(region.GetStartKey(), r.StartKey) >= 0 && (len(r.EndKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), r.EndKey) <= 0)) { + rules = append(rules, r) + continue + } + // region spans multiple rule ranges. + // |<-- rule -->| + // |<-- region -->| + // or + // |<-- rule -->| + // |<-- region -->| + // or + // |<-- rule -->| + // |<--- region --->| + return nil // It will considered abnormal when a region is not covered by any rule. Then Rule checker will try to split the region. + } + return prepareRulesForApply(rules) +} + +// FitRegion fits a region to the rules it matches. +func (m *RuleManager) FitRegion(stores core.StoreSetInformer, region *core.RegionInfo) *RegionFit { + rules := m.GetRulesForApplyRegion(region) + return FitRegion(stores, region, rules) +} diff --git a/server/schedule/placement/rule_manager_test.go b/server/schedule/placement/rule_manager_test.go new file mode 100644 index 00000000000..2c3dfc627ea --- /dev/null +++ b/server/schedule/placement/rule_manager_test.go @@ -0,0 +1,166 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "encoding/hex" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" +) + +var _ = Suite(&testManagerSuite{}) + +type testManagerSuite struct { + store *core.Storage + manager *RuleManager +} + +func (s *testManagerSuite) SetUpTest(c *C) { + s.store = core.NewStorage(kv.NewMemoryKV()) + var err error + s.manager, err = NewRuleManager(s.store, 3, []string{"zone", "rack", "host"}) + c.Assert(err, IsNil) +} + +func (s *testManagerSuite) TestDefault(c *C) { + rules := s.manager.GetAllRules() + c.Assert(rules, HasLen, 1) + c.Assert(rules[0].GroupID, Equals, "pd") + c.Assert(rules[0].ID, Equals, "default") + c.Assert(rules[0].Index, Equals, 0) + c.Assert(rules[0].StartKey, HasLen, 0) + c.Assert(rules[0].EndKey, HasLen, 0) + c.Assert(rules[0].Role, Equals, Voter) + c.Assert(rules[0].LocationLabels, DeepEquals, []string{"zone", "rack", "host"}) +} + +func (s *testManagerSuite) TestAdjustRule(c *C) { + rules := []Rule{ + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123ab", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "1123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123aaa", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "master", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 0}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: -1}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3, LabelConstraints: []LabelConstraint{{Op: "foo"}}}, + } + c.Assert(s.manager.adjustRule(&rules[0]), IsNil) + c.Assert(rules[0].StartKey, DeepEquals, []byte{0x12, 0x3a, 0xbc}) + c.Assert(rules[0].EndKey, DeepEquals, []byte{0x12, 0x3a, 0xbf}) + for i := 1; i < len(rules); i++ { + c.Assert(s.manager.adjustRule(&rules[i]), NotNil) + } +} + +func (s *testManagerSuite) TestSaveLoad(c *C) { + rules := []*Rule{ + {GroupID: "pd", ID: "default", Role: "voter", Count: 5}, + {GroupID: "foo", ID: "bar", StartKeyHex: "", EndKeyHex: "abcd", Role: "learner", Count: 1}, + } + for _, r := range rules { + s.manager.SetRule(r) + } + m2, err := NewRuleManager(s.store, 3, []string{"no", "labels"}) + c.Assert(err, IsNil) + c.Assert(m2.GetAllRules(), HasLen, 2) + c.Assert(m2.GetRule("pd", "default"), DeepEquals, rules[0]) + c.Assert(m2.GetRule("foo", "bar"), DeepEquals, rules[1]) +} + +func (s *testManagerSuite) TestKeys(c *C) { + s.manager.DeleteRule("pd", "default") + rules := []*Rule{ + {GroupID: "1", ID: "1", Role: "voter", Count: 1, StartKeyHex: "", EndKeyHex: ""}, + {GroupID: "2", ID: "2", Role: "voter", Count: 1, StartKeyHex: "11", EndKeyHex: "ff"}, + {GroupID: "2", ID: "3", Role: "voter", Count: 1, StartKeyHex: "22", EndKeyHex: "dd"}, + {GroupID: "3", ID: "4", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "ee"}, + {GroupID: "3", ID: "5", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "dd"}, + } + for _, r := range rules { + s.manager.SetRule(r) + } + + splitKeys := [][]string{ + {"", "", "11", "22", "44", "dd", "ee", "ff"}, + {"44", "", "dd", "ee", "ff"}, + {"44", "dd"}, + {"22", "ef", "44", "dd", "ee"}, + } + for _, keys := range splitKeys { + splits := s.manager.GetSplitKeys(s.dhex(keys[0]), s.dhex(keys[1])) + c.Assert(splits, HasLen, len(keys)-2) + for i := range splits { + c.Assert(splits[i], DeepEquals, s.dhex(keys[i+2])) + } + } + + regionKeys := [][][2]string{ + {{"", ""}}, + {{"aa", "bb"}, {"", ""}, {"11", "ff"}, {"22", "dd"}, {"44", "ee"}, {"44", "dd"}}, + {{"11", "22"}, {"", ""}, {"11", "ff"}}, + {{"11", "33"}}, + } + for _, keys := range regionKeys { + region := core.NewRegionInfo(&metapb.Region{StartKey: s.dhex(keys[0][0]), EndKey: s.dhex(keys[0][1])}, nil) + rules := s.manager.GetRulesForApplyRegion(region) + c.Assert(rules, HasLen, len(keys)-1) + for i := range rules { + c.Assert(rules[i].StartKeyHex, Equals, keys[i+1][0]) + c.Assert(rules[i].EndKeyHex, Equals, keys[i+1][1]) + } + } + + ruleByKeys := [][]string{ // first is query key, rests are rule keys. + {"", "", ""}, + {"11", "", "", "11", "ff"}, + {"33", "", "", "11", "ff", "22", "dd"}, + } + for _, keys := range ruleByKeys { + rules := s.manager.GetRulesByKey(s.dhex(keys[0])) + c.Assert(rules, HasLen, (len(keys)-1)/2) + for i := range rules { + c.Assert(rules[i].StartKeyHex, Equals, keys[i*2+1]) + c.Assert(rules[i].EndKeyHex, Equals, keys[i*2+2]) + } + } + + rulesByGroup := [][]string{ // first is group, rests are rule keys. + {"1", "", ""}, + {"2", "11", "ff", "22", "dd"}, + {"3", "44", "ee", "44", "dd"}, + {"4"}, + } + for _, keys := range rulesByGroup { + rules := s.manager.GetRulesByGroup(keys[0]) + c.Assert(rules, HasLen, (len(keys)-1)/2) + for i := range rules { + c.Assert(rules[i].StartKeyHex, Equals, keys[i*2+1]) + c.Assert(rules[i].EndKeyHex, Equals, keys[i*2+2]) + } + } +} + +func (s *testManagerSuite) dhex(hk string) []byte { + k, err := hex.DecodeString(hk) + if err != nil { + panic("decode fail") + } + return k +}