Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dr-ausosync: report min resolved ts #4716

Merged
merged 13 commits into from
Mar 17, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
1 change: 1 addition & 0 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (s *testConfigSuite) TestConfigPDServer(c *C) {
c.Assert(sc.MetricStorage, Equals, "")
c.Assert(sc.DashboardAddress, Equals, "auto")
c.Assert(sc.FlowRoundByDigit, Equals, int(3))
c.Assert(sc.SaveMinResolvedTSInterval, Equals, typeutil.NewDuration(0))
c.Assert(sc.MaxResetTSGap.Duration, Equals, 24*time.Hour)
}

Expand Down
57 changes: 57 additions & 0 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/tikv/pd/server"
"github.com/unrolled/render"
)

type minResolvedTSHandler struct {
svr *server.Server
rd *render.Render
}

func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler {
return &minResolvedTSHandler{
svr: svr,
rd: rd,
}
}

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type minResolvedTS struct {
MinResolvedTS uint64 `json:"min_resolved_ts"`
}

// @Tags minresolvedts
// @Summary Get min resolved ts.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// @Produce json
// @Success 200 {array} minResolvedTS
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) Get(w http.ResponseWriter, r *http.Request) {
storage := h.svr.GetStorage()
value, err := storage.LoadMinResolvedTS()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
})
}
66 changes: 66 additions & 0 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
)

var _ = Suite(&testMinResolvedTSSuite{})

type testMinResolvedTSSuite struct {
svr *server.Server
cleanup cleanUpFunc
urlPrefix string
}

func (s *testMinResolvedTSSuite) SetUpSuite(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil)
s.svr, s.cleanup = mustNewServer(c)
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
mustPutStore(c, s.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
}

func (s *testMinResolvedTSSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) {
url := s.urlPrefix + "/min-resolved-ts"
storage := s.svr.GetStorage()
min := uint64(233)
storage.SaveMinResolvedTS(min)
result := &minResolvedTS{
MinResolvedTS: min,
}
res, err := testDialClient.Get(url)
c.Assert(err, IsNil)
defer res.Body.Close()
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
c.Assert(err, IsNil)
c.Assert(listResp, DeepEquals, result)
}
4 changes: 4 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "/gc/safepoint", serviceGCSafepointHandler.GetGCSafePoint, setMethods("GET"), setAuditBackend(localLog))
registerFunc(apiRouter, "/gc/safepoint/{service_id}", serviceGCSafepointHandler.DeleteGCSafePoint, setMethods("DELETE"), setAuditBackend(localLog))

// min resolved ts API
minResolvedTSHandler := newMinResolvedTSHandler(svr, rd)
registerFunc(apiRouter, "/min-resolved-ts", minResolvedTSHandler.Get, setMethods("GET"))

