Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a API to clear tombstone store (#1472) #1705

Merged
merged 4 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,16 @@ types:
500:
description: PD server failed to proceed the request.

/remove-tombstone:
description: Remove all tombstone stores.
delete:
description: Remove all tombstone stores.
responses:
200:
description: All tombstone stores are removed.
500:
description: PD server failed to proceed the request.

/store/{storeId}:
description: A specific store.
uriParameters:
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", newStoresHandler(svr, rd).RemoveTombStone).Methods("DELETE")

labelsHandler := newLabelsHandler(svr, rd)
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
Expand Down
16 changes: 16 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,22 @@ func newStoresHandler(svr *server.Server, rd *render.Render) *storesHandler {
}
}

func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
}

err := cluster.RemoveTombStoneRecords()
if err != nil {
errorResp(h.rd, w, err)
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
Expand Down
24 changes: 24 additions & 0 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,30 @@ func (c *RaftCluster) checkStores() {
}
}

// RemoveTombStoneRecords removes the tombStone Records.
func (c *RaftCluster) RemoveTombStoneRecords() error {
c.RLock()
defer c.RUnlock()

cluster := c.cachedCluster

for _, store := range cluster.GetStores() {
if store.IsTombstone() {
// the store has already been tombstone
err := cluster.deleteStore(store)
if err != nil {
log.Error("delete store failed",
zap.Stringer("store", store.Store),
zap.Error(err))
return err
}
log.Info("delete store successed",
zap.Stringer("store", store.Store))
}
}
return nil
}

func (c *RaftCluster) checkOperators() {
co := c.coordinator
for _, op := range co.getOperators() {
Expand Down
16 changes: 16 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
return c.core.PutStore(store)
}

func (c *clusterInfo) deleteStore(store *core.StoreInfo) error {
c.Lock()
defer c.Unlock()
return c.deleteStoreLocked(store)
}

func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error {
if c.kv != nil {
if err := c.kv.DeleteStore(store.Store); err != nil {
return err
}
}
c.core.DeleteStore(store)
return nil
}

// BlockStore stops balancer from selecting the store.
func (c *clusterInfo) BlockStore(storeID uint64) error {
c.Lock()
Expand Down
5 changes: 5 additions & 0 deletions server/core/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (kv *KV) SaveStore(store *metapb.Store) error {
return kv.saveProto(kv.storePath(store.GetId()), store)
}

// DeleteStore deletes one store from KV.
func (kv *KV) DeleteStore(store *metapb.Store) error {
return kv.Delete(kv.storePath(store.GetId()))
}

// LoadRegion loads one regoin from KV.
func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) {
return kv.loadProto(kv.regionPath(regionID), region)
Expand Down
7 changes: 6 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,12 @@ func (s *StoresInfo) GetMetaStores() []*metapb.Store {
return stores
}

// GetStoreCount return the total count of storeInfo
// DeleteStore deletes tombstone record form store
func (s *StoresInfo) DeleteStore(store *StoreInfo) {
delete(s.stores, store.GetId())
}

// GetStoreCount returns the total count of storeInfo
func (s *StoresInfo) GetStoreCount() int {
return len(s.stores)
}
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func (bc *BasicCluster) PutStore(store *core.StoreInfo) error {
return nil
}

// DeleteStore deletes a store
func (bc *BasicCluster) DeleteStore(store *core.StoreInfo) {
bc.Stores.DeleteStore(store)
}

// PutRegion put a region
func (bc *BasicCluster) PutRegion(region *core.RegionInfo) error {
bc.Regions.SetRegion(region)
Expand Down
31 changes: 30 additions & 1 deletion tools/pd-ctl/pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
storePrefix = "pd/api/v1/store/%s"
)

// NewStoreCommand return a store subcommand of rootCmd
// NewStoreCommand return a stores subcommand of rootCmd
func NewStoreCommand() *cobra.Command {
s := &cobra.Command{
Use: `store [delete|label|weight] <store_id> [--jq="<query string>"]`,
Expand Down Expand Up @@ -70,6 +70,25 @@ func NewSetStoreWeightCommand() *cobra.Command {
}
}

// NewStoresCommand returns a store subcommand of rootCmd
func NewStoresCommand() *cobra.Command {
s := &cobra.Command{
Use: `stores [remove-tombstone]`,
Short: "show the store status",
}
s.AddCommand(NewRemoveTombStoneCommand())
return s
}

// NewRemoveTombStoneCommand returns a tombstone subcommand of storesCmd.
func NewRemoveTombStoneCommand() *cobra.Command {
return &cobra.Command{
Use: "remove-tombstone",
Short: "remove tombstone record if only safe",
Run: removeTombStoneCommandFunc,
}
}

func showStoreCommandFunc(cmd *cobra.Command, args []string) {
prefix := storesPrefix
if len(args) == 1 {
Expand Down Expand Up @@ -143,3 +162,13 @@ func setStoreWeightCommandFunc(cmd *cobra.Command, args []string) {
"region": region,
})
}

func removeTombStoneCommandFunc(cmd *cobra.Command, args []string) {
prefix := path.Join(storesPrefix, "remove-tombstone")
_, err := doRequest(cmd, prefix, http.MethodDelete)
if err != nil {
cmd.Printf("Failed to remove tombstone store %s \n", err)
return
}
cmd.Println("Success!")
}
1 change: 1 addition & 0 deletions tools/pd-ctl/pdctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Start(args []string) {
command.NewConfigCommand(),
command.NewRegionCommand(),
command.NewStoreCommand(),
command.NewStoresCommand(),
command.NewMemberCommand(),
command.NewExitCommand(),
command.NewLabelCommand(),
Expand Down