Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: Make all download error as retryable #298

Merged
merged 3 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
ErrRangeIsEmpty = errors.NewNoStackError("range is empty")
// ErrGRPC indicates any gRPC communication error. This error can be retried.
ErrGRPC = errors.NewNoStackError("gRPC error")
// ErrDownloadFailed indicates a generic, non-retryable download error.
// ErrDownloadFailed indicates a generic download error, expected to be
// retryable.
ErrDownloadFailed = errors.NewNoStackError("download sst failed")
// ErrIngestFailed indicates a generic, retryable ingest error.
ErrIngestFailed = errors.NewNoStackError("ingest sst failed")
Expand Down Expand Up @@ -72,7 +73,7 @@ func newDownloadSSTBackoffer() utils.Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
switch errors.Cause(err) {
case ErrGRPC, ErrEpochNotMatch, ErrIngestFailed:
case ErrGRPC, ErrEpochNotMatch, ErrDownloadFailed, ErrIngestFailed:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case ErrRangeIsEmpty, ErrRewriteRuleNotFound:
Expand Down
44 changes: 41 additions & 3 deletions pkg/restore/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"
"go.uber.org/multierr"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/restore"
Expand All @@ -30,6 +31,25 @@ func (s *testBackofferSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return restore.ErrGRPC
case 1:
return restore.ErrEpochNotMatch
case 2:
return nil
}
return nil
}, backoffer)
c.Assert(counter, Equals, 3)
c.Assert(err, IsNil)
}

func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
Expand All @@ -41,12 +61,19 @@ func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
case 1:
return restore.ErrEpochNotMatch
case 2:
return restore.ErrDownloadFailed
case 3:
return restore.ErrRangeIsEmpty
}
return nil
}, backoffer)
c.Assert(counter, Equals, 3)
c.Assert(err, Equals, restore.ErrRangeIsEmpty)
c.Assert(counter, Equals, 4)
c.Assert(multierr.Errors(err), DeepEquals, []error{
restore.ErrGRPC,
restore.ErrEpochNotMatch,
restore.ErrDownloadFailed,
restore.ErrRangeIsEmpty,
})
}

func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
Expand All @@ -57,5 +84,16 @@ func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
return restore.ErrEpochNotMatch
}, backoffer)
c.Assert(counter, Equals, 10)
c.Assert(err, Equals, restore.ErrEpochNotMatch)
c.Assert(multierr.Errors(err), DeepEquals, []error{
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
})
}
11 changes: 8 additions & 3 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -228,6 +229,7 @@ func (importer *FileImporter) Import(

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
regionLoop:
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
Expand All @@ -242,9 +244,12 @@ func (importer *FileImporter) Import(
return e
}, newDownloadSSTBackoffer())
if errDownload != nil {
if errDownload == ErrRewriteRuleNotFound || errDownload == ErrRangeIsEmpty {
// Skip this region
continue
for _, e := range multierr.Errors(errDownload) {
switch e {
case ErrRewriteRuleNotFound, ErrRangeIsEmpty:
// Skip this region
continue regionLoop
}
}
log.Error("download file failed",
zap.Stringer("file", file),
Expand Down
15 changes: 10 additions & 5 deletions pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package utils
import (
"context"
"time"

"go.uber.org/multierr"
)

// RetryableFunc presents a retryable operation.
Expand All @@ -18,25 +20,28 @@ type Backoffer interface {
Attempt() int
}

// WithRetry retrys a given operation with a backoff policy.
// WithRetry retries a given operation with a backoff policy.
//
// Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr containing all errors encountered.
func WithRetry(
ctx context.Context,
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
var lastErr error
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
if err != nil {
lastErr = err
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return lastErr
return allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
} else {
return nil
}
}
return lastErr
return allErrors
}