From df971d27b34ddbfc25aa9cf04abe7f85462ade35 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 7 May 2024 16:33:08 +0800 Subject: [PATCH 01/22] fix commit --- br/pkg/backup/client.go | 201 ++++++++++++++++++++++++++++++++++++---- br/pkg/backup/push.go | 32 +++++++ 2 files changed, 213 insertions(+), 20 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 7bf187ffce6fb..290e4613ede30 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -50,7 +50,6 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -86,6 +85,11 @@ const ( RegionUnit ProgressUnit = "region" ) +type StoreBackupPolicy struct { + One uint64 + All bool +} + // Client is a client instructs TiKV how to do a backup. type Client struct { mgr ClientMgr @@ -810,32 +814,189 @@ func (bc *Client) BackupRanges( ctx = opentracing.ContextWithSpan(ctx, span1) } - // we collect all files in a single goroutine to avoid thread safety issues. - workerPool := util.NewWorkerPool(concurrency, "Ranges") - eg, ectx := errgroup.WithContext(ctx) - for id, r := range ranges { - id := id - req := request - req.StartKey, req.EndKey = r.StartKey, r.EndKey - pr, err := bc.GetProgressRange(r) - if err != nil { - return errors.Trace(err) + stateChan := make(chan StoreBackupPolicy) + // go func() { + // // TODO watch changes on cluste state + // cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { + // stateChan <- StoreBackups{All: true} + // }), storewatch.WithOnDisconnect(func(s *metapb.Store) { + // stateChan <- StoreBackups{All: true} + // }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { + // // only backup for this store + // stateChan <- StoreBackups{One: s.Id} + // })) + // watcher := storewatch.New(bc.mgr.GetPDClient(), cb) + // tick := time.NewTicker(30 * time.Second) + // for { + // select { + // case <-ctx.Done(): + // return + // case <-tick.C: + // err := watcher.Step(ctx) + // if err != nil { + // // ignore it + // } + // } + // } + // }() + + // pr, err := bc.GetProgressRanges(r) + // if err != nil { + // return errors.Trace(err) + // } + bc.executePushdownBackup(ctx, ranges, replicaReadLabel, request, stateChan) + + // // we collect all files in a single goroutine to avoid thread safety issues. + // workerPool := util.NewWorkerPool(concurrency, "Ranges") + // eg, ectx := errgroup.WithContext(ctx) + // for id, r := range ranges { + // id := id + // req := request + // req.StartKey, req.EndKey = r.StartKey, r.EndKey + // pr, err := bc.GetProgressRange(r) + // if err != nil { + // return errors.Trace(err) + // } + // workerPool.ApplyOnErrorGroup(eg, func() error { + // elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) + // err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack) + // if err != nil { + // // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) + // if errors.Cause(err) == context.Canceled { + // return errors.SuspendStack(err) + // } + // return errors.Trace(err) + // } + // return nil + // }) + // } + + // return eg.Wait() +} + +func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[string]string) ([]*metapb.Store, error) { + allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) + if err != nil { + return nil, errors.Trace(err) + } + var targetStores []*metapb.Store + targetStoreIds := make(map[uint64]struct{}) + if len(replicaReadLabel) == 0 { + targetStores = allStores // send backup push down request to all stores + } else { + for _, store := range allStores { + for _, label := range store.Labels { + if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { + targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label + targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup + } + } } - workerPool.ApplyOnErrorGroup(eg, func() error { - elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) - err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack) + } + if len(replicaReadLabel) > 0 && len(targetStores) == 0 { + return nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) + } + return targetStores, nil + +} + +// infinite loop to backup ranges on all tikv stores +// if one client grpc disconnected. retry send backup request to this store. +// if new tikv store joined. send backup request to this store. +// if one tikv store rebooted. consider leader changes, retry send backup request to all stores. +// if one tikv store disconnected. consider leader changes, retry send backup request to all stores. +func (bc *Client) executePushdownBackup(ctx context.Context, ranges []rtree.Range, replicaReadLabel map[string]string, request backuppb.BackupRequest, stateChan chan StoreBackupPolicy) { + startStoreBackupFn := func( + ctx context.Context, + round uint64, + storeID uint64, + cli backuppb.BackupClient, + BackupResponseCh chan *backuppb.BackupResponse, + ) { + // req.Ranges := GetLeftRangesFromGlobalBtree() + go func() { + err := startStoreBackup(ctx, storeID, request, cli, BackupResponseCh) if err != nil { - // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) if errors.Cause(err) == context.Canceled { - return errors.SuspendStack(err) + // backup cancelled, just log it + logutil.CL(ctx).Info("store backup cancelled", + zap.Uint64("round", round), + zap.Uint64("storeID", storeID), zap.Error(err)) + } else { + // retry this store + logutil.CL(ctx).Error("store backup failed", + zap.Uint64("round", round), + zap.Uint64("storeID", storeID), zap.Error(err)) + stateChan <- StoreBackupPolicy{One: storeID} } - return errors.Trace(err) } - return nil - }) + }() } - return eg.Wait() + round := uint64(0) +mainLoop: + for { + round += 1 + childCtx, cancel := context.WithCancel(ctx) + BackupResponseCh := make(chan *backuppb.BackupResponse) + // Compute the left ranges + // TODO compute left ranges + // request.SubRanges = GetLeftRangesFromGlobalBtree() + allStores, err := bc.getBackupStores(childCtx, replicaReadLabel) + if err != nil { + logutil.CL(ctx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) + time.Sleep(3 * time.Second) + // infinite loop to retry + continue mainLoop + } + for _, store := range allStores { + storeID := store.GetId() + // reset backup client every round, for a clean grpc connection next time. + cli, err := bc.mgr.ResetBackupClient(ctx, storeID) + if err != nil { + logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + time.Sleep(3 * time.Second) + continue mainLoop + } + startStoreBackupFn(childCtx, round, storeID, cli, BackupResponseCh) + } + + select { + case <-ctx.Done(): + return + case storeBackupInfo := <-stateChan: + if storeBackupInfo.All { + logutil.CL(ctx).Info("cluster state changed. restart store backups") + // stop current connections + cancel() + time.Sleep(time.Second) + // start next round backups + continue mainLoop + } + if storeBackupInfo.One != 0 { + storeID := storeBackupInfo.One + cli, err := bc.mgr.GetBackupClient(ctx, storeID) + if err != nil { + logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) + time.Sleep(3 * time.Second) + continue mainLoop + } + startStoreBackupFn(childCtx, round, storeID, cli, BackupResponseCh) + } + case resp, ok := <-BackupResponseCh: + if !ok { + // this round backup finished. break and check ranges outside + break + } + // updateGlobalBtree(resp) + } + // leftRanges := GetLeftRangesFromGlobalBtree() + leftRanges := []rtree.Range + if len(leftRanges) == 0 { + // all range backuped + break + } + } } // BackupRange make a backup of the given key range. diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 47fcfc13e49ac..7c06570049441 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -210,3 +210,35 @@ func (push *pushDown) pushBackup( } } } + +func startStoreBackup( + ctx context.Context, + storeID uint64, + backupReq backuppb.BackupRequest, + backupCli backuppb.BackupClient, + respCh chan *backuppb.BackupResponse, +) error { + // this goroutine handle the response from a single store + select { + case <-ctx.Done(): + return ctx.Err() + default: + retry := -1 + return utils.WithRetry(ctx, func() error { + retry += 1 + logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry)) + // Send backup request to the store. + // handle the backup response or internal error here. + // handle the store error(reboot or network partition) outside. + return doSendBackup(ctx, backupCli, backupReq, func(resp *backuppb.BackupResponse) error { + // Forward all responses (including error). + select { + case <-ctx.Done(): + return ctx.Err() + case respCh <- resp: + } + return nil + }) + }, utils.NewBackupSSTBackoffer()) + } +} From 70b2655570c13da91717757195ef8fd5d6ee0661 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 7 May 2024 18:15:50 +0800 Subject: [PATCH 02/22] update --- br/pkg/backup/client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index b294443377bf3..bb0b8abd21962 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -983,7 +983,7 @@ mainLoop: inCompleteRanges := iter.GetIncompleteRanges() if len(inCompleteRanges) == 0 { // all range backuped - break + return nil } round += 1 @@ -1081,7 +1081,6 @@ mainLoop: } } } - return nil } func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *metautil.MetaWriter) error { From 70bbd10c37aea3ce5403378486a5101224319436 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 7 May 2024 18:56:33 +0800 Subject: [PATCH 03/22] remove useless code --- br/pkg/backup/BUILD.bazel | 3 - br/pkg/backup/client.go | 7 ++- br/pkg/backup/client_test.go | 103 ----------------------------------- 3 files changed, 5 insertions(+), 108 deletions(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index f80a1cef6a4ef..f05ecf49d5529 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -33,8 +33,6 @@ go_library( "//pkg/statistics/handle", "//pkg/statistics/handle/util", "//pkg/util", - "//pkg/util/codec", - "//pkg/util/redact", "//pkg/util/table-filter", "@com_github_google_btree//:btree", "@com_github_opentracing_opentracing_go//:opentracing-go", @@ -47,7 +45,6 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_prometheus_client_golang//prometheus", "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index bb0b8abd21962..2ffe7b31446d7 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -931,7 +931,6 @@ func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[stri return nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) } return targetStores, nil - } // infinite loop to backup ranges on all tikv stores @@ -978,16 +977,17 @@ func (bc *Client) executePushdownBackup( mainLoop: for { + childCtx, cancel := context.WithCancel(ctx) // Compute the left ranges iter := globalProgressTree.Iter() inCompleteRanges := iter.GetIncompleteRanges() if len(inCompleteRanges) == 0 { // all range backuped + cancel() return nil } round += 1 - childCtx, cancel := context.WithCancel(ctx) BackupResponseCh := make(chan *responseAndStore) request.SubRanges = getBackupRanges(inCompleteRanges) @@ -1013,6 +1013,7 @@ mainLoop: select { case <-ctx.Done(): + cancel() return ctx.Err() case storeBackupInfo := <-stateChan: if storeBackupInfo.All { @@ -1045,6 +1046,7 @@ mainLoop: if err != nil { logutil.CL(ctx).Error("failed to update the backup response", zap.Reflect("error", err)) + cancel() return err } if bc.checkpointRunner != nil { @@ -1073,6 +1075,7 @@ mainLoop: errMsg = errPb.Msg } // TODO output a precise store address. @3pointer + cancel() return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", storeID, errMsg, diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index eeaea5cec762f..cab5de0df295f 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -9,10 +9,8 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/gluetidb" @@ -20,14 +18,11 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" "go.opencensus.io/stats/view" ) @@ -128,44 +123,6 @@ func TestGetTS(t *testing.T) { require.Equal(t, backupts, ts) } -func TestOnBackupRegionErrorResponse(t *testing.T) { - type Case struct { - storeID uint64 - bo *tikv.Backoffer - backupTS uint64 - lockResolver *txnlock.LockResolver - resp *backuppb.BackupResponse - exceptedBackoffMs int - exceptedErr bool - } - newBackupRegionErrorResp := func(regionError *errorpb.Error) *backuppb.BackupResponse { - return &backuppb.BackupResponse{Error: &backuppb.Error{Detail: &backuppb.Error_RegionError{RegionError: regionError}}} - } - - cases := []Case{ - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false}, - } - for _, cs := range cases { - t.Log(cs) - _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1)) - require.Equal(t, cs.exceptedBackoffMs, backoffMs) - if cs.exceptedErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - } -} - func TestSkipUnsupportedDDLJob(t *testing.T) { s := createBackupSuite(t) @@ -246,63 +203,3 @@ func TestCheckBackupIsLocked(t *testing.T) { require.Error(t, err) require.Regexp(t, "backup lock file and sst file exist in(.+)", err.Error()) } - -func TestFindTargetPeer(t *testing.T) { - s := createBackupSuite(t) - - ctx := context.Background() - testutils.BootstrapWithMultiRegions(s.mockCluster, []byte("g"), []byte("n"), []byte("t")) - - leader1, err := s.backupClient.FindTargetPeer(ctx, []byte("a"), false, nil) - require.NoError(t, err) - - leader2, err := s.backupClient.FindTargetPeer(ctx, []byte("b"), false, nil) - require.NoError(t, err) - - // check passed keys on same region - require.Equal(t, leader1.GetId(), leader2.GetId()) - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasLeader\")") - - leader, err := s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil) - require.NoError(t, err) - // check passed keys on find leader after retry - require.Equal(t, 42, int(leader.GetId())) - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "return(\"noLeader\")") - - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil) - // check passed keys with error on find leader after retry - require.ErrorContains(t, err, "cannot find leader") - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasPeer\")") - - storeIDMap := make(map[uint64]struct{}) - storeIDMap[42] = struct{}{} - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap) - require.NoError(t, err) - // check passed keys with target peer - require.Equal(t, 43, int(leader.GetId())) - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") - - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)") - failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"noPeer\")") - - leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap) - // check passed keys with error and cannot find target peer - require.ErrorContains(t, err, "cannot find leader") - - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer") -} From 1b730f6621d25cd4a386fa562a53066f7f2ae8a7 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 8 May 2024 14:09:32 +0800 Subject: [PATCH 04/22] use context to control backup flow --- br/pkg/backup/client.go | 340 +++++++++++++++++----------------------- br/pkg/backup/push.go | 100 +++++++++++- 2 files changed, 237 insertions(+), 203 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 2ffe7b31446d7..327c8f3fc9a77 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -7,15 +7,13 @@ import ( "context" "encoding/base64" "encoding/json" - "io" - "os" + "reflect" "strings" "time" "github.com/google/btree" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -43,8 +41,6 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // ClientMgr manages connections needed by backup. @@ -815,35 +811,6 @@ func buildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRange return progressRangeTree, subRanges, nil } -func (bc *Client) getBackupTargetStores( - ctx context.Context, - replicaReadLabel map[string]string, -) ([]*metapb.Store, map[uint64]struct{}, error) { - allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) - if err != nil { - return nil, nil, errors.Trace(err) - } - var targetStores []*metapb.Store - targetStoreIds := make(map[uint64]struct{}) - if len(replicaReadLabel) == 0 { - targetStores = allStores - } else { - for _, store := range allStores { - for _, label := range store.Labels { - if val, ok := replicaReadLabel[label.Key]; ok && val == label.Value { - targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label - targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup - } - } - } - if len(targetStores) == 0 { - return nil, nil, errors.Errorf("no store matches replica read label: %v", replicaReadLabel) - } - } - - return targetStores, targetStoreIds, nil -} - // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, @@ -901,7 +868,7 @@ func (bc *Client) BackupRanges( return errors.Trace(err) } - err = bc.executePushdownBackup(ctx, &globalProgressTree, replicaReadLabel, request, stateChan, progressCallBack) + err = bc.mainBackupLoop(ctx, &globalProgressTree, replicaReadLabel, request, stateChan, progressCallBack) if err != nil { return errors.Trace(err) } @@ -938,7 +905,7 @@ func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[stri // if new tikv store joined. send backup request to this store. // if one tikv store rebooted. consider leader changes, retry send backup request to all stores. // if one tikv store disconnected. consider leader changes, retry send backup request to all stores. -func (bc *Client) executePushdownBackup( +func (bc *Client) mainBackupLoop( ctx context.Context, globalProgressTree *rtree.ProgressRangeTree, replicaReadLabel map[string]string, @@ -955,48 +922,94 @@ func (bc *Client) executePushdownBackup( BackupResponseCh chan *responseAndStore, ) { go func() { + defer close(BackupResponseCh) err := startStoreBackup(ctx, storeID, request, cli, BackupResponseCh) if err != nil { if errors.Cause(err) == context.Canceled { // backup cancelled, just log it logutil.CL(ctx).Info("store backup cancelled", zap.Uint64("round", round), - zap.Uint64("storeID", storeID), zap.Error(err)) + zap.Uint64("storeID", storeID)) } else { - // retry this store + // otherwise retry backup this store logutil.CL(ctx).Error("store backup failed", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) stateChan <- StoreBackupPolicy{One: storeID} } } + logutil.CL(ctx).Info("store backup finished", + zap.Uint64("round", round), + zap.Uint64("storeID", storeID)) }() } + + collectStoreBackupAsyncFn := func( + ctx context.Context, + storeBackupsCh []chan *responseAndStore, + globalCh chan *responseAndStore, + + ) { + go func() { + defer func() { + logutil.CL(ctx).Info("exit collect store backups") + close(globalCh) + }() + cases := make([]reflect.SelectCase, 0) + for _, ch := range storeBackupsCh { + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}) + } + + remainingProducers := len(cases) + logutil.CL(ctx).Info("start wait store backups", zap.Int("remainingProducers", remainingProducers)) + for remainingProducers > 0 { + chosen, value, ok := reflect.Select(cases) + if !ok { + // The chosen channel has been closed, so zero out the channel to disable the case + cases[chosen].Chan = reflect.ValueOf(nil) + remainingProducers -= 1 + continue + } + + select { + case <-ctx.Done(): + return + case globalCh <- value.Interface().(*responseAndStore): + } + } + }() + } + // mark flag to indicate the backup round round := uint64(0) errContext := utils.NewErrorContext("executePushdownBackup", 10) mainLoop: for { - childCtx, cancel := context.WithCancel(ctx) + round += 1 + globalBackupResultCh := make(chan *responseAndStore) + storeBackupResultChs := make([]chan *responseAndStore, 0) + + mainCtx, mainCancel := context.WithCancel(ctx) + handleCtx, handleCancel := context.WithCancel(ctx) // Compute the left ranges iter := globalProgressTree.Iter() inCompleteRanges := iter.GetIncompleteRanges() if len(inCompleteRanges) == 0 { // all range backuped - cancel() + mainCancel() + handleCancel() return nil } - round += 1 - BackupResponseCh := make(chan *responseAndStore) - + logutil.CL(ctx).Info("backup round", zap.Uint64("round", round)) request.SubRanges = getBackupRanges(inCompleteRanges) - allStores, err := bc.getBackupStores(childCtx, replicaReadLabel) + allStores, err := bc.getBackupStores(mainCtx, replicaReadLabel) if err != nil { + // this error must be retryable, because the we have connectted to pd before. + // so make infinite retry here logutil.CL(ctx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) - time.Sleep(3 * time.Second) - // infinite loop to retry + time.Sleep(time.Second) continue mainLoop } for _, store := range allStores { @@ -1004,82 +1017,101 @@ mainLoop: // reset backup client every round, for a clean grpc connection next time. cli, err := bc.mgr.ResetBackupClient(ctx, storeID) if err != nil { + // this error must be retryable, because the we get store info from pd. + // so make infinite retry here logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) - time.Sleep(3 * time.Second) - continue mainLoop - } - startStoreBackupAsyncFn(childCtx, round, storeID, request, cli, BackupResponseCh) - } - - select { - case <-ctx.Done(): - cancel() - return ctx.Err() - case storeBackupInfo := <-stateChan: - if storeBackupInfo.All { - logutil.CL(ctx).Info("cluster state changed. restart store backups") - // stop current connections - cancel() time.Sleep(time.Second) - // start next round backups continue mainLoop } - if storeBackupInfo.One != 0 { - storeID := storeBackupInfo.One - cli, err := bc.mgr.GetBackupClient(ctx, storeID) - if err != nil { - logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) - time.Sleep(3 * time.Second) + ch := make(chan *responseAndStore) + storeBackupResultChs = append(storeBackupResultChs, ch) + startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) + } + // infinite loop to handle region backup response + collectStoreBackupAsyncFn(handleCtx, storeBackupResultChs, globalBackupResultCh) + handleLoop: + for { + select { + case <-ctx.Done(): + handleCancel() + mainCancel() + return ctx.Err() + case storeBackupInfo := <-stateChan: + if storeBackupInfo.All { + logutil.CL(ctx).Info("cluster state changed. restart store backups") + // stop current connections + handleCancel() + mainCancel() + time.Sleep(time.Second) + // start next round backups continue mainLoop } - startStoreBackupAsyncFn(childCtx, round, storeID, request, cli, BackupResponseCh) - } - case respAndStore, ok := <-BackupResponseCh: - if !ok { - // this round backup finished. break and check ranges outside - break - } - resp := respAndStore.GetResponse() - storeID := respAndStore.GetStoreID() - if resp.GetError() == nil { - pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey) - if err != nil { - logutil.CL(ctx).Error("failed to update the backup response", - zap.Reflect("error", err)) - cancel() - return err - } - if bc.checkpointRunner != nil { - if err := checkpoint.AppendForBackup( - ctx, - bc.checkpointRunner, - pr.GroupKey, - resp.StartKey, - resp.EndKey, - resp.Files, - ); err != nil { - // flush checkpoint failed, - logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) - // this error doesn't not influnce main procedure. so ignore it. + if storeBackupInfo.One != 0 { + storeID := storeBackupInfo.One + cli, err := bc.mgr.GetBackupClient(ctx, storeID) + if err != nil { + logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) + time.Sleep(3 * time.Second) + continue mainLoop } + handleCancel() + ch := make(chan *responseAndStore) + storeBackupResultChs = append(storeBackupResultChs, ch) + startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) + handleCtx, handleCancel = context.WithCancel(mainCtx) + // collect all store backup producer channel result to one channel + globalBackupResultCh = make(chan *responseAndStore) + collectStoreBackupAsyncFn(handleCtx, storeBackupResultChs, globalBackupResultCh) + } + case respAndStore, ok := <-globalBackupResultCh: + if !ok { + // this round backup finished. break and check ranges outside + break handleLoop } - pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) - progressCallBack() - } else { - errPb := resp.GetError() - res := errContext.HandleIgnorableError(errPb, storeID) - switch res.Strategy { - case utils.GiveUpStrategy: - errMsg := res.Reason - if len(errMsg) <= 0 { - errMsg = errPb.Msg + resp := respAndStore.GetResponse() + storeID := respAndStore.GetStoreID() + if resp.GetError() == nil { + pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey) + if err != nil { + logutil.CL(ctx).Error("failed to update the backup response", + zap.Reflect("error", err)) + handleCancel() + mainCancel() + return err + } + if bc.checkpointRunner != nil { + if err := checkpoint.AppendForBackup( + ctx, + bc.checkpointRunner, + pr.GroupKey, + resp.StartKey, + resp.EndKey, + resp.Files, + ); err != nil { + // flush checkpoint failed, + logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) + // this error doesn't not influnce main procedure. so ignore it. + } + } + pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) + progressCallBack() + } else { + errPb := resp.GetError() + res := errContext.HandleIgnorableError(errPb, storeID) + switch res.Strategy { + case utils.GiveUpStrategy: + errMsg := res.Reason + if len(errMsg) <= 0 { + errMsg = errPb.Msg + } + // TODO output a precise store address. @3pointer + handleCancel() + mainCancel() + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", + storeID, + errMsg, + ) } - // TODO output a precise store address. @3pointer - cancel() - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", - storeID, - errMsg, - ) } } } @@ -1116,85 +1148,3 @@ func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *m return errors.Trace(progressRangeAscendErr) } - -func doSendBackup( - ctx context.Context, - client backuppb.BackupClient, - req backuppb.BackupRequest, - respFn func(*backuppb.BackupResponse) error, -) error { - failpoint.Inject("hint-backup-start", func(v failpoint.Value) { - logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + - "process will notify the shell.") - if sigFile, ok := v.(string); ok { - file, err := os.Create(sigFile) - if err != nil { - log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) - } - if file != nil { - file.Close() - } - } - time.Sleep(3 * time.Second) - }) - bCli, err := client.Backup(ctx, &req) - failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { - switch val.(string) { - case "Unavaiable": - { - logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.") - err = status.Error(codes.Unavailable, "Unavailable error") - } - case "Internal": - { - logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.") - err = status.Error(codes.Internal, "Internal error") - } - } - }) - failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { - if val.(bool) { - logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") - err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") - } - }) - if err != nil { - return err - } - defer func() { - _ = bCli.CloseSend() - }() - - for { - resp, err := bCli.Recv() - if err != nil { - if errors.Cause(err) == io.EOF { // nolint:errorlint - logutil.CL(ctx).Debug("backup streaming finish", - logutil.Key("backup-start-key", req.GetStartKey()), - logutil.Key("backup-end-key", req.GetEndKey())) - return nil - } - return err - } - // TODO: handle errors in the resp. - logutil.CL(ctx).Debug("range backed up", - logutil.Key("small-range-start-key", resp.GetStartKey()), - logutil.Key("small-range-end-key", resp.GetEndKey()), - zap.Int("api-version", int(resp.ApiVersion))) - err = respFn(resp) - if err != nil { - return errors.Trace(err) - } - } -} - -func getBackupRanges(ranges []rtree.Range) []*kvrpcpb.KeyRange { - requestRanges := make([]*kvrpcpb.KeyRange, 0, len(ranges)) - for _, r := range ranges { - requestRanges = append(requestRanges, &kvrpcpb.KeyRange{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) - } - return requestRanges -} diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 84cf50ced3720..307552d5c5095 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -4,22 +4,24 @@ package backup import ( "context" + "io" + "os" + "time" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -// pushDown wraps a backup task. -type pushDown struct { - mgr ClientMgr - respCh chan responseAndStore - errCh chan error -} - type responseAndStore struct { Resp *backuppb.BackupResponse StoreID uint64 @@ -33,6 +35,77 @@ func (r responseAndStore) GetStoreID() uint64 { return r.StoreID } +func doSendBackup( + ctx context.Context, + client backuppb.BackupClient, + req backuppb.BackupRequest, + respFn func(*backuppb.BackupResponse) error, +) error { + failpoint.Inject("hint-backup-start", func(v failpoint.Value) { + logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + + "process will notify the shell.") + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + } + time.Sleep(3 * time.Second) + }) + bCli, err := client.Backup(ctx, &req) + failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { + switch val.(string) { + case "Unavaiable": + { + logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.") + err = status.Error(codes.Unavailable, "Unavailable error") + } + case "Internal": + { + logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.") + err = status.Error(codes.Internal, "Internal error") + } + } + }) + failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") + err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") + } + }) + if err != nil { + return err + } + defer func() { + _ = bCli.CloseSend() + }() + + for { + resp, err := bCli.Recv() + if err != nil { + if errors.Cause(err) == io.EOF { // nolint:errorlint + logutil.CL(ctx).Debug("backup streaming finish", + logutil.Key("backup-start-key", req.GetStartKey()), + logutil.Key("backup-end-key", req.GetEndKey())) + return nil + } + return err + } + // TODO: handle errors in the resp. + logutil.CL(ctx).Debug("range backed up", + logutil.Key("small-range-start-key", resp.GetStartKey()), + logutil.Key("small-range-end-key", resp.GetEndKey()), + zap.Int("api-version", int(resp.ApiVersion))) + err = respFn(resp) + if err != nil { + return errors.Trace(err) + } + } +} + func startStoreBackup( ctx context.Context, storeID uint64, @@ -48,7 +121,7 @@ func startStoreBackup( retry := -1 return utils.WithRetry(ctx, func() error { retry += 1 - logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry)) + logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID), zap.Int("retry time", retry)) // Send backup request to the store. // handle the backup response or internal error here. // handle the store error(reboot or network partition) outside. @@ -100,3 +173,14 @@ func startStoreBackup( }, utils.NewBackupSSTBackoffer()) } } + +func getBackupRanges(ranges []rtree.Range) []*kvrpcpb.KeyRange { + requestRanges := make([]*kvrpcpb.KeyRange, 0, len(ranges)) + for _, r := range ranges { + requestRanges = append(requestRanges, &kvrpcpb.KeyRange{ + StartKey: r.StartKey, + EndKey: r.EndKey, + }) + } + return requestRanges +} From f8ab8416576cae6f55fc4e44b827d391fcf10cd8 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 8 May 2024 16:23:57 +0800 Subject: [PATCH 05/22] udpate some comments --- br/pkg/backup/client.go | 215 +++++++++++++++++++++++----------------- 1 file changed, 125 insertions(+), 90 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 327c8f3fc9a77..bd387a1773df7 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -62,14 +62,6 @@ type Checksum struct { // ProgressUnit represents the unit of progress. type ProgressUnit string -const ( - // backupFineGrainedMaxBackoff is 1 hour. - // given it begins the fine-grained backup, there must be some problems in the cluster. - // We need to be more patient. - backupFineGrainedMaxBackoff = 3600000 - backupRetryTimes = 5 -) - type StoreBackupPolicy struct { One uint64 All bool @@ -868,7 +860,7 @@ func (bc *Client) BackupRanges( return errors.Trace(err) } - err = bc.mainBackupLoop(ctx, &globalProgressTree, replicaReadLabel, request, stateChan, progressCallBack) + err = bc.startMainBackupLoop(ctx, &globalProgressTree, replicaReadLabel, request, stateChan, progressCallBack) if err != nil { return errors.Trace(err) } @@ -900,12 +892,61 @@ func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[stri return targetStores, nil } +func (bc *Client) OnBackupResponse( + ctx context.Context, + r *responseAndStore, + errContext *utils.ErrorContext, + globalProgressTree *rtree.ProgressRangeTree, +) error { + resp := r.GetResponse() + storeID := r.GetStoreID() + if resp.GetError() == nil { + pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey) + if err != nil { + logutil.CL(ctx).Error("failed to update the backup response", + zap.Reflect("error", err)) + return err + } + if bc.checkpointRunner != nil { + if err := checkpoint.AppendForBackup( + ctx, + bc.checkpointRunner, + pr.GroupKey, + resp.StartKey, + resp.EndKey, + resp.Files, + ); err != nil { + // flush checkpoint failed, + logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) + // this error doesn't not influnce main procedure. so ignore it. + } + } + pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) + } else { + errPb := resp.GetError() + res := errContext.HandleIgnorableError(errPb, storeID) + switch res.Strategy { + case utils.GiveUpStrategy: + errMsg := res.Reason + if len(errMsg) <= 0 { + errMsg = errPb.Msg + } + // TODO output a precise store address. @3pointer + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", + storeID, + errMsg, + ) + } + } + return nil +} + // infinite loop to backup ranges on all tikv stores -// if one client grpc disconnected. retry send backup request to this store. -// if new tikv store joined. send backup request to this store. -// if one tikv store rebooted. consider leader changes, retry send backup request to all stores. -// if one tikv store disconnected. consider leader changes, retry send backup request to all stores. -func (bc *Client) mainBackupLoop( +// if one client grpc disconnected. resend backup request to this store. +// if new tikv store joined. send backup request to new store. +// if one tikv store rebooted. consider leader changes, resend backup request to all stores. +// if one tikv store disconnected. consider leader changes, resend backup request to all stores. +func (bc *Client) startMainBackupLoop( ctx context.Context, globalProgressTree *rtree.ProgressRangeTree, replicaReadLabel map[string]string, @@ -919,14 +960,19 @@ func (bc *Client) mainBackupLoop( storeID uint64, request backuppb.BackupRequest, cli backuppb.BackupClient, - BackupResponseCh chan *responseAndStore, + respCh chan *responseAndStore, ) { go func() { - defer close(BackupResponseCh) - err := startStoreBackup(ctx, storeID, request, cli, BackupResponseCh) + defer func() { + logutil.CL(ctx).Info("exit store backup goroutine", zap.Uint64("store", storeID)) + close(respCh) + }() + err := startStoreBackup(ctx, storeID, request, cli, respCh) if err != nil { + // only 2 kinds of errors will occurr here. + // 1. grpc connection error(already retry inside) + // 2. context cancelled outside. if errors.Cause(err) == context.Canceled { - // backup cancelled, just log it logutil.CL(ctx).Info("store backup cancelled", zap.Uint64("round", round), zap.Uint64("storeID", storeID)) @@ -938,25 +984,23 @@ func (bc *Client) mainBackupLoop( stateChan <- StoreBackupPolicy{One: storeID} } } - logutil.CL(ctx).Info("store backup finished", - zap.Uint64("round", round), - zap.Uint64("storeID", storeID)) }() } - collectStoreBackupAsyncFn := func( + collectStoreBackupsAsyncFn := func( ctx context.Context, - storeBackupsCh []chan *responseAndStore, + round uint64, + storeBackupChs map[uint64]chan *responseAndStore, globalCh chan *responseAndStore, ) { go func() { defer func() { - logutil.CL(ctx).Info("exit collect store backups") + logutil.CL(ctx).Info("exit collect backups goroutine", zap.Uint64("round", round)) close(globalCh) }() cases := make([]reflect.SelectCase, 0) - for _, ch := range storeBackupsCh { + for _, ch := range storeBackupChs { cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}) } @@ -979,19 +1023,33 @@ func (bc *Client) mainBackupLoop( } }() } - // mark flag to indicate the backup round - round := uint64(0) - errContext := utils.NewErrorContext("executePushdownBackup", 10) + // a flag to indicate the backup round + // one backup round try to backup all ranges on all tikv stores. + // ideally, backup should be finished in one round + // unless the cluster state changed or some kv errors occurred. + round := uint64(0) mainLoop: for { round += 1 + // initialize the error context every round + errContext := utils.NewErrorContext("MainBackupLoop", 10) + + // a channel to collect all store backup results globalBackupResultCh := make(chan *responseAndStore) - storeBackupResultChs := make([]chan *responseAndStore, 0) + // channel slices to receive backup region result from different tikv stores + storeBackupResultChMap := make(map[uint64]chan *responseAndStore) + // mainCtx used to control mainLoop + // every round need a new context to control the main backup process mainCtx, mainCancel := context.WithCancel(ctx) + + // handleCtx used to control handleLoop + // every round has another infinite loop to handle all tikv backup responses + // until backup finished, store state changed or error occurred. handleCtx, handleCancel := context.WithCancel(ctx) - // Compute the left ranges + + // Compute the left ranges that not backuped yet iter := globalProgressTree.Iter() inCompleteRanges := iter.GetIncompleteRanges() if len(inCompleteRanges) == 0 { @@ -1001,34 +1059,35 @@ mainLoop: return nil } - logutil.CL(ctx).Info("backup round", zap.Uint64("round", round)) + logutil.CL(ctx).Info("backup round start...", zap.Uint64("round", round)) + request.SubRanges = getBackupRanges(inCompleteRanges) allStores, err := bc.getBackupStores(mainCtx, replicaReadLabel) if err != nil { - // this error must be retryable, because the we have connectted to pd before. - // so make infinite retry here + // because we have connectted to pd before. + // so this error must be retryable, just make infinite retry here logutil.CL(ctx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) time.Sleep(time.Second) continue mainLoop } for _, store := range allStores { storeID := store.GetId() - // reset backup client every round, for a clean grpc connection next time. + // reset backup client every round, to get a clean grpc connection. cli, err := bc.mgr.ResetBackupClient(ctx, storeID) if err != nil { - // this error must be retryable, because the we get store info from pd. - // so make infinite retry here - logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + // because the we get store info from pd. + // there is no customer setting here, so make infinite retry. + logutil.CL(ctx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) time.Sleep(time.Second) continue mainLoop } ch := make(chan *responseAndStore) - storeBackupResultChs = append(storeBackupResultChs, ch) + storeBackupResultChMap[storeID] = ch startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) } - // infinite loop to handle region backup response - collectStoreBackupAsyncFn(handleCtx, storeBackupResultChs, globalBackupResultCh) + // infinite loop to collect region backup response to global channel + collectStoreBackupsAsyncFn(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) handleLoop: for { select { @@ -1042,77 +1101,53 @@ mainLoop: // stop current connections handleCancel() mainCancel() - time.Sleep(time.Second) // start next round backups continue mainLoop } if storeBackupInfo.One != 0 { + // new tikv store come up storeID := storeBackupInfo.One cli, err := bc.mgr.GetBackupClient(ctx, storeID) if err != nil { logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) - time.Sleep(3 * time.Second) + handleCancel() + mainCancel() + // receive new store info but failed to get backup client. + // start next round backups to get all tikv stores and reset all client connections. continue mainLoop } + + // cancel the former collect goroutine handleCancel() ch := make(chan *responseAndStore) - storeBackupResultChs = append(storeBackupResultChs, ch) + + storeBackupResultChMap[storeID] = ch + // start backup for this store startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) + // re-create context for new handler loop handleCtx, handleCancel = context.WithCancel(mainCtx) - // collect all store backup producer channel result to one channel + // handleCancel makes the former collect goroutine exits + // so we need to re-create a new channel and restart a new collect goroutine. globalBackupResultCh = make(chan *responseAndStore) - collectStoreBackupAsyncFn(handleCtx, storeBackupResultChs, globalBackupResultCh) + // collect all store backup producer channel result to one channel + collectStoreBackupsAsyncFn(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) } case respAndStore, ok := <-globalBackupResultCh: if !ok { - // this round backup finished. break and check ranges outside + // this round backup finished. break and check incomplete ranges in mainLoop. break handleLoop } - resp := respAndStore.GetResponse() - storeID := respAndStore.GetStoreID() - if resp.GetError() == nil { - pr, err := globalProgressTree.FindContained(resp.StartKey, resp.EndKey) - if err != nil { - logutil.CL(ctx).Error("failed to update the backup response", - zap.Reflect("error", err)) - handleCancel() - mainCancel() - return err - } - if bc.checkpointRunner != nil { - if err := checkpoint.AppendForBackup( - ctx, - bc.checkpointRunner, - pr.GroupKey, - resp.StartKey, - resp.EndKey, - resp.Files, - ); err != nil { - // flush checkpoint failed, - logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) - // this error doesn't not influnce main procedure. so ignore it. - } - } - pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) - progressCallBack() - } else { - errPb := resp.GetError() - res := errContext.HandleIgnorableError(errPb, storeID) - switch res.Strategy { - case utils.GiveUpStrategy: - errMsg := res.Reason - if len(errMsg) <= 0 { - errMsg = errPb.Msg - } - // TODO output a precise store address. @3pointer - handleCancel() - mainCancel() - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v: %s", - storeID, - errMsg, - ) - } + err = bc.OnBackupResponse(ctx, respAndStore, errContext, globalProgressTree) + if err != nil { + // if error occurred here, stop the backup process + // because only 2 kinds of errors will be returned here: + // 1. permission denied on tikv store. + // 2. parse backup response error.(shouldn't happen in any case) + handleCancel() + mainCancel() + return err } + progressCallBack() } } } From 0ca511a5c71dfd8fcbd35f27d70126848ffba2b0 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 8 May 2024 17:44:06 +0800 Subject: [PATCH 06/22] fix typo --- br/pkg/backup/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index bd387a1773df7..42437fdfb453b 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -969,7 +969,7 @@ func (bc *Client) startMainBackupLoop( }() err := startStoreBackup(ctx, storeID, request, cli, respCh) if err != nil { - // only 2 kinds of errors will occurr here. + // only 2 kinds of errors will occur here. // 1. grpc connection error(already retry inside) // 2. context cancelled outside. if errors.Cause(err) == context.Canceled { From 17cb445c310a37e5d9858bf4577237e0d9baca64 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 9 May 2024 15:37:28 +0800 Subject: [PATCH 07/22] add some unit cases --- br/pkg/backup/client.go | 53 ++++++++----------- br/pkg/backup/client_test.go | 100 +++++++++++++++++++++++++++++++++++ br/pkg/backup/push.go | 10 ++-- 3 files changed, 126 insertions(+), 37 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 42437fdfb453b..900ff306145cf 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -768,7 +768,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return nil } -func (bc *Client) getProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange { +func (bc *Client) GetProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange { prs := make([]*rtree.ProgressRange, 0, len(ranges)) for _, r := range ranges { prs = append(prs, bc.getProgressRange(r)) @@ -776,31 +776,16 @@ func (bc *Client) getProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange return prs } -func buildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRangeTree, []*kvrpcpb.KeyRange, error) { +func BuildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRangeTree, error) { // the response from TiKV only contains the region's key, so use the // progress range tree to quickly seek the region's corresponding progress range. progressRangeTree := rtree.NewProgressRangeTree() - subRangesCount := 0 for _, pr := range pranges { if err := progressRangeTree.Insert(pr); err != nil { - return progressRangeTree, nil, errors.Trace(err) + return progressRangeTree, errors.Trace(err) } - subRangesCount += len(pr.Incomplete) } - // either the `incomplete` is origin range itself, - // or the `incomplete` is sub-ranges split by checkpoint of origin range. - subRanges := make([]*kvrpcpb.KeyRange, 0, subRangesCount) - progressRangeTree.Ascend(func(pr *rtree.ProgressRange) bool { - for _, r := range pr.Incomplete { - subRanges = append(subRanges, &kvrpcpb.KeyRange{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) - } - return true - }) - - return progressRangeTree, subRanges, nil + return progressRangeTree, nil } // BackupRanges make a backup of the given key ranges. @@ -854,8 +839,8 @@ func (bc *Client) BackupRanges( // } // }() - pranges := bc.getProgressRanges(ranges) - globalProgressTree, _, err := buildProgressRangeTree(pranges) + pranges := bc.GetProgressRanges(ranges) + globalProgressTree, err := BuildProgressRangeTree(pranges) if err != nil { return errors.Trace(err) } @@ -894,10 +879,14 @@ func (bc *Client) getBackupStores(ctx context.Context, replicaReadLabel map[stri func (bc *Client) OnBackupResponse( ctx context.Context, - r *responseAndStore, + r *ResponseAndStore, errContext *utils.ErrorContext, globalProgressTree *rtree.ProgressRangeTree, ) error { + if r == nil || r.GetResponse() == nil { + return nil + } + resp := r.GetResponse() storeID := r.GetStoreID() if resp.GetError() == nil { @@ -917,8 +906,8 @@ func (bc *Client) OnBackupResponse( resp.Files, ); err != nil { // flush checkpoint failed, - logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) // this error doesn't not influnce main procedure. so ignore it. + logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) } } pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) @@ -960,7 +949,7 @@ func (bc *Client) startMainBackupLoop( storeID uint64, request backuppb.BackupRequest, cli backuppb.BackupClient, - respCh chan *responseAndStore, + respCh chan *ResponseAndStore, ) { go func() { defer func() { @@ -990,8 +979,8 @@ func (bc *Client) startMainBackupLoop( collectStoreBackupsAsyncFn := func( ctx context.Context, round uint64, - storeBackupChs map[uint64]chan *responseAndStore, - globalCh chan *responseAndStore, + storeBackupChs map[uint64]chan *ResponseAndStore, + globalCh chan *ResponseAndStore, ) { go func() { @@ -1018,7 +1007,7 @@ func (bc *Client) startMainBackupLoop( select { case <-ctx.Done(): return - case globalCh <- value.Interface().(*responseAndStore): + case globalCh <- value.Interface().(*ResponseAndStore): } } }() @@ -1036,9 +1025,9 @@ mainLoop: errContext := utils.NewErrorContext("MainBackupLoop", 10) // a channel to collect all store backup results - globalBackupResultCh := make(chan *responseAndStore) + globalBackupResultCh := make(chan *ResponseAndStore) // channel slices to receive backup region result from different tikv stores - storeBackupResultChMap := make(map[uint64]chan *responseAndStore) + storeBackupResultChMap := make(map[uint64]chan *ResponseAndStore) // mainCtx used to control mainLoop // every round need a new context to control the main backup process @@ -1082,7 +1071,7 @@ mainLoop: time.Sleep(time.Second) continue mainLoop } - ch := make(chan *responseAndStore) + ch := make(chan *ResponseAndStore) storeBackupResultChMap[storeID] = ch startStoreBackupAsyncFn(mainCtx, round, storeID, request, cli, ch) } @@ -1119,7 +1108,7 @@ mainLoop: // cancel the former collect goroutine handleCancel() - ch := make(chan *responseAndStore) + ch := make(chan *ResponseAndStore) storeBackupResultChMap[storeID] = ch // start backup for this store @@ -1128,7 +1117,7 @@ mainLoop: handleCtx, handleCancel = context.WithCancel(mainCtx) // handleCancel makes the former collect goroutine exits // so we need to re-create a new channel and restart a new collect goroutine. - globalBackupResultCh = make(chan *responseAndStore) + globalBackupResultCh = make(chan *ResponseAndStore) // collect all store backup producer channel result to one channel collectStoreBackupsAsyncFn(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index cab5de0df295f..dcb48ad8f25be 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -17,7 +17,9 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -203,3 +205,101 @@ func TestCheckBackupIsLocked(t *testing.T) { require.Error(t, err) require.Regexp(t, "backup lock file and sst file exist in(.+)", err.Error()) } + +func TestOnBackupResponse(t *testing.T) { + s := createBackupSuite(t) + + ctx := context.Background() + + buildProgressRangeFn := func(startKey []byte, endKey []byte) *rtree.ProgressRange { + return &rtree.ProgressRange{ + Res: rtree.NewRangeTree(), + Origin: rtree.Range{ + StartKey: []byte(startKey), + EndKey: []byte(endKey), + }, + } + } + + errContext := utils.NewErrorContext("test", 1) + require.Nil(t, s.backupClient.OnBackupResponse(ctx, nil, errContext, nil)) + + tree := rtree.NewProgressRangeTree() + r := &backup.ResponseAndStore{ + StoreID: 0, + Resp: &backuppb.BackupResponse{ + Error: &backuppb.Error{ + Msg: "test", + }, + }, + } + // case #1: error resposne + // first error can be ignored due to errContext. + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + // second error cannot be ignored. + require.Error(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + // case #2: normal resposne + r = &backup.ResponseAndStore{ + StoreID: 0, + Resp: &backuppb.BackupResponse{ + StartKey: []byte("a"), + EndKey: []byte("b"), + }, + } + + require.NoError(t, tree.Insert(buildProgressRangeFn([]byte("aa"), []byte("c")))) + // error due to the tree range does not match response range. + require.Error(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + // case #3: partial range success case, find incomplete range + r.Resp.StartKey = []byte("aa") + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + + incomplete := tree.Iter().GetIncompleteRanges() + require.Len(t, incomplete, 1) + require.Equal(t, []byte("b"), incomplete[0].StartKey) + require.Equal(t, []byte("c"), incomplete[0].EndKey) + + // case #4: success case, make up incomplete range + r.Resp.StartKey = []byte("b") + r.Resp.EndKey = []byte("c") + require.NoError(t, s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)) + incomplete = tree.Iter().GetIncompleteRanges() + require.Len(t, incomplete, 0) +} + +func TestBuildProgressRangeTree(t *testing.T) { + s := createBackupSuite(t) + ranges := []rtree.Range{ + { + StartKey: []byte("aa"), + EndKey: []byte("b"), + }, + { + StartKey: []byte("c"), + EndKey: []byte("d"), + }, + } + pranges := s.backupClient.GetProgressRanges(ranges) + tree, err := backup.BuildProgressRangeTree(pranges) + require.NoError(t, err) + + contained, err := tree.FindContained([]byte("a"), []byte("aa")) + require.Nil(t, contained) + require.Error(t, err) + + contained, err = tree.FindContained([]byte("b"), []byte("ba")) + require.Nil(t, contained) + require.Error(t, err) + + contained, err = tree.FindContained([]byte("e"), []byte("ea")) + require.Nil(t, contained) + require.Error(t, err) + + contained, err = tree.FindContained([]byte("aa"), []byte("b")) + require.NotNil(t, contained) + require.Equal(t, []byte("aa"), contained.Origin.StartKey) + require.Equal(t, []byte("b"), contained.Origin.EndKey) + require.NoError(t, err) +} diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 307552d5c5095..8c0aace92a79c 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -22,16 +22,16 @@ import ( "google.golang.org/grpc/status" ) -type responseAndStore struct { +type ResponseAndStore struct { Resp *backuppb.BackupResponse StoreID uint64 } -func (r responseAndStore) GetResponse() *backuppb.BackupResponse { +func (r ResponseAndStore) GetResponse() *backuppb.BackupResponse { return r.Resp } -func (r responseAndStore) GetStoreID() uint64 { +func (r ResponseAndStore) GetStoreID() uint64 { return r.StoreID } @@ -111,7 +111,7 @@ func startStoreBackup( storeID uint64, backupReq backuppb.BackupRequest, backupCli backuppb.BackupClient, - respCh chan *responseAndStore, + respCh chan *ResponseAndStore, ) error { // this goroutine handle the response from a single store select { @@ -163,7 +163,7 @@ func startStoreBackup( select { case <-ctx.Done(): return ctx.Err() - case respCh <- &responseAndStore{ + case respCh <- &ResponseAndStore{ Resp: resp, StoreID: storeID, }: From 51c2de11080cbb3bf26382e19f9ce4825abbe2df Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 9 May 2024 16:24:23 +0800 Subject: [PATCH 08/22] fix build --- br/pkg/backup/client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index dcb48ad8f25be..747f41f6bf194 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -215,8 +215,8 @@ func TestOnBackupResponse(t *testing.T) { return &rtree.ProgressRange{ Res: rtree.NewRangeTree(), Origin: rtree.Range{ - StartKey: []byte(startKey), - EndKey: []byte(endKey), + StartKey: startKey, + EndKey: endKey, }, } } From ad3be1c3c42bf3e669127fbc9efe1ea2c2b99eda Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 9 May 2024 17:08:50 +0800 Subject: [PATCH 09/22] fix build --- br/pkg/backup/BUILD.bazel | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index f05ecf49d5529..7d7602193fb1e 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -73,6 +73,7 @@ go_test( "//br/pkg/metautil", "//br/pkg/mock", "//br/pkg/pdutil", + "//br/pkg/rtree", "//br/pkg/storage", "//br/pkg/utils", "//pkg/parser/model", @@ -81,15 +82,11 @@ go_test( "//pkg/testkit/testsetup", "//pkg/util/table-filter", "@com_github_golang_protobuf//proto", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", - "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", - "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", "@io_opencensus_go//stats/view", "@org_uber_go_goleak//:goleak", From 87dc6353a3c6afe90d28e474121fb4b8125fec79 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 10 May 2024 11:09:16 +0800 Subject: [PATCH 10/22] update integration tests --- br/tests/br_tikv_outage/run.sh | 4 +--- br/tests/br_tikv_outage2/run.sh | 12 +++--------- br/tests/br_tikv_outage3/run.sh | 4 +--- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/br/tests/br_tikv_outage/run.sh b/br/tests/br_tikv_outage/run.sh index fc9eef2c5be08..877b44884f897 100644 --- a/br/tests/br_tikv_outage/run.sh +++ b/br/tests/br_tikv_outage/run.sh @@ -8,16 +8,14 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client cases=${cases:-'shutdown'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" diff --git a/br/tests/br_tikv_outage2/run.sh b/br/tests/br_tikv_outage2/run.sh index 521b7942203ec..360558a37f3f9 100644 --- a/br/tests/br_tikv_outage2/run.sh +++ b/br/tests/br_tikv_outage2/run.sh @@ -8,20 +8,15 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client -cases=${cases:-'outage-at-finegrained outage outage-after-request'} +cases=${cases:-'outage outage-after-request'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" - if [ "$failure" = outage-at-finegrained ]; then - export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/backup/noop-backup=return(true)" - fi backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" rm -rf "${backup_dir:?}" @@ -30,11 +25,10 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get single_point_fault $failure wait $backup_pid case $failure in - scale-out | shutdown | outage-at-finegrained ) stop_services + scale-out | shutdown ) stop_services start_services ;; *) ;; esac - check done diff --git a/br/tests/br_tikv_outage3/run.sh b/br/tests/br_tikv_outage3/run.sh index c630029885e25..aeb56dedea166 100644 --- a/br/tests/br_tikv_outage3/run.sh +++ b/br/tests/br_tikv_outage3/run.sh @@ -8,16 +8,14 @@ set -eux load -hint_finegrained=$TEST_DIR/hint_finegrained hint_backup_start=$TEST_DIR/hint_backup_start hint_get_backup_client=$TEST_DIR/hint_get_backup_client cases=${cases:-'scale-out'} for failure in $cases; do - rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + rm -f "$hint_backup_start" "$hint_get_backup_client" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ -github.com/pingcap/tidb/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" From cedc6ac0103222eceeeed996a59e051e253a2620 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 10 May 2024 13:46:36 +0800 Subject: [PATCH 11/22] fix tests --- br/pkg/backup/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 900ff306145cf..f5a87bec4b36c 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1063,7 +1063,7 @@ mainLoop: for _, store := range allStores { storeID := store.GetId() // reset backup client every round, to get a clean grpc connection. - cli, err := bc.mgr.ResetBackupClient(ctx, storeID) + cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) if err != nil { // because the we get store info from pd. // there is no customer setting here, so make infinite retry. @@ -1096,7 +1096,7 @@ mainLoop: if storeBackupInfo.One != 0 { // new tikv store come up storeID := storeBackupInfo.One - cli, err := bc.mgr.GetBackupClient(ctx, storeID) + cli, err := bc.mgr.GetBackupClient(mainCtx, storeID) if err != nil { logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) handleCancel() @@ -1126,7 +1126,7 @@ mainLoop: // this round backup finished. break and check incomplete ranges in mainLoop. break handleLoop } - err = bc.OnBackupResponse(ctx, respAndStore, errContext, globalProgressTree) + err = bc.OnBackupResponse(handleCtx, respAndStore, errContext, globalProgressTree) if err != nil { // if error occurred here, stop the backup process // because only 2 kinds of errors will be returned here: From 7310e650f9711590a48b07dc57f6738cec2bf4cb Mon Sep 17 00:00:00 2001 From: 3pointer Date: Sat, 11 May 2024 11:21:52 +0800 Subject: [PATCH 12/22] fix context --- br/pkg/backup/client.go | 2 +- br/pkg/conn/conn.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index f5a87bec4b36c..86b99ab8338a3 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1043,8 +1043,8 @@ mainLoop: inCompleteRanges := iter.GetIncompleteRanges() if len(inCompleteRanges) == 0 { // all range backuped - mainCancel() handleCancel() + mainCancel() return nil } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 87accda625a22..8708297cfe9da 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -91,8 +91,6 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { err = status.Error(codes.Unknown, "Retryable error") - } else { - err = context.Canceled } }) @@ -100,8 +98,6 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") if val.(bool) { err = status.Error(codes.Canceled, "Cancel Retry") - } else { - err = context.Canceled } }) From f1bd396837d18de2b6d2eea6b5d712c74cf8196b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Sat, 11 May 2024 14:12:29 +0800 Subject: [PATCH 13/22] udpate tests --- br/pkg/backup/client.go | 18 ++++++++++++++---- br/tests/br_tikv_outage2/run.sh | 5 ----- br/tests/br_tikv_outage3/run.sh | 1 - 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 86b99ab8338a3..b4cefd06c0f86 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -970,6 +970,7 @@ func (bc *Client) startMainBackupLoop( logutil.CL(ctx).Error("store backup failed", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + time.Sleep(15 * time.Second) stateChan <- StoreBackupPolicy{One: storeID} } } @@ -1086,7 +1087,7 @@ mainLoop: return ctx.Err() case storeBackupInfo := <-stateChan: if storeBackupInfo.All { - logutil.CL(ctx).Info("cluster state changed. restart store backups") + logutil.CL(mainCtx).Info("cluster state changed. restart store backups") // stop current connections handleCancel() mainCancel() @@ -1094,11 +1095,20 @@ mainLoop: continue mainLoop } if storeBackupInfo.One != 0 { - // new tikv store come up storeID := storeBackupInfo.One - cli, err := bc.mgr.GetBackupClient(mainCtx, storeID) + _, err := bc.mgr.GetPDClient().GetStore(mainCtx, storeID) if err != nil { - logutil.CL(ctx).Error("failed to get backup client", zap.Uint64("storeID", storeID), zap.Error(err)) + // cannot get store, maybe store has scaled-in. + logutil.CL(mainCtx).Info("cannot get store from pd", zap.Error(err)) + // try next round + handleCancel() + mainCancel() + continue mainLoop + } + // reset backup client. store address could change but store id remained. + cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) + if err != nil { + logutil.CL(mainCtx).Error("failed to reset backup client", zap.Uint64("storeID", storeID), zap.Error(err)) handleCancel() mainCancel() // receive new store info but failed to get backup client. diff --git a/br/tests/br_tikv_outage2/run.sh b/br/tests/br_tikv_outage2/run.sh index 360558a37f3f9..913b9d65e81c5 100644 --- a/br/tests/br_tikv_outage2/run.sh +++ b/br/tests/br_tikv_outage2/run.sh @@ -24,11 +24,6 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get backup_pid=$! single_point_fault $failure wait $backup_pid - case $failure in - scale-out | shutdown ) stop_services - start_services ;; - *) ;; - esac check done diff --git a/br/tests/br_tikv_outage3/run.sh b/br/tests/br_tikv_outage3/run.sh index aeb56dedea166..6eeb44a04b991 100644 --- a/br/tests/br_tikv_outage3/run.sh +++ b/br/tests/br_tikv_outage3/run.sh @@ -30,6 +30,5 @@ github.com/pingcap/tidb/br/pkg/utils/hint-get-backup-client=1*return(\"$hint_get stop_services start_services - check done From 8528b7d13a75dc250c0c62a838d267e9b3a52de6 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Sat, 11 May 2024 16:19:38 +0800 Subject: [PATCH 14/22] check store liveness --- br/pkg/backup/client.go | 23 +++++++++++++++-------- br/pkg/utils/misc.go | 3 +-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index b4cefd06c0f86..ad5513c995089 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -970,7 +970,6 @@ func (bc *Client) startMainBackupLoop( logutil.CL(ctx).Error("store backup failed", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) - time.Sleep(15 * time.Second) stateChan <- StoreBackupPolicy{One: storeID} } } @@ -1057,11 +1056,15 @@ mainLoop: if err != nil { // because we have connectted to pd before. // so this error must be retryable, just make infinite retry here - logutil.CL(ctx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) - time.Sleep(time.Second) + logutil.CL(mainCtx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) continue mainLoop } for _, store := range allStores { + if err = utils.CheckStoreLiveness(store); err != nil { + // skip this store in this round. + logutil.CL(mainCtx).Warn("store not alive, skip backup it in this round", zap.Uint64("round", round), zap.Error(err)) + continue + } storeID := store.GetId() // reset backup client every round, to get a clean grpc connection. cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) @@ -1069,7 +1072,6 @@ mainLoop: // because the we get store info from pd. // there is no customer setting here, so make infinite retry. logutil.CL(ctx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) - time.Sleep(time.Second) continue mainLoop } ch := make(chan *ResponseAndStore) @@ -1087,7 +1089,7 @@ mainLoop: return ctx.Err() case storeBackupInfo := <-stateChan: if storeBackupInfo.All { - logutil.CL(mainCtx).Info("cluster state changed. restart store backups") + logutil.CL(mainCtx).Info("cluster state changed. restart store backups", zap.Uint64("round", round)) // stop current connections handleCancel() mainCancel() @@ -1096,19 +1098,24 @@ mainLoop: } if storeBackupInfo.One != 0 { storeID := storeBackupInfo.One - _, err := bc.mgr.GetPDClient().GetStore(mainCtx, storeID) + store, err := bc.mgr.GetPDClient().GetStore(mainCtx, storeID) if err != nil { // cannot get store, maybe store has scaled-in. - logutil.CL(mainCtx).Info("cannot get store from pd", zap.Error(err)) + logutil.CL(mainCtx).Info("cannot get store from pd", zap.Uint64("round", round), zap.Error(err)) // try next round handleCancel() mainCancel() continue mainLoop } + if err = utils.CheckStoreLiveness(store); err != nil { + // skip this store in this round. + logutil.CL(mainCtx).Warn("store not alive, skip backup it in this round", zap.Uint64("round", round), zap.Error(err)) + continue + } // reset backup client. store address could change but store id remained. cli, err := bc.mgr.ResetBackupClient(mainCtx, storeID) if err != nil { - logutil.CL(mainCtx).Error("failed to reset backup client", zap.Uint64("storeID", storeID), zap.Error(err)) + logutil.CL(mainCtx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) handleCancel() mainCancel() // receive new store info but failed to get backup client. diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index c351f62011a76..80882489905db 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -46,8 +46,7 @@ const ( // Also note that the offline threshold in PD is 20s, see // https://github.com/tikv/pd/blob/c40e319f50822678cda71ae62ee2fd70a9cac010/pkg/core/store.go#L523 - // After talk to PD members 100s is not a safe number. set it to 600s - storeDisconnectionDuration = 600 * time.Second + storeDisconnectionDuration = 100 * time.Second ) // IsTypeCompatible checks whether type target is compatible with type src From 2e66607a1b9991ca68a75834f4dcb9632bca78c0 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 13 May 2024 11:28:23 +0800 Subject: [PATCH 15/22] remove unproper failpoint injects --- br/tests/br_rawkv/run.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/br/tests/br_rawkv/run.sh b/br/tests/br_rawkv/run.sh index ed7dc7804f88c..c13d057d1e0dd 100755 --- a/br/tests/br_rawkv/run.sh +++ b/br/tests/br_rawkv/run.sh @@ -186,5 +186,3 @@ run_test "" # ingest "region error" to trigger fineGrainedBackup, only one region error. run_test "github.com/pingcap/tidb/br/pkg/backup/tikv-region-error=1*return(\"region error\")" -# all regions failed. -run_test "github.com/pingcap/tidb/br/pkg/backup/tikv-region-error=return(\"region error\")" From 99f6634ec41e4f3adffb5ed93b9afef00317c72f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 13 May 2024 14:22:48 +0800 Subject: [PATCH 16/22] fix ut --- br/pkg/conn/conn.go | 14 ++++++++++++-- br/pkg/conn/conn_test.go | 18 +++++++++++++----- tests/_utils/br_tikv_outage_util | 3 --- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 8708297cfe9da..18a43f62d636f 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -91,13 +91,23 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { err = status.Error(codes.Unknown, "Retryable error") + return err } }) - failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) { - logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") + failpoint.Inject("hint-GetAllTiKVStores-grpc-cancel", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-grpc-cancel injected.") if val.(bool) { err = status.Error(codes.Canceled, "Cancel Retry") + return err + } + }) + + failpoint.Inject("hint-GetAllTiKVStores-ctx-cancel", func(val failpoint.Value) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-ctx-cancel injected.") + if val.(bool) { + err = context.Canceled + return err } }) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 00fe21d60f1e0..3d594a3c1216a 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -25,10 +25,14 @@ import ( ) func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { - err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "1*return(true)->1*return(false)") + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-grpc-cancel", "1*return(true)") + require.NoError(t, err) + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel", "1*return(true)") require.NoError(t, err) defer func() { - err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel") + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-grpc-cancel") + require.NoError(t, err) + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel") require.NoError(t, err) }() ctx, cancel := context.WithCancel(context.Background()) @@ -64,16 +68,20 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { _, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) errs := multierr.Errors(err) - require.Equal(t, 2, len(errs)) + require.Equal(t, 1, len(errs)) require.Equal(t, codes.Canceled, status.Code(errors.Cause(errs[0]))) } func TestGetAllTiKVStoresWithUnknown(t *testing.T) { - err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "1*return(true)->1*return(false)") + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "1*return(true)") + require.NoError(t, err) + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel", "1*return(true)") require.NoError(t, err) defer func() { err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error") require.NoError(t, err) + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-ctx-cancel") + require.NoError(t, err) }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -108,7 +116,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { _, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) errs := multierr.Errors(err) - require.Equal(t, 2, len(errs)) + require.Equal(t, 1, len(errs)) require.Equal(t, codes.Unknown, status.Code(errors.Cause(errs[0]))) } diff --git a/tests/_utils/br_tikv_outage_util b/tests/_utils/br_tikv_outage_util index 192a2ebb195b4..db3a7cc171936 100644 --- a/tests/_utils/br_tikv_outage_util +++ b/tests/_utils/br_tikv_outage_util @@ -37,9 +37,6 @@ single_point_fault() { outage-after-request) wait_file_exist "$hint_get_backup_client" kv_outage -d 30 -i $victim;; - outage-at-finegrained) - wait_file_exist "$hint_finegrained" - kv_outage --kill -i $victim;; shutdown) wait_file_exist "$hint_backup_start" kv_outage --kill -i $victim;; From a2ee4e3c62dd30b97c7ca300fcbf25d9b88c4075 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 13 May 2024 14:32:22 +0800 Subject: [PATCH 17/22] fix build --- br/pkg/conn/conn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 18a43f62d636f..9c6c322b7debd 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -91,7 +91,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { err = status.Error(codes.Unknown, "Retryable error") - return err + return } }) @@ -99,7 +99,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-grpc-cancel injected.") if val.(bool) { err = status.Error(codes.Canceled, "Cancel Retry") - return err + return } }) @@ -107,7 +107,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-ctx-cancel injected.") if val.(bool) { err = context.Canceled - return err + return } }) From 3a21d6d69ad82da50d7570e5186d1e008e7329c2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 13 May 2024 15:05:09 +0800 Subject: [PATCH 18/22] fix check --- br/pkg/conn/conn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 9c6c322b7debd..cdb81a011c8a5 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -91,7 +91,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.") if val.(bool) { err = status.Error(codes.Unknown, "Retryable error") - return + failpoint.Return(err) } }) @@ -99,7 +99,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-grpc-cancel injected.") if val.(bool) { err = status.Error(codes.Canceled, "Cancel Retry") - return + failpoint.Return(err) } }) @@ -107,7 +107,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-ctx-cancel injected.") if val.(bool) { err = context.Canceled - return + failpoint.Return(err) } }) From 7271c3ec8d970882a62c9951f92ac0c529cff3dc Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 14 May 2024 09:55:07 +0800 Subject: [PATCH 19/22] address comments --- br/pkg/backup/client.go | 15 +++++++++++---- br/pkg/task/backup.go | 14 ++++++-------- br/pkg/task/backup_raw.go | 2 +- br/pkg/task/backup_txn.go | 3 +-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index ad5513c995089..aaa874a61c8af 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -9,6 +9,7 @@ import ( "encoding/json" "reflect" "strings" + "sync" "time" "github.com/google/btree" @@ -43,6 +44,8 @@ import ( "go.uber.org/zap" ) +var apiVersionSetter sync.Once + // ClientMgr manages connections needed by backup. type ClientMgr interface { GetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) @@ -792,7 +795,6 @@ func BuildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRange func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, - regionCounts []int, request backuppb.BackupRequest, concurrency uint, replicaReadLabel map[string]string, @@ -906,11 +908,15 @@ func (bc *Client) OnBackupResponse( resp.Files, ); err != nil { // flush checkpoint failed, - // this error doesn't not influnce main procedure. so ignore it. - logutil.CL(ctx).Warn("failed to flush checkpoint, ignore it", zap.Error(err)) + logutil.CL(ctx).Error("failed to flush checkpoint", zap.Error(err)) + return err } } pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) + apiVersionSetter.Do(func() { + apiVersion := resp.ApiVersion + bc.SetApiVersion(apiVersion) + }) } else { errPb := resp.GetError() res := errContext.HandleIgnorableError(errPb, storeID) @@ -1146,9 +1152,10 @@ mainLoop: err = bc.OnBackupResponse(handleCtx, respAndStore, errContext, globalProgressTree) if err != nil { // if error occurred here, stop the backup process - // because only 2 kinds of errors will be returned here: + // because only 3 kinds of errors will be returned here: // 1. permission denied on tikv store. // 2. parse backup response error.(shouldn't happen in any case) + // 3. checkpoint update failed. TODO: should we retry here? handleCancel() mainCancel() return err diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 3376b67fb6b90..a04f14f8b519a 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -619,7 +619,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig summary.CollectInt("backup total ranges", len(ranges)) - approximateRegions, regionCounts, err := getRegionCountOfRanges(ctx, mgr, ranges) + approximateRegions, err := getRegionCountOfRanges(ctx, mgr, ranges) if err != nil { return errors.Trace(err) } @@ -684,7 +684,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig }) metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, ranges, regionCounts, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) + err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -748,19 +748,17 @@ func getRegionCountOfRanges( ctx context.Context, mgr *conn.Mgr, ranges []rtree.Range, -) (int, []int, error) { +) (int, error) { // The number of regions need to backup approximateRegions := 0 - // The number array of regions of ranges, thecounts[i] is the number of regions of the range[i]. - counts := make([]int, 0, len(ranges)) for _, r := range ranges { regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) if err != nil { - return 0, nil, errors.Trace(err) + return 0, errors.Trace(err) } - counts = append(counts, regionCount) + approximateRegions += regionCount } - return approximateRegions, counts, nil + return approximateRegions, nil } // ParseTSString port from tidb setSnapshotTS. diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 34d44073b5153..1bb273a55581e 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -216,7 +216,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf } metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, []rtree.Range{rg}, []int{approximateRegions}, req, 1, nil, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, []rtree.Range{rg}, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/backup_txn.go b/br/pkg/task/backup_txn.go index 796330a0792a7..3ec3321b0ff68 100644 --- a/br/pkg/task/backup_txn.go +++ b/br/pkg/task/backup_txn.go @@ -169,7 +169,6 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf if err != nil { return errors.Trace(err) } - regionCounts := []int{approximateRegions} summary.CollectInt("backup total regions", approximateRegions) @@ -202,7 +201,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, backupRanges, regionCounts, req, 1, nil, metaWriter, progressCallBack) + err = client.BackupRanges(ctx, backupRanges, req, 1, nil, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } From 574360c015680f4b0fba35599dec73f69b31c8c1 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 14 May 2024 10:13:38 +0800 Subject: [PATCH 20/22] remove global variable --- br/pkg/backup/client.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index aaa874a61c8af..daf8acebc78e9 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -9,7 +9,6 @@ import ( "encoding/json" "reflect" "strings" - "sync" "time" "github.com/google/btree" @@ -44,8 +43,6 @@ import ( "go.uber.org/zap" ) -var apiVersionSetter sync.Once - // ClientMgr manages connections needed by backup. type ClientMgr interface { GetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) @@ -913,10 +910,8 @@ func (bc *Client) OnBackupResponse( } } pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) - apiVersionSetter.Do(func() { - apiVersion := resp.ApiVersion - bc.SetApiVersion(apiVersion) - }) + apiVersion := resp.ApiVersion + bc.SetApiVersion(apiVersion) } else { errPb := resp.GetError() res := errContext.HandleIgnorableError(errPb, storeID) From 8cec73fb534c4cbf2151e05411ec8c1bb6fe2bb6 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 14 May 2024 14:10:11 +0800 Subject: [PATCH 21/22] address comment --- br/pkg/backup/client.go | 19 ++++++------------- br/pkg/backup/client_test.go | 3 +-- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index daf8acebc78e9..d7b1d40a41d8c 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -768,20 +768,12 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return nil } -func (bc *Client) GetProgressRanges(ranges []rtree.Range) []*rtree.ProgressRange { - prs := make([]*rtree.ProgressRange, 0, len(ranges)) - for _, r := range ranges { - prs = append(prs, bc.getProgressRange(r)) - } - return prs -} - -func BuildProgressRangeTree(pranges []*rtree.ProgressRange) (rtree.ProgressRangeTree, error) { +func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error) { // the response from TiKV only contains the region's key, so use the // progress range tree to quickly seek the region's corresponding progress range. progressRangeTree := rtree.NewProgressRangeTree() - for _, pr := range pranges { - if err := progressRangeTree.Insert(pr); err != nil { + for _, r := range ranges { + if err := progressRangeTree.Insert(bc.getProgressRange(r)); err != nil { return progressRangeTree, errors.Trace(err) } } @@ -838,8 +830,7 @@ func (bc *Client) BackupRanges( // } // }() - pranges := bc.GetProgressRanges(ranges) - globalProgressTree, err := BuildProgressRangeTree(pranges) + globalProgressTree, err := bc.BuildProgressRangeTree(ranges) if err != nil { return errors.Trace(err) } @@ -1058,6 +1049,7 @@ mainLoop: // because we have connectted to pd before. // so this error must be retryable, just make infinite retry here logutil.CL(mainCtx).Error("failed to get backup stores", zap.Uint64("round", round), zap.Error(err)) + mainCancel() continue mainLoop } for _, store := range allStores { @@ -1073,6 +1065,7 @@ mainLoop: // because the we get store info from pd. // there is no customer setting here, so make infinite retry. logutil.CL(ctx).Error("failed to reset backup client", zap.Uint64("round", round), zap.Uint64("storeID", storeID), zap.Error(err)) + mainCancel() continue mainLoop } ch := make(chan *ResponseAndStore) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 747f41f6bf194..5814ae6dc718a 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -281,8 +281,7 @@ func TestBuildProgressRangeTree(t *testing.T) { EndKey: []byte("d"), }, } - pranges := s.backupClient.GetProgressRanges(ranges) - tree, err := backup.BuildProgressRangeTree(pranges) + tree, err := s.backupClient.BuildProgressRangeTree(ranges) require.NoError(t, err) contained, err := tree.FindContained([]byte("a"), []byte("aa")) From 5870354ef70c24387ae2fd94a99641c543ec7e66 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 15 May 2024 15:19:33 +0800 Subject: [PATCH 22/22] fix typo --- br/pkg/backup/push.go | 2 +- br/pkg/restore/client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 8c0aace92a79c..fc8147ae31f62 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -58,7 +58,7 @@ func doSendBackup( bCli, err := client.Backup(ctx, &req) failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { switch val.(string) { - case "Unavaiable": + case "Unavailable": { logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.") err = status.Error(codes.Unavailable, "Unavailable error") diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 770fe28eba6c1..45548e22009a0 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2382,7 +2382,7 @@ func (rc *Client) PreCheckTableTiFlashReplica( // set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash // see details at https://github.com/pingcap/br/issues/931 // TODO maybe set table.Info.TiFlashReplica.Count to tiFlashStoreCount, but we need more tests about it. - log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavaiable", + log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavailable", zap.Stringer("db", table.DB.Name), zap.Stringer("table", table.Info.Name), zap.Uint64("expect tiflash replica", table.Info.TiFlashReplica.Count),