From 220b4e6fd72ea45dda7fe64b790ad2964d2a5bc8 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 24 Apr 2024 13:06:10 +0800 Subject: [PATCH 1/7] This is an automated cherry-pick of #52822 Signed-off-by: ti-chi-bot --- br/pkg/lightning/backend/local/duplicate.go | 7 +- br/pkg/restore/import.go | 3 +- br/pkg/restore/import_retry_test.go | 49 +- br/pkg/restore/split/mock_pd_client.go | 195 +++++ br/pkg/restore/split/split.go | 16 + br/pkg/restore/split/split_test.go | 608 +++++++++++++ pkg/lightning/backend/local/BUILD.bazel | 188 ++++ pkg/lightning/backend/local/region_job.go | 907 ++++++++++++++++++++ 8 files changed, 1968 insertions(+), 5 deletions(-) create mode 100644 br/pkg/restore/split/mock_pd_client.go create mode 100644 pkg/lightning/backend/local/BUILD.bazel create mode 100644 pkg/lightning/backend/local/region_job.go diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 8877c16ae7740..d467be89355f7 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -28,10 +28,14 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" +<<<<<<< HEAD:br/pkg/lightning/backend/local/duplicate.go "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" +======= + berrors "github.com/pingcap/tidb/br/pkg/errors" +>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)):pkg/lightning/backend/local/duplicate.go "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" @@ -297,7 +301,8 @@ func getDupDetectClient( ) (import_sstpb.ImportSST_DuplicateDetectClient, error) { leader := region.Leader if leader == nil { - leader = region.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", region.Region.Id) } importClient, err := importClientFactory.Create(ctx, leader.GetStoreId()) if err != nil { diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 425f170334cd3..f5909d523feb5 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -825,7 +825,8 @@ func (importer *FileImporter) ingestSSTs( ) (*import_sstpb.IngestResponse, error) { leader := regionInfo.Leader if leader == nil { - leader = regionInfo.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", regionInfo.Region.Id) } reqCtx := &kvrpcpb.Context{ RegionId: regionInfo.Region.GetId(), diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index 4e885657f998f..06072834ff5e2 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -51,6 +51,47 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) { } } +<<<<<<< HEAD +======= +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +func initTestClient(isRawKv bool) *TestClient { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} + regions := make(map[uint64]*split.RegionInfo) + for i := uint64(1); i < 6; i++ { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv) + } + regions[i] = &split.RegionInfo{ + Leader: &metapb.Peer{ + Id: i, + StoreId: 1, + }, + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + }, + } + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + return NewTestClient(stores, regions, 6) +} + +>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)) func TestScanSuccess(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) @@ -245,7 +286,7 @@ func TestEpochNotMatch(t *testing.T) { {Id: 43}, }, }, - Leader: &metapb.Peer{Id: 43}, + Leader: &metapb.Peer{Id: 43, StoreId: 1}, } newRegion := pdtypes.NewRegionInfo(info.Region, info.Leader) mergeRegion := func() { @@ -304,7 +345,8 @@ func TestRegionSplit(t *testing.T) { EndKey: codec.EncodeBytes(nil, []byte("aayy")), }, Leader: &metapb.Peer{ - Id: 43, + Id: 43, + StoreId: 1, }, }, { @@ -314,7 +356,8 @@ func TestRegionSplit(t *testing.T) { EndKey: target.Region.EndKey, }, Leader: &metapb.Peer{ - Id: 45, + Id: 45, + StoreId: 1, }, }, } diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go new file mode 100644 index 0000000000000..4bd709260e90a --- /dev/null +++ b/br/pkg/restore/split/mock_pd_client.go @@ -0,0 +1,195 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. + +package split + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/tidb/pkg/store/pdtypes" + "github.com/pingcap/tidb/pkg/util/codec" + pd "github.com/tikv/pd/client" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// MockPDClientForSplit is a mock PD client for testing split and scatter. +type MockPDClientForSplit struct { + pd.Client + + mu sync.Mutex + + Regions *pdtypes.RegionTree + lastRegionID uint64 + scanRegions struct { + errors []error + beforeHook func() + } + splitRegions struct { + count int + hijacked func() (bool, *kvrpcpb.SplitRegionResponse, error) + } + scatterRegion struct { + eachRegionFailBefore int + count map[uint64]int + } + scatterRegions struct { + notImplemented bool + regionCount int + } + getOperator struct { + responses map[uint64][]*pdpb.GetOperatorResponse + } +} + +// NewMockPDClientForSplit creates a new MockPDClientForSplit. +func NewMockPDClientForSplit() *MockPDClientForSplit { + ret := &MockPDClientForSplit{} + ret.Regions = &pdtypes.RegionTree{} + ret.scatterRegion.count = make(map[uint64]int) + return ret +} + +func newRegionNotFullyReplicatedErr(regionID uint64) error { + return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionID) +} + +func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region { + c.mu.Lock() + defer c.mu.Unlock() + + return c.setRegions(boundaries) +} + +func (c *MockPDClientForSplit) setRegions(boundaries [][]byte) []*metapb.Region { + ret := make([]*metapb.Region, 0, len(boundaries)-1) + for i := 1; i < len(boundaries); i++ { + c.lastRegionID++ + r := &metapb.Region{ + Id: c.lastRegionID, + StartKey: boundaries[i-1], + EndKey: boundaries[i], + } + p := &metapb.Peer{ + Id: c.lastRegionID, + StoreId: 1, + } + c.Regions.SetRegion(&pdtypes.Region{ + Meta: r, + Leader: p, + }) + ret = append(ret, r) + } + return ret +} + +func (c *MockPDClientForSplit) ScanRegions( + _ context.Context, + key, endKey []byte, + limit int, + _ ...pd.GetRegionOption, +) ([]*pd.Region, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.scanRegions.errors) > 0 { + err := c.scanRegions.errors[0] + c.scanRegions.errors = c.scanRegions.errors[1:] + return nil, err + } + + if c.scanRegions.beforeHook != nil { + c.scanRegions.beforeHook() + } + + regions := c.Regions.ScanRange(key, endKey, limit) + ret := make([]*pd.Region, 0, len(regions)) + for _, r := range regions { + ret = append(ret, &pd.Region{ + Meta: r.Meta, + Leader: r.Leader, + }) + } + return ret, nil +} + +func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, r := range c.Regions.Regions { + if r.Meta.Id == regionID { + return &pd.Region{ + Meta: r.Meta, + Leader: r.Leader, + }, nil + } + } + return nil, errors.New("region not found") +} + +func (c *MockPDClientForSplit) SplitRegion( + region *RegionInfo, + keys [][]byte, + isRawKV bool, +) (bool, *kvrpcpb.SplitRegionResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.splitRegions.count++ + if c.splitRegions.hijacked != nil { + return c.splitRegions.hijacked() + } + + if !isRawKV { + for i := range keys { + keys[i] = codec.EncodeBytes(nil, keys[i]) + } + } + + newRegionBoundaries := make([][]byte, 0, len(keys)+2) + newRegionBoundaries = append(newRegionBoundaries, region.Region.StartKey) + newRegionBoundaries = append(newRegionBoundaries, keys...) + newRegionBoundaries = append(newRegionBoundaries, region.Region.EndKey) + newRegions := c.setRegions(newRegionBoundaries) + newRegions[0].Id = region.Region.Id + return false, &kvrpcpb.SplitRegionResponse{Regions: newRegions}, nil +} + +func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + c.scatterRegion.count[regionID]++ + if c.scatterRegion.count[regionID] > c.scatterRegion.eachRegionFailBefore { + return nil + } + return newRegionNotFullyReplicatedErr(regionID) +} + +func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.scatterRegions.notImplemented { + return nil, status.Error(codes.Unimplemented, "Ah, yep") + } + c.scatterRegions.regionCount += len(regionIDs) + return &pdpb.ScatterRegionResponse{}, nil +} + +func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.getOperator.responses == nil { + return &pdpb.GetOperatorResponse{Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, nil + } + ret := c.getOperator.responses[regionID][0] + c.getOperator.responses[regionID] = c.getOperator.responses[regionID][1:] + return ret, nil +} diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 4ea5b508594cf..09ed98d82bde0 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -59,7 +59,23 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro } cur := regions[0] + if cur.Leader == nil { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader is nil", cur.Region.Id) + } + if cur.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", cur.Region.Id) + } for _, r := range regions[1:] { + if r.Leader == nil { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader is nil", r.Region.Id) + } + if r.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", r.Region.Id) + } if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s", redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey)) diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 43e5afcff87b8..ee333a1935115 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -71,3 +71,611 @@ func TestScanRegionBackOfferWithStopRetry(t *testing.T) { require.Error(t, err) require.Equal(t, counter, 6) } +<<<<<<< HEAD +======= + +type recordCntBackoffer struct { + already int +} + +func (b *recordCntBackoffer) NextBackoff(error) time.Duration { + b.already++ + return 0 +} + +func (b *recordCntBackoffer) Attempt() int { + return 100 +} + +func TestScatterSequentiallyRetryCnt(t *testing.T) { + mockClient := NewMockPDClientForSplit() + mockClient.scatterRegion.eachRegionFailBefore = 7 + client := pdClient{ + needScatterVal: true, + client: mockClient, + } + client.needScatterInit.Do(func() {}) + + ctx := context.Background() + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + backoffer := &recordCntBackoffer{} + client.scatterRegionsSequentially( + ctx, + regions, + backoffer, + ) + require.Equal(t, 7, backoffer.already) +} + +func TestScatterBackwardCompatibility(t *testing.T) { + mockClient := NewMockPDClientForSplit() + mockClient.scatterRegions.notImplemented = true + client := pdClient{ + needScatterVal: true, + client: mockClient, + } + client.needScatterInit.Do(func() {}) + + ctx := context.Background() + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + err := client.scatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, map[uint64]int{1: 1, 2: 1}, client.client.(*MockPDClientForSplit).scatterRegion.count) +} + +func TestWaitForScatterRegions(t *testing.T) { + mockPDCli := NewMockPDClientForSplit() + mockPDCli.scatterRegions.notImplemented = true + client := pdClient{ + needScatterVal: true, + client: mockPDCli, + } + client.needScatterInit.Do(func() {}) + regionCnt := 6 + checkGetOperatorRespsDrained := func() { + for i := 1; i <= regionCnt; i++ { + require.Len(t, mockPDCli.getOperator.responses[uint64(i)], 0) + } + } + checkNoRetry := func() { + for i := 1; i <= regionCnt; i++ { + require.Equal(t, 0, mockPDCli.scatterRegion.count[uint64(i)]) + } + } + + ctx := context.Background() + regions := make([]*RegionInfo, 0, regionCnt) + for i := 1; i <= regionCnt; i++ { + regions = append(regions, &RegionInfo{ + Region: &metapb.Region{ + Id: uint64(i), + }, + }) + } + + mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) + mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ + {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, + } + mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("not-scatter-region")}, + } + mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, + } + mockPDCli.getOperator.responses[4] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_TIMEOUT}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, + } + mockPDCli.getOperator.responses[5] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, + {Desc: []byte("not-scatter-region")}, + } + // should trigger a retry + mockPDCli.getOperator.responses[6] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_REPLACE}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, + } + + left, err := client.WaitRegionsScattered(ctx, regions) + require.NoError(t, err) + require.Equal(t, 0, left) + for i := 1; i <= 3; i++ { + require.Equal(t, 0, mockPDCli.scatterRegion.count[uint64(i)]) + } + // OperatorStatus_TIMEOUT should trigger rescatter once + require.Equal(t, 1, mockPDCli.scatterRegion.count[uint64(4)]) + // 3 * OperatorStatus_CANCEL should trigger 3 * rescatter + require.Equal(t, 3, mockPDCli.scatterRegion.count[uint64(5)]) + // OperatorStatus_REPLACE should trigger rescatter once + require.Equal(t, 1, mockPDCli.scatterRegion.count[uint64(6)]) + checkGetOperatorRespsDrained() + + // test non-retryable error + + mockPDCli.scatterRegion.count = make(map[uint64]int) + mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) + mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ + {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, + } + mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("not-scatter-region")}, + } + // mimic non-retryable error + mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ + {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_DATA_COMPACTED}}}, + } + left, err = client.WaitRegionsScattered(ctx, regions) + require.ErrorContains(t, err, "get operator error: DATA_COMPACTED") + require.Equal(t, 4, left) // region 3,4,5,6 is not scattered + checkGetOperatorRespsDrained() + checkNoRetry() + + // test backoff is timed-out + + backup := WaitRegionOnlineAttemptTimes + WaitRegionOnlineAttemptTimes = 2 + t.Cleanup(func() { + WaitRegionOnlineAttemptTimes = backup + }) + + mockPDCli.scatterRegion.count = make(map[uint64]int) + mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) + mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ + {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, + } + mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("not-scatter-region")}, + } + mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, + } + mockPDCli.getOperator.responses[4] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, // first retry + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, // second retry + } + mockPDCli.getOperator.responses[5] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("not-scatter-region")}, + } + mockPDCli.getOperator.responses[6] = []*pdpb.GetOperatorResponse{ + {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, + } + left, err = client.WaitRegionsScattered(ctx, regions) + require.ErrorContains(t, err, "the first unfinished region: id:4") + require.Equal(t, 1, left) + checkGetOperatorRespsDrained() + checkNoRetry() +} + +func TestBackoffMayNotCountBackoffer(t *testing.T) { + b := NewBackoffMayNotCountBackoffer() + initVal := b.Attempt() + + b.NextBackoff(ErrBackoffAndDontCount) + require.Equal(t, initVal, b.Attempt()) + // test Annotate, which is the real usage in caller + b.NextBackoff(errors.Annotate(ErrBackoffAndDontCount, "caller message")) + require.Equal(t, initVal, b.Attempt()) + + b.NextBackoff(ErrBackoff) + require.Equal(t, initVal-1, b.Attempt()) + + b.NextBackoff(goerrors.New("test")) + require.Equal(t, 0, b.Attempt()) +} + +func TestSplitCtxCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockCli := NewMockPDClientForSplit() + mockCli.splitRegions.hijacked = func() (bool, *kvrpcpb.SplitRegionResponse, error) { + cancel() + resp := &kvrpcpb.SplitRegionResponse{ + Regions: []*metapb.Region{ + {Id: 1}, + {Id: 2}, + }, + } + return false, resp, nil + } + client := pdClient{ + client: mockCli, + } + + _, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}}) + require.ErrorIs(t, err, context.Canceled) +} + +func TestGetSplitKeyPerRegion(t *testing.T) { + // test case moved from BR + sortedKeys := [][]byte{ + []byte("b"), + []byte("d"), + []byte("g"), + []byte("j"), + []byte("l"), + } + sortedRegions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + StartKey: []byte("a"), + EndKey: []byte("g"), + }, + }, + { + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("g"), + EndKey: []byte("k"), + }, + }, + { + Region: &metapb.Region{ + Id: 3, + StartKey: []byte("k"), + EndKey: []byte("m"), + }, + }, + } + result := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) + require.Equal(t, 3, len(result)) + require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[sortedRegions[0]]) + require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[sortedRegions[1]]) + require.Equal(t, [][]byte{[]byte("l")}, result[sortedRegions[2]]) + + // test case moved from lightning + tableID := int64(1) + keys := []int64{1, 10, 100, 1000, 10000, -1} + sortedRegions = make([]*RegionInfo, 0, len(keys)) + start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0)) + regionStart := codec.EncodeBytes([]byte{}, start) + for i, end := range keys { + var regionEndKey []byte + if end >= 0 { + endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(end)) + regionEndKey = codec.EncodeBytes([]byte{}, endKey) + } + region := &RegionInfo{ + Region: &metapb.Region{ + Id: uint64(i), + StartKey: regionStart, + EndKey: regionEndKey, + }, + } + sortedRegions = append(sortedRegions, region) + regionStart = regionEndKey + } + + checkKeys := map[int64]int{ + 0: -1, + 5: 1, + 6: 1, + 7: 1, + 50: 2, + 60: 2, + 70: 2, + 100: -1, + 50000: 5, + } + expected := map[uint64][][]byte{} + sortedKeys = make([][]byte, 0, len(checkKeys)) + + for hdl, idx := range checkKeys { + key := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(hdl)) + sortedKeys = append(sortedKeys, key) + if idx < 0 { + continue + } + expected[uint64(idx)] = append(expected[uint64(idx)], key) + } + + slices.SortFunc(sortedKeys, bytes.Compare) + for i := range expected { + slices.SortFunc(expected[i], bytes.Compare) + } + + got := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) + require.Equal(t, len(expected), len(got)) + for region, gotKeys := range got { + require.Equal(t, expected[region.Region.GetId()], gotKeys) + } +} + +func checkRegionsBoundaries(t *testing.T, regions []*RegionInfo, expected [][]byte) { + require.Len( + t, regions, len(expected)-1, + "first region start key: %v, last region end key: %v, first expected key: %v, last expected key: %v", + regions[0].Region.StartKey, regions[len(regions)-1].Region.EndKey, + expected[0], expected[len(expected)-1], + ) + for i := 1; i < len(expected); i++ { + require.Equal(t, expected[i-1], regions[i-1].Region.StartKey) + require.Equal(t, expected[i], regions[i-1].Region.EndKey) + } +} + +func TestPaginateScanRegion(t *testing.T) { + ctx := context.Background() + mockPDClient := NewMockPDClientForSplit() + mockClient := &pdClient{ + client: mockPDClient, + } + + backup := WaitRegionOnlineAttemptTimes + WaitRegionOnlineAttemptTimes = 3 + t.Cleanup(func() { + WaitRegionOnlineAttemptTimes = backup + }) + + // no region + _, err := PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) + require.Error(t, err) + require.True(t, berrors.ErrPDBatchScanRegion.Equal(err)) + require.ErrorContains(t, err, "scan region return empty result") + + // retry on error + mockPDClient.scanRegions.errors = []error{ + status.Error(codes.Unavailable, "not leader"), + } + mockPDClient.SetRegions([][]byte{{}, {}}) + got, err := PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) + require.NoError(t, err) + checkRegionsBoundaries(t, got, [][]byte{{}, {}}) + + // test paginate + boundaries := [][]byte{{}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {}} + mockPDClient.SetRegions(boundaries) + got, err = PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) + require.NoError(t, err) + checkRegionsBoundaries(t, got, boundaries) + got, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{}, 3) + require.NoError(t, err) + checkRegionsBoundaries(t, got, boundaries[1:]) + got, err = PaginateScanRegion(ctx, mockClient, []byte{}, []byte{2}, 8) + require.NoError(t, err) + checkRegionsBoundaries(t, got, boundaries[:3]) // [, 1), [1, 2) + got, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{5}, 1) + require.NoError(t, err) + checkRegionsBoundaries(t, got, [][]byte{{4}, {5}}) + + // test start == end + _, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{4}, 1) + require.ErrorContains(t, err, "scan region return empty result") + + // test start > end + _, err = PaginateScanRegion(ctx, mockClient, []byte{5}, []byte{4}, 5) + require.True(t, berrors.ErrInvalidRange.Equal(err)) + require.ErrorContains(t, err, "startKey > endKey") + + // test retry exhausted + mockPDClient.scanRegions.errors = []error{ + status.Error(codes.Unavailable, "not leader"), + status.Error(codes.Unavailable, "not leader"), + status.Error(codes.Unavailable, "not leader"), + } + _, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{5}, 1) + require.ErrorContains(t, err, "not leader") + + // test region not continuous + mockPDClient.Regions = &pdtypes.RegionTree{} + mockPDClient.Regions.SetRegion(&pdtypes.Region{ + Meta: &metapb.Region{ + Id: 1, + StartKey: []byte{1}, + EndKey: []byte{2}, + }, + Leader: &metapb.Peer{ + Id: 1, + StoreId: 1, + }, + }) + mockPDClient.Regions.SetRegion(&pdtypes.Region{ + Meta: &metapb.Region{ + Id: 4, + StartKey: []byte{4}, + EndKey: []byte{5}, + }, + Leader: &metapb.Peer{ + Id: 4, + StoreId: 1, + }, + }) + + _, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{5}, 3) + require.True(t, berrors.ErrPDBatchScanRegion.Equal(err)) + require.ErrorContains(t, err, "region 1's endKey not equal to next region 4's startKey") + + // test region becomes continuous slowly + toAdd := []*pdtypes.Region{ + { + Meta: &metapb.Region{ + Id: 2, + StartKey: []byte{2}, + EndKey: []byte{3}, + }, + Leader: &metapb.Peer{ + Id: 2, + StoreId: 1, + }, + }, + { + Meta: &metapb.Region{ + Id: 3, + StartKey: []byte{3}, + EndKey: []byte{4}, + }, + Leader: &metapb.Peer{ + Id: 3, + StoreId: 1, + }, + }, + } + mockPDClient.scanRegions.beforeHook = func() { + mockPDClient.Regions.SetRegion(toAdd[0]) + toAdd = toAdd[1:] + } + got, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{5}, 100) + require.NoError(t, err) + checkRegionsBoundaries(t, got, [][]byte{{1}, {2}, {3}, {4}, {5}}) +} + +func TestRegionConsistency(t *testing.T) { + cases := []struct { + startKey []byte + endKey []byte + err string + regions []*RegionInfo + }{ + { + codec.EncodeBytes([]byte{}, []byte("a")), + codec.EncodeBytes([]byte{}, []byte("a")), + "scan region return empty result, startKey: (.*?), endKey: (.*?)", + []*RegionInfo{}, + }, + { + codec.EncodeBytes([]byte{}, []byte("a")), + codec.EncodeBytes([]byte{}, []byte("a")), + "first region 1's startKey(.*?) > startKey(.*?)", + []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + StartKey: codec.EncodeBytes([]byte{}, []byte("b")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("b")), + codec.EncodeBytes([]byte{}, []byte("e")), + "last region 100's endKey(.*?) < endKey(.*?)", + []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 100, + StartKey: codec.EncodeBytes([]byte{}, []byte("b")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's endKey not equal to next region 8's startKey(.*?)", + []*RegionInfo{ + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 1, + }, + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("b")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Leader: &metapb.Peer{ + Id: 8, + StoreId: 1, + }, + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("e")), + EndKey: codec.EncodeBytes([]byte{}, []byte("f")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader is nil(.*?)", + []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader's store id is 0(.*?)", + []*RegionInfo{ + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, + }, + }, + }, + } + for _, ca := range cases { + err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions) + require.Error(t, err) + require.Regexp(t, ca.err, err.Error()) + } +} +>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)) diff --git a/pkg/lightning/backend/local/BUILD.bazel b/pkg/lightning/backend/local/BUILD.bazel new file mode 100644 index 0000000000000..c297e333d2d7d --- /dev/null +++ b/pkg/lightning/backend/local/BUILD.bazel @@ -0,0 +1,188 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "local", + srcs = [ + "checksum.go", + "compress.go", + "disk_quota.go", + "duplicate.go", + "engine.go", + "engine_mgr.go", + "iterator.go", + "local.go", + "local_freebsd.go", + "local_unix.go", + "local_unix_generic.go", + "local_windows.go", + "localhelper.go", + "region_job.go", + "tikv_mode.go", + ], + importpath = "github.com/pingcap/tidb/pkg/lightning/backend/local", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/checksum", + "//br/pkg/errors", + "//br/pkg/logutil", + "//br/pkg/membuf", + "//br/pkg/pdutil", + "//br/pkg/restore/split", + "//br/pkg/storage", + "//br/pkg/version", + "//pkg/distsql", + "//pkg/infoschema", + "//pkg/kv", + "//pkg/lightning/backend", + "//pkg/lightning/backend/encode", + "//pkg/lightning/backend/external", + "//pkg/lightning/backend/kv", + "//pkg/lightning/checkpoints", + "//pkg/lightning/common", + "//pkg/lightning/config", + "//pkg/lightning/errormanager", + "//pkg/lightning/log", + "//pkg/lightning/manual", + "//pkg/lightning/metric", + "//pkg/lightning/mydump", + "//pkg/lightning/tikv", + "//pkg/lightning/verification", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/sessionctx/variable", + "//pkg/table", + "//pkg/table/tables", + "//pkg/tablecodec", + "//pkg/util", + "//pkg/util/codec", + "//pkg/util/compress", + "//pkg/util/engine", + "//pkg/util/hack", + "//pkg/util/logutil", + "//pkg/util/mathutil", + "//pkg/util/ranger", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//objstorage/objstorageprovider", + "@com_github_cockroachdb_pebble//sstable", + "@com_github_cockroachdb_pebble//vfs", + "@com_github_coreos_go_semver//semver", + "@com_github_docker_go_units//:go-units", + "@com_github_google_btree//:btree", + "@com_github_google_uuid//:uuid", + "@com_github_klauspost_compress//gzip", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", + "@com_github_tikv_pd_client//retry", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", + "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "local_test", + timeout = "short", + srcs = [ + "checksum_test.go", + "compress_test.go", + "disk_quota_test.go", + "duplicate_test.go", + "engine_mgr_test.go", + "engine_test.go", + "iterator_test.go", + "local_check_test.go", + "local_test.go", + "localhelper_test.go", + "main_test.go", + "region_job_test.go", + ], + embed = [":local"], + flaky = True, + race = "on", + shard_count = 50, + deps = [ + "//br/pkg/membuf", + "//br/pkg/mock/mocklocal", + "//br/pkg/restore/split", + "//br/pkg/storage", + "//br/pkg/utils", + "//pkg/ddl", + "//pkg/errno", + "//pkg/keyspace", + "//pkg/kv", + "//pkg/lightning/backend", + "//pkg/lightning/backend/encode", + "//pkg/lightning/backend/external", + "//pkg/lightning/backend/kv", + "//pkg/lightning/checkpoints", + "//pkg/lightning/common", + "//pkg/lightning/config", + "//pkg/lightning/log", + "//pkg/lightning/mydump", + "//pkg/parser", + "//pkg/parser/ast", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/sessionctx/stmtctx", + "//pkg/store/pdtypes", + "//pkg/table", + "//pkg/table/tables", + "//pkg/tablecodec", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util", + "//pkg/util/codec", + "//pkg/util/engine", + "//pkg/util/hack", + "//pkg/util/mock", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//objstorage/objstorageprovider", + "@com_github_cockroachdb_pebble//sstable", + "@com_github_cockroachdb_pebble//vfs", + "@com_github_coreos_go_semver//semver", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_docker_go_units//:go-units", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//errs", + "@com_github_tikv_pd_client//http", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//encoding", + "@org_golang_google_grpc//status", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_mock//gomock", + ], +) diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go new file mode 100644 index 0000000000000..7bc812e4b9bb6 --- /dev/null +++ b/pkg/lightning/backend/local/region_job.go @@ -0,0 +1,907 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "container/heap" + "context" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/errorpb" + sst "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/restore/split" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/lightning/common" + "github.com/pingcap/tidb/pkg/lightning/config" + "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/lightning/metric" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type jobStageTp string + +/* + + + v + +------+------+ + +->+regionScanned+<------+ + | +------+------+ | + | | | + | | | + | v | + | +--+--+ +-----+----+ + | |wrote+---->+needRescan| + | +--+--+ +-----+----+ + | | ^ + | | | + | v | + | +---+----+ | + +-----+ingested+---------+ + +---+----+ + | + v + +above diagram shows the state transition of a region job, here are some special +cases: + - regionScanned can directly jump to ingested if the keyRange has no data + - regionScanned can only transit to wrote. TODO: check if it should be transited + to needRescan + - if a job only partially writes the data, after it becomes ingested, it will + update its keyRange and transits to regionScanned to continue the remaining + data + - needRescan may output multiple regionScanned jobs when the old region is split +*/ +const ( + regionScanned jobStageTp = "regionScanned" + wrote jobStageTp = "wrote" + ingested jobStageTp = "ingested" + needRescan jobStageTp = "needRescan" + + // suppose each KV is about 32 bytes, 16 * units.KiB / 32 = 512 + defaultKVBatchCount = 512 +) + +func (j jobStageTp) String() string { + return string(j) +} + +// regionJob is dedicated to import the data in [keyRange.start, keyRange.end) +// to a region. The keyRange may be changed when processing because of writing +// partial data to TiKV or region split. +type regionJob struct { + keyRange common.Range + // TODO: check the keyRange so that it's always included in region + region *split.RegionInfo + // stage should be updated only by convertStageTo + stage jobStageTp + // writeResult is available only in wrote and ingested stage + writeResult *tikvWriteResult + + ingestData common.IngestData + regionSplitSize int64 + regionSplitKeys int64 + metrics *metric.Common + + retryCount int + waitUntil time.Time + lastRetryableErr error + + // injected is used in test to set the behaviour + injected []injectedBehaviour +} + +type tikvWriteResult struct { + sstMeta []*sst.SSTMeta + count int64 + totalBytes int64 + remainingStartKey []byte +} + +type injectedBehaviour struct { + write injectedWriteBehaviour + ingest injectedIngestBehaviour +} + +type injectedWriteBehaviour struct { + result *tikvWriteResult + err error +} + +type injectedIngestBehaviour struct { + nextStage jobStageTp + err error +} + +func (j *regionJob) convertStageTo(stage jobStageTp) { + j.stage = stage + switch stage { + case regionScanned: + j.writeResult = nil + case ingested: + // when writing is skipped because key range is empty + if j.writeResult == nil { + return + } + + j.ingestData.Finish(j.writeResult.totalBytes, j.writeResult.count) + if j.metrics != nil { + j.metrics.BytesCounter.WithLabelValues(metric.StateImported). + Add(float64(j.writeResult.totalBytes)) + } + case needRescan: + j.region = nil + } +} + +// ref means that the ingestData of job will be accessed soon. +func (j *regionJob) ref(wg *sync.WaitGroup) { + if wg != nil { + wg.Add(1) + } + if j.ingestData != nil { + j.ingestData.IncRef() + } +} + +// done promises that the ingestData of job will not be accessed. Same amount of +// done should be called to release the ingestData. +func (j *regionJob) done(wg *sync.WaitGroup) { + if j.ingestData != nil { + j.ingestData.DecRef() + } + if wg != nil { + wg.Done() + } +} + +// writeToTiKV writes the data to TiKV and mark this job as wrote stage. +// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. +// if any underlying logic has error, writeToTiKV will return an error. +// we don't need to do cleanup for the pairs written to tikv if encounters an error, +// tikv will take the responsibility to do so. +// TODO: let client-go provide a high-level write interface. +func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { + err := local.doWrite(ctx, j) + if err == nil { + return nil + } + if !common.IsRetryableError(err) { + return err + } + // currently only one case will restart write + if strings.Contains(err.Error(), "RequestTooNew") { + j.convertStageTo(regionScanned) + return err + } + j.convertStageTo(needRescan) + return err +} + +func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { + if j.stage != regionScanned { + return nil + } + + failpoint.Inject("fakeRegionJobs", func() { + front := j.injected[0] + j.injected = j.injected[1:] + j.writeResult = front.write.result + err := front.write.err + if err == nil { + j.convertStageTo(wrote) + } + failpoint.Return(err) + }) + + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, 15*time.Minute, common.ErrWriteTooSlow) + defer cancel() + + apiVersion := local.tikvCodec.GetAPIVersion() + clientFactory := local.importClientFactory + kvBatchSize := local.KVWriteBatchSize + bufferPool := local.engineMgr.getBufferPool() + writeLimiter := local.writeLimiter + + begin := time.Now() + region := j.region.Region + + firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.Start, j.keyRange.End) + if err != nil { + return errors.Trace(err) + } + if firstKey == nil { + j.convertStageTo(ingested) + log.FromContext(ctx).Debug("keys within region is empty, skip doIngest", + logutil.Key("start", j.keyRange.Start), + logutil.Key("regionStart", region.StartKey), + logutil.Key("end", j.keyRange.End), + logutil.Key("regionEnd", region.EndKey)) + return nil + } + + firstKey = codec.EncodeBytes([]byte{}, firstKey) + lastKey = codec.EncodeBytes([]byte{}, lastKey) + + u := uuid.New() + meta := &sst.SSTMeta{ + Uuid: u[:], + RegionId: region.GetId(), + RegionEpoch: region.GetRegionEpoch(), + Range: &sst.Range{ + Start: firstKey, + End: lastKey, + }, + ApiVersion: apiVersion, + } + + failpoint.Inject("changeEpochVersion", func(val failpoint.Value) { + cloned := *meta.RegionEpoch + meta.RegionEpoch = &cloned + i := val.(int) + if i >= 0 { + meta.RegionEpoch.Version += uint64(i) + } else { + meta.RegionEpoch.ConfVer -= uint64(-i) + } + }) + + annotateErr := func(in error, peer *metapb.Peer, msg string) error { + // annotate the error with peer/store/region info to help debug. + return errors.Annotatef( + in, + "peer %d, store %d, region %d, epoch %s, %s", + peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String(), + msg, + ) + } + + leaderID := j.region.Leader.GetId() + clients := make([]sst.ImportSST_WriteClient, 0, len(region.GetPeers())) + allPeers := make([]*metapb.Peer, 0, len(region.GetPeers())) + req := &sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: meta, + }, + Context: &kvrpcpb.Context{ + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: local.ResourceGroupName, + }, + RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType), + }, + } + for _, peer := range region.GetPeers() { + cli, err := clientFactory.Create(ctx, peer.StoreId) + if err != nil { + return annotateErr(err, peer, "when create client") + } + + wstream, err := cli.Write(ctx) + if err != nil { + return annotateErr(err, peer, "when open write stream") + } + + failpoint.Inject("mockWritePeerErr", func() { + err = errors.Errorf("mock write peer error") + failpoint.Return(annotateErr(err, peer, "when open write stream")) + }) + + // Bind uuid for this write request + if err = wstream.Send(req); err != nil { + return annotateErr(err, peer, "when send meta") + } + clients = append(clients, wstream) + allPeers = append(allPeers, peer) + } + dataCommitTS := j.ingestData.GetTS() + req.Chunk = &sst.WriteRequest_Batch{ + Batch: &sst.WriteBatch{ + CommitTs: dataCommitTS, + }, + } + + pairs := make([]*sst.Pair, 0, defaultKVBatchCount) + count := 0 + size := int64(0) + totalSize := int64(0) + totalCount := int64(0) + // if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split + // because the range-properties is not 100% accurate + regionMaxSize := j.regionSplitSize + if j.regionSplitSize <= int64(config.SplitRegionSize) { + regionMaxSize = j.regionSplitSize * 4 / 3 + } + + flushKVs := func() error { + req.Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] + preparedMsg := &grpc.PreparedMsg{} + // by reading the source code, Encode need to find codec and compression from the stream + // because all stream has the same codec and compression, we can use any one of them + if err := preparedMsg.Encode(clients[0], req); err != nil { + return err + } + + for i := range clients { + if err := writeLimiter.WaitN(ctx, allPeers[i].StoreId, int(size)); err != nil { + return errors.Trace(err) + } + if err := clients[i].SendMsg(preparedMsg); err != nil { + if err == io.EOF { + // if it's EOF, need RecvMsg to get the error + dummy := &sst.WriteResponse{} + err = clients[i].RecvMsg(dummy) + } + return annotateErr(err, allPeers[i], "when send data") + } + } + failpoint.Inject("afterFlushKVs", func() { + log.FromContext(ctx).Info(fmt.Sprintf("afterFlushKVs count=%d,size=%d", count, size)) + }) + return nil + } + + iter := j.ingestData.NewIter(ctx, j.keyRange.Start, j.keyRange.End, bufferPool) + //nolint: errcheck + defer iter.Close() + + var remainingStartKey []byte + for iter.First(); iter.Valid(); iter.Next() { + k, v := iter.Key(), iter.Value() + kvSize := int64(len(k) + len(v)) + // here we reuse the `*sst.Pair`s to optimize object allocation + if count < len(pairs) { + pairs[count].Key = k + pairs[count].Value = v + } else { + pair := &sst.Pair{ + Key: k, + Value: v, + } + pairs = append(pairs, pair) + } + count++ + totalCount++ + size += kvSize + totalSize += kvSize + + if size >= kvBatchSize { + if err := flushKVs(); err != nil { + return errors.Trace(err) + } + count = 0 + size = 0 + iter.ReleaseBuf() + } + if totalSize >= regionMaxSize || totalCount >= j.regionSplitKeys { + // we will shrink the key range of this job to real written range + if iter.Next() { + remainingStartKey = append([]byte{}, iter.Key()...) + log.FromContext(ctx).Info("write to tikv partial finish", + zap.Int64("count", totalCount), + zap.Int64("size", totalSize), + logutil.Key("startKey", j.keyRange.Start), + logutil.Key("endKey", j.keyRange.End), + logutil.Key("remainStart", remainingStartKey), + logutil.Region(region), + logutil.Leader(j.region.Leader), + zap.Uint64("commitTS", dataCommitTS)) + } + break + } + } + + if iter.Error() != nil { + return errors.Trace(iter.Error()) + } + + if count > 0 { + if err := flushKVs(); err != nil { + return errors.Trace(err) + } + count = 0 + size = 0 + iter.ReleaseBuf() + } + + var leaderPeerMetas []*sst.SSTMeta + for i, wStream := range clients { + resp, closeErr := wStream.CloseAndRecv() + if closeErr != nil { + return annotateErr(closeErr, allPeers[i], "when close write stream") + } + if resp.Error != nil { + return annotateErr(errors.New("resp error: "+resp.Error.Message), allPeers[i], "when close write stream") + } + if leaderID == region.Peers[i].GetId() { + leaderPeerMetas = resp.Metas + log.FromContext(ctx).Debug("get metas after write kv stream to tikv", zap.Reflect("metas", leaderPeerMetas)) + } + } + + failpoint.Inject("NoLeader", func() { + log.FromContext(ctx).Warn("enter failpoint NoLeader") + leaderPeerMetas = nil + }) + + // if there is not leader currently, we don't forward the stage to wrote and let caller + // handle the retry. + if len(leaderPeerMetas) == 0 { + log.FromContext(ctx).Warn("write to tikv no leader", + logutil.Region(region), logutil.Leader(j.region.Leader), + zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta), + zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize)) + return common.ErrNoLeader.GenWithStackByArgs(region.Id, leaderID) + } + + takeTime := time.Since(begin) + log.FromContext(ctx).Debug("write to kv", zap.Reflect("region", j.region), zap.Uint64("leader", leaderID), + zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas), + zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize), + zap.Stringer("takeTime", takeTime)) + if m, ok := metric.FromContext(ctx); ok { + m.SSTSecondsHistogram.WithLabelValues(metric.SSTProcessWrite).Observe(takeTime.Seconds()) + } + + j.writeResult = &tikvWriteResult{ + sstMeta: leaderPeerMetas, + count: totalCount, + totalBytes: totalSize, + remainingStartKey: remainingStartKey, + } + j.convertStageTo(wrote) + return nil +} + +// ingest tries to finish the regionJob. +// if any ingest logic has error, ingest may retry sometimes to resolve it and finally +// set job to a proper stage with nil error returned. +// if any underlying logic has error, ingest will return an error to let caller +// handle it. +func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) { + if j.stage != wrote { + return nil + } + + failpoint.Inject("fakeRegionJobs", func() { + front := j.injected[0] + j.injected = j.injected[1:] + j.convertStageTo(front.ingest.nextStage) + failpoint.Return(front.ingest.err) + }) + + if len(j.writeResult.sstMeta) == 0 { + j.convertStageTo(ingested) + return nil + } + + if m, ok := metric.FromContext(ctx); ok { + begin := time.Now() + defer func() { + if err == nil { + m.SSTSecondsHistogram.WithLabelValues(metric.SSTProcessIngest).Observe(time.Since(begin).Seconds()) + } + }() + } + + for retry := 0; retry < maxRetryTimes; retry++ { + resp, err := local.doIngest(ctx, j) + if err == nil && resp.GetError() == nil { + j.convertStageTo(ingested) + return nil + } + if err != nil { + if common.IsContextCanceledError(err) { + return err + } + log.FromContext(ctx).Warn("meet underlying error, will retry ingest", + log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta), + logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) + continue + } + canContinue, err := j.convertStageOnIngestError(resp) + if common.IsContextCanceledError(err) { + return err + } + if !canContinue { + log.FromContext(ctx).Warn("meet error and handle the job later", + zap.Stringer("job stage", j.stage), + logutil.ShortError(j.lastRetryableErr), + j.region.ToZapFields(), + logutil.Key("start", j.keyRange.Start), + logutil.Key("end", j.keyRange.End)) + return nil + } + log.FromContext(ctx).Warn("meet error and will doIngest region again", + logutil.ShortError(j.lastRetryableErr), + j.region.ToZapFields(), + logutil.Key("start", j.keyRange.Start), + logutil.Key("end", j.keyRange.End)) + } + return nil +} + +func (local *Backend) checkWriteStall( + ctx context.Context, + region *split.RegionInfo, +) (bool, *sst.IngestResponse, error) { + clientFactory := local.importClientFactory + for _, peer := range region.Region.GetPeers() { + cli, err := clientFactory.Create(ctx, peer.StoreId) + if err != nil { + return false, nil, errors.Trace(err) + } + // currently we use empty MultiIngestRequest to check if TiKV is busy. + // If in future the rate limit feature contains more metrics we can switch to use it. + resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err != nil { + return false, nil, errors.Trace(err) + } + if resp.Error != nil && resp.Error.ServerIsBusy != nil { + return true, resp, nil + } + } + return false, nil, nil +} + +// doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta. +// When meet error, it will remove finished sstMetas before return. +func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) { + clientFactory := local.importClientFactory + supportMultiIngest := local.supportMultiIngest + shouldCheckWriteStall := local.ShouldCheckWriteStall + if shouldCheckWriteStall { + writeStall, resp, err := local.checkWriteStall(ctx, j.region) + if err != nil { + return nil, errors.Trace(err) + } + if writeStall { + return resp, nil + } + } + + batch := 1 + if supportMultiIngest { + batch = len(j.writeResult.sstMeta) + } + + var resp *sst.IngestResponse + for start := 0; start < len(j.writeResult.sstMeta); start += batch { + end := min(start+batch, len(j.writeResult.sstMeta)) + ingestMetas := j.writeResult.sstMeta[start:end] + + log.FromContext(ctx).Debug("ingest meta", zap.Reflect("meta", ingestMetas)) + + failpoint.Inject("FailIngestMeta", func(val failpoint.Value) { + // only inject the error once + var resp *sst.IngestResponse + + switch val.(string) { + case "notleader": + resp = &sst.IngestResponse{ + Error: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{ + RegionId: j.region.Region.Id, + Leader: j.region.Leader, + }, + }, + } + case "epochnotmatch": + resp = &sst.IngestResponse{ + Error: &errorpb.Error{ + EpochNotMatch: &errorpb.EpochNotMatch{ + CurrentRegions: []*metapb.Region{j.region.Region}, + }, + }, + } + } + failpoint.Return(resp, nil) + }) + + leader := j.region.Leader + if leader == nil { + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", j.region.Region.Id) + } + + cli, err := clientFactory.Create(ctx, leader.StoreId) + if err != nil { + return nil, errors.Trace(err) + } + reqCtx := &kvrpcpb.Context{ + RegionId: j.region.Region.GetId(), + RegionEpoch: j.region.Region.GetRegionEpoch(), + Peer: leader, + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: local.ResourceGroupName, + }, + RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType), + } + + if supportMultiIngest { + req := &sst.MultiIngestRequest{ + Context: reqCtx, + Ssts: ingestMetas, + } + resp, err = cli.MultiIngest(ctx, req) + } else { + req := &sst.IngestRequest{ + Context: reqCtx, + Sst: ingestMetas[0], + } + resp, err = cli.Ingest(ctx, req) + } + if resp.GetError() != nil || err != nil { + // remove finished sstMetas + j.writeResult.sstMeta = j.writeResult.sstMeta[start:] + return resp, errors.Trace(err) + } + } + return resp, nil +} + +// convertStageOnIngestError will try to fix the error contained in ingest response. +// Return (_, error) when another error occurred. +// Return (true, nil) when the job can retry ingesting immediately. +// Return (false, nil) when the job should be put back to queue. +func (j *regionJob) convertStageOnIngestError( + resp *sst.IngestResponse, +) (bool, error) { + if resp.GetError() == nil { + return true, nil + } + + var newRegion *split.RegionInfo + switch errPb := resp.GetError(); { + case errPb.NotLeader != nil: + j.lastRetryableErr = common.ErrKVNotLeader.GenWithStack(errPb.GetMessage()) + + // meet a problem that the region leader+peer are all updated but the return + // error is only "NotLeader", we should update the whole region info. + j.convertStageTo(needRescan) + return false, nil + case errPb.EpochNotMatch != nil: + j.lastRetryableErr = common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage()) + + if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil { + var currentRegion *metapb.Region + for _, r := range currentRegions { + if insideRegion(r, j.writeResult.sstMeta) { + currentRegion = r + break + } + } + if currentRegion != nil { + var newLeader *metapb.Peer + for _, p := range currentRegion.Peers { + if p.GetStoreId() == j.region.Leader.GetStoreId() { + newLeader = p + break + } + } + if newLeader != nil { + newRegion = &split.RegionInfo{ + Leader: newLeader, + Region: currentRegion, + } + } + } + } + if newRegion != nil { + j.region = newRegion + j.convertStageTo(regionScanned) + return false, nil + } + j.convertStageTo(needRescan) + return false, nil + case strings.Contains(errPb.Message, "raft: proposal dropped"): + j.lastRetryableErr = common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage()) + + j.convertStageTo(needRescan) + return false, nil + case errPb.ServerIsBusy != nil: + j.lastRetryableErr = common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage()) + + return false, nil + case errPb.RegionNotFound != nil: + j.lastRetryableErr = common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage()) + + j.convertStageTo(needRescan) + return false, nil + case errPb.ReadIndexNotReady != nil: + j.lastRetryableErr = common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage()) + + // this error happens when this region is splitting, the error might be: + // read index not ready, reason can not read index due to split, region 64037 + // we have paused schedule, but it's temporary, + // if next request takes a long time, there's chance schedule is enabled again + // or on key range border, another engine sharing this region tries to split this + // region may cause this error too. + j.convertStageTo(needRescan) + return false, nil + case errPb.DiskFull != nil: + j.lastRetryableErr = common.ErrKVIngestFailed.GenWithStack(errPb.GetMessage()) + + return false, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) + } + // all others doIngest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange + j.lastRetryableErr = common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage()) + j.convertStageTo(regionScanned) + return false, nil +} + +type regionJobRetryHeap []*regionJob + +var _ heap.Interface = (*regionJobRetryHeap)(nil) + +func (h *regionJobRetryHeap) Len() int { + return len(*h) +} + +func (h *regionJobRetryHeap) Less(i, j int) bool { + v := *h + return v[i].waitUntil.Before(v[j].waitUntil) +} + +func (h *regionJobRetryHeap) Swap(i, j int) { + v := *h + v[i], v[j] = v[j], v[i] +} + +func (h *regionJobRetryHeap) Push(x any) { + *h = append(*h, x.(*regionJob)) +} + +func (h *regionJobRetryHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// regionJobRetryer is a concurrent-safe queue holding jobs that need to put +// back later, and put back when the regionJob.waitUntil is reached. It maintains +// a heap of jobs internally based on the regionJob.waitUntil field. +type regionJobRetryer struct { + // lock acquiring order: protectedClosed > protectedQueue > protectedToPutBack + protectedClosed struct { + mu sync.Mutex + closed bool + } + protectedQueue struct { + mu sync.Mutex + q regionJobRetryHeap + } + protectedToPutBack struct { + mu sync.Mutex + toPutBack *regionJob + } + putBackCh chan<- *regionJob + reload chan struct{} + jobWg *sync.WaitGroup +} + +// startRegionJobRetryer starts a new regionJobRetryer and it will run in +// background to put the job back to `putBackCh` when job's waitUntil is reached. +// Cancel the `ctx` will stop retryer and `jobWg.Done` will be trigger for jobs +// that are not put back yet. +func startRegionJobRetryer( + ctx context.Context, + putBackCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) *regionJobRetryer { + ret := ®ionJobRetryer{ + putBackCh: putBackCh, + reload: make(chan struct{}, 1), + jobWg: jobWg, + } + ret.protectedQueue.q = make(regionJobRetryHeap, 0, 16) + go ret.run(ctx) + return ret +} + +// run is only internally used, caller should not use it. +func (q *regionJobRetryer) run(ctx context.Context) { + defer q.close() + + for { + var front *regionJob + q.protectedQueue.mu.Lock() + if len(q.protectedQueue.q) > 0 { + front = q.protectedQueue.q[0] + } + q.protectedQueue.mu.Unlock() + + switch { + case front != nil: + select { + case <-ctx.Done(): + return + case <-q.reload: + case <-time.After(time.Until(front.waitUntil)): + q.protectedQueue.mu.Lock() + q.protectedToPutBack.mu.Lock() + q.protectedToPutBack.toPutBack = heap.Pop(&q.protectedQueue.q).(*regionJob) + // release the lock of queue to avoid blocking regionJobRetryer.push + q.protectedQueue.mu.Unlock() + + // hold the lock of toPutBack to make sending to putBackCh and + // resetting toPutBack atomic w.r.t. regionJobRetryer.close + select { + case <-ctx.Done(): + q.protectedToPutBack.mu.Unlock() + return + case q.putBackCh <- q.protectedToPutBack.toPutBack: + q.protectedToPutBack.toPutBack = nil + q.protectedToPutBack.mu.Unlock() + } + } + default: + // len(q.q) == 0 + select { + case <-ctx.Done(): + return + case <-q.reload: + } + } + } +} + +// close is only internally used, caller should not use it. +func (q *regionJobRetryer) close() { + q.protectedClosed.mu.Lock() + defer q.protectedClosed.mu.Unlock() + q.protectedClosed.closed = true + + if q.protectedToPutBack.toPutBack != nil { + q.protectedToPutBack.toPutBack.done(q.jobWg) + } + for _, job := range q.protectedQueue.q { + job.done(q.jobWg) + } +} + +// push should not be blocked for long time in any cases. +func (q *regionJobRetryer) push(job *regionJob) bool { + q.protectedClosed.mu.Lock() + defer q.protectedClosed.mu.Unlock() + if q.protectedClosed.closed { + return false + } + + q.protectedQueue.mu.Lock() + heap.Push(&q.protectedQueue.q, job) + q.protectedQueue.mu.Unlock() + + select { + case q.reload <- struct{}{}: + default: + } + return true +} From 0e66c9bfd1b2b523392b1f50670045b646b96bbc Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Jun 2024 10:31:14 +0800 Subject: [PATCH 2/7] fix some git conflict Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/duplicate.go | 5 +- br/pkg/restore/import_retry_test.go | 41 - br/pkg/restore/split/mock_pd_client.go | 195 ----- pkg/lightning/backend/local/BUILD.bazel | 188 ---- pkg/lightning/backend/local/region_job.go | 907 -------------------- 5 files changed, 1 insertion(+), 1335 deletions(-) delete mode 100644 br/pkg/restore/split/mock_pd_client.go delete mode 100644 pkg/lightning/backend/local/BUILD.bazel delete mode 100644 pkg/lightning/backend/local/region_job.go diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index d467be89355f7..33b1fa3a039eb 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -28,14 +28,11 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" -<<<<<<< HEAD:br/pkg/lightning/backend/local/duplicate.go + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" -======= - berrors "github.com/pingcap/tidb/br/pkg/errors" ->>>>>>> 0805e850d41 (br: handle region leader miss (#52822)):pkg/lightning/backend/local/duplicate.go "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index 06072834ff5e2..cbe8275b0209c 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -51,47 +51,6 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) { } } -<<<<<<< HEAD -======= -// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) -func initTestClient(isRawKv bool) *TestClient { - peers := make([]*metapb.Peer, 1) - peers[0] = &metapb.Peer{ - Id: 1, - StoreId: 1, - } - keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} - regions := make(map[uint64]*split.RegionInfo) - for i := uint64(1); i < 6; i++ { - startKey := []byte(keys[i-1]) - if len(startKey) != 0 { - startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv) - } - endKey := []byte(keys[i]) - if len(endKey) != 0 { - endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv) - } - regions[i] = &split.RegionInfo{ - Leader: &metapb.Peer{ - Id: i, - StoreId: 1, - }, - Region: &metapb.Region{ - Id: i, - Peers: peers, - StartKey: startKey, - EndKey: endKey, - }, - } - } - stores := make(map[uint64]*metapb.Store) - stores[1] = &metapb.Store{ - Id: 1, - } - return NewTestClient(stores, regions, 6) -} - ->>>>>>> 0805e850d41 (br: handle region leader miss (#52822)) func TestScanSuccess(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go deleted file mode 100644 index 4bd709260e90a..0000000000000 --- a/br/pkg/restore/split/mock_pd_client.go +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. - -package split - -import ( - "context" - "sync" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/tidb/pkg/store/pdtypes" - "github.com/pingcap/tidb/pkg/util/codec" - pd "github.com/tikv/pd/client" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// MockPDClientForSplit is a mock PD client for testing split and scatter. -type MockPDClientForSplit struct { - pd.Client - - mu sync.Mutex - - Regions *pdtypes.RegionTree - lastRegionID uint64 - scanRegions struct { - errors []error - beforeHook func() - } - splitRegions struct { - count int - hijacked func() (bool, *kvrpcpb.SplitRegionResponse, error) - } - scatterRegion struct { - eachRegionFailBefore int - count map[uint64]int - } - scatterRegions struct { - notImplemented bool - regionCount int - } - getOperator struct { - responses map[uint64][]*pdpb.GetOperatorResponse - } -} - -// NewMockPDClientForSplit creates a new MockPDClientForSplit. -func NewMockPDClientForSplit() *MockPDClientForSplit { - ret := &MockPDClientForSplit{} - ret.Regions = &pdtypes.RegionTree{} - ret.scatterRegion.count = make(map[uint64]int) - return ret -} - -func newRegionNotFullyReplicatedErr(regionID uint64) error { - return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionID) -} - -func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region { - c.mu.Lock() - defer c.mu.Unlock() - - return c.setRegions(boundaries) -} - -func (c *MockPDClientForSplit) setRegions(boundaries [][]byte) []*metapb.Region { - ret := make([]*metapb.Region, 0, len(boundaries)-1) - for i := 1; i < len(boundaries); i++ { - c.lastRegionID++ - r := &metapb.Region{ - Id: c.lastRegionID, - StartKey: boundaries[i-1], - EndKey: boundaries[i], - } - p := &metapb.Peer{ - Id: c.lastRegionID, - StoreId: 1, - } - c.Regions.SetRegion(&pdtypes.Region{ - Meta: r, - Leader: p, - }) - ret = append(ret, r) - } - return ret -} - -func (c *MockPDClientForSplit) ScanRegions( - _ context.Context, - key, endKey []byte, - limit int, - _ ...pd.GetRegionOption, -) ([]*pd.Region, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if len(c.scanRegions.errors) > 0 { - err := c.scanRegions.errors[0] - c.scanRegions.errors = c.scanRegions.errors[1:] - return nil, err - } - - if c.scanRegions.beforeHook != nil { - c.scanRegions.beforeHook() - } - - regions := c.Regions.ScanRange(key, endKey, limit) - ret := make([]*pd.Region, 0, len(regions)) - for _, r := range regions { - ret = append(ret, &pd.Region{ - Meta: r.Meta, - Leader: r.Leader, - }) - } - return ret, nil -} - -func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) { - c.mu.Lock() - defer c.mu.Unlock() - - for _, r := range c.Regions.Regions { - if r.Meta.Id == regionID { - return &pd.Region{ - Meta: r.Meta, - Leader: r.Leader, - }, nil - } - } - return nil, errors.New("region not found") -} - -func (c *MockPDClientForSplit) SplitRegion( - region *RegionInfo, - keys [][]byte, - isRawKV bool, -) (bool, *kvrpcpb.SplitRegionResponse, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.splitRegions.count++ - if c.splitRegions.hijacked != nil { - return c.splitRegions.hijacked() - } - - if !isRawKV { - for i := range keys { - keys[i] = codec.EncodeBytes(nil, keys[i]) - } - } - - newRegionBoundaries := make([][]byte, 0, len(keys)+2) - newRegionBoundaries = append(newRegionBoundaries, region.Region.StartKey) - newRegionBoundaries = append(newRegionBoundaries, keys...) - newRegionBoundaries = append(newRegionBoundaries, region.Region.EndKey) - newRegions := c.setRegions(newRegionBoundaries) - newRegions[0].Id = region.Region.Id - return false, &kvrpcpb.SplitRegionResponse{Regions: newRegions}, nil -} - -func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64) error { - c.mu.Lock() - defer c.mu.Unlock() - - c.scatterRegion.count[regionID]++ - if c.scatterRegion.count[regionID] > c.scatterRegion.eachRegionFailBefore { - return nil - } - return newRegionNotFullyReplicatedErr(regionID) -} - -func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.scatterRegions.notImplemented { - return nil, status.Error(codes.Unimplemented, "Ah, yep") - } - c.scatterRegions.regionCount += len(regionIDs) - return &pdpb.ScatterRegionResponse{}, nil -} - -func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.getOperator.responses == nil { - return &pdpb.GetOperatorResponse{Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, nil - } - ret := c.getOperator.responses[regionID][0] - c.getOperator.responses[regionID] = c.getOperator.responses[regionID][1:] - return ret, nil -} diff --git a/pkg/lightning/backend/local/BUILD.bazel b/pkg/lightning/backend/local/BUILD.bazel deleted file mode 100644 index c297e333d2d7d..0000000000000 --- a/pkg/lightning/backend/local/BUILD.bazel +++ /dev/null @@ -1,188 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "local", - srcs = [ - "checksum.go", - "compress.go", - "disk_quota.go", - "duplicate.go", - "engine.go", - "engine_mgr.go", - "iterator.go", - "local.go", - "local_freebsd.go", - "local_unix.go", - "local_unix_generic.go", - "local_windows.go", - "localhelper.go", - "region_job.go", - "tikv_mode.go", - ], - importpath = "github.com/pingcap/tidb/pkg/lightning/backend/local", - visibility = ["//visibility:public"], - deps = [ - "//br/pkg/checksum", - "//br/pkg/errors", - "//br/pkg/logutil", - "//br/pkg/membuf", - "//br/pkg/pdutil", - "//br/pkg/restore/split", - "//br/pkg/storage", - "//br/pkg/version", - "//pkg/distsql", - "//pkg/infoschema", - "//pkg/kv", - "//pkg/lightning/backend", - "//pkg/lightning/backend/encode", - "//pkg/lightning/backend/external", - "//pkg/lightning/backend/kv", - "//pkg/lightning/checkpoints", - "//pkg/lightning/common", - "//pkg/lightning/config", - "//pkg/lightning/errormanager", - "//pkg/lightning/log", - "//pkg/lightning/manual", - "//pkg/lightning/metric", - "//pkg/lightning/mydump", - "//pkg/lightning/tikv", - "//pkg/lightning/verification", - "//pkg/metrics", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/parser/terror", - "//pkg/sessionctx/variable", - "//pkg/table", - "//pkg/table/tables", - "//pkg/tablecodec", - "//pkg/util", - "//pkg/util/codec", - "//pkg/util/compress", - "//pkg/util/engine", - "//pkg/util/hack", - "//pkg/util/logutil", - "//pkg/util/mathutil", - "//pkg/util/ranger", - "@com_github_cockroachdb_pebble//:pebble", - "@com_github_cockroachdb_pebble//objstorage/objstorageprovider", - "@com_github_cockroachdb_pebble//sstable", - "@com_github_cockroachdb_pebble//vfs", - "@com_github_coreos_go_semver//semver", - "@com_github_docker_go_units//:go-units", - "@com_github_google_btree//:btree", - "@com_github_google_uuid//:uuid", - "@com_github_klauspost_compress//gzip", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/errorpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_tipb//go-tipb", - "@com_github_tikv_client_go_v2//kv", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//util", - "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//http", - "@com_github_tikv_pd_client//retry", - "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//backoff", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//credentials", - "@org_golang_google_grpc//credentials/insecure", - "@org_golang_google_grpc//keepalive", - "@org_golang_google_grpc//status", - "@org_golang_x_sync//errgroup", - "@org_golang_x_time//rate", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_multierr//:multierr", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "local_test", - timeout = "short", - srcs = [ - "checksum_test.go", - "compress_test.go", - "disk_quota_test.go", - "duplicate_test.go", - "engine_mgr_test.go", - "engine_test.go", - "iterator_test.go", - "local_check_test.go", - "local_test.go", - "localhelper_test.go", - "main_test.go", - "region_job_test.go", - ], - embed = [":local"], - flaky = True, - race = "on", - shard_count = 50, - deps = [ - "//br/pkg/membuf", - "//br/pkg/mock/mocklocal", - "//br/pkg/restore/split", - "//br/pkg/storage", - "//br/pkg/utils", - "//pkg/ddl", - "//pkg/errno", - "//pkg/keyspace", - "//pkg/kv", - "//pkg/lightning/backend", - "//pkg/lightning/backend/encode", - "//pkg/lightning/backend/external", - "//pkg/lightning/backend/kv", - "//pkg/lightning/checkpoints", - "//pkg/lightning/common", - "//pkg/lightning/config", - "//pkg/lightning/log", - "//pkg/lightning/mydump", - "//pkg/parser", - "//pkg/parser/ast", - "//pkg/parser/model", - "//pkg/parser/mysql", - "//pkg/sessionctx/stmtctx", - "//pkg/store/pdtypes", - "//pkg/table", - "//pkg/table/tables", - "//pkg/tablecodec", - "//pkg/testkit/testsetup", - "//pkg/types", - "//pkg/util", - "//pkg/util/codec", - "//pkg/util/engine", - "//pkg/util/hack", - "//pkg/util/mock", - "@com_github_cockroachdb_pebble//:pebble", - "@com_github_cockroachdb_pebble//objstorage/objstorageprovider", - "@com_github_cockroachdb_pebble//sstable", - "@com_github_cockroachdb_pebble//vfs", - "@com_github_coreos_go_semver//semver", - "@com_github_data_dog_go_sqlmock//:go-sqlmock", - "@com_github_docker_go_units//:go-units", - "@com_github_go_sql_driver_mysql//:mysql", - "@com_github_google_uuid//:uuid", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/errorpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_tipb//go-tipb", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//errs", - "@com_github_tikv_pd_client//http", - "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//encoding", - "@org_golang_google_grpc//status", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_goleak//:goleak", - "@org_uber_go_mock//gomock", - ], -) diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go deleted file mode 100644 index 7bc812e4b9bb6..0000000000000 --- a/pkg/lightning/backend/local/region_job.go +++ /dev/null @@ -1,907 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package local - -import ( - "container/heap" - "context" - "fmt" - "io" - "strings" - "sync" - "time" - - "github.com/google/uuid" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/errorpb" - sst "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/restore/split" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/lightning/common" - "github.com/pingcap/tidb/pkg/lightning/config" - "github.com/pingcap/tidb/pkg/lightning/log" - "github.com/pingcap/tidb/pkg/lightning/metric" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/tikv/client-go/v2/util" - "go.uber.org/zap" - "google.golang.org/grpc" -) - -type jobStageTp string - -/* - + - v - +------+------+ - +->+regionScanned+<------+ - | +------+------+ | - | | | - | | | - | v | - | +--+--+ +-----+----+ - | |wrote+---->+needRescan| - | +--+--+ +-----+----+ - | | ^ - | | | - | v | - | +---+----+ | - +-----+ingested+---------+ - +---+----+ - | - v - -above diagram shows the state transition of a region job, here are some special -cases: - - regionScanned can directly jump to ingested if the keyRange has no data - - regionScanned can only transit to wrote. TODO: check if it should be transited - to needRescan - - if a job only partially writes the data, after it becomes ingested, it will - update its keyRange and transits to regionScanned to continue the remaining - data - - needRescan may output multiple regionScanned jobs when the old region is split -*/ -const ( - regionScanned jobStageTp = "regionScanned" - wrote jobStageTp = "wrote" - ingested jobStageTp = "ingested" - needRescan jobStageTp = "needRescan" - - // suppose each KV is about 32 bytes, 16 * units.KiB / 32 = 512 - defaultKVBatchCount = 512 -) - -func (j jobStageTp) String() string { - return string(j) -} - -// regionJob is dedicated to import the data in [keyRange.start, keyRange.end) -// to a region. The keyRange may be changed when processing because of writing -// partial data to TiKV or region split. -type regionJob struct { - keyRange common.Range - // TODO: check the keyRange so that it's always included in region - region *split.RegionInfo - // stage should be updated only by convertStageTo - stage jobStageTp - // writeResult is available only in wrote and ingested stage - writeResult *tikvWriteResult - - ingestData common.IngestData - regionSplitSize int64 - regionSplitKeys int64 - metrics *metric.Common - - retryCount int - waitUntil time.Time - lastRetryableErr error - - // injected is used in test to set the behaviour - injected []injectedBehaviour -} - -type tikvWriteResult struct { - sstMeta []*sst.SSTMeta - count int64 - totalBytes int64 - remainingStartKey []byte -} - -type injectedBehaviour struct { - write injectedWriteBehaviour - ingest injectedIngestBehaviour -} - -type injectedWriteBehaviour struct { - result *tikvWriteResult - err error -} - -type injectedIngestBehaviour struct { - nextStage jobStageTp - err error -} - -func (j *regionJob) convertStageTo(stage jobStageTp) { - j.stage = stage - switch stage { - case regionScanned: - j.writeResult = nil - case ingested: - // when writing is skipped because key range is empty - if j.writeResult == nil { - return - } - - j.ingestData.Finish(j.writeResult.totalBytes, j.writeResult.count) - if j.metrics != nil { - j.metrics.BytesCounter.WithLabelValues(metric.StateImported). - Add(float64(j.writeResult.totalBytes)) - } - case needRescan: - j.region = nil - } -} - -// ref means that the ingestData of job will be accessed soon. -func (j *regionJob) ref(wg *sync.WaitGroup) { - if wg != nil { - wg.Add(1) - } - if j.ingestData != nil { - j.ingestData.IncRef() - } -} - -// done promises that the ingestData of job will not be accessed. Same amount of -// done should be called to release the ingestData. -func (j *regionJob) done(wg *sync.WaitGroup) { - if j.ingestData != nil { - j.ingestData.DecRef() - } - if wg != nil { - wg.Done() - } -} - -// writeToTiKV writes the data to TiKV and mark this job as wrote stage. -// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. -// if any underlying logic has error, writeToTiKV will return an error. -// we don't need to do cleanup for the pairs written to tikv if encounters an error, -// tikv will take the responsibility to do so. -// TODO: let client-go provide a high-level write interface. -func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { - err := local.doWrite(ctx, j) - if err == nil { - return nil - } - if !common.IsRetryableError(err) { - return err - } - // currently only one case will restart write - if strings.Contains(err.Error(), "RequestTooNew") { - j.convertStageTo(regionScanned) - return err - } - j.convertStageTo(needRescan) - return err -} - -func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { - if j.stage != regionScanned { - return nil - } - - failpoint.Inject("fakeRegionJobs", func() { - front := j.injected[0] - j.injected = j.injected[1:] - j.writeResult = front.write.result - err := front.write.err - if err == nil { - j.convertStageTo(wrote) - } - failpoint.Return(err) - }) - - var cancel context.CancelFunc - ctx, cancel = context.WithTimeoutCause(ctx, 15*time.Minute, common.ErrWriteTooSlow) - defer cancel() - - apiVersion := local.tikvCodec.GetAPIVersion() - clientFactory := local.importClientFactory - kvBatchSize := local.KVWriteBatchSize - bufferPool := local.engineMgr.getBufferPool() - writeLimiter := local.writeLimiter - - begin := time.Now() - region := j.region.Region - - firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.Start, j.keyRange.End) - if err != nil { - return errors.Trace(err) - } - if firstKey == nil { - j.convertStageTo(ingested) - log.FromContext(ctx).Debug("keys within region is empty, skip doIngest", - logutil.Key("start", j.keyRange.Start), - logutil.Key("regionStart", region.StartKey), - logutil.Key("end", j.keyRange.End), - logutil.Key("regionEnd", region.EndKey)) - return nil - } - - firstKey = codec.EncodeBytes([]byte{}, firstKey) - lastKey = codec.EncodeBytes([]byte{}, lastKey) - - u := uuid.New() - meta := &sst.SSTMeta{ - Uuid: u[:], - RegionId: region.GetId(), - RegionEpoch: region.GetRegionEpoch(), - Range: &sst.Range{ - Start: firstKey, - End: lastKey, - }, - ApiVersion: apiVersion, - } - - failpoint.Inject("changeEpochVersion", func(val failpoint.Value) { - cloned := *meta.RegionEpoch - meta.RegionEpoch = &cloned - i := val.(int) - if i >= 0 { - meta.RegionEpoch.Version += uint64(i) - } else { - meta.RegionEpoch.ConfVer -= uint64(-i) - } - }) - - annotateErr := func(in error, peer *metapb.Peer, msg string) error { - // annotate the error with peer/store/region info to help debug. - return errors.Annotatef( - in, - "peer %d, store %d, region %d, epoch %s, %s", - peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String(), - msg, - ) - } - - leaderID := j.region.Leader.GetId() - clients := make([]sst.ImportSST_WriteClient, 0, len(region.GetPeers())) - allPeers := make([]*metapb.Peer, 0, len(region.GetPeers())) - req := &sst.WriteRequest{ - Chunk: &sst.WriteRequest_Meta{ - Meta: meta, - }, - Context: &kvrpcpb.Context{ - ResourceControlContext: &kvrpcpb.ResourceControlContext{ - ResourceGroupName: local.ResourceGroupName, - }, - RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType), - }, - } - for _, peer := range region.GetPeers() { - cli, err := clientFactory.Create(ctx, peer.StoreId) - if err != nil { - return annotateErr(err, peer, "when create client") - } - - wstream, err := cli.Write(ctx) - if err != nil { - return annotateErr(err, peer, "when open write stream") - } - - failpoint.Inject("mockWritePeerErr", func() { - err = errors.Errorf("mock write peer error") - failpoint.Return(annotateErr(err, peer, "when open write stream")) - }) - - // Bind uuid for this write request - if err = wstream.Send(req); err != nil { - return annotateErr(err, peer, "when send meta") - } - clients = append(clients, wstream) - allPeers = append(allPeers, peer) - } - dataCommitTS := j.ingestData.GetTS() - req.Chunk = &sst.WriteRequest_Batch{ - Batch: &sst.WriteBatch{ - CommitTs: dataCommitTS, - }, - } - - pairs := make([]*sst.Pair, 0, defaultKVBatchCount) - count := 0 - size := int64(0) - totalSize := int64(0) - totalCount := int64(0) - // if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split - // because the range-properties is not 100% accurate - regionMaxSize := j.regionSplitSize - if j.regionSplitSize <= int64(config.SplitRegionSize) { - regionMaxSize = j.regionSplitSize * 4 / 3 - } - - flushKVs := func() error { - req.Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] - preparedMsg := &grpc.PreparedMsg{} - // by reading the source code, Encode need to find codec and compression from the stream - // because all stream has the same codec and compression, we can use any one of them - if err := preparedMsg.Encode(clients[0], req); err != nil { - return err - } - - for i := range clients { - if err := writeLimiter.WaitN(ctx, allPeers[i].StoreId, int(size)); err != nil { - return errors.Trace(err) - } - if err := clients[i].SendMsg(preparedMsg); err != nil { - if err == io.EOF { - // if it's EOF, need RecvMsg to get the error - dummy := &sst.WriteResponse{} - err = clients[i].RecvMsg(dummy) - } - return annotateErr(err, allPeers[i], "when send data") - } - } - failpoint.Inject("afterFlushKVs", func() { - log.FromContext(ctx).Info(fmt.Sprintf("afterFlushKVs count=%d,size=%d", count, size)) - }) - return nil - } - - iter := j.ingestData.NewIter(ctx, j.keyRange.Start, j.keyRange.End, bufferPool) - //nolint: errcheck - defer iter.Close() - - var remainingStartKey []byte - for iter.First(); iter.Valid(); iter.Next() { - k, v := iter.Key(), iter.Value() - kvSize := int64(len(k) + len(v)) - // here we reuse the `*sst.Pair`s to optimize object allocation - if count < len(pairs) { - pairs[count].Key = k - pairs[count].Value = v - } else { - pair := &sst.Pair{ - Key: k, - Value: v, - } - pairs = append(pairs, pair) - } - count++ - totalCount++ - size += kvSize - totalSize += kvSize - - if size >= kvBatchSize { - if err := flushKVs(); err != nil { - return errors.Trace(err) - } - count = 0 - size = 0 - iter.ReleaseBuf() - } - if totalSize >= regionMaxSize || totalCount >= j.regionSplitKeys { - // we will shrink the key range of this job to real written range - if iter.Next() { - remainingStartKey = append([]byte{}, iter.Key()...) - log.FromContext(ctx).Info("write to tikv partial finish", - zap.Int64("count", totalCount), - zap.Int64("size", totalSize), - logutil.Key("startKey", j.keyRange.Start), - logutil.Key("endKey", j.keyRange.End), - logutil.Key("remainStart", remainingStartKey), - logutil.Region(region), - logutil.Leader(j.region.Leader), - zap.Uint64("commitTS", dataCommitTS)) - } - break - } - } - - if iter.Error() != nil { - return errors.Trace(iter.Error()) - } - - if count > 0 { - if err := flushKVs(); err != nil { - return errors.Trace(err) - } - count = 0 - size = 0 - iter.ReleaseBuf() - } - - var leaderPeerMetas []*sst.SSTMeta - for i, wStream := range clients { - resp, closeErr := wStream.CloseAndRecv() - if closeErr != nil { - return annotateErr(closeErr, allPeers[i], "when close write stream") - } - if resp.Error != nil { - return annotateErr(errors.New("resp error: "+resp.Error.Message), allPeers[i], "when close write stream") - } - if leaderID == region.Peers[i].GetId() { - leaderPeerMetas = resp.Metas - log.FromContext(ctx).Debug("get metas after write kv stream to tikv", zap.Reflect("metas", leaderPeerMetas)) - } - } - - failpoint.Inject("NoLeader", func() { - log.FromContext(ctx).Warn("enter failpoint NoLeader") - leaderPeerMetas = nil - }) - - // if there is not leader currently, we don't forward the stage to wrote and let caller - // handle the retry. - if len(leaderPeerMetas) == 0 { - log.FromContext(ctx).Warn("write to tikv no leader", - logutil.Region(region), logutil.Leader(j.region.Leader), - zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta), - zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize)) - return common.ErrNoLeader.GenWithStackByArgs(region.Id, leaderID) - } - - takeTime := time.Since(begin) - log.FromContext(ctx).Debug("write to kv", zap.Reflect("region", j.region), zap.Uint64("leader", leaderID), - zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas), - zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize), - zap.Stringer("takeTime", takeTime)) - if m, ok := metric.FromContext(ctx); ok { - m.SSTSecondsHistogram.WithLabelValues(metric.SSTProcessWrite).Observe(takeTime.Seconds()) - } - - j.writeResult = &tikvWriteResult{ - sstMeta: leaderPeerMetas, - count: totalCount, - totalBytes: totalSize, - remainingStartKey: remainingStartKey, - } - j.convertStageTo(wrote) - return nil -} - -// ingest tries to finish the regionJob. -// if any ingest logic has error, ingest may retry sometimes to resolve it and finally -// set job to a proper stage with nil error returned. -// if any underlying logic has error, ingest will return an error to let caller -// handle it. -func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) { - if j.stage != wrote { - return nil - } - - failpoint.Inject("fakeRegionJobs", func() { - front := j.injected[0] - j.injected = j.injected[1:] - j.convertStageTo(front.ingest.nextStage) - failpoint.Return(front.ingest.err) - }) - - if len(j.writeResult.sstMeta) == 0 { - j.convertStageTo(ingested) - return nil - } - - if m, ok := metric.FromContext(ctx); ok { - begin := time.Now() - defer func() { - if err == nil { - m.SSTSecondsHistogram.WithLabelValues(metric.SSTProcessIngest).Observe(time.Since(begin).Seconds()) - } - }() - } - - for retry := 0; retry < maxRetryTimes; retry++ { - resp, err := local.doIngest(ctx, j) - if err == nil && resp.GetError() == nil { - j.convertStageTo(ingested) - return nil - } - if err != nil { - if common.IsContextCanceledError(err) { - return err - } - log.FromContext(ctx).Warn("meet underlying error, will retry ingest", - log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta), - logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) - continue - } - canContinue, err := j.convertStageOnIngestError(resp) - if common.IsContextCanceledError(err) { - return err - } - if !canContinue { - log.FromContext(ctx).Warn("meet error and handle the job later", - zap.Stringer("job stage", j.stage), - logutil.ShortError(j.lastRetryableErr), - j.region.ToZapFields(), - logutil.Key("start", j.keyRange.Start), - logutil.Key("end", j.keyRange.End)) - return nil - } - log.FromContext(ctx).Warn("meet error and will doIngest region again", - logutil.ShortError(j.lastRetryableErr), - j.region.ToZapFields(), - logutil.Key("start", j.keyRange.Start), - logutil.Key("end", j.keyRange.End)) - } - return nil -} - -func (local *Backend) checkWriteStall( - ctx context.Context, - region *split.RegionInfo, -) (bool, *sst.IngestResponse, error) { - clientFactory := local.importClientFactory - for _, peer := range region.Region.GetPeers() { - cli, err := clientFactory.Create(ctx, peer.StoreId) - if err != nil { - return false, nil, errors.Trace(err) - } - // currently we use empty MultiIngestRequest to check if TiKV is busy. - // If in future the rate limit feature contains more metrics we can switch to use it. - resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) - if err != nil { - return false, nil, errors.Trace(err) - } - if resp.Error != nil && resp.Error.ServerIsBusy != nil { - return true, resp, nil - } - } - return false, nil, nil -} - -// doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta. -// When meet error, it will remove finished sstMetas before return. -func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) { - clientFactory := local.importClientFactory - supportMultiIngest := local.supportMultiIngest - shouldCheckWriteStall := local.ShouldCheckWriteStall - if shouldCheckWriteStall { - writeStall, resp, err := local.checkWriteStall(ctx, j.region) - if err != nil { - return nil, errors.Trace(err) - } - if writeStall { - return resp, nil - } - } - - batch := 1 - if supportMultiIngest { - batch = len(j.writeResult.sstMeta) - } - - var resp *sst.IngestResponse - for start := 0; start < len(j.writeResult.sstMeta); start += batch { - end := min(start+batch, len(j.writeResult.sstMeta)) - ingestMetas := j.writeResult.sstMeta[start:end] - - log.FromContext(ctx).Debug("ingest meta", zap.Reflect("meta", ingestMetas)) - - failpoint.Inject("FailIngestMeta", func(val failpoint.Value) { - // only inject the error once - var resp *sst.IngestResponse - - switch val.(string) { - case "notleader": - resp = &sst.IngestResponse{ - Error: &errorpb.Error{ - NotLeader: &errorpb.NotLeader{ - RegionId: j.region.Region.Id, - Leader: j.region.Leader, - }, - }, - } - case "epochnotmatch": - resp = &sst.IngestResponse{ - Error: &errorpb.Error{ - EpochNotMatch: &errorpb.EpochNotMatch{ - CurrentRegions: []*metapb.Region{j.region.Region}, - }, - }, - } - } - failpoint.Return(resp, nil) - }) - - leader := j.region.Leader - if leader == nil { - return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, - "region id %d has no leader", j.region.Region.Id) - } - - cli, err := clientFactory.Create(ctx, leader.StoreId) - if err != nil { - return nil, errors.Trace(err) - } - reqCtx := &kvrpcpb.Context{ - RegionId: j.region.Region.GetId(), - RegionEpoch: j.region.Region.GetRegionEpoch(), - Peer: leader, - ResourceControlContext: &kvrpcpb.ResourceControlContext{ - ResourceGroupName: local.ResourceGroupName, - }, - RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType), - } - - if supportMultiIngest { - req := &sst.MultiIngestRequest{ - Context: reqCtx, - Ssts: ingestMetas, - } - resp, err = cli.MultiIngest(ctx, req) - } else { - req := &sst.IngestRequest{ - Context: reqCtx, - Sst: ingestMetas[0], - } - resp, err = cli.Ingest(ctx, req) - } - if resp.GetError() != nil || err != nil { - // remove finished sstMetas - j.writeResult.sstMeta = j.writeResult.sstMeta[start:] - return resp, errors.Trace(err) - } - } - return resp, nil -} - -// convertStageOnIngestError will try to fix the error contained in ingest response. -// Return (_, error) when another error occurred. -// Return (true, nil) when the job can retry ingesting immediately. -// Return (false, nil) when the job should be put back to queue. -func (j *regionJob) convertStageOnIngestError( - resp *sst.IngestResponse, -) (bool, error) { - if resp.GetError() == nil { - return true, nil - } - - var newRegion *split.RegionInfo - switch errPb := resp.GetError(); { - case errPb.NotLeader != nil: - j.lastRetryableErr = common.ErrKVNotLeader.GenWithStack(errPb.GetMessage()) - - // meet a problem that the region leader+peer are all updated but the return - // error is only "NotLeader", we should update the whole region info. - j.convertStageTo(needRescan) - return false, nil - case errPb.EpochNotMatch != nil: - j.lastRetryableErr = common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage()) - - if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil { - var currentRegion *metapb.Region - for _, r := range currentRegions { - if insideRegion(r, j.writeResult.sstMeta) { - currentRegion = r - break - } - } - if currentRegion != nil { - var newLeader *metapb.Peer - for _, p := range currentRegion.Peers { - if p.GetStoreId() == j.region.Leader.GetStoreId() { - newLeader = p - break - } - } - if newLeader != nil { - newRegion = &split.RegionInfo{ - Leader: newLeader, - Region: currentRegion, - } - } - } - } - if newRegion != nil { - j.region = newRegion - j.convertStageTo(regionScanned) - return false, nil - } - j.convertStageTo(needRescan) - return false, nil - case strings.Contains(errPb.Message, "raft: proposal dropped"): - j.lastRetryableErr = common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage()) - - j.convertStageTo(needRescan) - return false, nil - case errPb.ServerIsBusy != nil: - j.lastRetryableErr = common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage()) - - return false, nil - case errPb.RegionNotFound != nil: - j.lastRetryableErr = common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage()) - - j.convertStageTo(needRescan) - return false, nil - case errPb.ReadIndexNotReady != nil: - j.lastRetryableErr = common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage()) - - // this error happens when this region is splitting, the error might be: - // read index not ready, reason can not read index due to split, region 64037 - // we have paused schedule, but it's temporary, - // if next request takes a long time, there's chance schedule is enabled again - // or on key range border, another engine sharing this region tries to split this - // region may cause this error too. - j.convertStageTo(needRescan) - return false, nil - case errPb.DiskFull != nil: - j.lastRetryableErr = common.ErrKVIngestFailed.GenWithStack(errPb.GetMessage()) - - return false, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) - } - // all others doIngest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange - j.lastRetryableErr = common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage()) - j.convertStageTo(regionScanned) - return false, nil -} - -type regionJobRetryHeap []*regionJob - -var _ heap.Interface = (*regionJobRetryHeap)(nil) - -func (h *regionJobRetryHeap) Len() int { - return len(*h) -} - -func (h *regionJobRetryHeap) Less(i, j int) bool { - v := *h - return v[i].waitUntil.Before(v[j].waitUntil) -} - -func (h *regionJobRetryHeap) Swap(i, j int) { - v := *h - v[i], v[j] = v[j], v[i] -} - -func (h *regionJobRetryHeap) Push(x any) { - *h = append(*h, x.(*regionJob)) -} - -func (h *regionJobRetryHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -// regionJobRetryer is a concurrent-safe queue holding jobs that need to put -// back later, and put back when the regionJob.waitUntil is reached. It maintains -// a heap of jobs internally based on the regionJob.waitUntil field. -type regionJobRetryer struct { - // lock acquiring order: protectedClosed > protectedQueue > protectedToPutBack - protectedClosed struct { - mu sync.Mutex - closed bool - } - protectedQueue struct { - mu sync.Mutex - q regionJobRetryHeap - } - protectedToPutBack struct { - mu sync.Mutex - toPutBack *regionJob - } - putBackCh chan<- *regionJob - reload chan struct{} - jobWg *sync.WaitGroup -} - -// startRegionJobRetryer starts a new regionJobRetryer and it will run in -// background to put the job back to `putBackCh` when job's waitUntil is reached. -// Cancel the `ctx` will stop retryer and `jobWg.Done` will be trigger for jobs -// that are not put back yet. -func startRegionJobRetryer( - ctx context.Context, - putBackCh chan<- *regionJob, - jobWg *sync.WaitGroup, -) *regionJobRetryer { - ret := ®ionJobRetryer{ - putBackCh: putBackCh, - reload: make(chan struct{}, 1), - jobWg: jobWg, - } - ret.protectedQueue.q = make(regionJobRetryHeap, 0, 16) - go ret.run(ctx) - return ret -} - -// run is only internally used, caller should not use it. -func (q *regionJobRetryer) run(ctx context.Context) { - defer q.close() - - for { - var front *regionJob - q.protectedQueue.mu.Lock() - if len(q.protectedQueue.q) > 0 { - front = q.protectedQueue.q[0] - } - q.protectedQueue.mu.Unlock() - - switch { - case front != nil: - select { - case <-ctx.Done(): - return - case <-q.reload: - case <-time.After(time.Until(front.waitUntil)): - q.protectedQueue.mu.Lock() - q.protectedToPutBack.mu.Lock() - q.protectedToPutBack.toPutBack = heap.Pop(&q.protectedQueue.q).(*regionJob) - // release the lock of queue to avoid blocking regionJobRetryer.push - q.protectedQueue.mu.Unlock() - - // hold the lock of toPutBack to make sending to putBackCh and - // resetting toPutBack atomic w.r.t. regionJobRetryer.close - select { - case <-ctx.Done(): - q.protectedToPutBack.mu.Unlock() - return - case q.putBackCh <- q.protectedToPutBack.toPutBack: - q.protectedToPutBack.toPutBack = nil - q.protectedToPutBack.mu.Unlock() - } - } - default: - // len(q.q) == 0 - select { - case <-ctx.Done(): - return - case <-q.reload: - } - } - } -} - -// close is only internally used, caller should not use it. -func (q *regionJobRetryer) close() { - q.protectedClosed.mu.Lock() - defer q.protectedClosed.mu.Unlock() - q.protectedClosed.closed = true - - if q.protectedToPutBack.toPutBack != nil { - q.protectedToPutBack.toPutBack.done(q.jobWg) - } - for _, job := range q.protectedQueue.q { - job.done(q.jobWg) - } -} - -// push should not be blocked for long time in any cases. -func (q *regionJobRetryer) push(job *regionJob) bool { - q.protectedClosed.mu.Lock() - defer q.protectedClosed.mu.Unlock() - if q.protectedClosed.closed { - return false - } - - q.protectedQueue.mu.Lock() - heap.Push(&q.protectedQueue.q, job) - q.protectedQueue.mu.Unlock() - - select { - case q.reload <- struct{}{}: - default: - } - return true -} From 291ffc8de0d79678dedf469908528c4fb9c57601 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Jun 2024 10:35:33 +0800 Subject: [PATCH 3/7] fix some conflict Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local.go | 4 +- br/pkg/restore/split/split_test.go | 608 ------------------------ 2 files changed, 3 insertions(+), 609 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 336e57ccdc90e..5cd9fcd85f887 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -37,6 +37,7 @@ import ( sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -1165,7 +1166,8 @@ func (local *local) WriteToTiKV( func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) { leader := region.Leader if leader == nil { - leader = region.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region %d has no leader", region.Region.Id) } cli, err := local.getImportClient(ctx, leader.StoreId) diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index ee333a1935115..43e5afcff87b8 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -71,611 +71,3 @@ func TestScanRegionBackOfferWithStopRetry(t *testing.T) { require.Error(t, err) require.Equal(t, counter, 6) } -<<<<<<< HEAD -======= - -type recordCntBackoffer struct { - already int -} - -func (b *recordCntBackoffer) NextBackoff(error) time.Duration { - b.already++ - return 0 -} - -func (b *recordCntBackoffer) Attempt() int { - return 100 -} - -func TestScatterSequentiallyRetryCnt(t *testing.T) { - mockClient := NewMockPDClientForSplit() - mockClient.scatterRegion.eachRegionFailBefore = 7 - client := pdClient{ - needScatterVal: true, - client: mockClient, - } - client.needScatterInit.Do(func() {}) - - ctx := context.Background() - regions := []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 1, - }, - }, - { - Region: &metapb.Region{ - Id: 2, - }, - }, - } - backoffer := &recordCntBackoffer{} - client.scatterRegionsSequentially( - ctx, - regions, - backoffer, - ) - require.Equal(t, 7, backoffer.already) -} - -func TestScatterBackwardCompatibility(t *testing.T) { - mockClient := NewMockPDClientForSplit() - mockClient.scatterRegions.notImplemented = true - client := pdClient{ - needScatterVal: true, - client: mockClient, - } - client.needScatterInit.Do(func() {}) - - ctx := context.Background() - regions := []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 1, - }, - }, - { - Region: &metapb.Region{ - Id: 2, - }, - }, - } - err := client.scatterRegions(ctx, regions) - require.NoError(t, err) - require.Equal(t, map[uint64]int{1: 1, 2: 1}, client.client.(*MockPDClientForSplit).scatterRegion.count) -} - -func TestWaitForScatterRegions(t *testing.T) { - mockPDCli := NewMockPDClientForSplit() - mockPDCli.scatterRegions.notImplemented = true - client := pdClient{ - needScatterVal: true, - client: mockPDCli, - } - client.needScatterInit.Do(func() {}) - regionCnt := 6 - checkGetOperatorRespsDrained := func() { - for i := 1; i <= regionCnt; i++ { - require.Len(t, mockPDCli.getOperator.responses[uint64(i)], 0) - } - } - checkNoRetry := func() { - for i := 1; i <= regionCnt; i++ { - require.Equal(t, 0, mockPDCli.scatterRegion.count[uint64(i)]) - } - } - - ctx := context.Background() - regions := make([]*RegionInfo, 0, regionCnt) - for i := 1; i <= regionCnt; i++ { - regions = append(regions, &RegionInfo{ - Region: &metapb.Region{ - Id: uint64(i), - }, - }) - } - - mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) - mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ - {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, - } - mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("not-scatter-region")}, - } - mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, - } - mockPDCli.getOperator.responses[4] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_TIMEOUT}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, - } - mockPDCli.getOperator.responses[5] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_CANCEL}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, - {Desc: []byte("not-scatter-region")}, - } - // should trigger a retry - mockPDCli.getOperator.responses[6] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_REPLACE}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, - } - - left, err := client.WaitRegionsScattered(ctx, regions) - require.NoError(t, err) - require.Equal(t, 0, left) - for i := 1; i <= 3; i++ { - require.Equal(t, 0, mockPDCli.scatterRegion.count[uint64(i)]) - } - // OperatorStatus_TIMEOUT should trigger rescatter once - require.Equal(t, 1, mockPDCli.scatterRegion.count[uint64(4)]) - // 3 * OperatorStatus_CANCEL should trigger 3 * rescatter - require.Equal(t, 3, mockPDCli.scatterRegion.count[uint64(5)]) - // OperatorStatus_REPLACE should trigger rescatter once - require.Equal(t, 1, mockPDCli.scatterRegion.count[uint64(6)]) - checkGetOperatorRespsDrained() - - // test non-retryable error - - mockPDCli.scatterRegion.count = make(map[uint64]int) - mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) - mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ - {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, - } - mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("not-scatter-region")}, - } - // mimic non-retryable error - mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ - {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_DATA_COMPACTED}}}, - } - left, err = client.WaitRegionsScattered(ctx, regions) - require.ErrorContains(t, err, "get operator error: DATA_COMPACTED") - require.Equal(t, 4, left) // region 3,4,5,6 is not scattered - checkGetOperatorRespsDrained() - checkNoRetry() - - // test backoff is timed-out - - backup := WaitRegionOnlineAttemptTimes - WaitRegionOnlineAttemptTimes = 2 - t.Cleanup(func() { - WaitRegionOnlineAttemptTimes = backup - }) - - mockPDCli.scatterRegion.count = make(map[uint64]int) - mockPDCli.getOperator.responses = make(map[uint64][]*pdpb.GetOperatorResponse) - mockPDCli.getOperator.responses[1] = []*pdpb.GetOperatorResponse{ - {Header: &pdpb.ResponseHeader{Error: &pdpb.Error{Type: pdpb.ErrorType_REGION_NOT_FOUND}}}, - } - mockPDCli.getOperator.responses[2] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("not-scatter-region")}, - } - mockPDCli.getOperator.responses[3] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, - } - mockPDCli.getOperator.responses[4] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, // first retry - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_RUNNING}, // second retry - } - mockPDCli.getOperator.responses[5] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("not-scatter-region")}, - } - mockPDCli.getOperator.responses[6] = []*pdpb.GetOperatorResponse{ - {Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, - } - left, err = client.WaitRegionsScattered(ctx, regions) - require.ErrorContains(t, err, "the first unfinished region: id:4") - require.Equal(t, 1, left) - checkGetOperatorRespsDrained() - checkNoRetry() -} - -func TestBackoffMayNotCountBackoffer(t *testing.T) { - b := NewBackoffMayNotCountBackoffer() - initVal := b.Attempt() - - b.NextBackoff(ErrBackoffAndDontCount) - require.Equal(t, initVal, b.Attempt()) - // test Annotate, which is the real usage in caller - b.NextBackoff(errors.Annotate(ErrBackoffAndDontCount, "caller message")) - require.Equal(t, initVal, b.Attempt()) - - b.NextBackoff(ErrBackoff) - require.Equal(t, initVal-1, b.Attempt()) - - b.NextBackoff(goerrors.New("test")) - require.Equal(t, 0, b.Attempt()) -} - -func TestSplitCtxCancel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockCli := NewMockPDClientForSplit() - mockCli.splitRegions.hijacked = func() (bool, *kvrpcpb.SplitRegionResponse, error) { - cancel() - resp := &kvrpcpb.SplitRegionResponse{ - Regions: []*metapb.Region{ - {Id: 1}, - {Id: 2}, - }, - } - return false, resp, nil - } - client := pdClient{ - client: mockCli, - } - - _, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}}) - require.ErrorIs(t, err, context.Canceled) -} - -func TestGetSplitKeyPerRegion(t *testing.T) { - // test case moved from BR - sortedKeys := [][]byte{ - []byte("b"), - []byte("d"), - []byte("g"), - []byte("j"), - []byte("l"), - } - sortedRegions := []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 1, - StartKey: []byte("a"), - EndKey: []byte("g"), - }, - }, - { - Region: &metapb.Region{ - Id: 2, - StartKey: []byte("g"), - EndKey: []byte("k"), - }, - }, - { - Region: &metapb.Region{ - Id: 3, - StartKey: []byte("k"), - EndKey: []byte("m"), - }, - }, - } - result := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) - require.Equal(t, 3, len(result)) - require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[sortedRegions[0]]) - require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[sortedRegions[1]]) - require.Equal(t, [][]byte{[]byte("l")}, result[sortedRegions[2]]) - - // test case moved from lightning - tableID := int64(1) - keys := []int64{1, 10, 100, 1000, 10000, -1} - sortedRegions = make([]*RegionInfo, 0, len(keys)) - start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0)) - regionStart := codec.EncodeBytes([]byte{}, start) - for i, end := range keys { - var regionEndKey []byte - if end >= 0 { - endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(end)) - regionEndKey = codec.EncodeBytes([]byte{}, endKey) - } - region := &RegionInfo{ - Region: &metapb.Region{ - Id: uint64(i), - StartKey: regionStart, - EndKey: regionEndKey, - }, - } - sortedRegions = append(sortedRegions, region) - regionStart = regionEndKey - } - - checkKeys := map[int64]int{ - 0: -1, - 5: 1, - 6: 1, - 7: 1, - 50: 2, - 60: 2, - 70: 2, - 100: -1, - 50000: 5, - } - expected := map[uint64][][]byte{} - sortedKeys = make([][]byte, 0, len(checkKeys)) - - for hdl, idx := range checkKeys { - key := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(hdl)) - sortedKeys = append(sortedKeys, key) - if idx < 0 { - continue - } - expected[uint64(idx)] = append(expected[uint64(idx)], key) - } - - slices.SortFunc(sortedKeys, bytes.Compare) - for i := range expected { - slices.SortFunc(expected[i], bytes.Compare) - } - - got := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) - require.Equal(t, len(expected), len(got)) - for region, gotKeys := range got { - require.Equal(t, expected[region.Region.GetId()], gotKeys) - } -} - -func checkRegionsBoundaries(t *testing.T, regions []*RegionInfo, expected [][]byte) { - require.Len( - t, regions, len(expected)-1, - "first region start key: %v, last region end key: %v, first expected key: %v, last expected key: %v", - regions[0].Region.StartKey, regions[len(regions)-1].Region.EndKey, - expected[0], expected[len(expected)-1], - ) - for i := 1; i < len(expected); i++ { - require.Equal(t, expected[i-1], regions[i-1].Region.StartKey) - require.Equal(t, expected[i], regions[i-1].Region.EndKey) - } -} - -func TestPaginateScanRegion(t *testing.T) { - ctx := context.Background() - mockPDClient := NewMockPDClientForSplit() - mockClient := &pdClient{ - client: mockPDClient, - } - - backup := WaitRegionOnlineAttemptTimes - WaitRegionOnlineAttemptTimes = 3 - t.Cleanup(func() { - WaitRegionOnlineAttemptTimes = backup - }) - - // no region - _, err := PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) - require.Error(t, err) - require.True(t, berrors.ErrPDBatchScanRegion.Equal(err)) - require.ErrorContains(t, err, "scan region return empty result") - - // retry on error - mockPDClient.scanRegions.errors = []error{ - status.Error(codes.Unavailable, "not leader"), - } - mockPDClient.SetRegions([][]byte{{}, {}}) - got, err := PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) - require.NoError(t, err) - checkRegionsBoundaries(t, got, [][]byte{{}, {}}) - - // test paginate - boundaries := [][]byte{{}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {}} - mockPDClient.SetRegions(boundaries) - got, err = PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 3) - require.NoError(t, err) - checkRegionsBoundaries(t, got, boundaries) - got, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{}, 3) - require.NoError(t, err) - checkRegionsBoundaries(t, got, boundaries[1:]) - got, err = PaginateScanRegion(ctx, mockClient, []byte{}, []byte{2}, 8) - require.NoError(t, err) - checkRegionsBoundaries(t, got, boundaries[:3]) // [, 1), [1, 2) - got, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{5}, 1) - require.NoError(t, err) - checkRegionsBoundaries(t, got, [][]byte{{4}, {5}}) - - // test start == end - _, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{4}, 1) - require.ErrorContains(t, err, "scan region return empty result") - - // test start > end - _, err = PaginateScanRegion(ctx, mockClient, []byte{5}, []byte{4}, 5) - require.True(t, berrors.ErrInvalidRange.Equal(err)) - require.ErrorContains(t, err, "startKey > endKey") - - // test retry exhausted - mockPDClient.scanRegions.errors = []error{ - status.Error(codes.Unavailable, "not leader"), - status.Error(codes.Unavailable, "not leader"), - status.Error(codes.Unavailable, "not leader"), - } - _, err = PaginateScanRegion(ctx, mockClient, []byte{4}, []byte{5}, 1) - require.ErrorContains(t, err, "not leader") - - // test region not continuous - mockPDClient.Regions = &pdtypes.RegionTree{} - mockPDClient.Regions.SetRegion(&pdtypes.Region{ - Meta: &metapb.Region{ - Id: 1, - StartKey: []byte{1}, - EndKey: []byte{2}, - }, - Leader: &metapb.Peer{ - Id: 1, - StoreId: 1, - }, - }) - mockPDClient.Regions.SetRegion(&pdtypes.Region{ - Meta: &metapb.Region{ - Id: 4, - StartKey: []byte{4}, - EndKey: []byte{5}, - }, - Leader: &metapb.Peer{ - Id: 4, - StoreId: 1, - }, - }) - - _, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{5}, 3) - require.True(t, berrors.ErrPDBatchScanRegion.Equal(err)) - require.ErrorContains(t, err, "region 1's endKey not equal to next region 4's startKey") - - // test region becomes continuous slowly - toAdd := []*pdtypes.Region{ - { - Meta: &metapb.Region{ - Id: 2, - StartKey: []byte{2}, - EndKey: []byte{3}, - }, - Leader: &metapb.Peer{ - Id: 2, - StoreId: 1, - }, - }, - { - Meta: &metapb.Region{ - Id: 3, - StartKey: []byte{3}, - EndKey: []byte{4}, - }, - Leader: &metapb.Peer{ - Id: 3, - StoreId: 1, - }, - }, - } - mockPDClient.scanRegions.beforeHook = func() { - mockPDClient.Regions.SetRegion(toAdd[0]) - toAdd = toAdd[1:] - } - got, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{5}, 100) - require.NoError(t, err) - checkRegionsBoundaries(t, got, [][]byte{{1}, {2}, {3}, {4}, {5}}) -} - -func TestRegionConsistency(t *testing.T) { - cases := []struct { - startKey []byte - endKey []byte - err string - regions []*RegionInfo - }{ - { - codec.EncodeBytes([]byte{}, []byte("a")), - codec.EncodeBytes([]byte{}, []byte("a")), - "scan region return empty result, startKey: (.*?), endKey: (.*?)", - []*RegionInfo{}, - }, - { - codec.EncodeBytes([]byte{}, []byte("a")), - codec.EncodeBytes([]byte{}, []byte("a")), - "first region 1's startKey(.*?) > startKey(.*?)", - []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 1, - StartKey: codec.EncodeBytes([]byte{}, []byte("b")), - EndKey: codec.EncodeBytes([]byte{}, []byte("d")), - }, - }, - }, - }, - { - codec.EncodeBytes([]byte{}, []byte("b")), - codec.EncodeBytes([]byte{}, []byte("e")), - "last region 100's endKey(.*?) < endKey(.*?)", - []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 100, - StartKey: codec.EncodeBytes([]byte{}, []byte("b")), - EndKey: codec.EncodeBytes([]byte{}, []byte("d")), - }, - }, - }, - }, - { - codec.EncodeBytes([]byte{}, []byte("c")), - codec.EncodeBytes([]byte{}, []byte("e")), - "region 6's endKey not equal to next region 8's startKey(.*?)", - []*RegionInfo{ - { - Leader: &metapb.Peer{ - Id: 6, - StoreId: 1, - }, - Region: &metapb.Region{ - Id: 6, - StartKey: codec.EncodeBytes([]byte{}, []byte("b")), - EndKey: codec.EncodeBytes([]byte{}, []byte("d")), - RegionEpoch: nil, - }, - }, - { - Leader: &metapb.Peer{ - Id: 8, - StoreId: 1, - }, - Region: &metapb.Region{ - Id: 8, - StartKey: codec.EncodeBytes([]byte{}, []byte("e")), - EndKey: codec.EncodeBytes([]byte{}, []byte("f")), - }, - }, - }, - }, - { - codec.EncodeBytes([]byte{}, []byte("c")), - codec.EncodeBytes([]byte{}, []byte("e")), - "region 6's leader is nil(.*?)", - []*RegionInfo{ - { - Region: &metapb.Region{ - Id: 6, - StartKey: codec.EncodeBytes([]byte{}, []byte("c")), - EndKey: codec.EncodeBytes([]byte{}, []byte("d")), - RegionEpoch: nil, - }, - }, - { - Region: &metapb.Region{ - Id: 8, - StartKey: codec.EncodeBytes([]byte{}, []byte("d")), - EndKey: codec.EncodeBytes([]byte{}, []byte("e")), - }, - }, - }, - }, - { - codec.EncodeBytes([]byte{}, []byte("c")), - codec.EncodeBytes([]byte{}, []byte("e")), - "region 6's leader's store id is 0(.*?)", - []*RegionInfo{ - { - Leader: &metapb.Peer{ - Id: 6, - StoreId: 0, - }, - Region: &metapb.Region{ - Id: 6, - StartKey: codec.EncodeBytes([]byte{}, []byte("c")), - EndKey: codec.EncodeBytes([]byte{}, []byte("d")), - RegionEpoch: nil, - }, - }, - { - Leader: &metapb.Peer{ - Id: 6, - StoreId: 0, - }, - Region: &metapb.Region{ - Id: 8, - StartKey: codec.EncodeBytes([]byte{}, []byte("d")), - EndKey: codec.EncodeBytes([]byte{}, []byte("e")), - }, - }, - }, - }, - } - for _, ca := range cases { - err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions) - require.Error(t, err) - require.Regexp(t, ca.err, err.Error()) - } -} ->>>>>>> 0805e850d41 (br: handle region leader miss (#52822)) From f46b384fa0a9902fe4c2171461e3b86237ce88ba Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Jun 2024 14:59:07 +0800 Subject: [PATCH 4/7] fix cp --- br/pkg/lightning/backend/local/BUILD.bazel | 1 + br/pkg/restore/import_retry_test.go | 2 +- br/pkg/restore/split_test.go | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 1865104c32863..efa55a5414614 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -17,6 +17,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/local", visibility = ["//visibility:public"], deps = [ + "//br/pkg/errors", "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/checkpoints", diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index cbe8275b0209c..b0304fbdbf264 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -242,7 +242,7 @@ func TestEpochNotMatch(t *testing.T) { EndKey: right.Region.EndKey, Id: 42, Peers: []*metapb.Peer{ - {Id: 43}, + {Id: 43, StoreId: 1}, }, }, Leader: &metapb.Peer{Id: 43, StoreId: 1}, diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 37849259caca0..ae5d7dbfcf658 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -526,7 +526,8 @@ func initTestClient(isRawKv bool) *TestClient { } regions[i] = &split.RegionInfo{ Leader: &metapb.Peer{ - Id: i, + Id: i, + StoreId: 1, }, Region: &metapb.Region{ Id: i, From 95b9b6bd611b9329e71fbad33894f3abf16e34f5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Jun 2024 15:37:00 +0800 Subject: [PATCH 5/7] fix CI Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/localhelper_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index b233437685950..2801e2d890cbc 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -147,6 +147,10 @@ func (c *testSplitClient) SplitRegion( ConfVer: target.Region.RegionEpoch.ConfVer + 1, }, }, + Leader: &metapb.Peer{ + Id: target.Leader.Id, + StoreId: target.Leader.StoreId, + }, } c.regions[c.nextRegionID] = newRegion c.regionsInfo.SetRegion(pdtypes.NewRegionInfo(newRegion.Region, newRegion.Leader)) @@ -208,6 +212,10 @@ func (c *testSplitClient) BatchSplitRegionsWithOrigin( StartKey: startKey, EndKey: key, }, + Leader: &metapb.Peer{ + Id: target.Leader.Id, + StoreId: target.Leader.StoreId, + }, } c.regions[c.nextRegionID] = newRegion c.regionsInfo.SetRegion(pdtypes.NewRegionInfo(newRegion.Region, newRegion.Leader)) From ae672916fbe2de35b77370c3f047fbdf110507ab Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Mon, 3 Jun 2024 17:51:17 +0800 Subject: [PATCH 6/7] fix CI Signed-off-by: Jianjun Liao --- br/pkg/restore/split_test.go | 60 ++++++++++++++++++++++++++++++++++++ br/pkg/restore/util_test.go | 2 ++ 2 files changed, 62 insertions(+) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index ae5d7dbfcf658..280cce68629b3 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -692,12 +692,72 @@ func TestRegionConsistency(t *testing.T) { StartKey: codec.EncodeBytes([]byte{}, []byte("b")), EndKey: codec.EncodeBytes([]byte{}, []byte("d")), }, + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, }, { Region: &metapb.Region{ StartKey: codec.EncodeBytes([]byte{}, []byte("e")), EndKey: codec.EncodeBytes([]byte{}, []byte("f")), }, + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader is nil(.*?)", + []*split.RegionInfo{ + { + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader's store id is 0(.*?)", + []*split.RegionInfo{ + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, }, }, }, diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index dd2531a55ccf7..6fb49fa7f2e5b 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -191,6 +191,7 @@ func TestPaginateScanRegion(t *testing.T) { Id: i + 1, Peers: peers, }, + Leader: peers[0], } if i != 0 { @@ -218,6 +219,7 @@ func TestPaginateScanRegion(t *testing.T) { StartKey: endKey, EndKey: []byte{}, }, + Leader: peers[0], } regionsMap[num] = ri regions = append(regions, ri) From 83201a8a266a865e2a15ab788af122d2d75aa45e Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Tue, 4 Jun 2024 10:48:31 +0800 Subject: [PATCH 7/7] fix CI Signed-off-by: Jianjun Liao --- br/pkg/restore/split_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 280cce68629b3..73b7af61b3cb8 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -694,7 +694,7 @@ func TestRegionConsistency(t *testing.T) { }, Leader: &metapb.Peer{ Id: 6, - StoreId: 0, + StoreId: 1, }, }, { @@ -704,7 +704,7 @@ func TestRegionConsistency(t *testing.T) { }, Leader: &metapb.Peer{ Id: 6, - StoreId: 0, + StoreId: 1, }, }, },