diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index 566655cbdd163..22a3489b02dca 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -17,6 +17,7 @@ package ddl_test import ( "fmt" "strings" + "sync/atomic" "testing" "time" @@ -28,17 +29,18 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/require" atomicutil "go.uber.org/atomic" ) type testCancelJob struct { - sql string - ok bool - cancelState any // model.SchemaState | []model.SchemaState - onJobBefore bool - onJobUpdate bool - prepareSQL []string + sql string + expectCancelled bool + cancelState any // model.SchemaState | []model.SchemaState + onJobBefore bool + onJobUpdate bool + prepareSQL []string } var allTestCase = []testCancelJob{ @@ -204,6 +206,14 @@ func cancelSuccess(rs *testkit.Result) bool { } func TestCancel(t *testing.T) { + var enterCnt, exitCnt atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) }) + waitDDLWorkerExited := func() { + require.Eventually(t, func() bool { + return enterCnt.Load() == exitCnt.Load() + }, 10*time.Second, 10*time.Millisecond) + } store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 100*time.Millisecond) tk := testkit.NewTestKit(t, store) tkCancel := testkit.NewTestKit(t, store) @@ -244,28 +254,28 @@ func TestCancel(t *testing.T) { hook := &callback.TestDDLCallback{Do: dom} i := atomicutil.NewInt64(0) - cancel := atomicutil.NewBool(false) + canceled := atomicutil.NewBool(false) cancelResult := atomicutil.NewBool(false) cancelWhenReorgNotStart := atomicutil.NewBool(false) hookFunc := func(job *model.Job) { - if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel.Load() { + if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !canceled.Load() { if !cancelWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { return } rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) cancelResult.Store(cancelSuccess(rs)) - cancel.Store(true) + canceled.Store(true) } } dom.DDL().SetHook(hook.Clone()) - restHook := func(h *callback.TestDDLCallback) { + resetHook := func(h *callback.TestDDLCallback) { h.OnJobRunBeforeExported = nil h.OnJobUpdatedExported.Store(nil) dom.DDL().SetHook(h.Clone()) } - registHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) { + registerHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) { if onJobRunBefore { h.OnJobRunBeforeExported = hookFunc } else { @@ -274,41 +284,47 @@ func TestCancel(t *testing.T) { dom.DDL().SetHook(h.Clone()) } + waitDDLWorkerExited() for j, tc := range allTestCase { + t.Logf("running test case %d: %s", j, tc.sql) i.Store(int64(j)) msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState) if tc.onJobBefore { - restHook(hook) + resetHook(hook) for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - cancel.Store(false) + waitDDLWorkerExited() + canceled.Store(false) cancelWhenReorgNotStart.Store(true) - registHook(hook, true) - if tc.ok { + registerHook(hook, true) + if tc.expectCancelled { tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob) } else { tk.MustExec(tc.sql) } - if cancel.Load() { - require.Equal(t, tc.ok, cancelResult.Load(), msg) + waitDDLWorkerExited() + if canceled.Load() { + require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } } if tc.onJobUpdate { - restHook(hook) + resetHook(hook) for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - cancel.Store(false) + waitDDLWorkerExited() + canceled.Store(false) cancelWhenReorgNotStart.Store(false) - registHook(hook, false) - if tc.ok { + registerHook(hook, false) + if tc.expectCancelled { tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob) } else { tk.MustExec(tc.sql) } - if cancel.Load() { - require.Equal(t, tc.ok, cancelResult.Load(), msg) + waitDDLWorkerExited() + if canceled.Load() { + require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } } } diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 7619147884189..fa8234e14916b 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -529,6 +529,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { // delivery2Worker owns the worker, need to put it back to the pool in this function. func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { + failpoint.InjectCall("beforeDelivery2Worker", job) injectFailPointForGetJob(job) jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo() s.runningJobs.add(jobID, involvedSchemaInfos) @@ -561,7 +562,7 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. // job is already moved to history. failpoint.InjectCall("beforeRefreshJob", job) for { - job, err = s.sysTblMgr.GetJobByID(s.schCtx, job.ID) + job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID) failpoint.InjectCall("mockGetJobByIDFail", &err) if err == nil { break @@ -569,10 +570,10 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. if err == systable.ErrNotFound { logutil.DDLLogger().Info("job not found, might already finished", - zap.Int64("job_id", job.ID), zap.Stringer("state", job.State)) + zap.Int64("job_id", jobID)) return } - logutil.DDLLogger().Error("get job failed", zap.Error(err)) + logutil.DDLLogger().Error("get job failed", zap.Int64("job_id", jobID), zap.Error(err)) select { case <-s.schCtx.Done(): return diff --git a/pkg/ddl/tests/adminpause/BUILD.bazel b/pkg/ddl/tests/adminpause/BUILD.bazel index 30c658808ec7f..34034000e12f4 100644 --- a/pkg/ddl/tests/adminpause/BUILD.bazel +++ b/pkg/ddl/tests/adminpause/BUILD.bazel @@ -39,6 +39,7 @@ go_test( "//pkg/errno", "//pkg/parser/model", "//pkg/testkit", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", "//pkg/util/sqlexec", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/ddl/tests/adminpause/pause_cancel_test.go b/pkg/ddl/tests/adminpause/pause_cancel_test.go index c526eab013236..7f7f642c68913 100644 --- a/pkg/ddl/tests/adminpause/pause_cancel_test.go +++ b/pkg/ddl/tests/adminpause/pause_cancel_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -90,7 +91,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit var isCancelled = &atomic.Bool{} var cancelResultChn = make(chan []sqlexec.RecordSet, 1) var cancelErrChn = make(chan error, 1) - var cancelFunc = func(jobType string) { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob", func(*model.Job) { Logger.Debug("pauseAndCancelStmt: OnGetJobBeforeExported, ", zap.String("Expected Schema State", stmtCase.schemaState.String())) @@ -107,7 +108,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit isCancelled.CompareAndSwap(false, true) // In case that it runs into this scope again and again } - } + }) var verifyCancelResult = func(t *testing.T, adminCommandKit *testkit.TestKit) { require.True(t, isCancelled.Load()) @@ -130,7 +131,6 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit originalHook := dom.DDL().GetHook() hook.OnJobRunBeforeExported = pauseFunc - hook.OnGetJobBeforeExported = cancelFunc dom.DDL().SetHook(hook.Clone()) isPaused.Store(false) @@ -152,6 +152,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit // Release the hook, so that we could run the `rollbackStmts` successfully. dom.DDL().SetHook(originalHook) + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob") for _, rollbackStmt := range stmtCase.rollbackStmts { // no care about the result here, since the `statement` could have been cancelled OR finished successfully. diff --git a/pkg/ddl/testutil/testutil.go b/pkg/ddl/testutil/testutil.go index fb6e44df1c2fb..4b977654b7bd8 100644 --- a/pkg/ddl/testutil/testutil.go +++ b/pkg/ddl/testutil/testutil.go @@ -107,14 +107,14 @@ func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql str switch v := cancelState.(type) { case model.SchemaState: if job.Type == model.ActionMultiSchemaChange { - msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v) + msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s, job: %s)", sql, v, job.String()) require.Failf(t, msg, "use []model.SchemaState as cancel states instead") return false } return job.SchemaState == v case SubStates: // For multi-schema change sub-jobs. if job.MultiSchemaInfo == nil { - msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v) + msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v, job: %s)", sql, v, job.String()) require.Failf(t, msg, "use model.SchemaState as the cancel state instead") return false } diff --git a/pkg/testkit/testfailpoint/failpoint.go b/pkg/testkit/testfailpoint/failpoint.go index eec483115a077..381b53c129c23 100644 --- a/pkg/testkit/testfailpoint/failpoint.go +++ b/pkg/testkit/testfailpoint/failpoint.go @@ -36,3 +36,8 @@ func EnableCall(t testing.TB, name string, fn any) { require.NoError(t, failpoint.Disable(name)) }) } + +// Disable disables fail-point. +func Disable(t testing.TB, name string) { + require.NoError(t, failpoint.Disable(name)) +}