From 58f035d38dae1c3feac2dbdb194532779f9f1125 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 23 Nov 2021 18:55:16 +0800 Subject: [PATCH] rollback change --- br/pkg/lightning/backend/local/duplicate.go | 66 +++++++++------------ 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 7fa1e8b7cf475..fc88055c40c61 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -20,7 +20,6 @@ import ( "io" "math" "sort" - "sync" "time" "github.com/cockroachdb/pebble" @@ -260,45 +259,34 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } tryTimes := 0 indexHandles := makePendingIndexHandlesWithCapacity(0) - eg, rpcctx := errgroup.WithContext(ctx) - unfinishedRegions := make([]*metapb.Region, 0) - var rgLock sync.Mutex for len(regions) > 0 { if tryTimes > maxRetryTimes { return errors.Errorf("retry time exceed limit") } + unfinishedRegions := make([]*restore.RegionInfo, 0) waitingClients := make([]import_sstpb.ImportSST_DuplicateDetectClient, 0) - waitingRegions := make([]*restore.RegionInfo, 0) + watingRegions := make([]*restore.RegionInfo, 0) for idx, region := range regions { - r := region - manager.remoteWorkerPool.ApplyOnErrorGroup(eg, func() error { - _, start, _ := codec.DecodeBytes(r.Region.StartKey, []byte{}) - _, end, _ := codec.DecodeBytes(r.Region.EndKey, []byte{}) - if bytes.Compare(startKey, r.Region.StartKey) > 0 { - start = req.start - } - if r.Region.EndKey == nil || len(r.Region.EndKey) == 0 || bytes.Compare(endKey, r.Region.EndKey) < 0 { - end = req.end - } - - logger.Debug("[detect-dupe] get duplicate stream", - zap.Int("localStreamID", idx), - logutil.Region(region.Region), - logutil.Leader(region.Leader), - logutil.Key("regionStartKey", start), - logutil.Key("regionEndKey", end)) - cli, err := manager.getDuplicateStream(ctx, region, start, end) - if err != nil { - r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId()) - if err != nil { - unfinishedRegions = append(unfinishedRegions, region) - } else { - unfinishedRegions = append(unfinishedRegions, r) - } - return nil - } - }) + if len(waitingClients) > manager.regionConcurrency { + r := regions[idx:] + unfinishedRegions = append(unfinishedRegions, r...) + break + } + _, start, _ := codec.DecodeBytes(region.Region.StartKey, []byte{}) + _, end, _ := codec.DecodeBytes(region.Region.EndKey, []byte{}) + if bytes.Compare(startKey, region.Region.StartKey) > 0 { + start = req.start + } + if region.Region.EndKey == nil || len(region.Region.EndKey) == 0 || bytes.Compare(endKey, region.Region.EndKey) < 0 { + end = req.end + } + logger.Debug("[detect-dupe] get duplicate stream", + zap.Int("localStreamID", idx), + logutil.Region(region.Region), + logutil.Leader(region.Leader), + logutil.Key("regionStartKey", start), + logutil.Key("regionEndKey", end)) cli, err := manager.getDuplicateStream(ctx, region, start, end) if err != nil { r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId()) @@ -309,7 +297,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } } else { waitingClients = append(waitingClients, cli) - waitingRegions = append(waitingRegions, region) + watingRegions = append(watingRegions, region) } } @@ -323,7 +311,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } for idx, cli := range waitingClients { - region := waitingRegions[idx] + region := watingRegions[idx] cliLogger := logger.With( zap.Int("localStreamID", idx), logutil.Region(region.Region), @@ -362,9 +350,9 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, cliLogger.Warn("[detect-dupe] meet key error in duplicate detect response from TiKV, retry again ", zap.String("RegionError", resp.GetRegionError().GetMessage())) - r, err := restore.PaginateScanRegion(ctx, manager.splitCli, waitingRegions[idx].Region.GetStartKey(), waitingRegions[idx].Region.GetEndKey(), scanRegionLimit) + r, err := restore.PaginateScanRegion(ctx, manager.splitCli, watingRegions[idx].Region.GetStartKey(), watingRegions[idx].Region.GetEndKey(), scanRegionLimit) if err != nil { - unfinishedRegions = append(unfinishedRegions, waitingRegions[idx]) + unfinishedRegions = append(unfinishedRegions, watingRegions[idx]) } else { unfinishedRegions = append(unfinishedRegions, r...) } @@ -723,7 +711,7 @@ func (manager *DuplicateManager) getDuplicateStream(ctx context.Context, KeyOnly: false, } stream, err := cli.DuplicateDetect(ctx, req) - return stream, errors.Trace(err) + return stream, err } func (manager *DuplicateManager) getImportClient(ctx context.Context, peer *metapb.Peer) (import_sstpb.ImportSSTClient, error) { @@ -731,7 +719,7 @@ func (manager *DuplicateManager) getImportClient(ctx context.Context, peer *meta return manager.makeConn(ctx, peer.GetStoreId()) }) if err != nil { - return nil, errors.Trace(err) + return nil, err } return import_sstpb.NewImportSSTClient(conn), nil }