From 382942f3ea30c53cfde6e308c820c5bf16da869f Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 13 May 2024 11:44:40 +0800 Subject: [PATCH] This is an automated cherry-pick of #53134 Signed-off-by: ti-chi-bot --- pkg/ddl/ddl_worker.go | 70 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index ed06d117a07f3..09763c5b89181 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -723,6 +723,12 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) { } func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error { +<<<<<<< HEAD +======= + if err := w.checkBeforeCommit(); err != nil { + return err + } +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) err := w.finishDDLJob(t, job) if err != nil { w.sess.Rollback() @@ -819,6 +825,14 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return 0, err } +<<<<<<< HEAD +======= + if err = w.checkBeforeCommit(); err != nil { + d.unlockSchemaVersion(job.ID) + return 0, err + } + +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() { // If the running job meets an error // and the job state is rolling back, it means that we have already handled this error. @@ -866,6 +880,62 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { return schemaVer, nil } +<<<<<<< HEAD +======= +func (w *worker) checkBeforeCommit() error { + if !w.ddlCtx.isOwner() && w.tp != localWorker { + // Since this TiDB instance is not a DDL owner anymore, + // it should not commit any transaction. + w.sess.Rollback() + return dbterror.ErrNotOwner + } + + if err := w.ctx.Err(); err != nil { + // The worker context is canceled, it should not commit any transaction. + return err + } + return nil +} + +// HandleLocalDDLJob handles local ddl job like fast create table. +// Compare with normal ddl job: +// 1. directly insert the job to history job table(incompatible with CDC). +// 2. no need to wait schema version(only support create table now). +// 3. no register mdl info(only support create table now). +func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) { + defer func() { + w.unlockSeqNum(err) + }() + + txn, err := w.prepareTxn(job) + if err != nil { + return err + } + + t := meta.NewMeta(txn, meta.WithUpdateTableName()) + d.mu.RLock() + d.mu.hook.OnJobRunBefore(job) + d.mu.RUnlock() + + _, err = w.runDDLJob(d, t, job) + defer d.unlockSchemaVersion(job.ID) + if err != nil { + return err + } + + d.mu.RLock() + d.mu.hook.OnJobRunAfter(job) + d.mu.RUnlock() + + writeBinlog(d.binlogCli, txn, job) + // reset the SQL digest to make topsql work right. + w.sess.GetSessionVars().StmtCtx.ResetSQLDigest(job.Query) + + job.State = model.JobStateSynced + return w.HandleJobDone(d, job, t) +} + +>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134)) func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { return nil