Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_restore: added retry for recovery #46094

Merged
merged 3 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 90 additions & 10 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,75 @@ import (
"google.golang.org/grpc/backoff"
)

type RecoveryStage int

const (
StageUnknown RecoveryStage = iota
StageCollectingMeta
StageMakingRecoveryPlan
StageResetPDAllocateID
StageRecovering
StageFlashback
)

func (s RecoveryStage) String() string {
switch s {
case StageCollectingMeta:
return "collecting meta"
case StageMakingRecoveryPlan:
return "making recovery plan"
case StageResetPDAllocateID:
return "resetting PD allocate ID"
case StageRecovering:
return "recovering"
case StageFlashback:
return "flashback"
default:
return "unknown"
}
}

type recoveryError struct {
error
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
}
return StageUnknown
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 0, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the parameter 0 ok?

}
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()
}

// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
Expand All @@ -35,39 +104,50 @@ import (
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})
}

func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
return 0, recoveryError{error: err, atStage: StageCollectingMeta}
}

totalRegions := recovery.GetTotalRegions()

if err := recovery.MakeRecoveryPlan(); err != nil {
return totalRegions, errors.Trace(err)
err := recovery.MakeRecoveryPlan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the format as before?, such as

if err := xxx; err != nil {

if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}
}

log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, errors.Trace(err)
err = recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

return totalRegions, nil
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
// restore tikv data from a snapshot volume
var totalRegions int

// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not entire annotation?

totalRegions, err = restore.RecoverData(ctx, resolveTS, allStores, mgr, progress, restoreTS, cfg.Concurrency)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration {
return backoff
}

func (rs *RetryState) GiveUp() {
rs.retryTimes = rs.maxRetry
}

// InitialRetryState make the initial state for retrying.
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
return RetryState{
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var retryableServerError = []string{
// RetryableFunc presents a retryable operation.
type RetryableFunc func() error

type RetryableFuncV2[T any] func(context.Context) (T, error)

// Backoffer implements a backoff policy for retrying operations.
type Backoffer interface {
// NextBackoff returns a duration to wait before retrying again
Expand All @@ -51,20 +53,37 @@ func WithRetry(
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
innerErr := retryableFunc()
return struct{}{}, innerErr
})
return err
}

// WithRetryV2 retries a given operation with a backoff policy.
//
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr that containing all errors encountered.
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
func WithRetryV2[T any](
ctx context.Context,
backoffer Backoffer,
fn RetryableFuncV2[T],
) (T, error) {
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
res, err := fn(ctx)
if err == nil {
return nil
return res, nil
}
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return allErrors // nolint:wrapcheck
return *new(T), allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
}
return allErrors // nolint:wrapcheck
return *new(T), allErrors // nolint:wrapcheck
}

// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
Expand Down