Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Mar 11, 2022
1 parent bb23a5c commit 09b7079
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 251 deletions.
23 changes: 0 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ type Client interface {
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
// ReportMinResolvedTS reports the min resolved ts to pd.
ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
// Close closes the client.
Expand Down Expand Up @@ -1867,24 +1865,3 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}()
return globalConfigWatcherCh, err
}

func (c *client) ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ReportMinResolvedTS", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.ReportMinResolvedTsRequest{
Header: c.requestHeader(),
StoreId: storeID,
MinResolvedTs: minResolvedTS,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
_, err := c.getClient().ReportMinResolvedTS(ctx, req)
cancel()

if err != nil {
return errors.WithStack(err)
}
return nil
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/prometheus/client_golang v1.11.0
go.uber.org/goleak v1.1.11
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
14 changes: 3 additions & 11 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/http"

"github.com/tikv/pd/server"
"github.com/tikv/pd/server/storage/endpoint"
"github.com/unrolled/render"
)

Expand All @@ -36,8 +35,7 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type listMinResolvedTS struct {
MinResolvedTSList []*endpoint.MinResolvedTSPoint `json:"list"`
MinResolvedTSForCluster uint64 `json:"min_resolved_ts"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
}

// @Tags minresolvedts
Expand All @@ -48,19 +46,13 @@ type listMinResolvedTS struct {
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) {
storage := h.svr.GetStorage()
minResolvedTS, err := storage.LoadClusterMinResolvedTS()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
minResolvedTSList, err := storage.LoadAllMinResolvedTS()
minResolvedTS, err := storage.LoadMinResolvedTS()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
list := listMinResolvedTS{
MinResolvedTSList: minResolvedTSList,
MinResolvedTSForCluster: minResolvedTS,
MinResolvedTS: minResolvedTS,
}
h.rd.JSON(w, http.StatusOK, list)
}
19 changes: 3 additions & 16 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/storage/endpoint"
)

var _ = Suite(&testMinResolvedTSSuite{})
Expand Down Expand Up @@ -50,23 +49,11 @@ func (s *testMinResolvedTSSuite) TearDownSuite(c *C) {
func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) {
url := s.urlPrefix + "/min-resolved-ts"
storage := s.svr.GetStorage()
testData := []uint64{233333, 23333, 2333, 233, 1}
min := uint64(233)
storage.SaveMinResolvedTS(min)
result := &listMinResolvedTS{
MinResolvedTSList: make([]*endpoint.MinResolvedTSPoint, 0),
MinResolvedTS: min,
}
for i, minResolvedTS := range testData {
storeID := uint64(i + 1)
err := storage.SaveMinResolvedTS(storeID, minResolvedTS)
c.Assert(err, IsNil)
result.MinResolvedTSList = append(result.MinResolvedTSList, &endpoint.MinResolvedTSPoint{
StoreID: storeID,
MinResolvedTS: minResolvedTS,
})
}
ts, err := storage.LoadClusterMinResolvedTS()
c.Assert(err, IsNil)
result.MinResolvedTSForCluster = ts

res, err := testDialClient.Get(url)
c.Assert(err, IsNil)
defer res.Body.Close()
Expand Down
76 changes: 61 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
)

var backgroundJobInterval = 10 * time.Second
var saveMinResolvedTSInterval = 1 * time.Second

const (
clientTimeout = 3 * time.Second
Expand Down Expand Up @@ -251,15 +253,17 @@ func (c *RaftCluster) Start(s Server) error {
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.unsafeRecoveryController = newUnsafeRecoveryController(cluster)

c.wg.Add(5)
c.wg.Add(6)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
saveMinResolvedTSInterval = 1 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob(saveMinResolvedTSInterval)
c.running = true

return nil
Expand Down Expand Up @@ -1067,18 +1071,12 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
c.onStoreVersionChangeLocked()
if err == nil {
// clean up the residual information.
c.CleanTombstoneResidualInfo(storeID)
c.RemoveStoreLimit(storeID)
c.hotStat.RemoveRollingStoreStats(storeID)
}
return err
}

// CleanTombstoneResidualInfo clean up the residual information of tombstone store.
func (c *RaftCluster) CleanTombstoneResidualInfo(storeID uint64) {
c.RemoveStoreLimit(storeID)
c.RemoveMinResolvedTSStorage(storeID)
}

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func (c *RaftCluster) PauseLeaderTransfer(storeID uint64) error {
Expand Down Expand Up @@ -1225,7 +1223,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error {
errs.ZapError(err))
return err
}
c.CleanTombstoneResidualInfo(store.GetID())
c.RemoveStoreLimit(store.GetID())
log.Info("delete store succeeded",
zap.Stringer("store", store.GetMeta()))
}
Expand Down Expand Up @@ -1656,14 +1654,62 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
log.Error("persist store limit meet error", errs.ZapError(err))
}

// RemoveMinResolvedTSStorage remove min resolved ts storage for a given store ID.
func (c *RaftCluster) RemoveMinResolvedTSStorage(storeID uint64) error {
if c.storage != nil {
if err := c.storage.RemoveMinResolvedTS(storeID); err != nil {
return err
// GetMinResolvedTS returns the min resolved ts of all stores.
func (c *RaftCluster) GetMinResolvedTS() uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() {
return math.MaxUint64
}
min := uint64(math.MaxUint64)
for _, s := range c.GetStores() {
if !core.IsAvailableForMinResolvedTS(s) {
continue
}
if min > s.GetMinResolvedTS() {
min = s.GetMinResolvedTS()
}
}
return min
}

// SetMinResolvedTS sets up a store with min resolved ts.
func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
c.Lock()
defer c.Unlock()

store := c.GetStore(storeID)
if store == nil {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

newStore := store.Clone(
core.SetMinResolvedTS(minResolvedTS),
)

return c.putStoreLocked(newStore)
}

func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(saveInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("min resolved ts background jobs has been stopped")
return
case <-ticker.C:
minResolvedTS := c.GetMinResolvedTS()
if minResolvedTS != math.MaxUint64 {
c.Lock()
defer c.Unlock()
c.storage.SaveMinResolvedTS(minResolvedTS)
}
}
}
return nil
}

// SetStoreLimit sets a store limit for a given type and rate.
Expand Down
26 changes: 20 additions & 6 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
minResolvedTS uint64
}

// NewStoreInfo creates StoreInfo with meta data.
func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
storeInfo := &StoreInfo{
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
minResolvedTS: math.MaxUint64,
}
for _, opt := range opts {
opt(storeInfo)
Expand All @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand All @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand Down Expand Up @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration {
return 0
}

// GetMinResolvedTS returns min resolved ts.
func (s *StoreInfo) GetMinResolvedTS() uint64 {
return s.minResolvedTS
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down Expand Up @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo
}
}

// IsStoreContainLabel return if the store contains the given label.
// IsStoreContainLabel returns if the store contains the given label.
func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == key && l.GetValue() == value {
Expand All @@ -723,3 +732,8 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
}
return false
}

// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts.
func IsAvailableForMinResolvedTS(s *StoreInfo) bool {
return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0
}
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ func SetNewStoreStats(stats *pdpb.StoreStats) StoreCreateOption {
}
}

// SetMinResolvedTS sets min resolved ts for the store.
func SetMinResolvedTS(minResolvedTS uint64) StoreCreateOption {
return func(store *StoreInfo) {
store.minResolvedTS = minResolvedTS
}
}

// ResetStoreLimit resets the store limit for a store.
func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
5 changes: 1 addition & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,17 +1841,14 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo
return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil
}

var storage endpoint.MinResolvedTSStorage = s.storage
storeID := request.StoreId
minResolvedTS := request.MinResolvedTs

if err := storage.SaveMinResolvedTS(storeID, minResolvedTS); err != nil {
if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil {
return nil, err
}
log.Debug("updated min resolved-ts",
zap.Uint64("store", storeID),
zap.Uint64("min resolved-ts", minResolvedTS))

return &pdpb.ReportMinResolvedTsResponse{
Header: s.header(),
}, nil
Expand Down
11 changes: 3 additions & 8 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ func gcSafePointServicePath(serviceID string) string {
return path.Join(gcSafePointPath(), "service", serviceID)
}

// MinResolvedTSPath returns the min resolved ts with the given store ID.
func MinResolvedTSPath(storeID uint64) string {
return path.Join(minResolvedTS, fmt.Sprintf("%020d", storeID))
}

// MinResolvedTSPrefixPath returns the min resolved ts key path prefix.
func MinResolvedTSPrefixPath() string {
return minResolvedTS + "/"
// MinResolvedTSPath returns the min resolved ts path
func MinResolvedTSPath() string {
return path.Join(minResolvedTS)
}
Loading

0 comments on commit 09b7079

Please sign in to comment.