From 33f1c6e516e811452c5f77db0a75567e68a2a330 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 May 2024 19:13:13 +0800 Subject: [PATCH] backup: implement store based backup to solve the long tail issue (#53081) ref pingcap/tidb#52534 --- br/pkg/backup/BUILD.bazel | 8 +- br/pkg/backup/client.go | 926 +++++++++++-------------------- br/pkg/backup/client_test.go | 170 +++--- br/pkg/backup/push.go | 318 +++++------ br/pkg/conn/conn.go | 16 +- br/pkg/conn/conn_test.go | 18 +- br/pkg/restore/client.go | 2 +- br/pkg/task/backup.go | 14 +- br/pkg/task/backup_raw.go | 2 +- br/pkg/task/backup_txn.go | 3 +- br/pkg/utils/misc.go | 3 +- br/tests/br_rawkv/run.sh | 2 - br/tests/br_tikv_outage/run.sh | 4 +- br/tests/br_tikv_outage2/run.sh | 15 +- br/tests/br_tikv_outage3/run.sh | 5 +- tests/_utils/br_tikv_outage_util | 3 - 16 files changed, 602 insertions(+), 907 deletions(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index f80a1cef6a4ef..7d7602193fb1e 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -33,8 +33,6 @@ go_library( "//pkg/statistics/handle", "//pkg/statistics/handle/util", "//pkg/util", - "//pkg/util/codec", - "//pkg/util/redact", "//pkg/util/table-filter", "@com_github_google_btree//:btree", "@com_github_opentracing_opentracing_go//:opentracing-go", @@ -47,7 +45,6 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_prometheus_client_golang//prometheus", "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", @@ -76,6 +73,7 @@ go_test( "//br/pkg/metautil", "//br/pkg/mock", "//br/pkg/pdutil", + "//br/pkg/rtree", "//br/pkg/storage", "//br/pkg/utils", "//pkg/parser/model", @@ -84,15 +82,11 @@ go_test( "//pkg/testkit/testsetup", "//pkg/util/table-filter", "@com_github_golang_protobuf//proto", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", - "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", - "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", "@io_opencensus_go//stats/view", "@org_uber_go_goleak//:goleak", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index d49987c9a7c62..d7b1d40a41d8c 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -6,20 +6,14 @@ import ( "bytes" "context" "encoding/base64" - "encoding/hex" "encoding/json" - "fmt" - "io" - "math/rand" - "os" + "reflect" "strings" - "sync" "time" "github.com/google/btree" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -42,17 +36,11 @@ import ( "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/redact" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // ClientMgr manages connections needed by backup. @@ -74,13 +62,10 @@ type Checksum struct { // ProgressUnit represents the unit of progress. type ProgressUnit string -const ( - // backupFineGrainedMaxBackoff is 1 hour. - // given it begins the fine-grained backup, there must be some problems in the cluster. - // We need to be more patient. - backupFineGrainedMaxBackoff = 3600000 - backupRetryTimes = 5 -) +type StoreBackupPolicy struct { + One uint64 + All bool +} // Client is a client instructs TiKV how to do a backup. type Client struct { @@ -783,75 +768,22 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return nil } -func (bc *Client) getProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange { - prs := make([]*rtree.ProgressRange, 0, len(ranges)) - for _, r := range ranges { - prs = append(prs, bc.getProgressRange(r)) - } - return prs -} - -func buildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRangeTree, []*kvrpcpb.KeyRange, error) { +func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error) { // the response from TiKV only contains the region's key, so use the // progress range tree to quickly seek the region's corresponding progress range. progressRangeTree := rtree.NewProgressRangeTree() - subRangesCount := 0 - for _, pr := range pranges { - if err := progressRangeTree.Insert(pr); err != nil { - return progressRangeTree, nil, errors.Trace(err) - } - subRangesCount += len(pr.Incomplete) - } - // either the `incomplete` is origin range itself, - // or the `incomplete` is sub-ranges split by checkpoint of origin range. - subRanges := make([]*kvrpcpb.KeyRange, 0, subRangesCount) - progressRangeTree.Ascend(func(pr *rtree.ProgressRange) bool { - for _, r := range pr.Incomplete { - subRanges = append(subRanges, &kvrpcpb.KeyRange{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) - } - return true - }) - - return progressRangeTree, subRanges, nil -} - -func (bc *Client) getBackupTargetStores( - ctx context.Context, - replicaReadLabel map[string]string, -) ([]*metapb.Store, map[uint64]struct{}, error) { - allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) - if err != nil { - return nil, nil, errors.Trace(err) - } - var targetStores []*metapb.Store - targetStoreIds := make(map[uint64]struct{}) - if len(replicaReadLabel) == 0 { - targetStores = allStores - } else { - for _, store := range allStores { - for _, label := range store.Labels { - if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { - targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label - targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup - } - } - } - if len(targetStores) == 0 { - return nil, nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) + for _, r := range ranges { + if err := progressRangeTree.Insert(bc.getProgressRange(r)); err != nil { + return progressRangeTree, errors.Trace(err) } } - - return targetStores, targetStoreIds, nil + return progressRangeTree, nil } // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, - regionCounts []int, request backuppb.BackupRequest, concurrency uint, replicaReadLabel map[string]string, @@ -871,570 +803,384 @@ func (bc *Client) BackupRanges( ctx = opentracing.ContextWithSpan(ctx, span1) } - targetStores, targetStoreIds, err := bc.getBackupTargetStores(ctx, replicaReadLabel) + stateChan := make(chan StoreBackupPolicy) + // TODO implement state change watch goroutine @3pointer + // go func() { + // // TODO watch changes on cluste state + // cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { + // stateChan <- StoreBackups{All: true} + // }), storewatch.WithOnDisconnect(func(s *metapb.Store) { + // stateChan <- StoreBackups{All: true} + // }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { + // // only backup for this store + // stateChan <- StoreBackups{One: s.Id} + // })) + // watcher := storewatch.New(bc.mgr.GetPDClient(), cb) + // tick := time.NewTicker(30 * time.Second) + // for { + // select { + // case <-ctx.Done(): + // return + // case <-tick.C: + // err := watcher.Step(ctx) + // if err != nil { + // // ignore it + // } + // } + // } + // }() + + globalProgressTree, err := bc.BuildProgressRangeTree(ranges) if err != nil { return errors.Trace(err) } - // merge the small ranges, such as index ranges and small partition ranges, into a batch. - targetRangesBatchSize := len(targetStores) * 64 - - // we collect all files in a single goroutine to avoid thread safety issues. - workerPool := util.NewWorkerPool(concurrency, "Ranges") - eg, ectx := errgroup.WithContext(ctx) - rangeInBatchStartIndex := 0 - regionCountInBatch := 0 - // ASSERT: len(ranges) > 0 - for idx := 0; idx <= len(ranges); idx += 1 { - if idx != len(ranges) { - if regionCountInBatch <= targetRangesBatchSize { - regionCountInBatch += regionCounts[idx] - continue - } - regionCountInBatch = regionCounts[idx] - } - // merge the ranges[rangeInBatchStartIndex, id) into a batch - pranges := bc.getProgressRanges(ranges[rangeInBatchStartIndex:idx]) - idxStart := rangeInBatchStartIndex - idxEnd := idx - req := request - workerPool.ApplyOnErrorGroup(eg, func() (retErr error) { - elctx := logutil.ContextWithField(ectx, - logutil.RedactAny("range-sn-start", idxStart), logutil.RedactAny("range-sn-end", idxEnd)) - - prTree, subRanges, err := buildProgressRangeTree(pranges) - if err != nil { - return errors.Trace(err) - } - start := time.Now() - defer func() { - minPr, _ := prTree.Min() - maxPr, _ := prTree.Max() - key := "range start: " + - hex.EncodeToString(minPr.Origin.StartKey) + - " end: " + - hex.EncodeToString(maxPr.Origin.EndKey) - logutil.CL(elctx).Info("backup range completed", zap.String("key range", key), zap.Duration("take", time.Since(start))) - if retErr != nil { - summary.CollectFailureUnit(key, err) - } - }() - logutil.CL(elctx).Info("backup range started", zap.Uint64("rateLimit", req.RateLimit)) - if err := bc.pushBackupInBatch(elctx, req, prTree, subRanges, targetStores, progressCallBack); err != nil { - return errors.Trace(err) - } - if err := bc.fineGrainedBackup(elctx, req, targetStoreIds, prTree, progressCallBack); err != nil { - return errors.Trace(err) - } - if err := collectRangeFiles(prTree, metaWriter); err != nil { - return errors.Trace(err) - } - return nil - }) - - rangeInBatchStartIndex = idx - } - return eg.Wait() -} - -func (bc *Client) pushBackupInBatch( - ctx context.Context, - request backuppb.BackupRequest, - prTree rtree.ProgressRangeTree, - subRanges []*kvrpcpb.KeyRange, - targetStores []*metapb.Store, - progressCallBack func(), -) error { - logutil.CL(ctx).Info("backup push down started") - request.SubRanges = subRanges - push := newPushDown(bc.mgr, len(targetStores)) - err := push.pushBackup(ctx, request, prTree, targetStores, bc.checkpointRunner, progressCallBack) + err = bc.startMainBackupLoop(ctx, &globalProgressTree, replicaReadLabel, request, stateChan, progressCallBack) if err != nil { return errors.Trace(err) } - logutil.CL(ctx).Info("backup push down completed", zap.Int("range-count", len(subRanges))) - return nil + return collectRangeFiles(&globalProgressTree, metaWriter) } -func collectRangeFiles(progressRangeTree rtree.ProgressRangeTree, metaWriter *metautil.MetaWriter) error { - var progressRangeAscendErr error - progressRangeTree.Ascend(func(progressRange *rtree.ProgressRange) bool { - var rangeAscendErr error - progressRange.Res.Ascend(func(i btree.Item) bool { - r := i.(*rtree.Range) - for _, f := range r.Files { - summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) - } - // we need keep the files in order after we support multi_ingest sst. - // default_sst and write_sst need to be together. - if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil { - rangeAscendErr = err - return false +func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[string]string) ([]*metapb.Store, error) { + allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) + if err != nil { + return nil, errors.Trace(err) + } + var targetStores []*metapb.Store + targetStoreIds := make(map[uint64]struct{}) + if len(replicaReadLabel) == 0 { + targetStores = allStores // send backup push down request to all stores + } else { + for _, store := range allStores { + for _, label := range store.Labels { + if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { + targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label + targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup + } } - return true - }) - if rangeAscendErr != nil { - progressRangeAscendErr = rangeAscendErr - return false } - - // Check if there are duplicated files - checkDupFiles(&progressRange.Res) - return true - }) - - return errors.Trace(progressRangeAscendErr) + } + if len(replicaReadLabel) > 0 && len(targetStores) == 0 { + return nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) + } + return targetStores, nil } -func (bc *Client) FindTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) { - // Keys are saved in encoded format in TiKV, so the key must be encoded - // in order to find the correct region. - var leader *metapb.Peer - key = codec.EncodeBytesExt([]byte{}, key, isRawKv) - state := utils.InitialRetryState(60, 100*time.Millisecond, 2*time.Second) - failpoint.Inject("retry-state-on-find-target-peer", func(v failpoint.Value) { - logutil.CL(ctx).Info("reset state for FindTargetPeer") - state = utils.InitialRetryState(v.(int), 100*time.Millisecond, 100*time.Millisecond) - }) - err := utils.WithRetry(ctx, func() error { - region, err := bc.mgr.GetPDClient().GetRegion(ctx, key) - failpoint.Inject("return-region-on-find-target-peer", func(v failpoint.Value) { - switch v.(string) { - case "nil": - { - region = nil - } - case "hasLeader": - { - region = &pd.Region{ - Leader: &metapb.Peer{ - Id: 42, - }, - } - } - case "hasPeer": - { - region = &pd.Region{ - Meta: &metapb.Region{ - Peers: []*metapb.Peer{ - { - Id: 43, - StoreId: 42, - }, - }, - }, - } - } +func (bc *Client) OnBackupResponse( + ctx context.Context, + r *ResponseAndStore, + errContext *utils.ErrorContext, + globalProgressTree *rtree.ProgressRangeTree, +) error { + if r == nil || r.GetResponse() == nil { + return nil + } - case "noLeader": - { - region = &pd.Region{ - Leader: nil, - } - } - case "noPeer": - { - { - region = &pd.Region{ - Meta: &metapb.Region{ - Peers: nil, - }, - } - } - } - } - }) - if err != nil || region == nil { - logutil.CL(ctx).Error("find region failed", zap.Error(err), zap.Reflect("region", region)) - return errors.Annotate(berrors.ErrPDLeaderNotFound, "cannot find region from pd client") + resp := r.GetResponse() + storeID := r.GetStoreID() + if resp.GetError() == nil { + pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey) + if err != nil { + logutil.CL(ctx).Error("failed to update the backup response", + zap.Reflect("error", err)) + return err } - if len(targetStoreIds) == 0 { - if region.Leader != nil { - logutil.CL(ctx).Info("find leader", - zap.Reflect("Leader", region.Leader), logutil.Key("key", key)) - leader = region.Leader - return nil - } - } else { - candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers)) - for _, peer := range region.Meta.Peers { - if _, ok := targetStoreIds[peer.StoreId]; ok { - candidates = append(candidates, peer) - } + if bc.checkpointRunner != nil { + if err := checkpoint.AppendForBackup( + ctx, + bc.checkpointRunner, + pr.GroupKey, + resp.StartKey, + resp.EndKey, + resp.Files, + ); err != nil { + // flush checkpoint failed, + logutil.CL(ctx).Error("failed to flush checkpoint", zap.Error(err)) + return err } - if len(candidates) > 0 { - peer := candidates[rand.Intn(len(candidates))] - logutil.CL(ctx).Info("find target peer for backup", - zap.Reflect("Peer", peer), logutil.Key("key", key)) - leader = peer - return nil + } + pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) + apiVersion := resp.ApiVersion + bc.SetApiVersion(apiVersion) + } else { + errPb := resp.GetError() + res := errContext.HandleIgnorableError(errPb, storeID) + switch res.Strategy { + case utils.GiveUpStrategy: + errMsg := res.Reason + if len(errMsg) <= 0 { + errMsg = errPb.Msg } + // TODO output a precise store address. @3pointer + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", + storeID, + errMsg, + ) } - return errors.Annotate(berrors.ErrPDLeaderNotFound, "cannot find leader or candidate from pd client") - }, &state) - if err != nil { - logutil.CL(ctx).Error("can not find a valid target peer after retry", logutil.Key("key", key)) - return nil, err } - // leader cannot be nil if err is nil - return leader, nil + return nil } -func (bc *Client) fineGrainedBackup( +// infinite loop to backup ranges on all tikv stores +// if one client grpc disconnected. resend backup request to this store. +// if new tikv store joined. send backup request to new store. +// if one tikv store rebooted. consider leader changes, resend backup request to all stores. +// if one tikv store disconnected. consider leader changes, resend backup request to all stores. +func (bc *Client) startMainBackupLoop( ctx context.Context, - req backuppb.BackupRequest, - targetStoreIds map[uint64]struct{}, - prTree rtree.ProgressRangeTree, + globalProgressTree *rtree.ProgressRangeTree, + replicaReadLabel map[string]string, + request backuppb.BackupRequest, + stateChan chan StoreBackupPolicy, progressCallBack func(), ) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.fineGrainedBackup", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - failpoint.Inject("hint-fine-grained-backup", func(v failpoint.Value) { - log.Info("failpoint hint-fine-grained-backup injected, "+ - "process will sleep for 3s and notify the shell.", zap.String("file", v.(string))) - if sigFile, ok := v.(string); ok { - file, err := os.Create(sigFile) + startStoreBackupAsyncFn := func( + ctx context.Context, + round uint64, + storeID uint64, + request backuppb.BackupRequest, + cli backuppb.BackupClient, + respCh chan *ResponseAndStore, + ) { + go func() { + defer func() { + logutil.CL(ctx).Info("exit store backup goroutine", zap.Uint64("store", storeID)) + close(respCh) + }() + err := startStoreBackup(ctx, storeID, request, cli, respCh) if err != nil { - log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) - } - if file != nil { - file.Close() + // only 2 kinds of errors will occur here. + // 1. grpc connection error(already retry inside) + // 2. context cancelled outside. + if errors.Cause(err) == context.Canceled { + logutil.CL(ctx).Info("store backup cancelled", + zap.Uint64("round", round), + zap.Uint64("storeID", storeID)) + } else { + // otherwise retry backup this store + logutil.CL(ctx).Error("store backup failed", + zap.Uint64("round", round), + zap.Uint64("storeID", storeID), zap.Error(err)) + stateChan <- StoreBackupPolicy{One: storeID} + } } - time.Sleep(3 * time.Second) - } - }) + }() + } - bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown) - prIter := prTree.Iter() - for { - // Step1, check whether there is any incomplete range - incomplete := prIter.GetIncompleteRanges() - if len(incomplete) == 0 { - return nil - } - logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) - // Step2, retry backup on incomplete range - respCh := make(chan *backuppb.BackupResponse, 4) - errCh := make(chan error, 4) - retry := make(chan rtree.Range, 4) - - wg := new(sync.WaitGroup) - for i := 0; i < 4; i++ { - wg.Add(1) - fork, _ := bo.Inner().Fork() - go func(boFork *tikv.Backoffer) { - defer wg.Done() - for rg := range retry { - subReq := req - subReq.StartKey, subReq.EndKey = rg.StartKey, rg.EndKey - backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, targetStoreIds, respCh) - if err != nil { - errCh <- err - return - } - if backoffMs != 0 { - bo.RequestBackOff(backoffMs) - } - } - }(fork) - } + collectStoreBackupsAsyncFn := func( + ctx context.Context, + round uint64, + storeBackupChs map[uint64]chan *ResponseAndStore, + globalCh chan *ResponseAndStore, - // Dispatch rangs and wait + ) { go func() { - for _, rg := range incomplete { - retry <- rg + defer func() { + logutil.CL(ctx).Info("exit collect backups goroutine", zap.Uint64("round", round)) + close(globalCh) + }() + cases := make([]reflect.SelectCase, 0) + for _, ch := range storeBackupChs { + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}) } - close(retry) - wg.Wait() - close(respCh) - }() - selectLoop: - for { - select { - case err := <-errCh: - // TODO: should we handle err here? - return errors.Trace(err) - case resp, ok := <-respCh: + remainingProducers := len(cases) + logutil.CL(ctx).Info("start wait store backups", zap.Int("remainingProducers", remainingProducers)) + for remainingProducers > 0 { + chosen, value, ok := reflect.Select(cases) if !ok { - // Finished. - break selectLoop - } - if resp.Error != nil { - logutil.CL(ctx).Panic("unexpected backup error", - zap.Reflect("error", resp.Error)) + // The chosen channel has been closed, so zero out the channel to disable the case + cases[chosen].Chan = reflect.ValueOf(nil) + remainingProducers -= 1 + continue } - logutil.CL(ctx).Info("put fine grained range", - logutil.Key("fine-grained-range-start", resp.StartKey), - logutil.Key("fine-grained-range-end", resp.EndKey), - ) - pr, err := prTree.FindContained(resp.StartKey, resp.EndKey) - if err != nil { - logutil.CL(ctx).Panic("failed to update the backup response", - zap.Reflect("error", err)) - } - if bc.checkpointRunner != nil { - if err := checkpoint.AppendForBackup( - ctx, - bc.checkpointRunner, - pr.GroupKey, - resp.StartKey, - resp.EndKey, - resp.Files, - ); err != nil { - return errors.Annotate(err, "failed to flush checkpoint when fineGrainedBackup") - } - } - pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) - apiVersion := resp.ApiVersion - bc.SetApiVersion(apiVersion) - - // Update progress - progressCallBack() - } - } - - // Step3. Backoff if needed, then repeat. - if ms := bo.NextSleepInMS(); ms != 0 { - log.Info("handle fine grained", zap.Int("backoffMs", ms)) - err := bo.BackOff() - if err != nil { - return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", prIter.Len()) - } - } - } -} -// OnBackupResponse checks the backup resp, decides whether to retry and generate the error. -func OnBackupResponse( - storeID uint64, - bo *tikv.Backoffer, - backupTS uint64, - lockResolver *txnlock.LockResolver, - resp *backuppb.BackupResponse, - errContext *utils.ErrorContext, -) (*backuppb.BackupResponse, int, error) { - log.Debug("OnBackupResponse", zap.Reflect("resp", resp)) - if resp.Error == nil { - return resp, 0, nil - } - backoffMs := 0 - - err := resp.Error - switch v := err.Detail.(type) { - case *backuppb.Error_KvError: - if lockErr := v.KvError.Locked; lockErr != nil { - msBeforeExpired, err1 := lockResolver.ResolveLocks( - bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)}) - if err1 != nil { - return nil, 0, errors.Trace(err1) - } - if msBeforeExpired > 0 { - backoffMs = int(msBeforeExpired) + select { + case <-ctx.Done(): + return + case globalCh <- value.Interface().(*ResponseAndStore): + } } - return nil, backoffMs, nil - } - default: - res := errContext.HandleError(resp.Error, storeID) - switch res.Strategy { - case utils.GiveUpStrategy: - return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) - case utils.RetryStrategy: - return nil, 3000, nil - } + }() } - return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable") -} -func (bc *Client) handleFineGrained( - ctx context.Context, - bo *tikv.Backoffer, - req backuppb.BackupRequest, - targetStoreIds map[uint64]struct{}, - respCh chan<- *backuppb.BackupResponse, -) (int, error) { - targetPeer, pderr := bc.FindTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds) - if pderr != nil { - return 0, errors.Trace(pderr) - } - storeID := targetPeer.GetStoreId() - lockResolver := bc.mgr.GetLockResolver() - client, err := bc.mgr.GetBackupClient(ctx, storeID) - if err != nil { - if berrors.Is(err, berrors.ErrFailedToConnect) { - // When the leader store is died, - // 20s for the default max duration before the raft election timer fires. - logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) - return 20000, nil - } - - logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID)) - return 0, errors.Annotatef(err, "failed to connect to store %d", storeID) - } - hasProgress := false - backoffMill := 0 - errContext := utils.NewErrorContext("handleFineGrainedBackup", 10) - err = SendBackup( - ctx, storeID, client, req, - // Handle responses with the same backoffer. - func(resp *backuppb.BackupResponse) error { - response, shouldBackoff, err1 := - OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext) - if err1 != nil { - return err1 - } - if backoffMill < shouldBackoff { - backoffMill = shouldBackoff - } - if response != nil { - respCh <- response - } - // When meet an error, we need to set hasProgress too, in case of - // overriding the backoffTime of original error. - // hasProgress would be false iff there is a early io.EOF from the stream. - hasProgress = true + // a flag to indicate the backup round + // one backup round try to backup all ranges on all tikv stores. + // ideally, backup should be finished in one round + // unless the cluster state changed or some kv errors occurred. + round := uint64(0) +mainLoop: + for { + round += 1 + // initialize the error context every round + errContext := utils.NewErrorContext("MainBackupLoop", 10) + + // a channel to collect all store backup results + globalBackupResultCh := make(chan *ResponseAndStore) + // channel slices to receive backup region result from different tikv stores + storeBackupResultChMap := make(map[uint64]chan *ResponseAndStore) + + // mainCtx used to control mainLoop + // every round need a new context to control the main backup process + mainCtx, mainCancel := context.WithCancel(ctx) + + // handleCtx used to control handleLoop + // every round has another infinite loop to handle all tikv backup responses + // until backup finished, store state changed or error occurred. + handleCtx, handleCancel := context.WithCancel(ctx) + + // Compute the left ranges that not backuped yet + iter := globalProgressTree.Iter() + inCompleteRanges := iter.GetIncompleteRanges() + if len(inCompleteRanges) == 0 { + // all range backuped + handleCancel() + mainCancel() return nil - }, - func() (backuppb.BackupClient, error) { - logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) - return bc.mgr.ResetBackupClient(ctx, storeID) - }) - if err != nil { - if berrors.Is(err, berrors.ErrFailedToConnect) { - // When the leader store is died, - // 20s for the default max duration before the raft election timer fires. - logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) - return 20000, nil } - logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) - return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)", - redact.Key(req.StartKey), redact.Key(req.EndKey)) - } - // If no progress, backoff 10s for debouncing. - // 10s is the default interval of stores sending a heartbeat to the PD. - // And is the average new leader election timeout, which would be a reasonable back off time. - if !hasProgress { - backoffMill = 10000 - } - return backoffMill, nil -} + logutil.CL(ctx).Info("backup round start...", zap.Uint64("round", round)) -func doSendBackup( - ctx context.Context, - client backuppb.BackupClient, - req backuppb.BackupRequest, - respFn func(*backuppb.BackupResponse) error, -) error { - failpoint.Inject("hint-backup-start", func(v failpoint.Value) { - logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + - "process will notify the shell.") - if sigFile, ok := v.(string); ok { - file, err := os.Create(sigFile) - if err != nil { - log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) - } - if file != nil { - file.Close() - } + request.SubRanges = getBackupRanges(inCompleteRanges) + + allStores, err := bc.getBackupStores(mainCtx, replicaReadLabel) + if err != nil { + // because we have connectted to pd before. + // so this error must be retryable, just make infinite retry here + logutil.CL(mainCtx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) + mainCancel() + continue mainLoop } - time.Sleep(3 * time.Second) - }) - bCli, err := client.Backup(ctx, &req) - failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { - switch val.(string) { - case "Unavaiable": - { - logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.") - err = status.Error(codes.Unavailable, "Unavailable error") + for _, store := range allStores { + if err = utils.CheckStoreLiveness(store); err != nil { + // skip this store in this round. + logutil.CL(mainCtx).Warn("store not alive, skip backup it in this round", zap.Uint64("round", round), zap.Error(err)) + continue } - case "Internal": - { - logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.") - err = status.Error(codes.Internal, "Internal error") + storeID := store.GetId() + // reset backup client every round, to get a clean grpc connection. + cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) + if err != nil { + // because the we get store info from pd. + // there is no customer setting here, so make infinite retry. + logutil.CL(ctx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + mainCancel() + continue mainLoop } + ch := make(chan *ResponseAndStore) + storeBackupResultChMap[storeID] = ch + startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) } - }) - failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { - if val.(bool) { - logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") - err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") - } - }) - if err != nil { - return err - } - defer func() { - _ = bCli.CloseSend() - }() + // infinite loop to collect region backup response to global channel + collectStoreBackupsAsyncFn(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) + handleLoop: + for { + select { + case <-ctx.Done(): + handleCancel() + mainCancel() + return ctx.Err() + case storeBackupInfo := <-stateChan: + if storeBackupInfo.All { + logutil.CL(mainCtx).Info("cluster state changed. restart store backups", zap.Uint64("round", round)) + // stop current connections + handleCancel() + mainCancel() + // start next round backups + continue mainLoop + } + if storeBackupInfo.One != 0 { + storeID := storeBackupInfo.One + store, err := bc.mgr.GetPDClient().GetStore(mainCtx, storeID) + if err != nil { + // cannot get store, maybe store has scaled-in. + logutil.CL(mainCtx).Info("cannot get store from pd", zap.Uint64("round", round), zap.Error(err)) + // try next round + handleCancel() + mainCancel() + continue mainLoop + } + if err = utils.CheckStoreLiveness(store); err != nil { + // skip this store in this round. + logutil.CL(mainCtx).Warn("store not alive, skip backup it in this round", zap.Uint64("round", round), zap.Error(err)) + continue + } + // reset backup client. store address could change but store id remained. + cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) + if err != nil { + logutil.CL(mainCtx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + handleCancel() + mainCancel() + // receive new store info but failed to get backup client. + // start next round backups to get all tikv stores and reset all client connections. + continue mainLoop + } - for { - resp, err := bCli.Recv() - if err != nil { - if errors.Cause(err) == io.EOF { // nolint:errorlint - logutil.CL(ctx).Debug("backup streaming finish", - logutil.Key("backup-start-key", req.GetStartKey()), - logutil.Key("backup-end-key", req.GetEndKey())) - return nil + // cancel the former collect goroutine + handleCancel() + ch := make(chan *ResponseAndStore) + + storeBackupResultChMap[storeID] = ch + // start backup for this store + startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) + // re-create context for new handler loop + handleCtx, handleCancel = context.WithCancel(mainCtx) + // handleCancel makes the former collect goroutine exits + // so we need to re-create a new channel and restart a new collect goroutine. + globalBackupResultCh = make(chan *ResponseAndStore) + // collect all store backup producer channel result to one channel + collectStoreBackupsAsyncFn(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) + } + case respAndStore, ok := <-globalBackupResultCh: + if !ok { + // this round backup finished. break and check incomplete ranges in mainLoop. + break handleLoop + } + err = bc.OnBackupResponse(handleCtx, respAndStore, errContext, globalProgressTree) + if err != nil { + // if error occurred here, stop the backup process + // because only 3 kinds of errors will be returned here: + // 1. permission denied on tikv store. + // 2. parse backup response error.(shouldn't happen in any case) + // 3. checkpoint update failed. TODO: should we retry here? + handleCancel() + mainCancel() + return err + } + progressCallBack() } - return err - } - // TODO: handle errors in the resp. - logutil.CL(ctx).Debug("range backed up", - logutil.Key("small-range-start-key", resp.GetStartKey()), - logutil.Key("small-range-end-key", resp.GetEndKey()), - zap.Int("api-version", int(resp.ApiVersion))) - err = respFn(resp) - if err != nil { - return errors.Trace(err) } } } -// SendBackup send backup request to the given store. -// Stop receiving response if respFn returns error. -func SendBackup( - ctx context.Context, - // the `storeID` seems only used for logging now, maybe we can remove it then? - storeID uint64, - client backuppb.BackupClient, - req backuppb.BackupRequest, - respFn func(*backuppb.BackupResponse) error, - resetFn func() (backuppb.BackupClient, error), -) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan( - fmt.Sprintf("Client.SendBackup, storeID = %d, StartKey = %s, EndKey = %s", - storeID, redact.Key(req.StartKey), redact.Key(req.EndKey)), - opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - var errReset error - var errBackup error - - retry := -1 - return utils.WithRetry(ctx, func() error { - retry += 1 - if retry != 0 { - client, errReset = resetFn() - if errReset != nil { - return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+ - "please check the tikv status", storeID) +func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *metautil.MetaWriter) error { + var progressRangeAscendErr error + progressRangeTree.Ascend(func(progressRange *rtree.ProgressRange) bool { + var rangeAscendErr error + progressRange.Res.Ascend(func(i btree.Item) bool { + r := i.(*rtree.Range) + for _, f := range r.Files { + summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) } - } - logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry)) - errBackup = doSendBackup(ctx, client, req, respFn) - if errBackup != nil { - return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID) + // we need keep the files in order after we support multi_ingest sst. + // default_sst and write_sst need to be together. + if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil { + rangeAscendErr = err + return false + } + return true + }) + if rangeAscendErr != nil { + progressRangeAscendErr = rangeAscendErr + return false } - return nil - }, utils.NewBackupSSTBackoffer()) + // Check if there are duplicated files + checkDupFiles(&progressRange.Res) + return true + }) + + return errors.Trace(progressRangeAscendErr) } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index eeaea5cec762f..5814ae6dc718a 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -9,16 +9,15 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/parser/model" @@ -26,8 +25,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" "go.opencensus.io/stats/view" ) @@ -128,44 +125,6 @@ func TestGetTS(t *testing.T) { require.Equal(t, backupts, ts) } -func TestOnBackupRegionErrorResponse(t *testing.T) { - type Case struct { - storeID uint64 - bo *tikv.Backoffer - backupTS uint64 - lockResolver *txnlock.LockResolver - resp *backuppb.BackupResponse - exceptedBackoffMs int - exceptedErr bool - } - newBackupRegionErrorResp := func(regionError *errorpb.Error) *backuppb.BackupResponse { - return &backuppb.BackupResponse{Error: &backuppb.Error{Detail: &backuppb.Error_RegionError{RegionError: regionError}}} - } - - cases := []Case{ - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - } - for _, cs := range cases { - t.Log(cs) - _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1)) - require.Equal(t, cs.exceptedBackoffMs, backoffMs) - if cs.exceptedErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - } -} - func TestSkipUnsupportedDDLJob(t *testing.T) { s := createBackupSuite(t) @@ -247,62 +206,99 @@ func TestCheckBackupIsLocked(t *testing.T) { require.Regexp(t, "backup lock file and sst file exist in(.+)", err.Error()) } -func TestFindTargetPeer(t *testing.T) { +func TestOnBackupResponse(t *testing.T) { s := createBackupSuite(t) ctx := context.Background() - testutils.BootstrapWithMultiRegions(s.mockCluster, []byte("g"), []byte("n"), []byte("t")) - - leader1, err := s.backupClient.FindTargetPeer(ctx, []byte("a"), false, nil) - require.NoError(t, err) - leader2, err := s.backupClient.FindTargetPeer(ctx, []byte("b"), false, nil) - require.NoError(t, err) + buildProgressRangeFn := func(startKey []byte, endKey []byte) *rtree.ProgressRange { + return &rtree.ProgressRange{ + Res: rtree.NewRangeTree(), + Origin: rtree.Range{ + StartKey: startKey, + EndKey: endKey, + }, + } + } - // check passed keys on same region - require.Equal(t, leader1.GetId(), leader2.GetId()) + errContext := utils.NewErrorContext("test", 1) + require.Nil(t, s.backupClient.OnBackupResponse(ctx, nil, errContext, nil)) + + tree := rtree.NewProgressRangeTree() + r := &backup.ResponseAndStore{ + StoreID: 0, + Resp: &backuppb.BackupResponse{ + Error: &backuppb.Error{ + Msg: "test", + }, + }, + } + // case #1: error resposne + // first error can be ignored due to errContext. + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + // second error cannot be ignored. + require.Error(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + // case #2: normal resposne + r = &backup.ResponseAndStore{ + StoreID: 0, + Resp: &backuppb.BackupResponse{ + StartKey: []byte("a"), + EndKey: []byte("b"), + }, + } - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasLeader\")") + require.NoError(t, tree.Insert(buildProgressRangeFn([]byte("aa"), []byte("c")))) + // error due to the tree range does not match response range. + require.Error(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + // case #3: partial range success case, find incomplete range + r.Resp.StartKey = []byte("aa") + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + incomplete := tree.Iter().GetIncompleteRanges() + require.Len(t, incomplete, 1) + require.Equal(t, []byte("b"), incomplete[0].StartKey) + require.Equal(t, []byte("c"), incomplete[0].EndKey) + + // case #4: success case, make up incomplete range + r.Resp.StartKey = []byte("b") + r.Resp.EndKey = []byte("c") + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + incomplete = tree.Iter().GetIncompleteRanges() + require.Len(t, incomplete, 0) +} - leader, err := s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil) +func TestBuildProgressRangeTree(t *testing.T) { + s := createBackupSuite(t) + ranges := []rtree.Range{ + { + StartKey: []byte("aa"), + EndKey: []byte("b"), + }, + { + StartKey: []byte("c"), + EndKey: []byte("d"), + }, + } + tree, err := s.backupClient.BuildProgressRangeTree(ranges) require.NoError(t, err) - // check passed keys on find leader after retry - require.Equal(t, 42, int(leader.GetId())) - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "return(\"noLeader\")") - - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil) - // check passed keys with error on find leader after retry - require.ErrorContains(t, err, "cannot find leader") + contained, err := tree.FindContained([]byte("a"), []byte("aa")) + require.Nil(t, contained) + require.Error(t, err) - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") + contained, err = tree.FindContained([]byte("b"), []byte("ba")) + require.Nil(t, contained) + require.Error(t, err) - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasPeer\")") + contained, err = tree.FindContained([]byte("e"), []byte("ea")) + require.Nil(t, contained) + require.Error(t, err) - storeIDMap := make(map[uint64]struct{}) - storeIDMap[42] = struct{}{} - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap) + contained, err = tree.FindContained([]byte("aa"), []byte("b")) + require.NotNil(t, contained) + require.Equal(t, []byte("aa"), contained.Origin.StartKey) + require.Equal(t, []byte("b"), contained.Origin.EndKey) require.NoError(t, err) - // check passed keys with target peer - require.Equal(t, 43, int(leader.GetId())) - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"noPeer\")") - - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap) - // check passed keys with error and cannot find target peer - require.ErrorContains(t, err, "cannot find leader") - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") } diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index ea14283a370e1..fc8147ae31f62 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -4,213 +4,183 @@ package backup import ( "context" - "sync" + "io" + "os" + "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/checkpoint" - berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/pkg/util/redact" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -// pushDown wraps a backup task. -type pushDown struct { - mgr ClientMgr - respCh chan responseAndStore - errCh chan error +type ResponseAndStore struct { + Resp *backuppb.BackupResponse + StoreID uint64 } -type responseAndStore struct { - Resp *backuppb.BackupResponse - Store *metapb.Store -} - -func (r responseAndStore) GetResponse() *backuppb.BackupResponse { +func (r ResponseAndStore) GetResponse() *backuppb.BackupResponse { return r.Resp } -func (r responseAndStore) GetStore() *metapb.Store { - return r.Store +func (r ResponseAndStore) GetStoreID() uint64 { + return r.StoreID } -// newPushDown creates a push down backup. -func newPushDown(mgr ClientMgr, capacity int) *pushDown { - return &pushDown{ - mgr: mgr, - respCh: make(chan responseAndStore, capacity), - errCh: make(chan error, capacity), - } -} - -// FullBackup make a full backup of a tikv cluster. -func (push *pushDown) pushBackup( +func doSendBackup( ctx context.Context, + client backuppb.BackupClient, req backuppb.BackupRequest, - prTree rtree.ProgressRangeTree, - stores []*metapb.Store, - checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType], - progressCallBack func(), + respFn func(*backuppb.BackupResponse) error, ) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("pushDown.pushBackup", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - // Push down backup tasks to all tikv instances. - failpoint.Inject("noop-backup", func(_ failpoint.Value) { - logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) - failpoint.Return(nil) + failpoint.Inject("hint-backup-start", func(v failpoint.Value) { + logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + + "process will notify the shell.") + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + } + time.Sleep(3 * time.Second) }) - - wg := new(sync.WaitGroup) - errContext := utils.NewErrorContext("pushBackup", 10) - for _, s := range stores { - store := s - storeID := s.GetId() - lctx := logutil.ContextWithField(ctx, zap.Uint64("store-id", storeID)) - if err := utils.CheckStoreLiveness(s); err != nil { - logutil.CL(lctx).Warn("skip store", logutil.ShortError(err)) - continue + bCli, err := client.Backup(ctx, &req) + failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { + switch val.(string) { + case "Unavailable": + { + logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.") + err = status.Error(codes.Unavailable, "Unavailable error") + } + case "Internal": + { + logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.") + err = status.Error(codes.Internal, "Internal error") + } } - client, err := push.mgr.GetBackupClient(lctx, storeID) - if err != nil { - // BR should be able to backup even some of stores disconnected. - // The regions managed by this store can be retried at fine-grained backup then. - logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err)) - continue + }) + failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") + err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") } - wg.Add(1) - go func() { - defer wg.Done() - err := SendBackup( - lctx, storeID, client, req, - func(resp *backuppb.BackupResponse) error { - // Forward all responses (including error). - push.respCh <- responseAndStore{ - Resp: resp, - Store: store, - } - return nil - }, - func() (backuppb.BackupClient, error) { - logutil.CL(lctx).Warn("reset the connection in push") - return push.mgr.ResetBackupClient(lctx, storeID) - }) - // Disconnected stores can be ignored. - if err != nil { - push.errCh <- err - return - } - }() + }) + if err != nil { + return err } - - go func() { - wg.Wait() - // TODO: test concurrent receive response and close channel. - close(push.respCh) + defer func() { + _ = bCli.CloseSend() }() for { - select { - case respAndStore, ok := <-push.respCh: - resp := respAndStore.GetResponse() - store := respAndStore.GetStore() - if !ok { - // Finished. + resp, err := bCli.Recv() + if err != nil { + if errors.Cause(err) == io.EOF { // nolint:errorlint + logutil.CL(ctx).Debug("backup streaming finish", + logutil.Key("backup-start-key", req.GetStartKey()), + logutil.Key("backup-end-key", req.GetEndKey())) return nil } - failpoint.Inject("backup-timeout-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("backup-storage-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("tikv-region-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - // Msg: msg, - Detail: &backuppb.Error_RegionError{ - RegionError: &errorpb.Error{ - Message: msg, + return err + } + // TODO: handle errors in the resp. + logutil.CL(ctx).Debug("range backed up", + logutil.Key("small-range-start-key", resp.GetStartKey()), + logutil.Key("small-range-end-key", resp.GetEndKey()), + zap.Int("api-version", int(resp.ApiVersion))) + err = respFn(resp) + if err != nil { + return errors.Trace(err) + } + } +} + +func startStoreBackup( + ctx context.Context, + storeID uint64, + backupReq backuppb.BackupRequest, + backupCli backuppb.BackupClient, + respCh chan *ResponseAndStore, +) error { + // this goroutine handle the response from a single store + select { + case <-ctx.Done(): + return ctx.Err() + default: + retry := -1 + return utils.WithRetry(ctx, func() error { + retry += 1 + logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID), zap.Int("retry time", retry)) + // Send backup request to the store. + // handle the backup response or internal error here. + // handle the store error(reboot or network partition) outside. + return doSendBackup(ctx, backupCli, backupReq, func(resp *backuppb.BackupResponse) error { + // Forward all responses (including error). + failpoint.Inject("backup-timeout-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("backup-storage-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("tikv-region-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + // Msg: msg, + Detail: &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{ + Message: msg, + }, }, - }, - } - }) - if resp.GetError() == nil { - pr, err := prTree.FindContained(resp.StartKey, resp.EndKey) - if err != nil { - return errors.Annotate(err, "failed to update the backup response") - } - // None error means range has been backuped successfully. - if checkpointRunner != nil { - if err := checkpoint.AppendForBackup( - ctx, - checkpointRunner, - pr.GroupKey, - resp.StartKey, - resp.EndKey, - resp.Files, - ); err != nil { - // the error is only from flush operator - return errors.Annotate(err, "failed to flush checkpoint") } + }) + select { + case <-ctx.Done(): + return ctx.Err() + case respCh <- &ResponseAndStore{ + Resp: resp, + StoreID: storeID, + }: } - pr.Res.Put( - resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) + return nil + }) + }, utils.NewBackupSSTBackoffer()) + } +} - // Update progress - progressCallBack() - } else { - errPb := resp.GetError() - res := errContext.HandleIgnorableError(errPb, store.GetId()) - switch res.Strategy { - case utils.GiveUpStrategy: - errMsg := res.Reason - if len(errMsg) <= 0 { - errMsg = errPb.Msg - } - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s", - store.GetId(), - redact.Value(store.GetAddress()), - errMsg, - ) - default: - // other type just continue for next response - // and finally handle the range in fineGrainedBackup - continue - } - } - case err := <-push.errCh: - if !berrors.Is(err, berrors.ErrFailedToConnect) { - return errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) - } - logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err)) - return nil - } +func getBackupRanges(ranges []rtree.Range) []*kvrpcpb.KeyRange { + requestRanges := make([]*kvrpcpb.KeyRange, 0, len(ranges)) + for _, r := range ranges { + requestRanges = append(requestRanges, &kvrpcpb.KeyRange{ + StartKey: r.StartKey, + EndKey: r.EndKey, + }) } + return requestRanges } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 87accda625a22..cdb81a011c8a5 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -91,17 +91,23 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { err = status.Error(codes.Unknown, "Retryable error") - } else { - err = context.Canceled + failpoint.Return(err) } }) - failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) { - logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") + failpoint.Inject("hint-GetAllTiKVStores-grpc-cancel", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-grpc-cancel injected.") if val.(bool) { err = status.Error(codes.Canceled, "Cancel Retry") - } else { + failpoint.Return(err) + } + }) + + failpoint.Inject("hint-GetAllTiKVStores-ctx-cancel", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-ctx-cancel injected.") + if val.(bool) { err = context.Canceled + failpoint.Return(err) } }) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 00fe21d60f1e0..3d594a3c1216a 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -25,10 +25,14 @@ import ( ) func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { - err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "1*return(true)->1*return(false)") + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-grpc-cancel", "1*return(true)") + require.NoError(t, err) + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel", "1*return(true)") require.NoError(t, err) defer func() { - err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel") + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-grpc-cancel") + require.NoError(t, err) + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel") require.NoError(t, err) }() ctx, cancel := context.WithCancel(context.Background()) @@ -64,16 +68,20 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { _, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) errs := multierr.Errors(err) - require.Equal(t, 2, len(errs)) + require.Equal(t, 1, len(errs)) require.Equal(t, codes.Canceled, status.Code(errors.Cause(errs[0]))) } func TestGetAllTiKVStoresWithUnknown(t *testing.T) { - err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "1*return(true)->1*return(false)") + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "1*return(true)") + require.NoError(t, err) + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel", "1*return(true)") require.NoError(t, err) defer func() { err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error") require.NoError(t, err) + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel") + require.NoError(t, err) }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -108,7 +116,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { _, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) errs := multierr.Errors(err) - require.Equal(t, 2, len(errs)) + require.Equal(t, 1, len(errs)) require.Equal(t, codes.Unknown, status.Code(errors.Cause(errs[0]))) } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 770fe28eba6c1..45548e22009a0 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2382,7 +2382,7 @@ func (rc *Client) PreCheckTableTiFlashReplica( // set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash // see details at https://github.com/pingcap/br/issues/931 // TODO maybe set table.Info.TiFlashReplica.Count to tiFlashStoreCount, but we need more tests about it. - log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavaiable", + log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavailable", zap.Stringer("db", table.DB.Name), zap.Stringer("table", table.Info.Name), zap.Uint64("expect tiflash replica", table.Info.TiFlashReplica.Count), diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 3376b67fb6b90..a04f14f8b519a 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -619,7 +619,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig summary.CollectInt("backup total ranges", len(ranges)) - approximateRegions, regionCounts, err := getRegionCountOfRanges(ctx, mgr, ranges) + approximateRegions, err := getRegionCountOfRanges(ctx, mgr, ranges) if err != nil { return errors.Trace(err) } @@ -684,7 +684,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig }) metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, ranges, regionCounts, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) + err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -748,19 +748,17 @@ func getRegionCountOfRanges( ctx context.Context, mgr *conn.Mgr, ranges []rtree.Range, -) (int, []int, error) { +) (int, error) { // The number of regions need to backup approximateRegions := 0 - // The number array of regions of ranges, thecounts[i] is the number of regions of the range[i]. - counts := make([]int, 0, len(ranges)) for _, r := range ranges { regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) if err != nil { - return 0, nil, errors.Trace(err) + return 0, errors.Trace(err) } - counts = append(counts, regionCount) + approximateRegions += regionCount } - return approximateRegions, counts, nil + return approximateRegions, nil } // ParseTSString port from tidb setSnapshotTS. diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 34d44073b5153..1bb273a55581e 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -216,7 +216,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf } metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, []rtree.Range{rg}, []int{approximateRegions}, req, 1, nil, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, []rtree.Range{rg}, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/backup_txn.go b/br/pkg/task/backup_txn.go index 796330a0792a7..3ec3321b0ff68 100644 --- a/br/pkg/task/backup_txn.go +++ b/br/pkg/task/backup_txn.go @@ -169,7 +169,6 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf if err != nil { return errors.Trace(err) } - regionCounts := []int{approximateRegions} summary.CollectInt("backup total regions", approximateRegions) @@ -202,7 +201,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, backupRanges, regionCounts, req, 1, nil, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, backupRanges, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index c351f62011a76..80882489905db 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -46,8 +46,7 @@ const ( // Also note that the offline threshold in PD is 20s, see // https://github.com/tikv/pd/blob/c40e319f50822678cda71ae62ee2fd70a9cac010/pkg/core/store.go#L523 - // After talk to PD members 100s is not a safe number. set it to 600s - storeDisconnectionDuration = 600 * time.Second + storeDisconnectionDuration = 100 * time.Second ) // IsTypeCompatible checks whether type target is compatible with type src diff --git a/br/tests/br_rawkv/run.sh b/br/tests/br_rawkv/run.sh index ed7dc7804f88c..c13d057d1e0dd 100755 --- a/br/tests/br_rawkv/run.sh +++ b/br/tests/br_rawkv/run.sh @@ -186,5 +186,3 @@ run_test "" # ingest "region error" to trigger fineGrainedBackup, only one region error. run_test "github.com/pingcap/tidb/br/pkg/backup/tikv-region-error=1*return(\"region error\")" -# all regions failed. -run_test "github.com/pingcap/tidb/br/pkg/backup/tikv-region-error=return(\"region error\")" diff --git a/br/tests/br_tikv_outage/run.sh b/br/tests/br_tikv_outage/run.sh index fc9eef2c5be08..877b44884f897 100644 --- a/br/tests/br_tikv_outage/run.sh +++ b/br/tests/br_tikv_outage/run.sh @@ -8,16 +8,14 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client cases=${cases:-'shutdown'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" diff --git a/br/tests/br_tikv_outage2/run.sh b/br/tests/br_tikv_outage2/run.sh index 521b7942203ec..913b9d65e81c5 100644 --- a/br/tests/br_tikv_outage2/run.sh +++ b/br/tests/br_tikv_outage2/run.sh @@ -8,20 +8,15 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client -cases=${cases:-'outage-at-finegrained outage outage-after-request'} +cases=${cases:-'outage outage-after-request'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" - if [ "$failure" = outage-at-finegrained ]; then - export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/backup/noop-backup=return(true)" - fi backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" rm -rf "${backup_dir:?}" @@ -29,12 +24,6 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get backup_pid=$! single_point_fault $failure wait $backup_pid - case $failure in - scale-out | shutdown | outage-at-finegrained ) stop_services - start_services ;; - *) ;; - esac - check done diff --git a/br/tests/br_tikv_outage3/run.sh b/br/tests/br_tikv_outage3/run.sh index c630029885e25..6eeb44a04b991 100644 --- a/br/tests/br_tikv_outage3/run.sh +++ b/br/tests/br_tikv_outage3/run.sh @@ -8,16 +8,14 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client cases=${cases:-'scale-out'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" @@ -32,6 +30,5 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get stop_services start_services - check done diff --git a/tests/_utils/br_tikv_outage_util b/tests/_utils/br_tikv_outage_util index 192a2ebb195b4..db3a7cc171936 100644 --- a/tests/_utils/br_tikv_outage_util +++ b/tests/_utils/br_tikv_outage_util @@ -37,9 +37,6 @@ single_point_fault() { outage-after-request) wait_file_exist "$hint_get_backup_client" kv_outage -d 30 -i $victim;; - outage-at-finegrained) - wait_file_exist "$hint_finegrained" - kv_outage --kill -i $victim;; shutdown) wait_file_exist "$hint_backup_start" kv_outage --kill -i $victim;;