From 9fd5f4a8e4f273a60fbe7d3848f85a1be8f0600b Mon Sep 17 00:00:00 2001
From: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Date: Tue, 27 Jun 2023 22:53:37 +0800
Subject: [PATCH] lightning: fix "context cancel" overwrites the real error
 (#44734) (#45000)

close pingcap/tidb#44733
---
 br/pkg/lightning/backend/local/local.go      | 12 +++--
 br/pkg/lightning/backend/local/local_test.go | 57 ++++++++++++++++++++
 2 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go
index 3cdd67128eceb..6277dd9299983 100644
--- a/br/pkg/lightning/backend/local/local.go
+++ b/br/pkg/lightning/backend/local/local.go
@@ -1145,14 +1145,16 @@ func (local *Backend) generateAndSendJob(
 	for _, jobRange := range jobRanges {
 		r := jobRange
 		eg.Go(func() error {
-			select {
-			case <-egCtx.Done():
+			if egCtx.Err() != nil {
 				return nil
-			default:
 			}
 
+			failpoint.Inject("beforeGenerateJob", nil)
 			jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
 			if err != nil {
+				if common.IsContextCanceledError(err) {
+					return nil
+				}
 				return err
 			}
 			for _, job := range jobs {
@@ -1189,6 +1191,9 @@ func (local *Backend) generateJobForRange(
 	regionSplitSize, regionSplitKeys int64,
 ) ([]*regionJob, error) {
 	failpoint.Inject("fakeRegionJobs", func() {
+		if ctx.Err() != nil {
+			failpoint.Return(nil, ctx.Err())
+		}
 		key := [2]string{string(keyRange.start), string(keyRange.end)}
 		injected := fakeRegionJobs[key]
 		// overwrite the stage to regionScanned, because some time same keyRange
@@ -1565,6 +1570,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
 	jobWg.Wait()
 	workerCancel()
 	firstErr.Set(workGroup.Wait())
+	firstErr.Set(ctx.Err())
 	return firstErr.Get()
 }
 
diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go
index ed98115371f43..902dd906fa040 100644
--- a/br/pkg/lightning/backend/local/local_test.go
+++ b/br/pkg/lightning/backend/local/local_test.go
@@ -2028,3 +2028,60 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
 		}
 	}
 }
+
+func TestCtxCancelIsIgnored(t *testing.T) {
+	backup := maxRetryBackoffSecond
+	maxRetryBackoffSecond = 1
+	t.Cleanup(func() {
+		maxRetryBackoffSecond = backup
+	})
+
+	_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
+	_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()")
+	_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob", "sleep(1000)")
+	_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
+	t.Cleanup(func() {
+		_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
+		_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs")
+		_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob")
+		_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
+	})
+
+	initRanges := []Range{
+		{start: []byte{'c'}, end: []byte{'d'}},
+		{start: []byte{'d'}, end: []byte{'e'}},
+	}
+	fakeRegionJobs = map[[2]string]struct {
+		jobs []*regionJob
+		err  error
+	}{
+		{"c", "d"}: {
+			jobs: []*regionJob{
+				{
+					keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
+					engine:   &Engine{},
+					injected: getSuccessInjectedBehaviour(),
+				},
+			},
+		},
+		{"d", "e"}: {
+			jobs: []*regionJob{
+				{
+					keyRange: Range{start: []byte{'d'}, end: []byte{'e'}},
+					engine:   &Engine{},
+					injected: getSuccessInjectedBehaviour(),
+				},
+			},
+		},
+	}
+
+	ctx := context.Background()
+	l := &Backend{
+		BackendConfig: BackendConfig{
+			WorkerConcurrency: 1,
+		},
+	}
+	e := &Engine{}
+	err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
+	require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
+}