Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dr-ausosync: report min resolved ts #4716

Merged
merged 13 commits into from
Mar 17, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA=
github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
58 changes: 58 additions & 0 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/tikv/pd/server"
"github.com/unrolled/render"
)

type minResolvedTSHandler struct {
svr *server.Server
rd *render.Render
}

func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler {
return &minResolvedTSHandler{
svr: svr,
rd: rd,
}
}

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type listMinResolvedTS struct {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
MinResolvedTS uint64 `json:"min_resolved_ts"`
}

// @Tags minresolvedts
// @Summary Get min resolved ts.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// @Produce json
// @Success 200 {array} listMinResolvedTS
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
storage := h.svr.GetStorage()
minResolvedTS, err := storage.LoadMinResolvedTS()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
list := listMinResolvedTS{
MinResolvedTS: minResolvedTS,
}
h.rd.JSON(w, http.StatusOK, list)
}
64 changes: 64 additions & 0 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
)

var _ = Suite(&testMinResolvedTSSuite{})

type testMinResolvedTSSuite struct {
svr *server.Server
cleanup cleanUpFunc
urlPrefix string
}

func (s *testMinResolvedTSSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
mustPutStore(c, s.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
}

func (s *testMinResolvedTSSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) {
url := s.urlPrefix + "/min-resolved-ts"
storage := s.svr.GetStorage()
min := uint64(233)
storage.SaveMinResolvedTS(min)
result := &listMinResolvedTS{
MinResolvedTS: min,
}
res, err := testDialClient.Get(url)
c.Assert(err, IsNil)
defer res.Body.Close()
listResp := &listMinResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
c.Assert(err, IsNil)
c.Assert(listResp, DeepEquals, result)
}
4 changes: 4 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "GetGCSafePoint", "/gc/safepoint", serviceGCSafepointHandler.List, setMethods("GET"), setAuditBackend(localLog))
registerFunc(apiRouter, "DeleteGCSafePoint", "/gc/safepoint/{service_id}", serviceGCSafepointHandler.Delete, setMethods("DELETE"), setAuditBackend(localLog))

// min resolved ts API
MinResolvedTSHandler := newMinResolvedTSHandler(svr, rd)
registerFunc(apiRouter, "GetMinResolvedTS", "/min-resolved-ts", MinResolvedTSHandler.List, setMethods("GET"), setAuditBackend(localLog))

// unsafe admin operation API
unsafeOperationHandler := newUnsafeOperationHandler(svr, rd)
registerFunc(clusterRouter, "RemoveFailedStoresUnsafely", "/admin/unsafe/remove-failed-stores",
Expand Down
2 changes: 1 addition & 1 deletion server/api/service_gc_safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) {
func (s *testServiceGCSafepointSuite) TestServiceGCSafepoint(c *C) {
sspURL := s.urlPrefix + "/gc/safepoint"

storage := s.svr.GetStorage()
Expand Down
64 changes: 63 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
)

var backgroundJobInterval = 10 * time.Second
var saveMinResolvedTSInterval = 1 * time.Second
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

const (
clientTimeout = 3 * time.Second
Expand Down Expand Up @@ -251,15 +253,17 @@ func (c *RaftCluster) Start(s Server) error {
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.unsafeRecoveryController = newUnsafeRecoveryController(cluster)

c.wg.Add(5)
c.wg.Add(6)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
saveMinResolvedTSInterval = 1 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob(saveMinResolvedTSInterval)
c.running = true

return nil
Expand Down Expand Up @@ -1650,6 +1654,64 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
log.Error("persist store limit meet error", errs.ZapError(err))
}

// GetMinResolvedTS returns the min resolved ts of all stores.
func (c *RaftCluster) GetMinResolvedTS() uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() {
return math.MaxUint64
}
min := uint64(math.MaxUint64)
for _, s := range c.GetStores() {
if !core.IsAvailableForMinResolvedTS(s) {
disksing marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if min > s.GetMinResolvedTS() {
min = s.GetMinResolvedTS()
}
}
return min
}

// SetMinResolvedTS sets up a store with min resolved ts.
func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
c.Lock()
defer c.Unlock()

store := c.GetStore(storeID)
if store == nil {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

newStore := store.Clone(
core.SetMinResolvedTS(minResolvedTS),
)

return c.putStoreLocked(newStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has unnecessary writes to etcd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ok to write to etcd. If not write to etcd, it will be lost when PD is shut down or PD leader is changed.

Copy link
Contributor

@nolouch nolouch Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it only write meta information to etcd, actually there is no any update :

func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
	if c.storage != nil {
		if err := c.storage.SaveStore(store.GetMeta()); err != nil {
			return err
		}
	}
	c.core.PutStore(store)
	c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
	return nil
}

}

func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(saveInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("min resolved ts background jobs has been stopped")
return
case <-ticker.C:
minResolvedTS := c.GetMinResolvedTS()
if minResolvedTS != math.MaxUint64 {
c.Lock()
defer c.Unlock()
c.storage.SaveMinResolvedTS(minResolvedTS)
}
}
}
}

// SetStoreLimit sets a store limit for a given type and rate.
func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error {
old := c.opt.GetScheduleConfig().Clone()
Expand Down
26 changes: 20 additions & 6 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
minResolvedTS uint64
}

// NewStoreInfo creates StoreInfo with meta data.
func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
storeInfo := &StoreInfo{
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
minResolvedTS: math.MaxUint64,
}
for _, opt := range opts {
opt(storeInfo)
Expand All @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand All @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
minResolvedTS: s.minResolvedTS,
}

for _, opt := range opts {
Expand Down Expand Up @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration {
return 0
}

// GetMinResolvedTS returns min resolved ts.
func (s *StoreInfo) GetMinResolvedTS() uint64 {
return s.minResolvedTS
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down Expand Up @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo
}
}

// IsStoreContainLabel return if the store contains the given label.
// IsStoreContainLabel returns if the store contains the given label.
func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == key && l.GetValue() == value {
Expand All @@ -723,3 +732,8 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
}
return false
}

// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts.
func IsAvailableForMinResolvedTS(s *StoreInfo) bool {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0
}
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ func SetNewStoreStats(stats *pdpb.StoreStats) StoreCreateOption {
}
}

// SetMinResolvedTS sets min resolved ts for the store.
func SetMinResolvedTS(minResolvedTS uint64) StoreCreateOption {
return func(store *StoreInfo) {
store.minResolvedTS = minResolvedTS
}
}

// ResetStoreLimit resets the store limit for a store.
func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
34 changes: 34 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,3 +1819,37 @@ func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) error {
// TODO: reimplement add scheduler logic to avoid repeating the introduction HTTP requests inside `server/api`.
return s.GetHandler().AddEvictOrGrant(float64(stats.GetStoreId()), schedulers.EvictLeaderName)
}

// ReportMinResolvedTS implements gRPC PDServer.
func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) {
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request)
}

if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil
}

storeID := request.StoreId
minResolvedTS := request.MinResolvedTs
if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil {
return nil, err
}
log.Debug("updated min resolved-ts",
zap.Uint64("store", storeID),
zap.Uint64("min resolved-ts", minResolvedTS))
return &pdpb.ReportMinResolvedTsResponse{
Header: s.header(),
}, nil
}
Loading