From 382942f3ea30c53cfde6e308c820c5bf16da869f Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 13 May 2024 11:44:40 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #53134 Signed-off-by: ti-chi-bot --- pkg/ddl/ddl_worker.go | 70 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index ed06d117a07f3..09763c5b89181 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -723,6 +723,12 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) { } func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error { +<<<<<<< HEAD +======= + if err := w.checkBeforeCommit(); err != nil { + return err + } +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) err := w.finishDDLJob(t, job) if err != nil { w.sess.Rollback() @@ -819,6 +825,14 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return 0, err } +<<<<<<< HEAD +======= + if err = w.checkBeforeCommit(); err != nil { + d.unlockSchemaVersion(job.ID) + return 0, err + } + +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() { // If the running job meets an error // and the job state is rolling back, it means that we have already handled this error. @@ -866,6 +880,62 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return schemaVer, nil } +<<<<<<< HEAD +======= +func (w *worker) checkBeforeCommit() error { + if !w.ddlCtx.isOwner() && w.tp != localWorker { + // Since this TiDB instance is not a DDL owner anymore, + // it should not commit any transaction. + w.sess.Rollback() + return dbterror.ErrNotOwner + } + + if err := w.ctx.Err(); err != nil { + // The worker context is canceled, it should not commit any transaction. + return err + } + return nil +} + +// HandleLocalDDLJob handles local ddl job like fast create table. +// Compare with normal ddl job: +// 1. directly insert the job to history job table(incompatible with CDC). +// 2. no need to wait schema version(only support create table now). +// 3. no register mdl info(only support create table now). +func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) { + defer func() { + w.unlockSeqNum(err) + }() + + txn, err := w.prepareTxn(job) + if err != nil { + return err + } + + t := meta.NewMeta(txn, meta.WithUpdateTableName()) + d.mu.RLock() + d.mu.hook.OnJobRunBefore(job) + d.mu.RUnlock() + + _, err = w.runDDLJob(d, t, job) + defer d.unlockSchemaVersion(job.ID) + if err != nil { + return err + } + + d.mu.RLock() + d.mu.hook.OnJobRunAfter(job) + d.mu.RUnlock() + + writeBinlog(d.binlogCli, txn, job) + // reset the SQL digest to make topsql work right. + w.sess.GetSessionVars().StmtCtx.ResetSQLDigest(job.Query) + + job.State = model.JobStateSynced + return w.HandleJobDone(d, job, t) +} + +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { return nil From 32da3d9b1b53ec35ac096523ee66ea4c23f3cae4 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 28 May 2024 15:38:59 +0800 Subject: [PATCH 2/2] resolve conflict --- pkg/ddl/ddl_worker.go | 50 +------------------------------------------ 1 file changed, 1 insertion(+), 49 deletions(-) diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 09763c5b89181..9f6d872942cc9 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -723,12 +723,9 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) { } func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error { -<<<<<<< HEAD -======= if err := w.checkBeforeCommit(); err != nil { return err } ->>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) err := w.finishDDLJob(t, job) if err != nil { w.sess.Rollback() @@ -825,14 +822,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return 0, err } -<<<<<<< HEAD -======= if err = w.checkBeforeCommit(); err != nil { d.unlockSchemaVersion(job.ID) return 0, err } - ->>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() { // If the running job meets an error // and the job state is rolling back, it means that we have already handled this error. @@ -880,10 +873,8 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return schemaVer, nil } -<<<<<<< HEAD -======= func (w *worker) checkBeforeCommit() error { - if !w.ddlCtx.isOwner() && w.tp != localWorker { + if !w.ddlCtx.isOwner() { // Since this TiDB instance is not a DDL owner anymore, // it should not commit any transaction. w.sess.Rollback() @@ -897,45 +888,6 @@ func (w *worker) checkBeforeCommit() error { return nil } -// HandleLocalDDLJob handles local ddl job like fast create table. -// Compare with normal ddl job: -// 1. directly insert the job to history job table(incompatible with CDC). -// 2. no need to wait schema version(only support create table now). -// 3. no register mdl info(only support create table now). -func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) { - defer func() { - w.unlockSeqNum(err) - }() - - txn, err := w.prepareTxn(job) - if err != nil { - return err - } - - t := meta.NewMeta(txn, meta.WithUpdateTableName()) - d.mu.RLock() - d.mu.hook.OnJobRunBefore(job) - d.mu.RUnlock() - - _, err = w.runDDLJob(d, t, job) - defer d.unlockSchemaVersion(job.ID) - if err != nil { - return err - } - - d.mu.RLock() - d.mu.hook.OnJobRunAfter(job) - d.mu.RUnlock() - - writeBinlog(d.binlogCli, txn, job) - // reset the SQL digest to make topsql work right. - w.sess.GetSessionVars().StmtCtx.ResetSQLDigest(job.Query) - - job.State = model.JobStateSynced - return w.HandleJobDone(d, job, t) -} - ->>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { return nil