Skip to content

Commit

Permalink
misc: fix the deploy path not being updated (#2596) (#2600)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-srebot authored Jul 2, 2020
1 parent a62699e commit db0aa7e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
1 change: 1 addition & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error {
core.SetStoreVersion(store.GitHash, store.Version),
core.SetStoreLabels(labels),
core.SetStoreStartTime(store.StartTimestamp),
core.SetStoreDeployPath(store.DeployPath),
)
}
if err = c.checkStoreLabels(s); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func SetStoreVersion(githash, version string) StoreCreateOption {
}
}

// SetStoreDeployPath sets the deploy path for the store.
func SetStoreDeployPath(deployPath string) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta.DeployPath = deployPath
store.meta = meta
}
}

// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
31 changes: 19 additions & 12 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,33 +183,40 @@ func testPutStore(c *C, clusterID uint64, rc *cluster.RaftCluster, grpcPDClient
id, err := rc.AllocID()
c.Assert(err, IsNil)
// Put new store with a duplicated address when old store is up will fail.
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up))
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", id)))
c.Assert(err, NotNil)

id, err = rc.AllocID()
c.Assert(err, IsNil)
// Put new store with a duplicated address when old store is offline will fail.
resetStoreState(c, rc, store.GetId(), metapb.StoreState_Offline)
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up))
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", id)))
c.Assert(err, NotNil)

id, err = rc.AllocID()
c.Assert(err, IsNil)
// Put new store with a duplicated address when old store is tombstone is OK.
resetStoreState(c, rc, store.GetId(), metapb.StoreState_Tombstone)
rc.GetStore(store.GetId())
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up))
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", id)))
c.Assert(err, IsNil)

id, err = rc.AllocID()
c.Assert(err, IsNil)
// Put a new store.
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, "127.0.0.1:12345", "2.1.0", metapb.StoreState_Up))
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, "127.0.0.1:12345", "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", id)))
c.Assert(err, IsNil)
s := rc.GetStore(id).GetMeta()
c.Assert(s.DeployPath, Equals, fmt.Sprintf("test/store%d", id))

_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(id, "127.0.0.1:12345", "2.1.0", metapb.StoreState_Up, fmt.Sprintf("move/test/store%d", id)))
c.Assert(err, IsNil)
s = rc.GetStore(id).GetMeta()
c.Assert(s.DeployPath, Equals, fmt.Sprintf("move/test/store%d", id))

// Put an existed store with duplicated address with other old stores.
resetStoreState(c, rc, store.GetId(), metapb.StoreState_Up)
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(store.GetId(), "127.0.0.1:12345", "2.1.0", metapb.StoreState_Up))
_, err = putStore(c, grpcPDClient, clusterID, newMetaStore(store.GetId(), "127.0.0.1:12345", "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", store.GetId())))
c.Assert(err, NotNil)
}

Expand Down Expand Up @@ -357,7 +364,7 @@ func (s *clusterTestSuite) TestRaftClusterMultipleRestart(c *C) {
// add an offline store
storeID, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Offline)
store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Offline, fmt.Sprintf("test/store%d", storeID))
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
err = rc.PutStore(store, false)
Expand All @@ -376,8 +383,8 @@ func (s *clusterTestSuite) TestRaftClusterMultipleRestart(c *C) {
}
}

func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState) *metapb.Store {
return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state}
func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store {
return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath}
}

func (s *clusterTestSuite) TestGetPDMembers(c *C) {
Expand Down Expand Up @@ -419,7 +426,7 @@ func (s *clusterTestSuite) TestStoreVersionChange(c *C) {
svr.SetClusterVersion("2.0.0")
storeID, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Up)
store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", storeID))
var wg sync.WaitGroup
c.Assert(failpoint.Enable("github.com/pingcap/pd/v4/server/versionChangeConcurrency", `return(true)`), IsNil)
wg.Add(1)
Expand Down Expand Up @@ -459,7 +466,7 @@ func (s *clusterTestSuite) TestConcurrentHandleRegion(c *C) {
for _, addr := range storeAddrs {
storeID, err := id.Alloc()
c.Assert(err, IsNil)
store := newMetaStore(storeID, addr, "2.1.0", metapb.StoreState_Up)
store := newMetaStore(storeID, addr, "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", storeID))
stores = append(stores, store)
_, err = putStore(c, grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -892,7 +899,7 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) {
for _, addr := range storeAddrs {
storeID, err := id.Alloc()
c.Assert(err, IsNil)
store := newMetaStore(storeID, addr, "4.0.0", metapb.StoreState_Up)
store := newMetaStore(storeID, addr, "4.0.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", storeID))
_, err = putStore(c, grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
}
Expand Down Expand Up @@ -975,7 +982,7 @@ func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) {
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
rc.SetStorage(core.NewStorage(kv.NewMemoryKV()))
store := newMetaStore(1, "127.0.1.1:0", "4.0.0", metapb.StoreState_Up)
store := newMetaStore(1, "127.0.1.1:0", "4.0.0", metapb.StoreState_Up, "test/store1")
_, err = putStore(c, grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
r := &metapb.Region{
Expand Down

0 comments on commit db0aa7e

Please sign in to comment.