Skip to content

Commit

Permalink
backup: fix retry of fine-grained backup (#43252) (#43305)
Browse files Browse the repository at this point in the history
close #43236
  • Loading branch information
ti-chi-bot authored May 23, 2023
1 parent 6f89439 commit 0457615
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 25 deletions.
38 changes: 14 additions & 24 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
// backupFineGrainedMaxBackoff is 1 hour.
// given it begins the fine-grained backup, there must be some problems in the cluster.
// We need to be more patient.
backupFineGrainedMaxBackoff = 3600000
backupRetryTimes = 5
// RangeUnit represents the progress updated counter when a range finished.
RangeUnit ProgressUnit = "range"
Expand Down Expand Up @@ -920,20 +922,20 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
logutil.CL(ctx).Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if region.Leader != nil {
log.Info("find leader",
logutil.CL(ctx).Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
log.Warn("no region found", logutil.Key("key", key))
logutil.CL(ctx).Warn("no region found", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
log.Error("can not find leader", logutil.Key("key", key))
logutil.CL(ctx).Error("can not find leader", logutil.Key("key", key))
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader")
}

Expand Down Expand Up @@ -964,7 +966,7 @@ func (bc *Client) fineGrainedBackup(
}
})

bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown)
for {
// Step1, check whether there is any incomplete range
incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey)
Expand All @@ -977,14 +979,10 @@ func (bc *Client) fineGrainedBackup(
errCh := make(chan error, 4)
retry := make(chan rtree.Range, 4)

max := &struct {
ms int
mu sync.Mutex
}{}
wg := new(sync.WaitGroup)
for i := 0; i < 4; i++ {
wg.Add(1)
fork, _ := bo.Fork()
fork, _ := bo.Inner().Fork()
go func(boFork *tikv.Backoffer) {
defer wg.Done()
for rg := range retry {
Expand All @@ -996,11 +994,7 @@ func (bc *Client) fineGrainedBackup(
return
}
if backoffMs != 0 {
max.mu.Lock()
if max.ms < backoffMs {
max.ms = backoffMs
}
max.mu.Unlock()
bo.RequestBackOff(backoffMs)
}
}
}(fork)
Expand Down Expand Up @@ -1056,15 +1050,11 @@ func (bc *Client) fineGrainedBackup(
}

// Step3. Backoff if needed, then repeat.
max.mu.Lock()
ms := max.ms
max.mu.Unlock()
if ms != 0 {
if ms := bo.NextSleepInMS(); ms != 0 {
log.Info("handle fine grained", zap.Int("backoffMs", ms))
// TODO: fill a meaningful error.
err := bo.BackoffWithMaxSleepTxnLockFast(ms, berrors.ErrUnknown)
err := bo.BackOff()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", pr.Res.Len())
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ go_library(
"//util",
"//util/sqlexec",
"@com_github_cheggaaa_pb_v3//:pb",
"@com_github_cznic_mathutil//:mathutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
Expand Down Expand Up @@ -74,13 +76,14 @@ go_test(
"math_test.go",
"misc_test.go",
"progress_test.go",
"retry_test.go",
"safe_point_test.go",
"schema_test.go",
"sensitive_test.go",
],
embed = [":utils"],
flaky = True,
shard_count = 20,
shard_count = 29,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -101,6 +104,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package utils
import (
"context"
"strings"
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/terror"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -84,3 +87,76 @@ func FallBack2CreateTable(err error) bool {
}
return false
}

// RetryWithBackoffer is a simple context for a "mixed" retry.
// Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument.
// But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but
// we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade
// back off strategy. That is what the `RetryWithBackoffer` did.
type RetryWithBackoffer struct {
bo *tikv.Backoffer

totalBackoff int
maxBackoff int
baseErr error

mu sync.Mutex
nextBackoff int
}

// AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions:
// When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config
// Which is sealed in the "client-go/internal".
func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer {
return &RetryWithBackoffer{
bo: tikv.NewBackoffer(ctx, maxSleepMs),
maxBackoff: maxSleepMs,
baseErr: baseErr,
}
}

// NextSleepInMS returns the time `BackOff` will sleep in ms of the state.
func (r *RetryWithBackoffer) NextSleepInMS() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.nextBackoff
}

// TotalSleepInMS returns the total sleeped time in ms.
func (r *RetryWithBackoffer) TotalSleepInMS() int {
return r.totalBackoff + r.bo.GetTotalSleep()
}

// MaxSleepInMS returns the max sleep time for the retry context in ms.
func (r *RetryWithBackoffer) MaxSleepInMS() int {
return r.maxBackoff
}

// BackOff executes the back off: sleep for a precalculated backoff time.
// See `RequestBackOff` for more details.
func (r *RetryWithBackoffer) BackOff() error {
r.mu.Lock()
nextBo := r.nextBackoff
r.nextBackoff = 0
r.mu.Unlock()

if r.TotalSleepInMS() > r.maxBackoff {
return errors.Annotatef(r.baseErr, "backoff exceeds the max backoff time %s", time.Duration(r.maxBackoff)*time.Millisecond)
}
time.Sleep(time.Duration(nextBo) * time.Millisecond)
r.totalBackoff += nextBo
return nil
}

// RequestBackOff register the intent of backing off at least n milliseconds.
// That intent will be fulfilled when calling `BackOff`.
func (r *RetryWithBackoffer) RequestBackOff(ms int) {
r.mu.Lock()
r.nextBackoff = mathutil.Max(r.nextBackoff, ms)
r.mu.Unlock()
}

// Inner returns the reference to the inner `backoffer`.
func (r *RetryWithBackoffer) Inner() *tikv.Backoffer {
return r.bo
}
49 changes: 49 additions & 0 deletions br/pkg/utils/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package utils_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestRetryAdapter(t *testing.T) {
req := require.New(t)

begin := time.Now()
bo := utils.AdaptTiKVBackoffer(context.Background(), 200, errors.New("everything is alright"))
// This should sleep for 100ms.
bo.Inner().Backoff(tikv.BoTiKVRPC(), errors.New("TiKV is in a deep dream"))
sleeped := bo.TotalSleepInMS()
req.GreaterOrEqual(sleeped, 50)
req.LessOrEqual(sleeped, 150)
requestedBackOff := [...]int{10, 20, 5, 0, 42, 48}
wg := new(sync.WaitGroup)
wg.Add(len(requestedBackOff))
for _, bms := range requestedBackOff {
bms := bms
go func() {
bo.RequestBackOff(bms)
wg.Done()
}()
}
wg.Wait()
req.Equal(bo.NextSleepInMS(), 48)
req.NoError(bo.BackOff())
req.Equal(bo.TotalSleepInMS(), sleeped+48)

bo.RequestBackOff(150)
req.NoError(bo.BackOff())

bo.RequestBackOff(150)
req.ErrorContains(bo.BackOff(), "everything is alright", "total = %d / %d", bo.TotalSleepInMS(), bo.MaxSleepInMS())

req.Greater(time.Since(begin), 200*time.Millisecond)
}

0 comments on commit 0457615

Please sign in to comment.