Skip to content

Commit

Permalink
backupccl: enable splitting full ranges during restore
Browse files Browse the repository at this point in the history
Previously sstBatcher had a boolean to control whether it would split
and then scatter a range it was filling when it could determine that it
was about to overfill it i.e. if had previously reported its remaining
capacity to be less than what the batcher was about to add. RESTORE set
this boolean to disable this splitting and scattering as it pre-split and
pre-scattered all of its ranges and thus did not expect the batcher to
need to do any as-it-fills splitting and scattering.

However blanket disabling this makes little sense: if the batcher knows
that it is about to overfill a range, regardless of whether or not that
range was supposed to be pre-split to avoid this, it always makes sense
to split it before adding to it, rather than overfill it.

Thus this change makes as-we-fill splits unconditional, and moves the
boolean to be more narrowly scoped to just disabling the scatters, as
RESTORE still pre-scatters its pre-chunked restore spans. This paves the
way for RESTORE to make and pre-split larger chunks, without worrying if
doing so will cause overfilling of ranges.

Release note: none.

Release justification: bug fix.
  • Loading branch information
dt committed Sep 1, 2022
1 parent 1c96b93 commit 0ee547a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
evalCtx.Settings,
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
false, /* scatterSplitRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
Expand Down
44 changes: 24 additions & 20 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ type SSTBatcher struct {

initialSplitDone bool

// disableScatters controls scatters of the as-we-fill split ranges.
disableScatters bool

// The rest of the fields accumulated state as opposed to configuration. Some,
// like totalRows, are accumulated _across_ batches and are not reset between
// batches when Reset() is called.
stats ingestionPerformanceStats
disableSplits bool
stats ingestionPerformanceStats

// The rest of the fields are per-batch and are reset via Reset() before each
// batch is started.
Expand Down Expand Up @@ -180,7 +182,7 @@ func MakeSSTBatcher(
settings *cluster.Settings,
disallowShadowingBelow hlc.Timestamp,
writeAtBatchTs bool,
splitFilledRanges bool,
scatterSplitRanges bool,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
Expand All @@ -190,7 +192,7 @@ func MakeSSTBatcher(
settings: settings,
disallowShadowingBelow: disallowShadowingBelow,
writeAtBatchTS: writeAtBatchTs,
disableSplits: !splitFilledRanges,
disableScatters: !scatterSplitRanges,
mem: mem,
limiter: sendLimiter,
}
Expand Down Expand Up @@ -475,7 +477,7 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
// than the size that range had when we last added to it, then we should split
// off the suffix of that range where this file starts and add it to that new
// range after scattering it.
if !b.disableSplits && b.lastRange.span.ContainsKey(start) && size >= b.lastRange.remaining {
if b.lastRange.span.ContainsKey(start) && size >= b.lastRange.remaining {
log.VEventf(ctx, 2, "%s batcher splitting full range before adding file starting at %s",
b.name, start)

Expand Down Expand Up @@ -518,21 +520,23 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
} else {
b.stats.splits++

// Now scatter the RHS before we proceed to ingest into it. We know it
// should be empty since we split above if there was a nextExistingKey.
beforeScatter := timeutil.Now()
resp, err := b.db.AdminScatter(ctx, splitAt, maxScatterSize)
b.stats.scatterWait += timeutil.Since(beforeScatter)
if err != nil {
// err could be a max size violation, but this is unexpected since we
// split before, so a warning is probably ok.
log.Warningf(ctx, "%s failed to scatter : %v", b.name, err)
} else {
b.stats.scatters++
moved := sz(resp.ReplicasScatteredBytes)
b.stats.scatterMoved += moved
if moved > 0 {
log.VEventf(ctx, 1, "%s split scattered %s in non-empty range %s", b.name, moved, resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
if !b.disableScatters {
// Now scatter the RHS before we proceed to ingest into it. We know it
// should be empty since we split above if there was a nextExistingKey.
beforeScatter := timeutil.Now()
resp, err := b.db.AdminScatter(ctx, splitAt, maxScatterSize)
b.stats.scatterWait += timeutil.Since(beforeScatter)
if err != nil {
// err could be a max size violation, but this is unexpected since we
// split before, so a warning is probably ok.
log.Warningf(ctx, "%s failed to scatter : %v", b.name, err)
} else {
b.stats.scatters++
moved := sz(resp.ReplicasScatteredBytes)
b.stats.scatterMoved += moved
if moved > 0 {
log.VEventf(ctx, 1, "%s split scattered %s in non-empty range %s", b.name, moved, resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
}
}
}
}
Expand Down

0 comments on commit 0ee547a

Please sign in to comment.