Skip to content

Commit

Permalink
lightning: fix "context cancel" overwrites the real error (#44734) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 27, 2023
1 parent 54f275b commit 9fd5f4a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
12 changes: 9 additions & 3 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,14 +1145,16 @@ func (local *Backend) generateAndSendJob(
for _, jobRange := range jobRanges {
r := jobRange
eg.Go(func() error {
select {
case <-egCtx.Done():
if egCtx.Err() != nil {
return nil
default:
}

failpoint.Inject("beforeGenerateJob", nil)
jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return err
}
for _, job := range jobs {
Expand Down Expand Up @@ -1189,6 +1191,9 @@ func (local *Backend) generateJobForRange(
regionSplitSize, regionSplitKeys int64,
) ([]*regionJob, error) {
failpoint.Inject("fakeRegionJobs", func() {
if ctx.Err() != nil {
failpoint.Return(nil, ctx.Err())
}
key := [2]string{string(keyRange.start), string(keyRange.end)}
injected := fakeRegionJobs[key]
// overwrite the stage to regionScanned, because some time same keyRange
Expand Down Expand Up @@ -1565,6 +1570,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
jobWg.Wait()
workerCancel()
firstErr.Set(workGroup.Wait())
firstErr.Set(ctx.Err())
return firstErr.Get()
}

Expand Down
57 changes: 57 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,3 +2028,60 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
}
}
}

func TestCtxCancelIsIgnored(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob", "sleep(1000)")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
})

initRanges := []Range{
{start: []byte{'c'}, end: []byte{'d'}},
{start: []byte{'d'}, end: []byte{'e'}},
}
fakeRegionJobs = map[[2]string]struct {
jobs []*regionJob
err error
}{
{"c", "d"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
},
},
},
{"d", "e"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'d'}, end: []byte{'e'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
},
},
},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 1,
},
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

0 comments on commit 9fd5f4a

Please sign in to comment.