Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53134
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed May 27, 2024
1 parent 0bbeb15 commit 382942f
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) {
}

func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
<<<<<<< HEAD
=======
if err := w.checkBeforeCommit(); err != nil {
return err
}
>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134))
err := w.finishDDLJob(t, job)
if err != nil {
w.sess.Rollback()
Expand Down Expand Up @@ -819,6 +825,14 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
return 0, err
}

<<<<<<< HEAD
=======
if err = w.checkBeforeCommit(); err != nil {
d.unlockSchemaVersion(job.ID)
return 0, err
}

>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134))
if runJobErr != nil && !job.IsRollingback() && !job.IsRollbackDone() {
// If the running job meets an error
// and the job state is rolling back, it means that we have already handled this error.
Expand Down Expand Up @@ -866,6 +880,62 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
return schemaVer, nil
}

<<<<<<< HEAD
=======
func (w *worker) checkBeforeCommit() error {
if !w.ddlCtx.isOwner() && w.tp != localWorker {
// Since this TiDB instance is not a DDL owner anymore,
// it should not commit any transaction.
w.sess.Rollback()
return dbterror.ErrNotOwner
}

if err := w.ctx.Err(); err != nil {
// The worker context is canceled, it should not commit any transaction.
return err
}
return nil
}

// HandleLocalDDLJob handles local ddl job like fast create table.
// Compare with normal ddl job:
// 1. directly insert the job to history job table(incompatible with CDC).
// 2. no need to wait schema version(only support create table now).
// 3. no register mdl info(only support create table now).
func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) {
defer func() {
w.unlockSeqNum(err)
}()

txn, err := w.prepareTxn(job)
if err != nil {
return err
}

t := meta.NewMeta(txn, meta.WithUpdateTableName())
d.mu.RLock()
d.mu.hook.OnJobRunBefore(job)
d.mu.RUnlock()

_, err = w.runDDLJob(d, t, job)
defer d.unlockSchemaVersion(job.ID)
if err != nil {
return err
}

d.mu.RLock()
d.mu.hook.OnJobRunAfter(job)
d.mu.RUnlock()

writeBinlog(d.binlogCli, txn, job)
// reset the SQL digest to make topsql work right.
w.sess.GetSessionVars().StmtCtx.ResetSQLDigest(job.Query)

job.State = model.JobStateSynced
return w.HandleJobDone(d, job, t)
}

>>>>>>> 601e21ca500 (ddl: add context cancel check before commit (#53134))
func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
Expand Down

0 comments on commit 382942f

Please sign in to comment.