From f842b7258200cef8412dca0eab701ad0f861ec93 Mon Sep 17 00:00:00 2001 From: dhysum <130652266+dhysum@users.noreply.github.com> Date: Tue, 9 May 2023 22:32:08 +0800 Subject: [PATCH] This is an automated cherry-pick of #43171 Signed-off-by: ti-chi-bot --- ddl/BUILD.bazel | 1 + ddl/db_test.go | 72 ++++++++- ddl/ddl.go | 259 ++++++++++++++++++++++++------- ddl/pause_test.go | 301 ++++++++++++++++++++++++++++++++++++ errno/errcode.go | 5 + errno/errname.go | 4 + errors.toml | 15 ++ executor/builder.go | 33 +++- executor/executor.go | 37 ++++- parser/model/ddl.go | 23 +++ util/dbterror/ddl_terror.go | 8 +- 11 files changed, 692 insertions(+), 66 deletions(-) create mode 100644 ddl/pause_test.go diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 639e09e1c619f..90dd1be856986 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -197,6 +197,7 @@ go_test( "mv_index_test.go", "options_test.go", "partition_test.go", + "pause_test.go", "placement_policy_ddl_test.go", "placement_policy_test.go", "placement_sql_test.go", diff --git a/ddl/db_test.go b/ddl/db_test.go index d155a98d2a79c..8ff727a108837 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1106,8 +1106,10 @@ func TestCancelJobWriteConflict(t *testing.T) { stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`)) - defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL")) + }() rs, cancelErr = tk2.Session().Execute(context.Background(), stmt) } } @@ -1131,6 +1133,72 @@ func TestCancelJobWriteConflict(t *testing.T) { result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID))) } +func TestPauseJobWriteConflict(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) + + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + + tk1.MustExec("use test") + + tk1.MustExec("create table t(id int)") + + var jobID int64 + var pauseErr error + var pauseRS []sqlexec.RecordSet + hook := &callback.TestDDLCallback{Do: dom} + d := dom.DDL() + originalHook := d.GetHook() + d.SetHook(hook) + defer d.SetHook(originalHook) + + // Test when pause cannot be retried and adding index succeeds. + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`)) + defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL")) + }() + + jobID = job.ID + stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID) + pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt) + } + } + tk1.MustExec("alter table t add index (id)") + require.EqualError(t, pauseErr, "mock commit error") + + var cancelRS []sqlexec.RecordSet + var cancelErr error + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`)) + defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL")) + }() + + jobID = job.ID + stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID) + pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt) + + time.Sleep(5 * time.Second) + stmt = fmt.Sprintf("admin cancel ddl jobs %d", jobID) + cancelRS, cancelErr = tk2.Session().Execute(context.Background(), stmt) + } + } + tk1.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob) + require.NoError(t, pauseErr) + require.NoError(t, cancelErr) + result := tk2.ResultSetToResultWithCtx(context.Background(), pauseRS[0], "pause ddl job successfully") + result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID))) + result = tk2.ResultSetToResultWithCtx(context.Background(), cancelRS[0], "cancel ddl job successfully") + result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID))) +} + func TestTxnSavepointWithDDL(t *testing.T) { store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl.go b/ddl/ddl.go index 0287221558b55..216b4093bcda3 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1091,7 +1091,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { continue } sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat. - errs, err := CancelJobs(se, []int64{jobID}) + errs, err := CancelJobsBySystem(se, []int64{jobID}) d.sessPool.Put(se) if len(errs) > 0 { logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0])) @@ -1425,85 +1425,238 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) { return generalJob, reorgJob, nil } -// CancelJobs cancels the DDL jobs. -func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) { - return cancelConcurrencyJobs(se, ids) +// cancelRunningJob cancel a DDL job that is in the concurrent state. +func cancelRunningJob(sess *sess.Session, job *model.Job, + byWho model.AdminCommandOperator) (err error) { + // These states can't be cancelled. + if job.IsDone() || job.IsSynced() { + return dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID) + } + + // If the state is rolling back, it means the work is cleaning the data after cancelling the job. + if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { + return nil + } + + if !job.IsRollbackable() { + return dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) + } + job.State = model.JobStateCancelling + job.AdminOperator = byWho + + // Make sure RawArgs isn't overwritten. + return json.Unmarshal(job.RawArgs, &job.Args) } -// cancelConcurrencyJobs cancels the DDL jobs that are in the concurrent state. -func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error) { - failpoint.Inject("mockCancelConcurencyDDL", func(val failpoint.Value) { +// pauseRunningJob check and pause the running Job +func pauseRunningJob(sess *sess.Session, job *model.Job, + byWho model.AdminCommandOperator) (err error) { + // It would be much better doing this filter during `getJobsBySQL`, but not now. + if !job.IsPausable() { + err = dbterror.ErrCannotPauseDDLJob.GenWithStackByArgs(job.ID) + if err != nil { + return err + } + } + + job.State = model.JobStatePausing + job.AdminOperator = byWho + + return json.Unmarshal(job.RawArgs, &job.Args) +} + +// resumePausedJob check and resume the Paused Job +func resumePausedJob(se *sess.Session, job *model.Job, + byWho model.AdminCommandOperator) (err error) { + if !job.IsResumable() || + // The Paused job should only be resumed by who paused it + job.AdminOperator != byWho { + return dbterror.ErrCannotResumeDDLJob.GenWithStackByArgs(job.ID) + } + + job.State = model.JobStateQueueing + + return json.Unmarshal(job.RawArgs, &job.Args) +} + +// processJobs command on the Job according to the process +func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), + sessCtx sessionctx.Context, + ids []int64, + byWho model.AdminCommandOperator) ([]error, error) { + failpoint.Inject("mockFailedCommandOnConcurencyDDL", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil, errors.New("mock commit error")) } }) + if len(ids) == 0 { return nil, nil } - var jobMap = make(map[int64]int) // jobID -> error index - sessCtx := sess.NewSession(se) - err := sessCtx.Begin() - if err != nil { - return nil, err - } + ns := sess.NewSession(sessCtx) + var errs []error - idsStr := make([]string, 0, len(ids)) - for idx, id := range ids { - jobMap[id] = idx - idsStr = append(idsStr, strconv.FormatInt(id, 10)) - } + // We should process (and try) all the jobs in one Transaction. + for tryN := uint(0); tryN < 10; tryN += 1 { + errs = make([]error, len(ids)) + // Need to figure out which one could not be paused + jobMap := make(map[int64]int, len(ids)) + idsStr := make([]string, 0, len(ids)) + for idx, id := range ids { + jobMap[id] = idx + idsStr = append(idsStr, strconv.FormatInt(id, 10)) + } - jobs, err := getJobsBySQL(sessCtx, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", "))) - if err != nil { - sessCtx.Rollback() - return nil, err - } + err := ns.Begin() + if err != nil { + return nil, err + } + jobs, err := getJobsBySQL(ns, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", "))) + if err != nil { + ns.Rollback() + return nil, err + } - errs := make([]error, len(ids)) + for _, job := range jobs { + i, ok := jobMap[job.ID] + if !ok { + logutil.BgLogger().Debug("Job ID from meta is not consistent with requested job id,", + zap.Int64("fetched job ID", job.ID)) + errs[i] = dbterror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) + continue + } + delete(jobMap, job.ID) - for _, job := range jobs { - i, ok := jobMap[job.ID] - if !ok { - logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job", - zap.Int64("need to canceled job ID", job.ID), - zap.Int64("current job ID", job.ID)) - continue + err = process(ns, job, byWho) + if err != nil { + errs[i] = err + break + } + + err = updateDDLJob2Table(ns, job, true) + if err != nil { + break + } } - delete(jobMap, job.ID) - // These states can't be cancelled. - if job.IsDone() || job.IsSynced() { - errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID) + // We may meet some error on job update, try it again + if err != nil { + ns.Rollback() continue } - // If the state is rolling back, it means the work is cleaning the data after cancelling the job. - if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { + + // There may be some conflict during the update, try it again + if ns.Commit() != nil { continue } - if !job.IsRollbackable() { - errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) - continue + + for id, idx := range jobMap { + errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id) } - job.State = model.JobStateCancelling - // Make sure RawArgs isn't overwritten. - err := json.Unmarshal(job.RawArgs, &job.Args) + + break + } + return errs, nil +} + +// CancelJobs cancels the DDL jobs according to user command. +func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) { + return processJobs(cancelRunningJob, se, ids, model.AdminCommandByEndUser) +} + +// PauseJobs pause all the DDL jobs according to user command. +func PauseJobs(se sessionctx.Context, ids []int64) ([]error, error) { + return processJobs(pauseRunningJob, se, ids, model.AdminCommandByEndUser) +} + +// ResumeJobs resume all the DDL jobs according to user command. +func ResumeJobs(se sessionctx.Context, ids []int64) ([]error, error) { + return processJobs(resumePausedJob, se, ids, model.AdminCommandByEndUser) +} + +// CancelJobsBySystem cancels Jobs because of internal reasons. +func CancelJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { + return processJobs(cancelRunningJob, se, ids, model.AdminCommandBySystem) +} + +// PauseJobsBySystem pauses Jobs because of internal reasons. +func PauseJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { + return processJobs(pauseRunningJob, se, ids, model.AdminCommandBySystem) +} + +// ResumeJobsBySystem resumes Jobs that are paused by TiDB itself. +func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) { + return processJobs(resumePausedJob, se, ids, model.AdminCommandBySystem) +} + +// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage. +func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), + se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) { + var err error + var jobErrs = make(map[int64]error) + + ns := sess.NewSession(se) + err = ns.Begin() + if err != nil { + return nil, err + } + + var jobID int64 = 0 + var jobIDMax int64 = 0 + var limit int = 100 + for { + var jobs []*model.Job + jobs, err = getJobsBySQL(ns, JobTable, + fmt.Sprintf("job_id >= %s order by job_id asc limit %s", + strconv.FormatInt(jobID, 10), + strconv.FormatInt(int64(limit), 10))) if err != nil { - errs[i] = errors.Trace(err) - continue + ns.Rollback() + return nil, err } - err = updateDDLJob2Table(sessCtx, job, true) - if err != nil { - errs[i] = errors.Trace(err) + + for _, job := range jobs { + err = process(ns, job, byWho) + if err != nil { + jobErrs[job.ID] = err + ns.Rollback() + return jobErrs, err + } + err = updateDDLJob2Table(ns, job, true) + if err != nil { + ns.Rollback() + return jobErrs, err + } + } + + // Just in case the job ID is not sequential + if jobs[len(jobs)-1].ID > jobIDMax { + jobIDMax = jobs[len(jobs)-1].ID } + + // If rows returned is smaller than $limit, then there is no more records + if len(jobs) < limit { + break + } + + jobID = jobIDMax + 1 } - err = sessCtx.Commit() + + err = ns.Commit() if err != nil { return nil, err } - for id, idx := range jobMap { - errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id) - } - return errs, nil + return jobErrs, nil +} + +// PauseAllJobsBySystem pauses all running Jobs because of internal reasons. +func PauseAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) { + return processAllJobs(pauseRunningJob, se, model.AdminCommandBySystem) +} + +// ResumeAllJobsBySystem resumes all paused Jobs because of internal reasons. +func ResumeAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) { + return processAllJobs(resumePausedJob, se, model.AdminCommandBySystem) } // GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID. diff --git a/ddl/pause_test.go b/ddl/pause_test.go new file mode 100644 index 0000000000000..b7150a2c76ce0 --- /dev/null +++ b/ddl/pause_test.go @@ -0,0 +1,301 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/internal/callback" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + atomicutil "go.uber.org/atomic" +) + +type testPauseAndResumeJob struct { + sql string + ok bool + jobState interface{} // model.SchemaState | []model.SchemaState + onJobBefore bool + onJobUpdate bool + prepareSQL []string +} + +type TestTableUser struct { + id int64 + user string + name string + age int + province string + city string + phone string + createdTime time.Time + updatedTime time.Time +} + +func generateString(letterRunes []rune, length int) (string, error) { + b := make([]rune, length) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b), nil +} + +func generateName(length int) (string, error) { + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ") + return generateString(letterRunes, length) +} + +func generatePhone(length int) (string, error) { + var numberRunes = []rune("0123456789") + return generateString(numberRunes, length) +} + +func (tu *TestTableUser) generateAttributes() (err error) { + tu.user, err = generateName(rand.Intn(127)) + if err != nil { + return err + } + tu.name, err = generateName(rand.Intn(127)) + if err != nil { + return err + } + + tu.age = rand.Intn(100) + + tu.province, err = generateName(rand.Intn(32)) + if err != nil { + return err + } + tu.city, err = generateName(rand.Intn(32)) + if err != nil { + return err + } + tu.phone, err = generatePhone(14) + if err != nil { + return err + } + tu.createdTime = time.Now() + tu.updatedTime = time.Now() + + return nil +} + +func (tu *TestTableUser) insertStmt() string { + return fmt.Sprintf("INSERT INTO t_user(user, name, age, province, city, phone, created_time, updated_time) VALUES ('%s', '%s', %d, '%s', '%s', '%s', '%s', '%s')", + tu.user, tu.name, tu.age, tu.province, tu.city, tu.phone, tu.createdTime, tu.updatedTime) +} + +var allPauseJobTestCase = []testPauseAndResumeJob{ + // Add primary key + {"alter table t_user add primary key idx_id (id)", true, model.StateNone, true, false, nil}, + {"alter table t_user add primary key idx_id (id)", true, model.StateDeleteOnly, true, true, nil}, + {"alter table t_user add primary key idx_id (id)", true, model.StateWriteOnly, true, true, nil}, + {"alter table t_user add primary key idx_id (id)", true, model.StateWriteReorganization, true, true, nil}, + {"alter table t_user add primary key idx_id (id)", false, model.StatePublic, false, true, nil}, + + // Drop primary key + {"alter table t_user drop primary key", true, model.StatePublic, true, false, nil}, + {"alter table t_user drop primary key", false, model.StateWriteOnly, true, false, nil}, + {"alter table t_user drop primary key", false, model.StateWriteOnly, true, false, []string{"alter table t_user add primary key idx_id (id)"}}, + {"alter table t_user drop primary key", false, model.StateDeleteOnly, true, false, []string{"alter table t_user add primary key idx_id (id)"}}, + {"alter table t_user drop primary key", false, model.StateDeleteOnly, false, true, []string{"alter table t_user add primary key idx_id (id)"}}, + + // Add unique key + {"alter table t_user add unique index idx_name (id)", true, model.StateNone, true, false, nil}, + {"alter table t_user add unique index idx_name (id)", true, model.StateDeleteOnly, true, true, nil}, + {"alter table t_user add unique index idx_name (id)", true, model.StateWriteOnly, true, true, nil}, + {"alter table t_user add unique index idx_name (id)", true, model.StateWriteReorganization, true, true, nil}, + {"alter table t_user add unique index idx_name (id)", false, model.StatePublic, false, true, nil}, + + {"alter table t_user add index idx_phone (phone)", true, model.StateNone, true, false, nil}, + {"alter table t_user add index idx_phone (phone)", true, model.StateDeleteOnly, true, true, nil}, + {"alter table t_user add index idx_phone (phone)", true, model.StateWriteOnly, true, true, nil}, + {"alter table t_user add index idx_phone (phone)", false, model.StatePublic, false, true, nil}, + + // Add column. + {"alter table t_user add column c4 bigint", true, model.StateNone, true, false, nil}, + {"alter table t_user add column c4 bigint", true, model.StateDeleteOnly, true, true, nil}, + {"alter table t_user add column c4 bigint", true, model.StateWriteOnly, true, true, nil}, + {"alter table t_user add column c4 bigint", true, model.StateWriteReorganization, true, true, nil}, + {"alter table t_user add column c4 bigint", false, model.StatePublic, false, true, nil}, + + // Create table. + {"create table test_create_table(a int)", true, model.StateNone, true, false, nil}, + {"create table test_create_table(a int)", false, model.StatePublic, false, true, nil}, + + // Drop table. + {"drop table test_create_table", true, model.StatePublic, true, false, nil}, + {"drop table test_create_table", false, model.StateWriteOnly, true, true, []string{"create table if not exists test_create_table(a int)"}}, + {"drop table test_create_table", false, model.StateDeleteOnly, true, true, []string{"create table if not exists test_create_table(a int)"}}, + {"drop table test_create_table", false, model.StateNone, false, true, []string{"create table if not exists test_create_table(a int)"}}, + + // Create schema. + {"create database test_create_db", true, model.StateNone, true, false, nil}, + {"create database test_create_db", false, model.StatePublic, false, true, nil}, + + // Drop schema. + {"drop database test_create_db", true, model.StatePublic, true, false, nil}, + {"drop database test_create_db", false, model.StateWriteOnly, true, true, []string{"create database if not exists test_create_db"}}, + {"drop database test_create_db", false, model.StateDeleteOnly, true, true, []string{"create database if not exists test_create_db"}}, + {"drop database test_create_db", false, model.StateNone, false, true, []string{"create database if not exists test_create_db"}}, + + // Drop column. + {"alter table t_user drop column c3", true, model.StatePublic, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user drop column c3", false, model.StateDeleteOnly, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user drop column c3", false, model.StateDeleteOnly, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user drop column c3", false, model.StateWriteOnly, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user drop column c3", false, model.StateDeleteReorganization, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user drop column c3", false, model.StateNone, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + + // Drop column with index. + {"alter table t_user drop column c3", true, model.StatePublic, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + {"alter table t_user drop column c3", false, model.StateDeleteOnly, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + {"alter table t_user drop column c3", false, model.StateDeleteOnly, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + {"alter table t_user drop column c3", false, model.StateWriteOnly, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + {"alter table t_user drop column c3", false, model.StateDeleteReorganization, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + {"alter table t_user drop column c3", false, model.StateNone, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint", "alter table t_user add index idx_c3(c3)"}}, + + // Modify column, no reorg. + {"alter table t_user modify column c3 mediumint", true, model.StateNone, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user modify column c3 int", false, model.StatePublic, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + + // Modify column, reorg. + {"alter table t_user modify column c3 char(10)", true, model.StateNone, true, false, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user modify column c3 char(10)", true, model.StateDeleteOnly, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user modify column c3 char(10)", true, model.StateWriteOnly, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user modify column c3 char(10)", true, model.StateWriteReorganization, true, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, + {"alter table t_user modify column c3 char(10)", false, model.StatePublic, false, true, []string{"alter table t_user drop column if exists c3", "alter table t_user add column c3 bigint"}}, +} + +func isCommandSuccess(rs *testkit.Result) bool { + return strings.Contains(rs.Rows()[0][1].(string), "success") +} + +func TestPauseAndResumeMain(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 100*time.Millisecond) + tk := testkit.NewTestKit(t, store) + tkCommand := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE if not exists t_user ( + id int(11) NOT NULL AUTO_INCREMENT, + user varchar(128) NOT NULL, + name varchar(128) NOT NULL, + age int(11) NOT NULL, + province varchar(32) NOT NULL DEFAULT '', + city varchar(32) NOT NULL DEFAULT '', + phone varchar(16) NOT NULL DEFAULT '', + created_time datetime NOT NULL, + updated_time datetime NOT NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`) + + idx := 0 + rowCount := 100000 + tu := &TestTableUser{} + for idx < rowCount { + _ = tu.generateAttributes() + tk.MustExec(tu.insertStmt()) + + idx += 1 + } + + logger := logutil.BgLogger() + ddl.ReorgWaitTimeout = 10 * time.Millisecond + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 2") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + tk = testkit.NewTestKit(t, store) + tk.MustExec("use test") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockBackfillSlow", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockBackfillSlow")) + }() + + hook := &callback.TestDDLCallback{Do: dom} + i := atomicutil.NewInt64(0) + + isPaused := atomicutil.NewBool(false) + pauseWhenReorgNotStart := atomicutil.NewBool(false) + isCancelled := atomicutil.NewBool(false) + cancelWhenReorgNotStart := atomicutil.NewBool(false) + commandHook := func(job *model.Job) { + logger.Info("allPauseJobTestCase commandHook: " + job.String()) + if testMatchCancelState(t, job, allPauseJobTestCase[i.Load()].jobState, allPauseJobTestCase[i.Load()].sql) && !isPaused.Load() { + logger.Info("allPauseJobTestCase commandHook: pass the check") + if !pauseWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { + logger.Info("allPauseJobTestCase commandHook: reorg, return") + return + } + rs := tkCommand.MustQuery(fmt.Sprintf("admin pause ddl jobs %d", job.ID)) + logger.Info("allPauseJobTestCase commandHook: " + rs.Rows()[0][1].(string)) + isPaused.Store(isCommandSuccess(rs)) + time.Sleep(1 * time.Second) + + rs = tkCommand.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) + logger.Info("allPauseJobTestCase cancelHook: " + rs.Rows()[0][1].(string)) + isCancelled.Store(isCommandSuccess(rs)) + } + } + + dom.DDL().SetHook(hook.Clone()) + + restHook := func(h *callback.TestDDLCallback) { + h.OnJobRunBeforeExported = nil + h.OnJobRunAfterExported = nil + dom.DDL().SetHook(h.Clone()) + } + + registHook := func(h *callback.TestDDLCallback) { + h.OnJobRunBeforeExported = commandHook + dom.DDL().SetHook(h.Clone()) + } + + for idx, tc := range allPauseJobTestCase { + i.Store(int64(idx)) + msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.jobState) + + logger.Info("allPauseJobTestCase: " + msg) + + restHook(hook) + for _, prepareSQL := range tc.prepareSQL { + logger.Info("Prepare SQL:" + prepareSQL) + tk.MustExec(prepareSQL) + } + + isPaused.Store(false) + isCancelled.Store(false) + pauseWhenReorgNotStart.Store(false) + cancelWhenReorgNotStart.Store(false) + registHook(hook) + + if tc.ok { + tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob) + require.Equal(t, tc.ok, isPaused.Load(), msg) + require.Equal(t, tc.ok, isCancelled.Load(), msg) + } else { + tk.MustExec(tc.sql) + } + + // TODO: should add some check on Job during reorganization + } +} diff --git a/errno/errcode.go b/errno/errcode.go index 2bd9c1d760458..fe92490817c6c 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1112,6 +1112,11 @@ const ( ErrColumnInChange = 8245 ErrDDLSetting = 8246 ErrIngestFailed = 8247 + + ErrCannotPauseDDLJob = 8260 + ErrCannotResumeDDLJob = 8261 + ErrPausedDDLJob = 8262 + // Resource group errors. ErrResourceGroupExists = 8248 ErrResourceGroupNotExists = 8249 diff --git a/errno/errname.go b/errno/errname.go index 2f0283693d544..df9ce39e2c0ba 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1138,4 +1138,8 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrPrometheusAddrIsNotSet: mysql.Message("Prometheus address is not set in PD and etcd", nil), ErrTiKVStaleCommand: mysql.Message("TiKV server reports stale command", nil), ErrTiKVMaxTimestampNotSynced: mysql.Message("TiKV max timestamp is not synced", nil), + + ErrCannotPauseDDLJob: mysql.Message("Job [%v] can't be paused now", nil), + ErrCannotResumeDDLJob: mysql.Message("Job [%v] can't be resumed", nil), + ErrPausedDDLJob: mysql.Message("Job [%v] already paused", nil), } diff --git a/errors.toml b/errors.toml index ff97f7ad710d4..5d966da891305 100644 --- a/errors.toml +++ b/errors.toml @@ -1406,6 +1406,21 @@ error = ''' Ingest failed: %s ''' +["ddl:8260"] +error = ''' +Job [%v] can't be paused now +''' + +["ddl:8261"] +error = ''' +Job [%v] can't be resumed +''' + +["ddl:8262"] +error = ''' +Job [%v] already paused +''' + ["domain:8027"] error = ''' Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV diff --git a/executor/builder.go b/executor/builder.go index 76566687d1543..a08df0623d2c4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -206,6 +206,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildSelectLock(v) case *plannercore.CancelDDLJobs: return b.buildCancelDDLJobs(v) + case *plannercore.PauseDDLJobs: + return b.buildPauseDDLJobs(v) + case *plannercore.ResumeDDLJobs: + return b.buildResumeDDLJobs(v) case *plannercore.ShowNextRowID: return b.buildShowNextRowID(v) case *plannercore.ShowDDL: @@ -310,8 +314,33 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor { e := &CancelDDLJobsExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - jobIDs: v.JobIDs, + CommandDDLJobsExec: &CommandDDLJobsExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + jobIDs: v.JobIDs, + execute: ddl.CancelJobs, + }, + } + return e +} + +func (b *executorBuilder) buildPauseDDLJobs(v *plannercore.PauseDDLJobs) Executor { + e := &PauseDDLJobsExec{ + CommandDDLJobsExec: &CommandDDLJobsExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + jobIDs: v.JobIDs, + execute: ddl.PauseJobs, + }, + } + return e +} + +func (b *executorBuilder) buildResumeDDLJobs(v *plannercore.ResumeDDLJobs) Executor { + e := &ResumeDDLJobsExec{ + CommandDDLJobsExec: &CommandDDLJobsExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + jobIDs: v.JobIDs, + execute: ddl.ResumeJobs, + }, } return e } diff --git a/executor/executor.go b/executor/executor.go index 291e0b4e9a7bc..4bf8071e1cee2 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -335,29 +335,35 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error { return err } -// CancelDDLJobsExec represents a cancel DDL jobs executor. -type CancelDDLJobsExec struct { +// CommandDDLJobsExec is the general struct for Cancel/Pause/Resume commands on +// DDL jobs. These command currently by admin have the very similar struct and +// operations, it should be a better idea to have them in the same struct. +type CommandDDLJobsExec struct { baseExecutor cursor int jobIDs []int64 errs []error + + execute func(se sessionctx.Context, ids []int64) (errs []error, err error) } -// Open implements the Executor Open interface. -func (e *CancelDDLJobsExec) Open(ctx context.Context) error { +// Open implements the Executor for all Cancel/Pause/Resume command on DDL jobs +// just with different processes. And, it should not be called directly by the +// Executor. +func (e *CommandDDLJobsExec) Open(ctx context.Context) error { // We want to use a global transaction to execute the admin command, so we don't use e.ctx here. newSess, err := e.getSysSession() if err != nil { return err } - e.errs, err = ddl.CancelJobs(newSess, e.jobIDs) + e.errs, err = e.execute(newSess, e.jobIDs) e.releaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess) return err } -// Next implements the Executor Next interface. -func (e *CancelDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error { +// Next implements the Executor Next interface for Cancel/Pause/Resume +func (e *CommandDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobIDs) { return nil @@ -375,6 +381,21 @@ func (e *CancelDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } +// CancelDDLJobsExec represents a cancel DDL jobs executor. +type CancelDDLJobsExec struct { + *CommandDDLJobsExec +} + +// PauseDDLJobsExec indicates an Executor for Pause a DDL Job. +type PauseDDLJobsExec struct { + *CommandDDLJobsExec +} + +// ResumeDDLJobsExec indicates an Executor for Resume a DDL Job. +type ResumeDDLJobsExec struct { + *CommandDDLJobsExec +} + // ShowNextRowIDExec represents a show the next row ID executor. type ShowNextRowIDExec struct { baseExecutor @@ -640,7 +661,7 @@ func ts2Time(timestamp uint64, loc *time.Location) types.Time { // ShowDDLJobQueriesExec represents a show DDL job queries executor. // The jobs id that is given by 'admin show ddl job queries' statement, -// only be searched in the latest 10 history jobs +// only be searched in the latest 10 history jobs. type ShowDDLJobQueriesExec struct { baseExecutor diff --git a/parser/model/ddl.go b/parser/model/ddl.go index d6d8790962382..618c96e572752 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -737,6 +737,29 @@ func (job *Job) IsCancelling() bool { return job.State == JobStateCancelling } +<<<<<<< HEAD +======= +// IsPaused returns whether the job is paused. +func (job *Job) IsPaused() bool { + return job.State == JobStatePaused +} + +// IsPausing indicates whether the job is pausing. +func (job *Job) IsPausing() bool { + return job.State == JobStatePausing +} + +// IsPausable checks whether we can pause the job. +func (job *Job) IsPausable() bool { + return job.NotStarted() || (job.IsRunning() && job.IsRollbackable()) +} + +// IsResumable checks whether the job can be rollback. +func (job *Job) IsResumable() bool { + return job.IsPaused() +} + +>>>>>>> 0d3ad533863 (ddl, executor: executor change for pause/resume on ddl jobs (#43171)) // IsSynced returns whether the DDL modification is synced among all TiDB servers. func (job *Job) IsSynced() bool { return job.State == JobStateSynced diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index ddacf77c025ef..079b9f533cf54 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -32,6 +32,8 @@ var ( ErrInvalidDDLJob = ClassDDL.NewStd(mysql.ErrInvalidDDLJob) // ErrCancelledDDLJob means the DDL job is cancelled. ErrCancelledDDLJob = ClassDDL.NewStd(mysql.ErrCancelledDDLJob) + // ErrPausedDDLJob returns when the DDL job cannot be paused. + ErrPausedDDLJob = ClassDDL.NewStd(mysql.ErrPausedDDLJob) // ErrRunMultiSchemaChanges means we run multi schema changes. ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change for %s"), nil)) // ErrOperateSameColumn means we change the same columns multiple times in a DDL. @@ -391,7 +393,11 @@ var ( ErrCancelFinishedDDLJob = ClassDDL.NewStd(mysql.ErrCancelFinishedDDLJob) // ErrCannotCancelDDLJob returns when cancel a almost finished ddl job, because cancel in now may cause data inconsistency. ErrCannotCancelDDLJob = ClassDDL.NewStd(mysql.ErrCannotCancelDDLJob) - // ErrDDLSetting returns when failing to enable/disable DDL + // ErrCannotPauseDDLJob returns when the State is not qualified to be paused. + ErrCannotPauseDDLJob = ClassDDL.NewStd(mysql.ErrCannotPauseDDLJob) + // ErrCannotResumeDDLJob returns when the State is not qualified to be resumed. + ErrCannotResumeDDLJob = ClassDDL.NewStd(mysql.ErrCannotResumeDDLJob) + // ErrDDLSetting returns when failing to enable/disable DDL. ErrDDLSetting = ClassDDL.NewStd(mysql.ErrDDLSetting) // ErrIngestFailed returns when the DDL ingest job is failed. ErrIngestFailed = ClassDDL.NewStd(mysql.ErrIngestFailed)