Skip to content

Commit

Permalink
store/tikv: Randomize selection of proxy store on network partition (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Mar 25, 2021
1 parent 919e50d commit f9adb7f
Showing 1 changed file with 46 additions and 15 deletions.
61 changes: 46 additions & 15 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -478,10 +479,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
if atomic.LoadInt32(&store.needForwarding) == 0 {
regionStore.unsetProxyStoreIfNeeded(cachedRegion)
} else {
proxyStore, proxyAccessIdx, proxyStoreIdx, err = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
if err != nil {
return nil, err
}
proxyStore, proxyAccessIdx, proxyStoreIdx = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
if proxyStore != nil {
proxyAddr, err = c.getStoreAddr(bo, cachedRegion, proxyStore, proxyStoreIdx)
if err != nil {
Expand Down Expand Up @@ -1231,28 +1229,46 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store,
}
}

func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int, err error) {
func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
if !c.enableForwarding || store.storeType != TiKV || atomic.LoadInt32(&store.needForwarding) == 0 {
return
}

if rs.proxyTiKVIdx >= 0 {
storeIdx, proxyStore := rs.accessStore(TiKVOnly, rs.proxyTiKVIdx)
return proxyStore, rs.proxyTiKVIdx, storeIdx, err
return proxyStore, rs.proxyTiKVIdx, storeIdx
}

tikvNum := rs.accessStoreNum(TiKVOnly)
for index := 0; index < tikvNum; index++ {
if tikvNum <= 1 {
return
}

// Randomly select an non-leader peer
first := rand.Intn(tikvNum - 1)
if first >= int(workStoreIdx) {
first = (first + 1) % tikvNum
}

// If the current selected peer is not reachable, switch to the next one, until a reachable peer is found or all
// peers are checked.
for i := 0; i < tikvNum; i++ {
index := (i + first) % tikvNum
// Skip work store which is the actual store to be accessed
if index == int(workStoreIdx) {
continue
}
storeIdx, store := rs.accessStore(TiKVOnly, AccessIndex(index))
// Skip unreachable stores.
if atomic.LoadInt32(&store.needForwarding) != 0 {
continue
}

rs.setProxyStoreIdx(region, AccessIndex(index))
return store, AccessIndex(index), storeIdx, nil
return store, AccessIndex(index), storeIdx
}

return nil, 0, 0, errors.New("the region leader is disconnected and no store is available ")
return nil, 0, 0
}

func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) {
Expand Down Expand Up @@ -1535,17 +1551,32 @@ func (r *RegionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex)
}

// switchNextProxyStore switches the index of the peer that will forward requests to the leader to the next peer.
// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and it will be moved to
// the first peer that can be the proxy.
// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and a random peer will
// be select in this case.
func (r *RegionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) {
if r.proxyTiKVIdx != currentProxyIdx {
return
}
nextIdx := (currentProxyIdx + 1) % AccessIndex(r.accessStoreNum(TiKVOnly))
// skips the current workTiKVIdx
if nextIdx == r.workTiKVIdx {
nextIdx = (nextIdx + 1) % AccessIndex(r.accessStoreNum(TiKVOnly))

tikvNum := r.accessStoreNum(TiKVOnly)
var nextIdx AccessIndex

// If the region is not using proxy before, randomly select a non-leader peer for the first try.
if currentProxyIdx == -1 {
// Randomly select an non-leader peer
// TODO: Skip unreachable peers here.
nextIdx = AccessIndex(rand.Intn(tikvNum - 1))
if nextIdx >= r.workTiKVIdx {
nextIdx++
}
} else {
nextIdx = (currentProxyIdx + 1) % AccessIndex(tikvNum)
// skips the current workTiKVIdx
if nextIdx == r.workTiKVIdx {
nextIdx = (nextIdx + 1) % AccessIndex(tikvNum)
}
}

newRegionStore := r.clone()
newRegionStore.proxyTiKVIdx = nextIdx
if incEpochStoreIdx >= 0 {
Expand Down

0 comments on commit f9adb7f

Please sign in to comment.