diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 57c1ee57f2d77..4429e35cd102a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "sync" "sync/atomic" "time" @@ -478,10 +479,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe if atomic.LoadInt32(&store.needForwarding) == 0 { regionStore.unsetProxyStoreIfNeeded(cachedRegion) } else { - proxyStore, proxyAccessIdx, proxyStoreIdx, err = c.getProxyStore(cachedRegion, store, regionStore, accessIdx) - if err != nil { - return nil, err - } + proxyStore, proxyAccessIdx, proxyStoreIdx = c.getProxyStore(cachedRegion, store, regionStore, accessIdx) if proxyStore != nil { proxyAddr, err = c.getStoreAddr(bo, cachedRegion, proxyStore, proxyStoreIdx) if err != nil { @@ -1231,28 +1229,46 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, } } -func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int, err error) { +func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) { if !c.enableForwarding || store.storeType != TiKV || atomic.LoadInt32(&store.needForwarding) == 0 { return } if rs.proxyTiKVIdx >= 0 { storeIdx, proxyStore := rs.accessStore(TiKVOnly, rs.proxyTiKVIdx) - return proxyStore, rs.proxyTiKVIdx, storeIdx, err + return proxyStore, rs.proxyTiKVIdx, storeIdx } tikvNum := rs.accessStoreNum(TiKVOnly) - for index := 0; index < tikvNum; index++ { + if tikvNum <= 1 { + return + } + + // Randomly select an non-leader peer + first := rand.Intn(tikvNum - 1) + if first >= int(workStoreIdx) { + first = (first + 1) % tikvNum + } + + // If the current selected peer is not reachable, switch to the next one, until a reachable peer is found or all + // peers are checked. + for i := 0; i < tikvNum; i++ { + index := (i + first) % tikvNum // Skip work store which is the actual store to be accessed if index == int(workStoreIdx) { continue } storeIdx, store := rs.accessStore(TiKVOnly, AccessIndex(index)) + // Skip unreachable stores. + if atomic.LoadInt32(&store.needForwarding) != 0 { + continue + } + rs.setProxyStoreIdx(region, AccessIndex(index)) - return store, AccessIndex(index), storeIdx, nil + return store, AccessIndex(index), storeIdx } - return nil, 0, 0, errors.New("the region leader is disconnected and no store is available ") + return nil, 0, 0 } func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) { @@ -1535,17 +1551,32 @@ func (r *RegionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) } // switchNextProxyStore switches the index of the peer that will forward requests to the leader to the next peer. -// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and it will be moved to -// the first peer that can be the proxy. +// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and a random peer will +// be select in this case. func (r *RegionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) { if r.proxyTiKVIdx != currentProxyIdx { return } - nextIdx := (currentProxyIdx + 1) % AccessIndex(r.accessStoreNum(TiKVOnly)) - // skips the current workTiKVIdx - if nextIdx == r.workTiKVIdx { - nextIdx = (nextIdx + 1) % AccessIndex(r.accessStoreNum(TiKVOnly)) + + tikvNum := r.accessStoreNum(TiKVOnly) + var nextIdx AccessIndex + + // If the region is not using proxy before, randomly select a non-leader peer for the first try. + if currentProxyIdx == -1 { + // Randomly select an non-leader peer + // TODO: Skip unreachable peers here. + nextIdx = AccessIndex(rand.Intn(tikvNum - 1)) + if nextIdx >= r.workTiKVIdx { + nextIdx++ + } + } else { + nextIdx = (currentProxyIdx + 1) % AccessIndex(tikvNum) + // skips the current workTiKVIdx + if nextIdx == r.workTiKVIdx { + nextIdx = (nextIdx + 1) % AccessIndex(tikvNum) + } } + newRegionStore := r.clone() newRegionStore.proxyTiKVIdx = nextIdx if incEpochStoreIdx >= 0 {