diff --git a/client/client.go b/client/client.go index 021f2e8ea1b..405194b9444 100644 --- a/client/client.go +++ b/client/client.go @@ -124,8 +124,6 @@ type Client interface { StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error // WatchGlobalConfig returns an stream with all global config and updates WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) - // ReportMinResolvedTS reports the min resolved ts to pd. - ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error // Close closes the client. @@ -1867,24 +1865,3 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } - -func (c *client) ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("pdclient.ReportMinResolvedTS", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - req := &pdpb.ReportMinResolvedTsRequest{ - Header: c.requestHeader(), - StoreId: storeID, - MinResolvedTs: minResolvedTS, - } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - _, err := c.getClient().ReportMinResolvedTS(ctx, req) - cancel() - - if err != nil { - return errors.WithStack(err) - } - return nil -} diff --git a/client/go.mod b/client/go.mod index fb1d5a0bae0..e3b9192bb30 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a + github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/prometheus/client_golang v1.11.0 go.uber.org/goleak v1.1.11 diff --git a/client/go.sum b/client/go.sum index 2f27cfbb563..c705a06c247 100644 --- a/client/go.sum +++ b/client/go.sum @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= -github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index 1e45506148c..a28e9c8df8a 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -18,7 +18,6 @@ import ( "net/http" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/storage/endpoint" "github.com/unrolled/render" ) @@ -36,8 +35,7 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type listMinResolvedTS struct { - MinResolvedTSList []*endpoint.MinResolvedTSPoint `json:"list"` - MinResolvedTSForCluster uint64 `json:"min_resolved_ts"` + MinResolvedTS uint64 `json:"min_resolved_ts"` } // @Tags minresolvedts @@ -48,19 +46,13 @@ type listMinResolvedTS struct { // @Router /min-resolved-ts [get] func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) { storage := h.svr.GetStorage() - minResolvedTS, err := storage.LoadClusterMinResolvedTS() - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - minResolvedTSList, err := storage.LoadAllMinResolvedTS() + minResolvedTS, err := storage.LoadMinResolvedTS() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } list := listMinResolvedTS{ - MinResolvedTSList: minResolvedTSList, - MinResolvedTSForCluster: minResolvedTS, + MinResolvedTS: minResolvedTS, } h.rd.JSON(w, http.StatusOK, list) } diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go index e523770792a..595ea42b1b9 100644 --- a/server/api/min_resolved_ts_test.go +++ b/server/api/min_resolved_ts_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/storage/endpoint" ) var _ = Suite(&testMinResolvedTSSuite{}) @@ -50,23 +49,11 @@ func (s *testMinResolvedTSSuite) TearDownSuite(c *C) { func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) { url := s.urlPrefix + "/min-resolved-ts" storage := s.svr.GetStorage() - testData := []uint64{233333, 23333, 2333, 233, 1} + min := uint64(233) + storage.SaveMinResolvedTS(min) result := &listMinResolvedTS{ - MinResolvedTSList: make([]*endpoint.MinResolvedTSPoint, 0), + MinResolvedTS: min, } - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - err := storage.SaveMinResolvedTS(storeID, minResolvedTS) - c.Assert(err, IsNil) - result.MinResolvedTSList = append(result.MinResolvedTSList, &endpoint.MinResolvedTSPoint{ - StoreID: storeID, - MinResolvedTS: minResolvedTS, - }) - } - ts, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - result.MinResolvedTSForCluster = ts - res, err := testDialClient.Get(url) c.Assert(err, IsNil) defer res.Body.Close() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e2fc8c5fdc7..44fccfd76f0 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "fmt" + "math" "net/http" "strconv" "sync" @@ -53,6 +54,7 @@ import ( ) var backgroundJobInterval = 10 * time.Second +var saveMinResolvedTSInterval = 1 * time.Second const ( clientTimeout = 3 * time.Second @@ -251,15 +253,17 @@ func (c *RaftCluster) Start(s Server) error { c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) - c.wg.Add(5) + c.wg.Add(6) go c.runCoordinator() failpoint.Inject("highFrequencyClusterJobs", func() { backgroundJobInterval = 100 * time.Microsecond + saveMinResolvedTSInterval = 1 * time.Microsecond }) go c.runBackgroundJobs(backgroundJobInterval) go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() + go c.runMinResolvedTSJob(saveMinResolvedTSInterval) c.running = true return nil @@ -1067,18 +1071,12 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { c.onStoreVersionChangeLocked() if err == nil { // clean up the residual information. - c.CleanTombstoneResidualInfo(storeID) + c.RemoveStoreLimit(storeID) c.hotStat.RemoveRollingStoreStats(storeID) } return err } -// CleanTombstoneResidualInfo clean up the residual information of tombstone store. -func (c *RaftCluster) CleanTombstoneResidualInfo(storeID uint64) { - c.RemoveStoreLimit(storeID) - c.RemoveMinResolvedTSStorage(storeID) -} - // PauseLeaderTransfer prevents the store from been selected as source or // target store of TransferLeader. func (c *RaftCluster) PauseLeaderTransfer(storeID uint64) error { @@ -1225,7 +1223,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error { errs.ZapError(err)) return err } - c.CleanTombstoneResidualInfo(store.GetID()) + c.RemoveStoreLimit(store.GetID()) log.Info("delete store succeeded", zap.Stringer("store", store.GetMeta())) } @@ -1656,14 +1654,62 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { log.Error("persist store limit meet error", errs.ZapError(err)) } -// RemoveMinResolvedTSStorage remove min resolved ts storage for a given store ID. -func (c *RaftCluster) RemoveMinResolvedTSStorage(storeID uint64) error { - if c.storage != nil { - if err := c.storage.RemoveMinResolvedTS(storeID); err != nil { - return err +// GetMinResolvedTS returns the min resolved ts of all stores. +func (c *RaftCluster) GetMinResolvedTS() uint64 { + c.RLock() + defer c.RUnlock() + if !c.isInitialized() { + return math.MaxUint64 + } + min := uint64(math.MaxUint64) + for _, s := range c.GetStores() { + if !core.IsAvailableForMinResolvedTS(s) { + continue + } + if min > s.GetMinResolvedTS() { + min = s.GetMinResolvedTS() + } + } + return min +} + +// SetMinResolvedTS sets up a store with min resolved ts. +func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error { + c.Lock() + defer c.Unlock() + + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + + newStore := store.Clone( + core.SetMinResolvedTS(minResolvedTS), + ) + + return c.putStoreLocked(newStore) +} + +func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("min resolved ts background jobs has been stopped") + return + case <-ticker.C: + minResolvedTS := c.GetMinResolvedTS() + if minResolvedTS != math.MaxUint64 { + c.Lock() + defer c.Unlock() + c.storage.SaveMinResolvedTS(minResolvedTS) + } } } - return nil } // SetStoreLimit sets a store limit for a given type and rate. diff --git a/server/core/store.go b/server/core/store.go index c30283cad36..b9e28dc52fe 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -60,16 +60,18 @@ type StoreInfo struct { leaderWeight float64 regionWeight float64 limiter map[storelimit.Type]*storelimit.StoreLimit + minResolvedTS uint64 } // NewStoreInfo creates StoreInfo with meta data. func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { storeInfo := &StoreInfo{ - meta: store, - storeStats: newStoreStats(), - leaderWeight: 1.0, - regionWeight: 1.0, - limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + meta: store, + storeStats: newStoreStats(), + leaderWeight: 1.0, + regionWeight: 1.0, + limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + minResolvedTS: math.MaxUint64, } for _, opt := range opts { opt(storeInfo) @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration { return 0 } +// GetMinResolvedTS returns min resolved ts. +func (s *StoreInfo) GetMinResolvedTS() uint64 { + return s.minResolvedTS +} + var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo } } -// IsStoreContainLabel return if the store contains the given label. +// IsStoreContainLabel returns if the store contains the given label. func IsStoreContainLabel(store *metapb.Store, key, value string) bool { for _, l := range store.GetLabels() { if l.GetKey() == key && l.GetValue() == value { @@ -723,3 +732,8 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool { } return false } + +// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts. +func IsAvailableForMinResolvedTS(s *StoreInfo) bool { + return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0 +} diff --git a/server/core/store_option.go b/server/core/store_option.go index 649dd2feb67..b445e9da735 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -217,6 +217,13 @@ func SetNewStoreStats(stats *pdpb.StoreStats) StoreCreateOption { } } +// SetMinResolvedTS sets min resolved ts for the store. +func SetMinResolvedTS(minResolvedTS uint64) StoreCreateOption { + return func(store *StoreInfo) { + store.minResolvedTS = minResolvedTS + } +} + // ResetStoreLimit resets the store limit for a store. func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCreateOption { return func(store *StoreInfo) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 4a1ea8825e9..c2a09a68de3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1841,17 +1841,14 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil } - var storage endpoint.MinResolvedTSStorage = s.storage storeID := request.StoreId minResolvedTS := request.MinResolvedTs - - if err := storage.SaveMinResolvedTS(storeID, minResolvedTS); err != nil { + if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil { return nil, err } log.Debug("updated min resolved-ts", zap.Uint64("store", storeID), zap.Uint64("min resolved-ts", minResolvedTS)) - return &pdpb.ReportMinResolvedTsResponse{ Header: s.header(), }, nil diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 4cd5763e3a4..8dc10d6c220 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -99,12 +99,7 @@ func gcSafePointServicePath(serviceID string) string { return path.Join(gcSafePointPath(), "service", serviceID) } -// MinResolvedTSPath returns the min resolved ts with the given store ID. -func MinResolvedTSPath(storeID uint64) string { - return path.Join(minResolvedTS, fmt.Sprintf("%020d", storeID)) -} - -// MinResolvedTSPrefixPath returns the min resolved ts key path prefix. -func MinResolvedTSPrefixPath() string { - return minResolvedTS + "/" +// MinResolvedTSPath returns the min resolved ts path +func MinResolvedTSPath() string { + return path.Join(minResolvedTS) } diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go index fea649dd256..12cf21da40a 100644 --- a/server/storage/endpoint/min_resolved_ts.go +++ b/server/storage/endpoint/min_resolved_ts.go @@ -15,11 +15,7 @@ package endpoint import ( - "math" - "path" "strconv" - - "go.etcd.io/etcd/clientv3" ) // MinResolvedTSPoint is the min resolved ts for a store @@ -31,18 +27,15 @@ type MinResolvedTSPoint struct { // MinResolvedTSStorage defines the storage operations on the min resolved ts. type MinResolvedTSStorage interface { - LoadMinResolvedTS(storeID uint64) (uint64, error) - SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error - LoadClusterMinResolvedTS() (uint64, error) - LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) - RemoveMinResolvedTS(storeID uint64) error + LoadMinResolvedTS() (uint64, error) + SaveMinResolvedTS(minResolvedTS uint64) error } var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) -// LoadMinResolvedTS loads the min resolved ts with the given store ID from storage. -func (se *StorageEndpoint) LoadMinResolvedTS(storeID uint64) (uint64, error) { - value, err := se.Load(MinResolvedTSPath(storeID)) +// LoadMinResolvedTS loads the min resolved ts from storage. +func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { + value, err := se.Load(MinResolvedTSPath()) if err != nil { return 0, err } @@ -56,71 +49,8 @@ func (se *StorageEndpoint) LoadMinResolvedTS(storeID uint64) (uint64, error) { return minResolvedTS, nil } -// SaveMinResolvedTS saves the min resolved ts with the given store ID to storage. -func (se *StorageEndpoint) SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error { +// SaveMinResolvedTS saves the min resolved ts. +func (se *StorageEndpoint) SaveMinResolvedTS(minResolvedTS uint64) error { value := strconv.FormatUint(minResolvedTS, 16) - return se.Save(MinResolvedTSPath(storeID), value) -} - -// LoadClusterMinResolvedTS returns the min resolved ts for the cluster -func (se *StorageEndpoint) LoadClusterMinResolvedTS() (uint64, error) { - prefix := MinResolvedTSPrefixPath() - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return math.MaxUint64, err - } - if len(keys) == 0 { - // There's no service safepoint. It may be a new cluster, or upgraded from an older version - return 0, nil - } - - min := uint64(math.MaxUint64) - for i := range keys { - var ts uint64 - if ts, err = strconv.ParseUint(values[i], 16, 64); err != nil { - return min, err - } - if ts < min { - min = ts - } - } - return min, nil -} - -// LoadAllMinResolvedTS returns min resolved ts of all stores. -func (se *StorageEndpoint) LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) { - prefix := MinResolvedTSPrefixPath() - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return nil, err - } - if len(keys) == 0 { - return []*MinResolvedTSPoint{}, nil - } - - tss := make([]*MinResolvedTSPoint, 0, len(keys)) - for i, key := range keys { - var minResolvedTS, storeID uint64 - if minResolvedTS, err = strconv.ParseUint(values[i], 16, 64); err != nil { - return nil, err - } - if storeID, err = strconv.ParseUint(path.Base(key), 16, 64); err != nil { - return nil, err - } - ts := &MinResolvedTSPoint{ - StoreID: storeID, - MinResolvedTS: minResolvedTS, - } - tss = append(tss, ts) - } - - return tss, nil -} - -// RemoveMinResolvedTS removes min resolved ts for the store -func (se *StorageEndpoint) RemoveMinResolvedTS(storeID uint64) error { - key := MinResolvedTSPath(storeID) - return se.Remove(key) + return se.Save(MinResolvedTSPath(), value) } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index feb08f0a3a6..51870a62133 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -279,31 +279,3 @@ func (s *testStorageSuite) TestLoadRegionsExceedRangeLimit(c *C) { } c.Assert(failpoint.Disable("github.com/tikv/pd/server/storage/kv/withRangeLimit"), IsNil) } - -func (s *testStorageSuite) TestMinResolvedTSStorage(c *C) { - storage := NewStorageWithMemoryBackend() - testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} - - r, e := storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - err := storage.SaveMinResolvedTS(storeID, minResolvedTS) - c.Assert(err, IsNil) - minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(minResolvedTS, Equals, minResolvedTS1) - min, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(min, Equals, minResolvedTS) - } - for i := range testData { - storeID := uint64(i + 1) - err := storage.RemoveMinResolvedTS(storeID) - c.Assert(err, IsNil) - } - r, e = storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) -} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 41d18b987d0..e7301372d47 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -1164,38 +1164,6 @@ func (s *testClientSuite) TestScatterRegion(c *C) { c.Succeed() } -func (s *testClientSuite) TestReportMinResolvedTS(c *C) { - storage := s.srv.GetStorage() - testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} - r, e := storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - req := &pdpb.ReportMinResolvedTsRequest{ - Header: newHeader(s.srv), - StoreId: storeID, - MinResolvedTs: minResolvedTS, - } - _, err := s.grpcSvr.ReportMinResolvedTS(context.Background(), req) - c.Assert(err, IsNil) - minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(minResolvedTS, Equals, minResolvedTS1) - min, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(min, Equals, minResolvedTS) - } - for i := range testData { - storeID := uint64(i + 1) - err := storage.RemoveMinResolvedTS(storeID) - c.Assert(err, IsNil) - } - r, e = storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) -} - type testConfigTTLSuite struct { ctx context.Context cancel context.CancelFunc diff --git a/tests/client/go.sum b/tests/client/go.sum index 57569d5ee6b..be32c4b8e0c 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -403,6 +403,7 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index dcc27e65e9c..474154e86d8 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -17,6 +17,8 @@ package cluster_test import ( "context" "fmt" + "math" + "strconv" "sync" "testing" "time" @@ -245,7 +247,7 @@ func resetStoreState(c *C, rc *cluster.RaftCluster, storeID uint64, state metapb if state == metapb.StoreState_Offline { rc.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) } else if state == metapb.StoreState_Tombstone { - rc.CleanTombstoneResidualInfo(storeID) + rc.RemoveStoreLimit(storeID) } } @@ -1116,7 +1118,7 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { c.Assert(err, IsNil) } -func (s *clusterTestSuite) TestMinResolvedTSWithTombstone(c *C) { +func (s *clusterTestSuite) TestMinResolvedTS(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy() c.Assert(err, IsNil) @@ -1130,31 +1132,76 @@ func (s *clusterTestSuite) TestMinResolvedTSWithTombstone(c *C) { bootstrapCluster(c, clusterID, grpcPDClient) rc := leaderServer.GetRaftCluster() c.Assert(rc, NotNil) - - req := &pdpb.GetAllStoresRequest{ - Header: testutil.NewRequestHeader(clusterID), + c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil) + addStoreWithMinResolvedTS := func(c *C, storeID uint64, isTiflash bool, minResolvedTS, expect uint64) { + store := &metapb.Store{ + Id: storeID, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(storeID)), + } + if isTiflash { + store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} + } + _, err := putStore(grpcPDClient, clusterID, store) + c.Assert(err, IsNil) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: testutil.NewRequestHeader(clusterID), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req) + c.Assert(err, IsNil) + time.Sleep(time.Millisecond * 10) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, expect) } - resp, err := grpcPDClient.GetAllStores(context.Background(), req) - c.Assert(err, IsNil) - c.Assert(resp.Stores, HasLen, 1) - storeID := resp.Stores[0].GetId() - - min := uint64(233) - req2 := &pdpb.ReportMinResolvedTsRequest{ - Header: testutil.NewRequestHeader(clusterID), - StoreId: storeID, - MinResolvedTs: min, + store1TS := uint64(233) + store3TS := store1TS - 10 + // case1: no init + // min resolved ts should be not available + store1 := uint64(1) + status, err := rc.LoadClusterStatus() + c.Assert(status.IsInitialized, IsFalse) + addStoreWithMinResolvedTS(c, store1, false, store1TS, math.MaxUint64) + // case2: add region + // min resolved ts should be available + for i := 0; i < 3; i++ { + region := &metapb.Region{ + Id: uint64(4 + i), + Peers: []*metapb.Peer{{Id: uint64(10 + i), StoreId: store1}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) } - _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req2) - c.Assert(err, IsNil) - ts0, err := rc.GetStorage().LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(ts0, Equals, min) - ts1, err := rc.GetStorage().LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(ts1, Equals, min) - resetStoreState(c, rc, storeID, metapb.StoreState_Tombstone) - ts2, err := rc.GetStorage().LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(ts2, Equals, uint64(0)) + c.Assert(rc.GetStore(store1).GetLeaderCount(), Equals, 3) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, store1TS) + // case2: add tiflash store + // min resolved ts should no change + store2 := uint64(2) + addStoreWithMinResolvedTS(c, store2, true, 0, store1TS) + // case4: add new store with less ts but without leader + // min resolved ts should no change + store3 := uint64(3) + addStoreWithMinResolvedTS(c, store3, false, store3TS, store1TS) + // case5: transfer region leader to store 3 + // min resolved ts should change to store 3 + region := &metapb.Region{ + Id: uint64(20), + Peers: []*metapb.Peer{{Id: uint64(21), StoreId: store3}}, + StartKey: []byte{byte(20)}, + EndKey: []byte{byte(21)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + c.Assert(rc.GetStore(store3).GetLeaderCount(), Equals, 1) + ts = rc.GetMinResolvedTS() + c.Assert(ts, Equals, store3TS) + // case6: set tombstone + // min resolved ts should change to store 1 + resetStoreState(c, rc, store3, metapb.StoreState_Tombstone) + time.Sleep(time.Millisecond * 10) + ts = rc.GetMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts, Equals, store1TS) }