-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snap_restore: added retry for recovery #46094
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,75 @@ import ( | |
"google.golang.org/grpc/backoff" | ||
) | ||
|
||
type RecoveryStage int | ||
|
||
const ( | ||
StageUnknown RecoveryStage = iota | ||
StageCollectingMeta | ||
StageMakingRecoveryPlan | ||
StageResetPDAllocateID | ||
StageRecovering | ||
StageFlashback | ||
) | ||
|
||
func (s RecoveryStage) String() string { | ||
switch s { | ||
case StageCollectingMeta: | ||
return "collecting meta" | ||
case StageMakingRecoveryPlan: | ||
return "making recovery plan" | ||
case StageResetPDAllocateID: | ||
return "resetting PD allocate ID" | ||
case StageRecovering: | ||
return "recovering" | ||
case StageFlashback: | ||
return "flashback" | ||
default: | ||
return "unknown" | ||
} | ||
} | ||
|
||
type recoveryError struct { | ||
error | ||
atStage RecoveryStage | ||
} | ||
|
||
func FailedAt(err error) RecoveryStage { | ||
if rerr, ok := err.(recoveryError); ok { | ||
return rerr.atStage | ||
} | ||
return StageUnknown | ||
} | ||
|
||
type recoveryBackoffer struct { | ||
state utils.RetryState | ||
} | ||
|
||
func newRecoveryBackoffer() *recoveryBackoffer { | ||
return &recoveryBackoffer{ | ||
state: utils.InitialRetryState(16, 0, 0), | ||
} | ||
} | ||
|
||
func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration { | ||
s := FailedAt(err) | ||
switch s { | ||
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering: | ||
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s)) | ||
return bo.state.ExponentialBackoff() | ||
case StageFlashback: | ||
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s)) | ||
bo.state.GiveUp() | ||
return 0 | ||
} | ||
log.Warn("unknown stage of backing off.", zap.Int("val", int(s))) | ||
return bo.state.ExponentialBackoff() | ||
} | ||
|
||
func (bo *recoveryBackoffer) Attempt() int { | ||
return bo.state.Attempt() | ||
} | ||
|
||
// RecoverData recover the tikv cluster | ||
// 1. read all meta data from tikvs | ||
// 2. make recovery plan and then recovery max allocate ID firstly | ||
|
@@ -35,39 +104,50 @@ import ( | |
// 5. prepare the flashback | ||
// 6. flashback to resolveTS | ||
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) { | ||
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) { | ||
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency) | ||
}) | ||
} | ||
|
||
func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
var recovery = NewRecovery(allStores, mgr, progress, concurrency) | ||
if err := recovery.ReadRegionMeta(ctx); err != nil { | ||
return 0, errors.Trace(err) | ||
return 0, recoveryError{error: err, atStage: StageCollectingMeta} | ||
} | ||
|
||
totalRegions := recovery.GetTotalRegions() | ||
|
||
if err := recovery.MakeRecoveryPlan(); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
err := recovery.MakeRecoveryPlan() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep the format as before?, such as
|
||
if err != nil { | ||
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan} | ||
} | ||
|
||
log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID)) | ||
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID} | ||
} | ||
|
||
// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode. | ||
// This wathcher will retrigger `RecoveryRegions` for those stores. | ||
recovery.SpawnTiKVShutDownWatchers(ctx) | ||
if err := recovery.RecoverRegions(ctx); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
return totalRegions, recoveryError{error: err, atStage: StageRecovering} | ||
} | ||
|
||
if err := recovery.WaitApply(ctx); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
return totalRegions, recoveryError{error: err, atStage: StageRecovering} | ||
} | ||
|
||
if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
err = recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
if err != nil { | ||
return totalRegions, recoveryError{error: err, atStage: StageFlashback} | ||
} | ||
|
||
if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil { | ||
return totalRegions, errors.Trace(err) | ||
return totalRegions, recoveryError{error: err, atStage: StageFlashback} | ||
} | ||
|
||
return totalRegions, nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,6 +146,8 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto | |
// restore tikv data from a snapshot volume | ||
var totalRegions int | ||
|
||
// Roughly handle the case that some TiKVs are rebooted during making plan. | ||
// Generally, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not entire annotation? |
||
totalRegions, err = restore.RecoverData(ctx, resolveTS, allStores, mgr, progress, restoreTS, cfg.Concurrency) | ||
if err != nil { | ||
return errors.Trace(err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the parameter
0
ok?