Skip to content

Commit

Permalink
*: add a API to clear tombstone store (#1472) (#1705)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored and sre-bot committed Aug 29, 2019
1 parent 1cfe67b commit 539f50e
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 2 deletions.
10 changes: 10 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,16 @@ types:
500:
description: PD server failed to proceed the request.

/remove-tombstone:
description: Remove all tombstone stores.
delete:
description: Remove all tombstone stores.
responses:
200:
description: All tombstone stores are removed.
500:
description: PD server failed to proceed the request.

/store/{storeId}:
description: A specific store.
uriParameters:
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", newStoresHandler(svr, rd).RemoveTombStone).Methods("DELETE")

labelsHandler := newLabelsHandler(svr, rd)
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
Expand Down
16 changes: 16 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,22 @@ func newStoresHandler(svr *server.Server, rd *render.Render) *storesHandler {
}
}

func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
}

err := cluster.RemoveTombStoneRecords()
if err != nil {
errorResp(h.rd, w, err)
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
Expand Down
24 changes: 24 additions & 0 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,30 @@ func (c *RaftCluster) checkStores() {
}
}

// RemoveTombStoneRecords removes the tombStone Records.
func (c *RaftCluster) RemoveTombStoneRecords() error {
c.RLock()
defer c.RUnlock()

cluster := c.cachedCluster

for _, store := range cluster.GetStores() {
if store.IsTombstone() {
// the store has already been tombstone
err := cluster.deleteStore(store)
if err != nil {
log.Error("delete store failed",
zap.Stringer("store", store.Store),
zap.Error(err))
return err
}
log.Info("delete store successed",
zap.Stringer("store", store.Store))
}
}
return nil
}

func (c *RaftCluster) checkOperators() {
co := c.coordinator
for _, op := range co.getOperators() {
Expand Down
16 changes: 16 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
return c.core.PutStore(store)
}

func (c *clusterInfo) deleteStore(store *core.StoreInfo) error {
c.Lock()
defer c.Unlock()
return c.deleteStoreLocked(store)
}

func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error {
if c.kv != nil {
if err := c.kv.DeleteStore(store.Store); err != nil {
return err
}
}
c.core.DeleteStore(store)
return nil
}

// BlockStore stops balancer from selecting the store.
func (c *clusterInfo) BlockStore(storeID uint64) error {
c.Lock()
Expand Down
5 changes: 5 additions & 0 deletions server/core/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (kv *KV) SaveStore(store *metapb.Store) error {
return kv.saveProto(kv.storePath(store.GetId()), store)
}

// DeleteStore deletes one store from KV.
func (kv *KV) DeleteStore(store *metapb.Store) error {
return kv.Delete(kv.storePath(store.GetId()))
}

// LoadRegion loads one regoin from KV.
func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) {
return kv.loadProto(kv.regionPath(regionID), region)
Expand Down
7 changes: 6 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,12 @@ func (s *StoresInfo) GetMetaStores() []*metapb.Store {
return stores
}

// GetStoreCount return the total count of storeInfo
// DeleteStore deletes tombstone record form store
func (s *StoresInfo) DeleteStore(store *StoreInfo) {
delete(s.stores, store.GetId())
}

// GetStoreCount returns the total count of storeInfo
func (s *StoresInfo) GetStoreCount() int {
return len(s.stores)
}
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func (bc *BasicCluster) PutStore(store *core.StoreInfo) error {
return nil
}

// DeleteStore deletes a store
func (bc *BasicCluster) DeleteStore(store *core.StoreInfo) {
bc.Stores.DeleteStore(store)
}

// PutRegion put a region
func (bc *BasicCluster) PutRegion(region *core.RegionInfo) error {
bc.Regions.SetRegion(region)
Expand Down
31 changes: 30 additions & 1 deletion tools/pd-ctl/pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
storePrefix = "pd/api/v1/store/%s"
)

// NewStoreCommand return a store subcommand of rootCmd
// NewStoreCommand return a stores subcommand of rootCmd
func NewStoreCommand() *cobra.Command {
s := &cobra.Command{
Use: `store [delete|label|weight] <store_id> [--jq="<query string>"]`,
Expand Down Expand Up @@ -70,6 +70,25 @@ func NewSetStoreWeightCommand() *cobra.Command {
}
}

// NewStoresCommand returns a store subcommand of rootCmd
func NewStoresCommand() *cobra.Command {
s := &cobra.Command{
Use: `stores [remove-tombstone]`,
Short: "show the store status",
}
s.AddCommand(NewRemoveTombStoneCommand())
return s
}

// NewRemoveTombStoneCommand returns a tombstone subcommand of storesCmd.
func NewRemoveTombStoneCommand() *cobra.Command {
return &cobra.Command{
Use: "remove-tombstone",
Short: "remove tombstone record if only safe",
Run: removeTombStoneCommandFunc,
}
}

func showStoreCommandFunc(cmd *cobra.Command, args []string) {
prefix := storesPrefix
if len(args) == 1 {
Expand Down Expand Up @@ -143,3 +162,13 @@ func setStoreWeightCommandFunc(cmd *cobra.Command, args []string) {
"region": region,
})
}

func removeTombStoneCommandFunc(cmd *cobra.Command, args []string) {
prefix := path.Join(storesPrefix, "remove-tombstone")
_, err := doRequest(cmd, prefix, http.MethodDelete)
if err != nil {
cmd.Printf("Failed to remove tombstone store %s \n", err)
return
}
cmd.Println("Success!")
}
1 change: 1 addition & 0 deletions tools/pd-ctl/pdctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Start(args []string) {
command.NewConfigCommand(),
command.NewRegionCommand(),
command.NewStoreCommand(),
command.NewStoresCommand(),
command.NewMemberCommand(),
command.NewExitCommand(),
command.NewLabelCommand(),
Expand Down

0 comments on commit 539f50e

Please sign in to comment.