Skip to content

Commit

Permalink
lightning: improve region actions for huge region number (pingcap#43807)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and ti-chi-bot committed May 17, 2023
1 parent 53aa3fa commit 48d3b78
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 144 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/web",
"//br/pkg/redact",
"//br/pkg/restore/split",
"//br/pkg/storage",
"//br/pkg/utils",
"//br/pkg/version/build",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/checksum",
"//br/pkg/errors",
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ func (m *DupeDetector) processRemoteDupTask(
importClientFactory ImportClientFactory,
regionPool *utils.WorkerPool,
) error {
regionErrRetryAttempts := split.WaitRegionOnlineAttemptTimes
remainAttempts := maxDupCollectAttemptTimes
remainKeyRanges := newPendingKeyRanges(task.KeyRange)
for {
Expand All @@ -869,6 +870,16 @@ func (m *DupeDetector) processRemoteDupTask(
return errors.Trace(err)
}
if !madeProgress {
_, isRegionErr := errors.Cause(err).(regionError)
if isRegionErr && regionErrRetryAttempts > 0 {
regionErrRetryAttempts--
if regionErrRetryAttempts%10 == 0 {
logger.Warn("[detect-dupe] process remote dupTask encounters region error, retrying",
log.ShortError(err), zap.Int("remainRegionErrAttempts", regionErrRetryAttempts))
}
continue
}

remainAttempts--
if remainAttempts <= 0 {
logger.Error("[detect-dupe] all attempts to process the remote dupTask have failed", log.ShortError(err))
Expand Down
13 changes: 9 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,11 @@ type BackendConfig struct {
// compress type when write or ingest into tikv
ConnCompressType config.CompressionType
// concurrency of generateJobForRange and import(write & ingest) workers
WorkerConcurrency int
KVWriteBatchSize int
CheckpointEnabled bool
WorkerConcurrency int
KVWriteBatchSize int
RegionSplitBatchSize int
RegionSplitConcurrency int
CheckpointEnabled bool
// memory table size of pebble. since pebble can have multiple mem tables, the max memory used is
// MemTableSize * MemTableStopWritesThreshold, see pebble.Options for more details.
MemTableSize int
Expand Down Expand Up @@ -429,6 +431,8 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: cfg.TikvImporter.SendKVPairs,
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
CheckpointEnabled: cfg.Checkpoint.Enable,
MemTableSize: int(cfg.TikvImporter.EngineMemCacheSize),
LocalWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
Expand Down Expand Up @@ -1076,6 +1080,7 @@ func (local *Backend) prepareAndSendJob(
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
needSplit = true
})
logger := log.FromContext(ctx).With(zap.Stringer("uuid", engine.UUID)).Begin(zap.InfoLevel, "split and scatter ranges")
for i := 0; i < maxRetryTimes; i++ {
failpoint.Inject("skipSplitAndScatter", func() {
failpoint.Break()
Expand All @@ -1089,8 +1094,8 @@ func (local *Backend) prepareAndSendJob(
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engine.UUID),
log.ShortError(err), zap.Int("retry", i))
}
logger.End(zap.ErrorLevel, err)
if err != nil {
log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engine.UUID), log.ShortError(err))
return err
}

Expand Down
116 changes: 63 additions & 53 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"database/sql"
"math"
"runtime"
"sort"
"strings"
"sync"
Expand All @@ -31,6 +30,7 @@ import (
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand All @@ -53,8 +53,6 @@ const (
)

var (
// the max keys count in a batch to split one region
maxBatchSplitKeys = 4096
// the max total key size in a split region batch.
// our threshold should be smaller than TiKV's raft max entry size(default is 8MB).
maxBatchSplitSize = 6 * units.MiB
Expand Down Expand Up @@ -262,8 +260,7 @@ func (local *Backend) SplitAndScatterRegionByRanges(
}

var syncLock sync.Mutex
// TODO, make this size configurable
size := mathutil.Min(len(splitKeyMap), runtime.GOMAXPROCS(0))
size := mathutil.Min(len(splitKeyMap), local.RegionSplitConcurrency)
ch := make(chan *splitInfo, size)
eg, splitCtx := errgroup.WithContext(ctx)

Expand All @@ -282,7 +279,9 @@ func (local *Backend) SplitAndScatterRegionByRanges(
endIdx := 0
batchKeySize := 0
for endIdx <= len(keys) {
if endIdx == len(keys) || batchKeySize+len(keys[endIdx]) > maxBatchSplitSize || endIdx-startIdx >= maxBatchSplitKeys {
if endIdx == len(keys) ||
batchKeySize+len(keys[endIdx]) > maxBatchSplitSize ||
endIdx-startIdx >= local.RegionSplitBatchSize {
splitRegionStart := codec.EncodeBytes([]byte{}, keys[startIdx])
splitRegionEnd := codec.EncodeBytes([]byte{}, keys[endIdx-1])
if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) {
Expand Down Expand Up @@ -400,7 +399,9 @@ func (local *Backend) SplitAndScatterRegionByRanges(
return nil
}

// BatchSplitRegions splits the region into multiple regions by given split keys.
// BatchSplitRegions will split regions by the given split keys and tries to
// scatter new regions. If split/scatter fails because new region is not ready,
// this function will not return error.
func (local *Backend) BatchSplitRegions(
ctx context.Context,
region *split.RegionInfo,
Expand All @@ -414,36 +415,48 @@ func (local *Backend) BatchSplitRegions(
return nil, nil, errors.Annotatef(err, "batch split regions failed")
}
var failedErr error
retryRegions := make([]*split.RegionInfo, 0)
scatterRegions := newRegions
waitTime := splitRegionBaseBackOffTime
for i := 0; i < maxRetryTimes; i++ {
backoffer := split.NewWaitRegionOnlineBackoffer().(*split.WaitRegionOnlineBackoffer)
_ = utils.WithRetry(ctx, func() error {
retryRegions := make([]*split.RegionInfo, 0)
for _, region := range scatterRegions {
// Wait for a while until the regions successfully splits.
local.waitForSplit(ctx, region.Region.Id)
ok, err2 := local.hasRegion(ctx, region.Region.Id)
if !ok || err2 != nil {
failedErr = err2
if failedErr == nil {
failedErr = errors.Errorf("region %d not found", region.Region.Id)
}
retryRegions = append(retryRegions, region)
continue
}
if err = local.splitCli.ScatterRegion(ctx, region); err != nil {
failedErr = err
retryRegions = append(retryRegions, region)
}
}
if len(retryRegions) == 0 {
break
return nil
}
// if the number of becomes smaller, we can infer TiKV side really
// made some progress so don't increase the retry times.
if len(retryRegions) < len(scatterRegions) {
backoffer.Stat.ReduceRetry()
}
// the scatter operation likely fails because region replicate not finish yet
// pack them to one log to avoid printing a lot warn logs.
log.FromContext(ctx).Warn("scatter region failed", zap.Int("regionCount", len(newRegions)),
zap.Int("failedCount", len(retryRegions)), zap.Error(failedErr), zap.Int("retry", i))
zap.Int("failedCount", len(retryRegions)), zap.Error(failedErr))
scatterRegions = retryRegions
retryRegions = make([]*split.RegionInfo, 0)
select {
case <-time.After(waitTime):
case <-ctx.Done():
return nil, nil, ctx.Err()
}
waitTime *= 2
}

return region, newRegions, nil
// although it's not PDBatchScanRegion, WaitRegionOnlineBackoffer will only
// check this error class so we simply reuse it. Will refine WaitRegionOnlineBackoffer
// later
failedErr = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scatter region failed")
return failedErr
}, backoffer)

// TODO: there's still change that we may skip scatter if the retry is timeout.
return region, newRegions, ctx.Err()
}

func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, error) {
Expand All @@ -454,53 +467,49 @@ func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, err
return regionInfo != nil, nil
}

func (local *Backend) waitForSplit(ctx context.Context, regionID uint64) {
for i := 0; i < split.SplitCheckMaxRetryTimes; i++ {
ok, err := local.hasRegion(ctx, regionID)
if err != nil {
log.FromContext(ctx).Info("wait for split failed", log.ShortError(err))
return
}
if ok {
break
}
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
}
}

func (local *Backend) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var (
retErr error
backoffer = split.NewWaitRegionOnlineBackoffer().(*split.WaitRegionOnlineBackoffer)
)
// WithRetry will return multierr which is hard to use, so we use `retErr`
// to save the error needed to return.
_ = utils.WithRetry(ctx, func() error {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
scattered, err := local.checkRegionScatteredOrReScatter(ctx, region)
if scattered {
scatterCount++
continue
}
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
retErr = err
// return nil to stop retry, the error is saved in `retErr`
return nil
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-subCtx.Done():
return
if len(retryRegions) == 0 {
regions = retryRegions
return nil
}
if len(retryRegions) < len(regions) {
backoffer.Stat.ReduceRetry()
}

regions = retryRegions
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "wait for scatter region failed")
}, backoffer)

if len(regions) > 0 && retErr == nil {
retErr = errors.Errorf("wait for scatter region timeout, print the first unfinished region %v",
regions[0].Region.String())
}
return scatterCount, nil
return scatterCount, retErr
}

func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
Expand All @@ -510,6 +519,7 @@ func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regio
}
// Heartbeat may not be sent to PD
if respErr := resp.GetHeader().GetError(); respErr != nil {
// TODO: why this is OK?
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
Expand Down
Loading

0 comments on commit 48d3b78

Please sign in to comment.