// unsafe admin operation API
unsafeOperationHandler := newUnsafeOperationHandler(svr, rd)
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores",
Expand Down
2 changes: 1 addition & 1 deletion server/api/service_gc_safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) {
func (s *testServiceGCSafepointSuite) TestServiceGCSafepoint(c *C) {
sspURL := s.urlPrefix + "/gc/safepoint"

storage := s.svr.GetStorage()
Expand Down
66 changes: 65 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -250,16 +251,19 @@ func (c *RaftCluster) Start(s Server) error {
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.unsafeRecoveryController = newUnsafeRecoveryController(cluster)
saveMinResolvedTSInterval := c.opt.GetSaveMinResolvedTSInterval()

c.wg.Add(5)
c.wg.Add(6)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
saveMinResolvedTSInterval = 1 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob(saveMinResolvedTSInterval)
c.running = true

return nil
Expand Down Expand Up @@ -1650,6 +1654,66 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
log.Error("persist store limit meet error", errs.ZapError(err))
}

// GetMinResolvedTS returns the min resolved ts of all stores.
func (c *RaftCluster) GetMinResolvedTS() uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() {
return math.MaxUint64
}
min := uint64(math.MaxUint64)
for _, s := range c.GetStores() {
if !core.IsAvailableForMinResolvedTS(s) {
disksing marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if min > s.GetMinResolvedTS() {
min = s.GetMinResolvedTS()
}
}
return min
}

// SetMinResolvedTS sets up a store with min resolved ts.
func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
c.Lock()
defer c.Unlock()

store := c.GetStore(storeID)
if store == nil {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

newStore := store.Clone(
core.SetMinResolvedTS(minResolvedTS),
)

return c.putStoreLocked(newStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has unnecessary writes to etcd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ok to write to etcd. If not write to etcd, it will be lost when PD is shut down or PD leader is changed.

Copy link
Contributor

@nolouch nolouch Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it only write meta information to etcd, actually there is no any update :

func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
	if c.storage != nil {
		if err := c.storage.SaveStore(store.GetMeta()); err != nil {
			return err
		}
	}
	c.core.PutStore(store)
	c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
	return nil
}

}

func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) {
defer c.wg.Done()
if saveInterval == 0 {
return
}
defer logutil.LogPanic()
ticker := time.NewTicker(saveInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("min resolved ts background jobs has been stopped")
return
case <-ticker.C:
minResolvedTS := c.GetMinResolvedTS()
if minResolvedTS != math.MaxUint64 {
c.Lock()
c.storage.SaveMinResolvedTS(minResolvedTS)
c.Unlock()
}
}
}
}

// SetStoreLimit sets a store limit for a given type and rate.
func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error {
old := c.opt.GetScheduleConfig().Clone()
Expand Down
18 changes: 12 additions & 6 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,13 @@ const (

defaultLeaderPriorityCheckInterval = time.Minute

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3 // KB
maxTraceFlowRoundByDigit = 5 // 0.1 MB
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"
defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3 // KB
maxTraceFlowRoundByDigit = 5 // 0.1 MB
defaultMaxResetTSGap = 24 * time.Hour
defaultSaveMinResolvedTSInterval = 0
defaultKeyType = "table"

defaultStrictlyMatchLabel = false
defaultEnablePlacementRules = true
Expand Down Expand Up @@ -1102,6 +1103,8 @@ type PDServerConfig struct {
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"`
// FlowRoundByDigit used to discretization processing flow information.
FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
// SaveMinResolvedTSInterval is the interval to save the min resolved ts.
SaveMinResolvedTSInterval typeutil.Duration `toml:"save-min-resolved-ts-interval" json:"save-min-resolved-ts-interval"`
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1124,6 +1127,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit)
}
if !meta.IsDefined("save-min-resolved-ts-interval") {
adjustDuration(&c.SaveMinResolvedTSInterval, defaultSaveMinResolvedTSInterval)
}
c.migrateConfigurationFromFile(meta)
return c.Validate()
}
Expand Down
5 changes: 5 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,11 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa
return false
}

// GetSaveMinResolvedTSInterval gets the interval for PD to save min resolved ts.
func (o *PersistOptions) GetSaveMinResolvedTSInterval() time.Duration {
return o.GetPDServerConfig().SaveMinResolvedTSInterval.Duration
}

const ttlConfigPrefix = "/config/ttl"

// SetTTLData set temporary configuration
Expand Down
26 changes: 20 additions & 6 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
minResolvedTS uint64
}

// NewStoreInfo creates StoreInfo with meta data.
func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
storeInfo := &StoreInfo{
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
minResolvedTS: math.MaxUint64,
}
for _, opt := range opts {
opt(storeInfo)
Expand All @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand All @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand Down Expand Up @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration {
return 0
}

// GetMinResolvedTS returns min resolved ts.
func (s *StoreInfo) GetMinResolvedTS() uint64 {
return s.minResolvedTS
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down Expand Up @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo
}
}

// IsStoreContainLabel return if the store contains the given label.
// IsStoreContainLabel returns if the store contains the given label.
func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == key && l.GetValue() == value {
Expand All @@ -723,3 +732,8 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
}
return false
}

// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts.
func IsAvailableForMinResolvedTS(s *StoreInfo) bool {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0
}
Loading