diff --git a/go.mod b/go.mod index 40c9fed06de..23d7230629f 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b + github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20220316134154-e88e27120168 diff --git a/go.sum b/go.sum index 422306bc076..ed1409a6e74 100644 --- a/go.sum +++ b/go.sum @@ -399,8 +399,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/server/api/config_test.go b/server/api/config_test.go index 31bdb8d4853..efa31ba05c2 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -299,6 +299,7 @@ func (s *testConfigSuite) TestConfigPDServer(c *C) { c.Assert(sc.MetricStorage, Equals, "") c.Assert(sc.DashboardAddress, Equals, "auto") c.Assert(sc.FlowRoundByDigit, Equals, int(3)) + c.Assert(sc.MinResolvedTSPersistenceInterval, Equals, typeutil.NewDuration(0)) c.Assert(sc.MaxResetTSGap.Duration, Equals, 24*time.Hour) } diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go new file mode 100644 index 00000000000..33f4c68c4c9 --- /dev/null +++ b/server/api/min_resolved_ts.go @@ -0,0 +1,57 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "github.com/tikv/pd/server" + "github.com/unrolled/render" +) + +type minResolvedTSHandler struct { + svr *server.Server + rd *render.Render +} + +func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler { + return &minResolvedTSHandler{ + svr: svr, + rd: rd, + } +} + +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type minResolvedTS struct { + MinResolvedTS uint64 `json:"min_resolved_ts"` +} + +// @Tags minresolvedts +// @Summary Get cluster-level min resolved ts. +// @Produce json +// @Success 200 {array} minResolvedTS +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /min-resolved-ts [get] +func (h *minResolvedTSHandler) Get(w http.ResponseWriter, r *http.Request) { + storage := h.svr.GetStorage() + value, err := storage.LoadMinResolvedTS() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, minResolvedTS{ + MinResolvedTS: value, + }) +} diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go new file mode 100644 index 00000000000..a5624fe41e4 --- /dev/null +++ b/server/api/min_resolved_ts_test.go @@ -0,0 +1,66 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/server" +) + +var _ = Suite(&testMinResolvedTSSuite{}) + +type testMinResolvedTSSuite struct { + svr *server.Server + cleanup cleanUpFunc + urlPrefix string +} + +func (s *testMinResolvedTSSuite) SetUpSuite(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil) + s.svr, s.cleanup = mustNewServer(c) + mustWaitLeader(c, []*server.Server{s.svr}) + + addr := s.svr.GetAddr() + s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) + + mustBootstrapCluster(c, s.svr) + mustPutStore(c, s.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil) +} + +func (s *testMinResolvedTSSuite) TearDownSuite(c *C) { + s.cleanup() +} + +func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) { + url := s.urlPrefix + "/min-resolved-ts" + storage := s.svr.GetStorage() + min := uint64(233) + storage.SaveMinResolvedTS(min) + result := &minResolvedTS{ + MinResolvedTS: min, + } + res, err := testDialClient.Get(url) + c.Assert(err, IsNil) + defer res.Body.Close() + listResp := &minResolvedTS{} + err = apiutil.ReadJSON(res.Body, listResp) + c.Assert(err, IsNil) + c.Assert(listResp, DeepEquals, result) +} diff --git a/server/api/router.go b/server/api/router.go index c35ff565f27..6e3d3249ec3 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -350,6 +350,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "/gc/safepoint", serviceGCSafepointHandler.GetGCSafePoint, setMethods("GET"), setAuditBackend(localLog)) registerFunc(apiRouter, "/gc/safepoint/{service_id}", serviceGCSafepointHandler.DeleteGCSafePoint, setMethods("DELETE"), setAuditBackend(localLog)) + // min resolved ts API + minResolvedTSHandler := newMinResolvedTSHandler(svr, rd) + registerFunc(apiRouter, "/min-resolved-ts", minResolvedTSHandler.Get, setMethods("GET")) + // unsafe admin operation API unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores", diff --git a/server/api/service_gc_safepoint_test.go b/server/api/service_gc_safepoint_test.go index 32237499bce..c3bc931c843 100644 --- a/server/api/service_gc_safepoint_test.go +++ b/server/api/service_gc_safepoint_test.go @@ -49,7 +49,7 @@ func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) { s.cleanup() } -func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) { +func (s *testServiceGCSafepointSuite) TestServiceGCSafepoint(c *C) { sspURL := s.urlPrefix + "/gc/safepoint" storage := s.svr.GetStorage() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 83852a300c9..7ea33714831 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "fmt" + "math" "net/http" "strconv" "sync" @@ -250,16 +251,19 @@ func (c *RaftCluster) Start(s Server) error { c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) + minResolvedTSPersistenceInterval := c.opt.GetMinResolvedTSPersistenceInterval() - c.wg.Add(5) + c.wg.Add(6) go c.runCoordinator() failpoint.Inject("highFrequencyClusterJobs", func() { backgroundJobInterval = 100 * time.Microsecond + minResolvedTSPersistenceInterval = 1 * time.Microsecond }) go c.runBackgroundJobs(backgroundJobInterval) go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() + go c.runMinResolvedTSJob(minResolvedTSPersistenceInterval) c.running = true return nil @@ -1650,6 +1654,66 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { log.Error("persist store limit meet error", errs.ZapError(err)) } +// GetMinResolvedTS returns the min resolved ts of all stores. +func (c *RaftCluster) GetMinResolvedTS() uint64 { + c.RLock() + defer c.RUnlock() + if !c.isInitialized() { + return math.MaxUint64 + } + min := uint64(math.MaxUint64) + for _, s := range c.GetStores() { + if !core.IsAvailableForMinResolvedTS(s) { + continue + } + if min > s.GetMinResolvedTS() { + min = s.GetMinResolvedTS() + } + } + return min +} + +// SetMinResolvedTS sets up a store with min resolved ts. +func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error { + c.Lock() + defer c.Unlock() + + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + + newStore := store.Clone( + core.SetMinResolvedTS(minResolvedTS), + ) + + return c.putStoreLocked(newStore) +} + +func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) { + defer c.wg.Done() + if saveInterval == 0 { + return + } + defer logutil.LogPanic() + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("min resolved ts background jobs has been stopped") + return + case <-ticker.C: + minResolvedTS := c.GetMinResolvedTS() + if minResolvedTS != math.MaxUint64 { + c.Lock() + c.storage.SaveMinResolvedTS(minResolvedTS) + c.Unlock() + } + } + } +} + // SetStoreLimit sets a store limit for a given type and rate. func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error { old := c.opt.GetScheduleConfig().Clone() diff --git a/server/config/config.go b/server/config/config.go index a6f4a4288fb..54313f0970f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -228,12 +228,13 @@ const ( defaultLeaderPriorityCheckInterval = time.Minute - defaultUseRegionStorage = true - defaultTraceRegionFlow = true - defaultFlowRoundByDigit = 3 // KB - maxTraceFlowRoundByDigit = 5 // 0.1 MB - defaultMaxResetTSGap = 24 * time.Hour - defaultKeyType = "table" + defaultUseRegionStorage = true + defaultTraceRegionFlow = true + defaultFlowRoundByDigit = 3 // KB + maxTraceFlowRoundByDigit = 5 // 0.1 MB + defaultMaxResetTSGap = 24 * time.Hour + defaultMinResolvedTSPersistenceInterval = 0 + defaultKeyType = "table" defaultStrictlyMatchLabel = false defaultEnablePlacementRules = true @@ -1102,6 +1103,8 @@ type PDServerConfig struct { TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"` // FlowRoundByDigit used to discretization processing flow information. FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"` + // MinResolvedTSPersistenceInterval is the interval to save the min resolved ts. + MinResolvedTSPersistenceInterval typeutil.Duration `toml:"min-resolved-ts-persistence-interval" json:"min-resolved-ts-persistence-interval"` } func (c *PDServerConfig) adjust(meta *configMetaData) error { @@ -1124,6 +1127,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("flow-round-by-digit") { adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) } + if !meta.IsDefined("min-resolved-ts-persistence-interval") { + adjustDuration(&c.MinResolvedTSPersistenceInterval, defaultMinResolvedTSPersistenceInterval) + } c.migrateConfigurationFromFile(meta) return c.Validate() } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index a8672b6712e..d23ed7bf147 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -653,6 +653,11 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa return false } +// GetMinResolvedTSPersistenceInterval gets the interval for PD to save min resolved ts. +func (o *PersistOptions) GetMinResolvedTSPersistenceInterval() time.Duration { + return o.GetPDServerConfig().MinResolvedTSPersistenceInterval.Duration +} + const ttlConfigPrefix = "/config/ttl" // SetTTLData set temporary configuration diff --git a/server/core/store.go b/server/core/store.go index c30283cad36..b46d7ce0b54 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -60,16 +60,18 @@ type StoreInfo struct { leaderWeight float64 regionWeight float64 limiter map[storelimit.Type]*storelimit.StoreLimit + minResolvedTS uint64 } // NewStoreInfo creates StoreInfo with meta data. func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { storeInfo := &StoreInfo{ - meta: store, - storeStats: newStoreStats(), - leaderWeight: 1.0, - regionWeight: 1.0, - limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + meta: store, + storeStats: newStoreStats(), + leaderWeight: 1.0, + regionWeight: 1.0, + limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + minResolvedTS: 0, } for _, opt := range opts { opt(storeInfo) @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration { return 0 } +// GetMinResolvedTS returns min resolved ts. +func (s *StoreInfo) GetMinResolvedTS() uint64 { + return s.minResolvedTS +} + var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo } } -// IsStoreContainLabel return if the store contains the given label. +// IsStoreContainLabel returns if the store contains the given label. func IsStoreContainLabel(store *metapb.Store, key, value string) bool { for _, l := range store.GetLabels() { if l.GetKey() == key && l.GetValue() == value { @@ -723,3 +732,10 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool { } return false } + +// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts. +func IsAvailableForMinResolvedTS(s *StoreInfo) bool { + // If a store is tombstone or no leader, it is not meaningful for min resolved ts. + // And we will skip tiflash, because it does not report min resolved ts. + return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0 +} diff --git a/server/core/store_option.go b/server/core/store_option.go index 649dd2feb67..b445e9da735 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -217,6 +217,13 @@ func SetNewStoreStats(stats *pdpb.StoreStats) StoreCreateOption { } } +// SetMinResolvedTS sets min resolved ts for the store. +func SetMinResolvedTS(minResolvedTS uint64) StoreCreateOption { + return func(store *StoreInfo) { + store.minResolvedTS = minResolvedTS + } +} + // ResetStoreLimit resets the store limit for a store. func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCreateOption { return func(store *StoreInfo) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 7f846e22da1..d2e92126881 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1819,3 +1819,37 @@ func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) error { // TODO: reimplement add scheduler logic to avoid repeating the introduction HTTP requests inside `server/api`. return s.GetHandler().AddEvictOrGrant(float64(stats.GetStoreId()), schedulers.EvictLeaderName) } + +// ReportMinResolvedTS implements gRPC PDServer. +func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) { + forwardedHost := getForwardedHost(ctx) + if !s.isLocalRequest(forwardedHost) { + client, err := s.getDelegateClient(ctx, forwardedHost) + if err != nil { + return nil, err + } + ctx = grpcutil.ResetForwardContext(ctx) + return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request) + } + + if err := s.validateRequest(request.GetHeader()); err != nil { + return nil, err + } + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil + } + + storeID := request.StoreId + minResolvedTS := request.MinResolvedTs + if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil { + return nil, err + } + log.Debug("updated min resolved-ts", + zap.Uint64("store", storeID), + zap.Uint64("min resolved-ts", minResolvedTS)) + return &pdpb.ReportMinResolvedTsResponse{ + Header: s.header(), + }, nil +} diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 195c4dea2aa..1f5e05601cf 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -30,6 +30,7 @@ const ( replicationPath = "replication_mode" customScheduleConfigPath = "scheduler_config" gcWorkerServiceSafePointID = "gc_worker" + minResolvedTS = "min_resolved_ts" ) // AppendToRootPath appends the given key to the rootPath. @@ -97,3 +98,8 @@ func GCSafePointServicePrefixPath() string { func gcSafePointServicePath(serviceID string) string { return path.Join(gcSafePointPath(), "service", serviceID) } + +// MinResolvedTSPath returns the min resolved ts path +func MinResolvedTSPath() string { + return path.Join(clusterPath, minResolvedTS) +} diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go new file mode 100644 index 00000000000..a8dd5c48538 --- /dev/null +++ b/server/storage/endpoint/min_resolved_ts.go @@ -0,0 +1,54 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "strconv" + + "github.com/tikv/pd/pkg/errs" +) + +// MinResolvedTSPoint is the min resolved ts for a store +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type MinResolvedTSPoint struct { + MinResolvedTS uint64 `json:"min_resolved_ts"` +} + +// MinResolvedTSStorage defines the storage operations on the min resolved ts. +type MinResolvedTSStorage interface { + LoadMinResolvedTS() (uint64, error) + SaveMinResolvedTS(minResolvedTS uint64) error +} + +var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) + +// LoadMinResolvedTS loads the min resolved ts from storage. +func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { + value, err := se.Load(MinResolvedTSPath()) + if err != nil || value == "" { + return 0, err + } + minResolvedTS, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, errs.ErrStrconvParseUint.Wrap(err).GenWithStackByArgs() + } + return minResolvedTS, nil +} + +// SaveMinResolvedTS saves the min resolved ts. +func (se *StorageEndpoint) SaveMinResolvedTS(minResolvedTS uint64) error { + value := strconv.FormatUint(minResolvedTS, 16) + return se.Save(MinResolvedTSPath(), value) +} diff --git a/server/storage/storage.go b/server/storage/storage.go index cd63156f8c8..b92a9248672 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -37,6 +37,7 @@ type Storage interface { endpoint.RuleStorage endpoint.ReplicationStatusStorage endpoint.GCSafePointStorage + endpoint.MinResolvedTSStorage } // NewStorageWithMemoryBackend creates a new storage with memory backend. diff --git a/tests/client/go.mod b/tests/client/go.mod index 2d7f51778a7..b258d794a76 100644 --- a/tests/client/go.mod +++ b/tests/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b + github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 diff --git a/tests/client/go.sum b/tests/client/go.sum index b3874b0bf8a..39ec14f5165 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -404,8 +404,9 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 799a9e47151..b39b95c254f 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -186,6 +186,12 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) + // config set min-resolved-ts-persistence-interval + args = []string{"-u", pdAddr, "config", "set", "min-resolved-ts-persistence-interval", "1s"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(svr.GetPDServerConfig().MinResolvedTSPersistenceInterval, Equals, typeutil.NewDuration(time.Second)) + // test config read and write testItems := []testItem{ {"leader-schedule-limit", uint64(64), func(scheduleConfig *config.ScheduleConfig) interface{} { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 7f4affc47f3..3f426ffb50f 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -17,6 +17,8 @@ package cluster_test import ( "context" "fmt" + "math" + "strconv" "sync" "testing" "time" @@ -35,6 +37,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" + "github.com/tikv/pd/server/id" syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedulers" @@ -1115,3 +1118,106 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } + +func (s *clusterTestSuite) putRegionWithLeader(c *C, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { + for i := 0; i < 3; i++ { + regionID, err := id.Alloc() + c.Assert(err, IsNil) + peerID, err := id.Alloc() + c.Assert(err, IsNil) + region := &metapb.Region{ + Id: regionID, + Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + } + c.Assert(rc.GetStore(storeID).GetLeaderCount(), Equals, 3) +} + +func (s *clusterTestSuite) TestMinResolvedTS(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 1) + defer tc.Destroy() + c.Assert(err, IsNil) + + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + id := leaderServer.GetAllocator() + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(c, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + c.Assert(rc, NotNil) + c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil) + addStoreWithMinResolvedTS := func(c *C, storeID uint64, isTiflash bool, minResolvedTS, expect uint64) { + store := &metapb.Store{ + Id: storeID, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(storeID)), + } + if isTiflash { + store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} + } + _, err := putStore(grpcPDClient, clusterID, store) + c.Assert(err, IsNil) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: testutil.NewRequestHeader(clusterID), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req) + c.Assert(err, IsNil) + time.Sleep(time.Millisecond * 10) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, expect) + } + store1TS := uint64(233) + store3TS := store1TS - 10 + // case1: no init + // min resolved ts should be not available + store1 := uint64(1) + status, err := rc.LoadClusterStatus() + c.Assert(status.IsInitialized, IsFalse) + addStoreWithMinResolvedTS(c, store1, false, store1TS, math.MaxUint64) + // case2: add leader to store1 + // min resolved ts should be available + s.putRegionWithLeader(c, rc, id, store1) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, store1TS) + // case2: add tiflash store + // min resolved ts should no change + store2 := uint64(2) + addStoreWithMinResolvedTS(c, store2, true, 0, store1TS) + // case4: add new store with less ts but without leader + // min resolved ts should no change + store3 := uint64(3) + addStoreWithMinResolvedTS(c, store3, false, store3TS, store1TS) + // case5: add leader to store 3, store 3 has less ts than store 1. + // min resolved ts should change to store 3 + s.putRegionWithLeader(c, rc, id, store3) + ts = rc.GetMinResolvedTS() + c.Assert(ts, Equals, store3TS) + // case6: set tombstone + // min resolved ts should change to store 1 + resetStoreState(c, rc, store3, metapb.StoreState_Tombstone) + time.Sleep(time.Millisecond * 10) + ts = rc.GetMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts, Equals, store1TS) + // case7: add a store with leader but no report min resolved ts + // min resolved ts should be zero + store4 := uint64(4) + _, err = putStore(grpcPDClient, clusterID, &metapb.Store{ + Id: store4, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(store4)), + }) + c.Assert(err, IsNil) + s.putRegionWithLeader(c, rc, id, store4) + ts = rc.GetMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts, Equals, uint64(0)) +} diff --git a/tools/pd-ctl/pdctl/command/min_resolved_ts.go b/tools/pd-ctl/pdctl/command/min_resolved_ts.go new file mode 100644 index 00000000000..dbf0c47b2de --- /dev/null +++ b/tools/pd-ctl/pdctl/command/min_resolved_ts.go @@ -0,0 +1,45 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "net/http" + + "github.com/spf13/cobra" +) + +var ( + minResolvedTSPrefix = "pd/api/v1/min-resolved-ts" +) + +// NewMinResolvedTSCommand return min resolved ts subcommand of rootCmd +func NewMinResolvedTSCommand() *cobra.Command { + l := &cobra.Command{ + Use: "min-resolved-ts", + Short: "show min resolved ts", + Run: ShowMinResolvedTS, + } + return l +} + +// ShowMinResolvedTS show min resolved ts +func ShowMinResolvedTS(cmd *cobra.Command, args []string) { + r, err := doRequest(cmd, minResolvedTSPrefix, http.MethodGet, http.Header{}) + if err != nil { + cmd.Printf("Failed to get min resolved ts: %s\n", err) + return + } + cmd.Println(r) +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index ef3c1467208..2399d26fed0 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -62,6 +62,7 @@ func GetRootCmd() *cobra.Command { command.NewLogCommand(), command.NewPluginCommand(), command.NewServiceGCSafepointCommand(), + command.NewMinResolvedTSCommand(), command.NewCompletionCommand(), command.NewUnsafeCommand(), ) diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 789c13355e2..0cf5621abd8 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -110,6 +110,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=