diff --git a/pkg/tsoutil/tso.go b/pkg/tsoutil/tso.go index bfa104708db..b1c3b638769 100644 --- a/pkg/tsoutil/tso.go +++ b/pkg/tsoutil/tso.go @@ -13,7 +13,11 @@ package tsoutil -import "time" +import ( + "time" + + "github.com/pingcap/kvproto/pkg/pdpb" +) const ( physicalShiftBits = 18 @@ -27,3 +31,11 @@ func ParseTS(ts uint64) (time.Time, uint64) { physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) return physicalTime, logical } + +// ParseTimestamp parses pdpb.Timestamp to time.Time +func ParseTimestamp(ts pdpb.Timestamp) (time.Time, uint64) { + logical := uint64(ts.Logical) + physical := ts.Physical + physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) + return physicalTime, logical +} diff --git a/server/api/router.go b/server/api/router.go index 9e3309812e0..f81077d74ed 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -206,6 +206,11 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.R apiRouter.Handle("/debug/pprof/block", pprof.Handler("block")) apiRouter.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + // service GC safepoint API + serviceGCSafepointHandler := newServiceGCSafepointHandler(svr, rd) + apiRouter.HandleFunc("/gc/safepoint", serviceGCSafepointHandler.List).Methods("GET") + apiRouter.HandleFunc("/gc/safepoint/{service_id}", serviceGCSafepointHandler.Delete).Methods("DELETE") + // Deprecated rootRouter.Handle("/health", newHealthHandler(svr, rd)).Methods("GET") // Deprecated diff --git a/server/api/service_gc_safepoint.go b/server/api/service_gc_safepoint.go new file mode 100644 index 00000000000..6d6539ead45 --- /dev/null +++ b/server/api/service_gc_safepoint.go @@ -0,0 +1,84 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/core" + "github.com/unrolled/render" +) + +type serviceGCSafepointHandler struct { + svr *server.Server + rd *render.Render +} + +func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *serviceGCSafepointHandler { + return &serviceGCSafepointHandler{ + svr: svr, + rd: rd, + } +} + +type listServiceGCSafepoint struct { + ServiceGCSafepoints []*core.ServiceSafePoint `json:"service_gc_safe_points"` + GCSafePoint uint64 `json:"gc_safe_point"` +} + +// @Tags servicegcsafepoint +// @Summary Get all service GC safepoint. +// @Produce json +// @Success 200 {array} listServiceGCSafepoint +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /gc/safepoint [get] +func (h *serviceGCSafepointHandler) List(w http.ResponseWriter, r *http.Request) { + storage := h.svr.GetStorage() + gcSafepoint, err := storage.LoadGCSafePoint() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + ssps, err := storage.GetAllServiceGCSafePoints() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + list := listServiceGCSafepoint{ + GCSafePoint: gcSafepoint, + ServiceGCSafepoints: ssps, + } + h.rd.JSON(w, http.StatusOK, list) +} + +// @Tags servicegcsafepoint +// @Summary Delete a service GC safepoint. +// @Param service_id path string true "Service ID" +// @Produce json +// @Success 200 {string} string "Delete service GC safepoint successfully." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /gc/safepoint/{service_id} [delete] +// @Tags rule +func (h *serviceGCSafepointHandler) Delete(w http.ResponseWriter, r *http.Request) { + storage := h.svr.GetStorage() + serviceID := mux.Vars(r)["service_id"] + err := storage.RemoveServiceGCSafePoint(serviceID) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, "Delete service GC safepoint successfully.") +} diff --git a/server/api/service_gc_safepoint_test.go b/server/api/service_gc_safepoint_test.go new file mode 100644 index 00000000000..66a2eb0e5b3 --- /dev/null +++ b/server/api/service_gc_safepoint_test.go @@ -0,0 +1,95 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "fmt" + "net/http" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/core" +) + +var _ = Suite(&testServiceGCSafepointSuite{}) + +type testServiceGCSafepointSuite struct { + svr *server.Server + cleanup cleanUpFunc + urlPrefix string +} + +func (s *testServiceGCSafepointSuite) SetUpSuite(c *C) { + s.svr, s.cleanup = mustNewServer(c) + mustWaitLeader(c, []*server.Server{s.svr}) + + addr := s.svr.GetAddr() + s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) + + mustBootstrapCluster(c, s.svr) + mustPutStore(c, s.svr, 1, metapb.StoreState_Up, nil) +} + +func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) { + s.cleanup() +} + +func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) { + sspURL := s.urlPrefix + "/gc/safepoint" + + storage := s.svr.GetStorage() + list := &listServiceGCSafepoint{ + ServiceGCSafepoints: []*core.ServiceSafePoint{ + { + ServiceID: "a", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 1, + }, + { + ServiceID: "b", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 2, + }, + { + ServiceID: "c", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 3, + }, + }, + GCSafePoint: 1, + } + for _, ssp := range list.ServiceGCSafepoints { + err := storage.SaveServiceGCSafePoint(ssp) + c.Assert(err, IsNil) + } + storage.SaveGCSafePoint(1) + + res, err := testDialClient.Get(sspURL) + c.Assert(err, IsNil) + listResp := &listServiceGCSafepoint{} + err = apiutil.ReadJSON(res.Body, listResp) + c.Assert(err, IsNil) + c.Assert(listResp, DeepEquals, list) + + res, err = doDelete(testDialClient, sspURL+"/a") + c.Assert(err, IsNil) + c.Assert(res.StatusCode, Equals, http.StatusOK) + + left, err := storage.GetAllServiceGCSafePoints() + c.Assert(err, IsNil) + c.Assert(left, DeepEquals, list.ServiceGCSafepoints[1:]) +} diff --git a/server/core/storage.go b/server/core/storage.go index 3e3e41a5cfa..b1f006816dd 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -410,9 +410,9 @@ func (s *Storage) LoadGCSafePoint() (uint64, error) { // ServiceSafePoint is the safepoint for a specific service type ServiceSafePoint struct { - ServiceID string - ExpiredAt int64 - SafePoint uint64 + ServiceID string `json:"service_id"` + ExpiredAt int64 `json:"expired_at"` + SafePoint uint64 `json:"safe_point"` } // SaveServiceGCSafePoint saves a GC safepoint for the service @@ -433,7 +433,7 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error { } // LoadMinServiceGCSafePoint returns the minimum safepoint across all services -func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { +func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) { prefix := path.Join(gcPath, "safe_point", "service") // the next of 'e' is 'f' prefixEnd := path.Join(gcPath, "safe_point", "servicf") @@ -446,13 +446,12 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { } min := &ServiceSafePoint{SafePoint: math.MaxUint64} - now := time.Now().Unix() for i, key := range keys { ssp := &ServiceSafePoint{} if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } - if ssp.ExpiredAt < now { + if ssp.ExpiredAt < now.Unix() { s.Remove(key) continue } @@ -464,6 +463,30 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { return min, nil } +// GetAllServiceGCSafePoints returns all services GC safepoints +func (s *Storage) GetAllServiceGCSafePoints() ([]*ServiceSafePoint, error) { + prefix := path.Join(gcPath, "safe_point", "service") + "/" + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := s.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return []*ServiceSafePoint{}, nil + } + + ssps := make([]*ServiceSafePoint, 0, len(keys)) + for i := range keys { + ssp := &ServiceSafePoint{} + if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { + return nil, err + } + ssps = append(ssps, ssp) + } + + return ssps, nil +} + // LoadAllScheduleConfig loads all schedulers' config. func (s *Storage) LoadAllScheduleConfig() ([]string, []string, error) { keys, values, err := s.LoadRange(customScheduleConfigPath, clientv3.GetPrefixRangeEnd(customScheduleConfigPath), 1000) diff --git a/server/core/storage_test.go b/server/core/storage_test.go index bf3d5ff581a..7fce5da869e 100644 --- a/server/core/storage_test.go +++ b/server/core/storage_test.go @@ -244,7 +244,7 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) { c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil) } - ssp, err := storage.LoadMinServiceGCSafePoint() + ssp, err := storage.LoadMinServiceGCSafePoint(time.Now()) c.Assert(err, IsNil) c.Assert(ssp.ServiceID, Equals, "2") c.Assert(ssp.ExpiredAt, Equals, expireAt) diff --git a/server/grpc_service.go b/server/grpc_service.go index 253379fad20..036f92a5116 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -766,7 +766,11 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd } } - min, err := s.storage.LoadMinServiceGCSafePoint() + now, err := s.tso.Now() + if err != nil { + return nil, err + } + min, err := s.storage.LoadMinServiceGCSafePoint(now) if err != nil { return nil, err } @@ -774,7 +778,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd if request.TTL > 0 && request.SafePoint >= min.SafePoint { ssp := &core.ServiceSafePoint{ ServiceID: string(request.ServiceId), - ExpiredAt: time.Now().Unix() + request.TTL, + ExpiredAt: now.Unix() + request.TTL, SafePoint: request.SafePoint, } if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { @@ -786,7 +790,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd zap.Uint64("safepoint", ssp.SafePoint)) // If the min safepoint is updated, load the next one if string(request.ServiceId) == min.ServiceID { - min, err = s.storage.LoadMinServiceGCSafePoint() + min, err = s.storage.LoadMinServiceGCSafePoint(now) if err != nil { return nil, err } @@ -800,7 +804,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return &pdpb.UpdateServiceGCSafePointResponse{ Header: s.header(), ServiceId: []byte(min.ServiceID), - TTL: min.ExpiredAt - time.Now().Unix(), + TTL: min.ExpiredAt - now.Unix(), MinSafePoint: min.SafePoint, }, nil } diff --git a/server/tso/tso.go b/server/tso/tso.go index 67b84b15cda..bc3a5f82fbf 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -321,3 +321,13 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { } return resp, errors.New("can not get timestamp") } + +// Now returns the current tso time. +func (t *TimestampOracle) Now() (time.Time, error) { + resp, err := t.GetRespTS(1) + if err != nil { + return time.Time{}, err + } + tm, _ := tsoutil.ParseTimestamp(resp) + return tm, nil +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 405c45d0e77..a8bec952629 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -661,7 +661,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { c.Assert(min, Equals, uint64(3)) // Update only the TTL of the minimum safepoint - oldMinSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint() + oldMinSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) c.Assert(err, IsNil) c.Assert(oldMinSsp.ServiceID, Equals, "c") c.Assert(oldMinSsp.SafePoint, Equals, uint64(3)) @@ -669,7 +669,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { "c", 2000, 3) c.Assert(err, IsNil) c.Assert(min, Equals, uint64(3)) - minSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint() + minSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) c.Assert(err, IsNil) c.Assert(minSsp.ServiceID, Equals, "c") c.Assert(oldMinSsp.SafePoint, Equals, uint64(3)) @@ -680,7 +680,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { "c", 1, 3) c.Assert(err, IsNil) c.Assert(min, Equals, uint64(3)) - minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint() + minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) c.Assert(err, IsNil) c.Assert(minSsp.ServiceID, Equals, "c") c.Assert(oldMinSsp.SafePoint, Equals, uint64(3)) diff --git a/tools/pd-ctl/pdctl/command/gc_safepoint_command.go b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go new file mode 100644 index 00000000000..8600432baff --- /dev/null +++ b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go @@ -0,0 +1,70 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "net/http" + + "github.com/spf13/cobra" +) + +var ( + serviceGCSafepointPrefix = "pd/api/v1/gc/safepoint" +) + +// NewServiceGCSafepointCommand return a service gc safepoint subcommand of rootCmd +func NewServiceGCSafepointCommand() *cobra.Command { + l := &cobra.Command{ + Use: "service-gc-safepoint", + Short: "show all service gc safepoint", + Run: showSSPs, + } + l.AddCommand(NewDeleteServiceGCSafepointCommand()) + return l +} + +// NewDeleteServiceGCSafepointCommand return a subcommand to delete service gc safepoint +func NewDeleteServiceGCSafepointCommand() *cobra.Command { + l := &cobra.Command{ + Use: "delete ", + Short: "delete a service gc safepoint", + Run: deleteSSP, + Hidden: true, + } + return l +} + +func showSSPs(cmd *cobra.Command, args []string) { + r, err := doRequest(cmd, serviceGCSafepointPrefix, http.MethodGet) + if err != nil { + cmd.Printf("Failed to get service GC safepoint: %s\n", err) + return + } + cmd.Println(r) +} + +func deleteSSP(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + serviceID := args[0] + deleteURL := serviceGCSafepointPrefix + "/" + serviceID + r, err := doRequest(cmd, deleteURL, http.MethodDelete) + if err != nil { + cmd.Printf("Failed to delete service GC safepoint: %s\n", err) + return + } + cmd.Println(r) +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 52278d527bd..fba2f820eb0 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -85,6 +85,7 @@ func getBasicCmd() *cobra.Command { command.NewHealthCommand(), command.NewLogCommand(), command.NewPluginCommand(), + command.NewServiceGCSafepointCommand(), command.NewCompletionCommand(), )