diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 1865104c32863..efa55a5414614 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -17,6 +17,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/local", visibility = ["//visibility:public"], deps = [ + "//br/pkg/errors", "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/checkpoints", diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 8877c16ae7740..33b1fa3a039eb 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" @@ -297,7 +298,8 @@ func getDupDetectClient( ) (import_sstpb.ImportSST_DuplicateDetectClient, error) { leader := region.Leader if leader == nil { - leader = region.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", region.Region.Id) } importClient, err := importClientFactory.Create(ctx, leader.GetStoreId()) if err != nil { diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 336e57ccdc90e..5cd9fcd85f887 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -37,6 +37,7 @@ import ( sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -1165,7 +1166,8 @@ func (local *local) WriteToTiKV( func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) { leader := region.Leader if leader == nil { - leader = region.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region %d has no leader", region.Region.Id) } cli, err := local.getImportClient(ctx, leader.StoreId) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index b233437685950..2801e2d890cbc 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -147,6 +147,10 @@ func (c *testSplitClient) SplitRegion( ConfVer: target.Region.RegionEpoch.ConfVer + 1, }, }, + Leader: &metapb.Peer{ + Id: target.Leader.Id, + StoreId: target.Leader.StoreId, + }, } c.regions[c.nextRegionID] = newRegion c.regionsInfo.SetRegion(pdtypes.NewRegionInfo(newRegion.Region, newRegion.Leader)) @@ -208,6 +212,10 @@ func (c *testSplitClient) BatchSplitRegionsWithOrigin( StartKey: startKey, EndKey: key, }, + Leader: &metapb.Peer{ + Id: target.Leader.Id, + StoreId: target.Leader.StoreId, + }, } c.regions[c.nextRegionID] = newRegion c.regionsInfo.SetRegion(pdtypes.NewRegionInfo(newRegion.Region, newRegion.Leader)) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 425f170334cd3..f5909d523feb5 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -825,7 +825,8 @@ func (importer *FileImporter) ingestSSTs( ) (*import_sstpb.IngestResponse, error) { leader := regionInfo.Leader if leader == nil { - leader = regionInfo.Region.GetPeers()[0] + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", regionInfo.Region.Id) } reqCtx := &kvrpcpb.Context{ RegionId: regionInfo.Region.GetId(), diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index 4e885657f998f..b0304fbdbf264 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -242,10 +242,10 @@ func TestEpochNotMatch(t *testing.T) { EndKey: right.Region.EndKey, Id: 42, Peers: []*metapb.Peer{ - {Id: 43}, + {Id: 43, StoreId: 1}, }, }, - Leader: &metapb.Peer{Id: 43}, + Leader: &metapb.Peer{Id: 43, StoreId: 1}, } newRegion := pdtypes.NewRegionInfo(info.Region, info.Leader) mergeRegion := func() { @@ -304,7 +304,8 @@ func TestRegionSplit(t *testing.T) { EndKey: codec.EncodeBytes(nil, []byte("aayy")), }, Leader: &metapb.Peer{ - Id: 43, + Id: 43, + StoreId: 1, }, }, { @@ -314,7 +315,8 @@ func TestRegionSplit(t *testing.T) { EndKey: target.Region.EndKey, }, Leader: &metapb.Peer{ - Id: 45, + Id: 45, + StoreId: 1, }, }, } diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 4ea5b508594cf..09ed98d82bde0 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -59,7 +59,23 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro } cur := regions[0] + if cur.Leader == nil { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader is nil", cur.Region.Id) + } + if cur.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", cur.Region.Id) + } for _, r := range regions[1:] { + if r.Leader == nil { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader is nil", r.Region.Id) + } + if r.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", r.Region.Id) + } if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s", redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey)) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 37849259caca0..73b7af61b3cb8 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -526,7 +526,8 @@ func initTestClient(isRawKv bool) *TestClient { } regions[i] = &split.RegionInfo{ Leader: &metapb.Peer{ - Id: i, + Id: i, + StoreId: 1, }, Region: &metapb.Region{ Id: i, @@ -691,12 +692,72 @@ func TestRegionConsistency(t *testing.T) { StartKey: codec.EncodeBytes([]byte{}, []byte("b")), EndKey: codec.EncodeBytes([]byte{}, []byte("d")), }, + Leader: &metapb.Peer{ + Id: 6, + StoreId: 1, + }, }, { Region: &metapb.Region{ StartKey: codec.EncodeBytes([]byte{}, []byte("e")), EndKey: codec.EncodeBytes([]byte{}, []byte("f")), }, + Leader: &metapb.Peer{ + Id: 6, + StoreId: 1, + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader is nil(.*?)", + []*split.RegionInfo{ + { + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, + }, + }, + }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader's store id is 0(.*?)", + []*split.RegionInfo{ + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, }, }, }, diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index dd2531a55ccf7..6fb49fa7f2e5b 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -191,6 +191,7 @@ func TestPaginateScanRegion(t *testing.T) { Id: i + 1, Peers: peers, }, + Leader: peers[0], } if i != 0 { @@ -218,6 +219,7 @@ func TestPaginateScanRegion(t *testing.T) { StartKey: endKey, EndKey: []byte{}, }, + Leader: peers[0], } regionsMap[num] = ri regions = append(regions, ri)