Skip to content

Commit

Permalink
copr: Fix bug that mpp node availability detect does not work in some…
Browse files Browse the repository at this point in the history
… corner cases (pingcap#28201)
  • Loading branch information
windtalker committed Oct 12, 2021
1 parent 797bddd commit 66aa22a
Showing 1 changed file with 52 additions and 52 deletions.
104 changes: 52 additions & 52 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool) []*batchCopTask {
if len(originalTasks) <= 1 {
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
}
cache := kvStore.GetRegionCache()
Expand Down Expand Up @@ -212,16 +213,28 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if totalRemainingRegionNum > 0 {
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
Expand All @@ -231,57 +244,44 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
return store
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
var key string
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
Expand Down

0 comments on commit 66aa22a

Please sign in to comment.