Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46094
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Aug 18, 2023
1 parent b1eebca commit a7bd673
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 8 deletions.
96 changes: 89 additions & 7 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,75 @@ import (
"google.golang.org/grpc/backoff"
)

type RecoveryStage int

const (
StageUnknown RecoveryStage = iota
StageCollectingMeta
StageMakingRecoveryPlan
StageResetPDAllocateID
StageRecovering
StageFlashback
)

func (s RecoveryStage) String() string {
switch s {
case StageCollectingMeta:
return "collecting meta"
case StageMakingRecoveryPlan:
return "making recovery plan"
case StageResetPDAllocateID:
return "resetting PD allocate ID"
case StageRecovering:
return "recovering"
case StageFlashback:
return "flashback"
default:
return "unknown"
}
}

type recoveryError struct {
error
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
}
return StageUnknown
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute),
}
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()
}

// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
Expand All @@ -35,39 +104,52 @@ import (
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})
}

func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
return 0, recoveryError{error: err, atStage: StageCollectingMeta}
}

totalRegions := recovery.GetTotalRegions()

if err := recovery.MakeRecoveryPlan(); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}
}

log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

return totalRegions, nil
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration {
return backoff
}

func (rs *RetryState) GiveUp() {
rs.retryTimes = rs.maxRetry
}

// InitialRetryState make the initial state for retrying.
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
return RetryState{
Expand Down
34 changes: 33 additions & 1 deletion br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var retryableServerError = []string{
// RetryableFunc presents a retryable operation.
type RetryableFunc func() error

type RetryableFuncV2[T any] func(context.Context) (T, error)

// Backoffer implements a backoff policy for retrying operations.
type Backoffer interface {
// NextBackoff returns a duration to wait before retrying again
Expand All @@ -51,8 +53,26 @@ func WithRetry(
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
innerErr := retryableFunc()
return struct{}{}, innerErr
})
return err
}

// WithRetryV2 retries a given operation with a backoff policy.
//
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr that containing all errors encountered.
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
func WithRetryV2[T any](
ctx context.Context,
backoffer Backoffer,
fn RetryableFuncV2[T],
) (T, error) {
var allErrors error
for backoffer.Attempt() > 0 {
<<<<<<< HEAD
err := retryableFunc()
if err != nil {
allErrors = multierr.Append(allErrors, err)
Expand All @@ -64,8 +84,20 @@ func WithRetry(
} else {
return nil
}
=======
res, err := fn(ctx)
if err == nil {
return res, nil
}
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return *new(T), allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
>>>>>>> 2ac191a1379 (snap_restore: added retry for recovery (#46094))
}
return allErrors // nolint:wrapcheck
return *new(T), allErrors // nolint:wrapcheck
}

// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
Expand Down

0 comments on commit a7bd673

Please sign in to comment.