Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45904
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3pointer authored and ti-chi-bot committed Sep 12, 2023
1 parent 2650a30 commit ca5cf77
Show file tree
Hide file tree
Showing 11 changed files with 627 additions and 219 deletions.
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.StoreManager.TLSConfig()
}

// GetStore gets the tikvStore.
func (mgr *Mgr) GetStore() tikv.Storage {
return mgr.tikvStore
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver {
return mgr.tikvStore.GetLockResolver()
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand All @@ -65,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 19,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand All @@ -82,18 +85,24 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@io_etcd_go_etcd_server_v3//mvcc",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
192 changes: 163 additions & 29 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package streamhelper

import (
"bytes"
"context"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -17,7 +21,10 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +67,9 @@ type CheckpointAdvancer struct {

// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand All @@ -69,6 +78,53 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
StartKey []byte
EndKey []byte
TS uint64

// It's better to use PD timestamp in future, for now
// use local time to decide the time to resolve lock is ok.
resolveLockTime time.Time
}

func newCheckpointWithTS(ts uint64) *checkpoint {
return &checkpoint{
TS: ts,
resolveLockTime: time.Now(),
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
TS: s.Value,
resolveLockTime: time.Now(),
}
}

func (c *checkpoint) safeTS() uint64 {
return c.TS - 1
}

func (c *checkpoint) equal(o *checkpoint) bool {
return bytes.Equal(c.StartKey, o.StartKey) &&
bytes.Equal(c.EndKey, o.EndKey) && c.TS == o.TS
}

// if a checkpoint stay in a time too long(3 min)
// we should try to resolve lock for the range
// to keep the RPO in 5 min.
func (c *checkpoint) needResolveLocks() bool {
failpoint.Inject("NeedResolveLocks", func(val failpoint.Value) {
failpoint.Return(val.(bool))
})
return time.Since(c.resolveLockTime) > 3*time.Minute
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
Expand All @@ -92,6 +148,13 @@ func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) {
c.UpdateConfig(cfg)
}

// UpdateLastCheckpoint modify the checkpoint in ticking.
func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint) {
c.lastCheckpointMu.Lock()
c.lastCheckpoint = p
c.lastCheckpointMu.Unlock()
}

// Config returns the current config.
func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
Expand Down Expand Up @@ -172,15 +235,24 @@ func tsoBefore(n time.Duration) uint64 {
return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0)
}

func tsoAfter(ts uint64, n time.Duration) uint64 {
return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n))
}

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()

f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (uint64, error) {
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
var minValue spans.Valued
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
Expand All @@ -194,13 +266,13 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)),
zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)))
if len(targets) == 0 {
return minValue.Value, nil
return minValue, nil
}
err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key })
if err != nil {
return 0, err
return minValue, err
}
return minValue.Value, nil
return minValue, nil
}

func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
Expand Down Expand Up @@ -293,7 +365,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
if err != nil {
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
Expand Down Expand Up @@ -323,33 +395,36 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
return false
}
if cp <= c.lastCheckpoint {
// Need resolve lock for different range and same TS
// so check the range and TS here.
if cp.equal(c.lastCheckpoint) {
return false
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
c.UpdateLastCheckpoint(cp)
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
getCheckpoint func(context.Context) (uint64, error)) error {
getCheckpoint func(context.Context) (spans.Valued, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}

if c.setCheckpoint(cp) {
if c.setCheckpoint(ctx, cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
}
Expand Down Expand Up @@ -402,43 +477,59 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.setCheckpoint(ctx, c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint-1)
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
if err != nil {
return errors.Annotatef(err,
"failed to update service GC safe point, current checkpoint is %d, target checkpoint is %d",
c.lastCheckpoint-1, p)
c.lastCheckpoint.safeTS(), p)
}
if p <= c.lastCheckpoint-1 {
if p <= c.lastCheckpoint.safeTS() {
log.Info("updated log backup GC safe point.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
if p > c.lastCheckpoint-1 {
if p > c.lastCheckpoint.safeTS() {
log.Warn("update log backup GC safe point failed: stale.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
// lastCheckpoint is not increased too long enough.
// assume the cluster has expired locks for whatever reasons.
var targets []spans.Valued
if c.lastCheckpoint != nil && c.lastCheckpoint.needResolveLocks() && c.inResolvingLock.CompareAndSwap(false, true) {
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// when get locks here. assume these locks are not belong to same txn,
// but these locks' start ts are close to 1 minute. try resolve these locks at one time
vsf.TraverseValuesLessThan(tsoAfter(c.lastCheckpoint.TS, time.Minute), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
})
if len(targets) != 0 {
log.Info("Advancer starts to resolve locks", zap.Int("targets", len(targets)))
// use new context here to avoid timeout
ctx := context.Background()
c.asyncResolveLocksForRanges(ctx, targets)
}
c.inResolvingLock.Store(false)
}
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.",
logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
}
return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
Expand Down Expand Up @@ -467,3 +558,46 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {

return errs
}

func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, targets []spans.Valued) {
// run in another goroutine
// do not block main tick here
go func() {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
}
workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
var wg sync.WaitGroup
for _, r := range targets {
targetRange := r
wg.Add(1)
workerPool.Apply(func() {
defer wg.Done()
// Run resolve lock on the whole TiKV cluster.
// it will use startKey/endKey to scan region in PD.
// but regionCache already has a codecPDClient. so just use decode key here.
// and it almost only include one region here. so set concurrency to 1.
runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner",
c.env.GetStore(), 1, handler)
err := runner.RunOnRange(ctx, targetRange.Key.StartKey, targetRange.Key.EndKey)
if err != nil {
// wait for next tick
log.Warn("resolve locks failed, wait for next tick", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Error(err))
}
})
}
wg.Wait()
log.Info("finish resolve locks for checkpoint", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
logutil.Key("StartKey", c.lastCheckpoint.StartKey),
logutil.Key("EndKey", c.lastCheckpoint.EndKey),
zap.Int("targets", len(targets)))
c.lastCheckpointMu.Lock()
c.lastCheckpoint.resolveLockTime = time.Now()
c.lastCheckpointMu.Unlock()
}()
}
Loading

0 comments on commit ca5cf77

Please sign in to comment.