diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 25225d3ddd93b..7a7a7d43eb842 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -74,9 +74,11 @@ type Checksum struct { // ProgressUnit represents the unit of progress. type ProgressUnit string -// Maximum total sleep time(in ms) for kv/cop commands. const ( - backupFineGrainedMaxBackoff = 80000 + // backupFineGrainedMaxBackoff is 1 hour. + // given it begins the fine-grained backup, there must be some problems in the cluster. + // We need to be more patient. + backupFineGrainedMaxBackoff = 3600000 backupRetryTimes = 5 // RangeUnit represents the progress updated counter when a range finished. RangeUnit ProgressUnit = "range" @@ -920,20 +922,20 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool // better backoff. region, err := bc.mgr.GetPDClient().GetRegion(ctx, key) if err != nil || region == nil { - log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region)) + logutil.CL(ctx).Error("find leader failed", zap.Error(err), zap.Reflect("region", region)) time.Sleep(time.Millisecond * time.Duration(100*i)) continue } if region.Leader != nil { - log.Info("find leader", + logutil.CL(ctx).Info("find leader", zap.Reflect("Leader", region.Leader), logutil.Key("key", key)) return region.Leader, nil } - log.Warn("no region found", logutil.Key("key", key)) + logutil.CL(ctx).Warn("no region found", logutil.Key("key", key)) time.Sleep(time.Millisecond * time.Duration(100*i)) continue } - log.Error("can not find leader", logutil.Key("key", key)) + logutil.CL(ctx).Error("can not find leader", logutil.Key("key", key)) return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader") } @@ -964,7 +966,7 @@ func (bc *Client) fineGrainedBackup( } }) - bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) + bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown) for { // Step1, check whether there is any incomplete range incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey) @@ -977,14 +979,10 @@ func (bc *Client) fineGrainedBackup( errCh := make(chan error, 4) retry := make(chan rtree.Range, 4) - max := &struct { - ms int - mu sync.Mutex - }{} wg := new(sync.WaitGroup) for i := 0; i < 4; i++ { wg.Add(1) - fork, _ := bo.Fork() + fork, _ := bo.Inner().Fork() go func(boFork *tikv.Backoffer) { defer wg.Done() for rg := range retry { @@ -996,11 +994,7 @@ func (bc *Client) fineGrainedBackup( return } if backoffMs != 0 { - max.mu.Lock() - if max.ms < backoffMs { - max.ms = backoffMs - } - max.mu.Unlock() + bo.RequestBackOff(backoffMs) } } }(fork) @@ -1056,15 +1050,11 @@ func (bc *Client) fineGrainedBackup( } // Step3. Backoff if needed, then repeat. - max.mu.Lock() - ms := max.ms - max.mu.Unlock() - if ms != 0 { + if ms := bo.NextSleepInMS(); ms != 0 { log.Info("handle fine grained", zap.Int("backoffMs", ms)) - // TODO: fill a meaningful error. - err := bo.BackoffWithMaxSleepTxnLockFast(ms, berrors.ErrUnknown) + err := bo.BackOff() if err != nil { - return errors.Trace(err) + return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", pr.Res.Len()) } } } diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index ebf579ba0fb9d..2484988c870dd 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//util", "//util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", + "@com_github_cznic_mathutil//:mathutil", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", @@ -45,6 +46,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//backoff", @@ -74,13 +76,14 @@ go_test( "math_test.go", "misc_test.go", "progress_test.go", + "retry_test.go", "safe_point_test.go", "schema_test.go", "sensitive_test.go", ], embed = [":utils"], flaky = True, - shard_count = 20, + shard_count = 29, deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -101,6 +104,7 @@ go_test( "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index afd5b4ee1e6a7..0cdd33934a0d5 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -5,11 +5,14 @@ package utils import ( "context" "strings" + "sync" "time" + "github.com/cznic/mathutil" "github.com/pingcap/errors" tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/terror" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" ) @@ -84,3 +87,76 @@ func FallBack2CreateTable(err error) bool { } return false } + +// RetryWithBackoffer is a simple context for a "mixed" retry. +// Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument. +// But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but +// we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade +// back off strategy. That is what the `RetryWithBackoffer` did. +type RetryWithBackoffer struct { + bo *tikv.Backoffer + + totalBackoff int + maxBackoff int + baseErr error + + mu sync.Mutex + nextBackoff int +} + +// AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions: +// When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config +// Which is sealed in the "client-go/internal". +func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer { + return &RetryWithBackoffer{ + bo: tikv.NewBackoffer(ctx, maxSleepMs), + maxBackoff: maxSleepMs, + baseErr: baseErr, + } +} + +// NextSleepInMS returns the time `BackOff` will sleep in ms of the state. +func (r *RetryWithBackoffer) NextSleepInMS() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.nextBackoff +} + +// TotalSleepInMS returns the total sleeped time in ms. +func (r *RetryWithBackoffer) TotalSleepInMS() int { + return r.totalBackoff + r.bo.GetTotalSleep() +} + +// MaxSleepInMS returns the max sleep time for the retry context in ms. +func (r *RetryWithBackoffer) MaxSleepInMS() int { + return r.maxBackoff +} + +// BackOff executes the back off: sleep for a precalculated backoff time. +// See `RequestBackOff` for more details. +func (r *RetryWithBackoffer) BackOff() error { + r.mu.Lock() + nextBo := r.nextBackoff + r.nextBackoff = 0 + r.mu.Unlock() + + if r.TotalSleepInMS() > r.maxBackoff { + return errors.Annotatef(r.baseErr, "backoff exceeds the max backoff time %s", time.Duration(r.maxBackoff)*time.Millisecond) + } + time.Sleep(time.Duration(nextBo) * time.Millisecond) + r.totalBackoff += nextBo + return nil +} + +// RequestBackOff register the intent of backing off at least n milliseconds. +// That intent will be fulfilled when calling `BackOff`. +func (r *RetryWithBackoffer) RequestBackOff(ms int) { + r.mu.Lock() + r.nextBackoff = mathutil.Max(r.nextBackoff, ms) + r.mu.Unlock() +} + +// Inner returns the reference to the inner `backoffer`. +func (r *RetryWithBackoffer) Inner() *tikv.Backoffer { + return r.bo +} diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go new file mode 100644 index 0000000000000..eeef8c61c0480 --- /dev/null +++ b/br/pkg/utils/retry_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package utils_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestRetryAdapter(t *testing.T) { + req := require.New(t) + + begin := time.Now() + bo := utils.AdaptTiKVBackoffer(context.Background(), 200, errors.New("everything is alright")) + // This should sleep for 100ms. + bo.Inner().Backoff(tikv.BoTiKVRPC(), errors.New("TiKV is in a deep dream")) + sleeped := bo.TotalSleepInMS() + req.GreaterOrEqual(sleeped, 50) + req.LessOrEqual(sleeped, 150) + requestedBackOff := [...]int{10, 20, 5, 0, 42, 48} + wg := new(sync.WaitGroup) + wg.Add(len(requestedBackOff)) + for _, bms := range requestedBackOff { + bms := bms + go func() { + bo.RequestBackOff(bms) + wg.Done() + }() + } + wg.Wait() + req.Equal(bo.NextSleepInMS(), 48) + req.NoError(bo.BackOff()) + req.Equal(bo.TotalSleepInMS(), sleeped+48) + + bo.RequestBackOff(150) + req.NoError(bo.BackOff()) + + bo.RequestBackOff(150) + req.ErrorContains(bo.BackOff(), "everything is alright", "total = %d / %d", bo.TotalSleepInMS(), bo.MaxSleepInMS()) + + req.Greater(time.Since(begin), 200*time.Millisecond) +}