diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3d4eef97ef3..8b14345fcf2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -51,11 +51,6 @@ const ( defaultChangedRegionsLimit = 10000 ) -// ErrRegionIsStale is error info for region is stale. -var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { - return errors.Errorf("region is stale: region %v origin %v", region, origin) -} - // Server is the interface for cluster. type Server interface { GetAllocator() *id.AllocatorImpl @@ -242,7 +237,17 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() - if err := c.storage.LoadRegions(c.core.PutRegion); err != nil { + // used to load region from kv storage to cache storage. + putRegion := func(region *core.RegionInfo) []*core.RegionInfo { + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta())) + // return the state region to delete. + return []*core.RegionInfo{region} + } + return c.core.PutRegion(region) + } + if err := c.storage.LoadRegions(putRegion); err != nil { return nil, err } log.Info("load regions", @@ -394,14 +399,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RLock() - origin := c.GetRegion(region.GetID()) - if origin == nil { - for _, item := range c.core.GetOverlaps(region) { - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { - c.RUnlock() - return ErrRegionIsStale(region.GetMeta(), item.GetMeta()) - } - } + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + c.RUnlock() + return err } writeItems := c.CheckWriteStatus(region) readItems := c.CheckReadStatus(region) @@ -420,10 +421,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() - // Region meta is stale, return an error. - if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) - } if r.GetVersion() > o.GetVersion() { log.Info("region Version changed", zap.Uint64("region-id", region.GetID()), @@ -1454,6 +1451,8 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot return c.hotSpotCache.CheckRead(region, c.storesStats) } +// TODO: remove me. +// only used in test. func (c *RaftCluster) putRegion(region *core.RegionInfo) error { c.Lock() defer c.Unlock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b000937a6bd..ef33c14165b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -758,7 +758,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { e := region.GetRegionEpoch() if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region, origin) + return core.ErrRegionIsStale(region, origin) } return nil diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 3730ff5b85c..b53c403127c 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -282,6 +282,29 @@ func (bc *BasicCluster) TakeStore(storeID uint64) *StoreInfo { return bc.Stores.TakeStore(storeID) } +// PreCheckPutRegion checks if the region is valid to put. +func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { + bc.RLock() + for _, item := range bc.Regions.GetOverlaps(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { + bc.RUnlock() + return nil, ErrRegionIsStale(region.GetMeta(), item.GetMeta()) + } + } + origin := bc.Regions.GetRegion(region.GetID()) + bc.RUnlock() + if origin == nil { + return nil, nil + } + r := region.GetRegionEpoch() + o := origin.GetRegionEpoch() + // Region meta is stale, return an error. + if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { + return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) + } + return origin, nil +} + // PutRegion put a region. func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { bc.Lock() diff --git a/server/core/errors.go b/server/core/errors.go index a58b9adcc53..5d0e0ea48b6 100644 --- a/server/core/errors.go +++ b/server/core/errors.go @@ -21,6 +21,8 @@ import ( "net/http" "github.com/pingcap/errcode" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pkg/errors" ) var ( @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string { // Code returns StoreBlockedCode func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode } + +// ErrRegionIsStale is error info for region is stale. +var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { + return errors.Errorf("region is stale: region %v origin %v", region, origin) +} diff --git a/server/core/region.go b/server/core/region.go index ad83611d43f..f30bf0a3b79 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -304,6 +304,9 @@ func (r *RegionInfo) GetID() uint64 { // GetMeta returns the meta information of the region. func (r *RegionInfo) GetMeta() *metapb.Region { + if r == nil { + return nil + } return r.meta } diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 45f3044fc35..9588deb2ed5 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -94,6 +94,29 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } + // merge case + // region2 -> region1 -> region0 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[2]) + c.Assert(err, IsNil) + + // merge case + // region3 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + + // merge case + // region0 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + regions = regions[4:] + regionLen = len(regions) + // ensure flush to region storage, we use a duration larger than the // region storage flush rate limit (3s). time.Sleep(4 * time.Second) @@ -104,6 +127,9 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { c.Assert(leaderServer, NotNil) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() c.Assert(len(loadRegions), Equals, regionLen) + for _, region := range regions { + c.Assert(leaderServer.GetRegionInfoByID(region.GetID()).GetMeta(), DeepEquals, region.GetMeta()) + } } func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {