Skip to content

Commit

Permalink
ddl: background reorganization change for admin pause/resume ... (#…
Browse files Browse the repository at this point in the history
…43297)

close #18015, ref #40041
  • Loading branch information
dhysum authored May 10, 2023
1 parent 1bdfc80 commit 5c31e16
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 43 deletions.
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc := d.getReorgCtx(jobID)

for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillData we will never cancel the job.
// Give job chance to be canceled or paused, if we not check it here,
// we will never cancel the job once there is panic in bf.BackfillData.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, false)
Expand Down Expand Up @@ -721,7 +721,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
if len(kvRanges) == 0 {
break
}

logutil.BgLogger().Info("[ddl] start backfill workers to reorg record",
zap.Stringer("type", bfWorkerType),
zap.Int("workerCnt", scheduler.currentWorkerSize()),
Expand Down
6 changes: 5 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
return w.updateCurrentElement(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
return false, ver, nil
}

if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
Expand Down Expand Up @@ -1100,7 +1104,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() {
if w.isReorgCancelled(reorgInfo.Job.ID) {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,12 @@ func (dc *ddlCtx) removeReorgCtx(jobID int64) {
}
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
rc := dc.getReorgCtx(job.ID)
if rc == nil {
return
}
rc.notifyReorgCancel()
rc.notifyJobState(job.State)
}

// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
Expand Down
18 changes: 13 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,11 @@ func jobNeedGC(job *model.Job) bool {
return false
}
switch job.Type {
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn,
case model.ActionDropSchema, model.ActionDropTable,
model.ActionTruncateTable, model.ActionDropIndex,
model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionDropColumn, model.ActionModifyColumn,
model.ActionAddIndex, model.ActionAddPrimaryKey,
model.ActionReorganizePartition:
return true
Expand Down Expand Up @@ -739,7 +742,6 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
err = w.HandleJobDone(d, job, t)
return 0, err
}

d.mu.RLock()
d.mu.hook.OnJobRunBefore(job)
d.mu.RUnlock()
Expand Down Expand Up @@ -952,15 +954,21 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}()
if job.IsFinished() {
logutil.Logger(w.logCtx).Debug("[ddl] finish DDL job", zap.String("job", job.String()))
return
return ver, err
}

if job.IsPaused() || job.IsPausing() {
logutil.Logger(w.logCtx).Info("[ddl] DDL job paused", zap.String("job", job.String()))
return ver, pauseReorgWorkers(w, d, job)
}

// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
logutil.Logger(w.logCtx).Debug("[ddl] cancel DDL job", zap.String("job", job.String()))
return convertJob2RollbackJob(w, d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
if !job.IsRollingback() {
job.State = model.JobStateRunning
}

Expand Down
3 changes: 3 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,9 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return w.addTableIndex(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
return false, ver, nil
}
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// if timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
Expand Down
76 changes: 61 additions & 15 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,60 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool
}
for _, row := range rows {
jobBinary := row.GetBytes(0)
runJob := model.Job{}
err := runJob.Decode(jobBinary)
isJobProcessing := row.GetInt64(1) == 1

job := model.Job{}
err = job.Decode(jobBinary)
if err != nil {
return nil, errors.Trace(err)
}
if row.GetInt64(1) == 1 {
return &runJob, nil
if job.IsPaused() {
// If the job has been Paused, we should not process it. And the
// processing should have been set with 0.
continue
}
b, err := filter(&runJob)

// Receive the `admin pause ...` command on this job, turn it to be
// not processing; And, keep continue to pause the job and the
// background reorganization workers.
if job.IsPausing() {
// We want the priority of the jobs keeping the same as the time
// (i.e., job_id) they were issued, and lower than those are still
// running.
if err = d.markJobNotProcessing(se, &job); err != nil {
logutil.BgLogger().Warn(
"[ddl] failed to mark the job as processing=0",
zap.Error(err), zap.String("job", job.String()))
return nil, errors.Trace(err)
}

// The job may have been run for a while, we need to notify the
// background reorganization worker to finish in worker.runDDLJob
// So that we should not `continue` or `return` here
}

// The job has already been picked up, just return to continue it.
if isJobProcessing {
return &job, nil
}

b, err := filter(&job)
if err != nil {
return nil, errors.Trace(err)
}
if b {
if err := d.markJobProcessing(se, &runJob); err != nil {
logutil.BgLogger().Warn("[ddl] handle ddl job failed: mark job is processing meet error", zap.Error(err), zap.String("job", runJob.String()))
return nil, errors.Trace(err)
if !job.IsPausing() {
// This should be the first time that the job is picked up.
// Then it should not be a pausing or paused job.
if err = d.markJobProcessing(se, &job); err != nil {
logutil.BgLogger().Warn(
"[ddl] handle ddl job failed: mark job is processing meet error",
zap.Error(err),
zap.String("job", job.String()))
return nil, errors.Trace(err)
}
}
return &runJob, nil
return &job, nil
}
}
return nil, nil
Expand All @@ -142,18 +178,18 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
if job.Type == model.ActionDropSchema {
// Check if there is any reorg job on this schema.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10)))
return d.checkJobIsRunnable(sess, sql)
return d.NoConflictJob(sess, sql)
}
// Check if there is any running job works on the same table.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+
"or (type = %d and processing)", job.ID, model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
return d.NoConflictJob(sess, sql)
})
}

func (d *ddl) checkJobIsRunnable(se *sess.Session, sql string) (bool, error) {
rows, err := se.Execute(context.Background(), sql, "check_runnable")
func (d *ddl) NoConflictJob(se *sess.Session, sql string) (bool, error) {
rows, err := se.Execute(context.Background(), sql, "check conflict jobs")
return len(rows) == 0, err
}

Expand All @@ -165,7 +201,7 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
"or (CONCAT(',', table_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+
"or (type = %d and processing) limit 1",
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
return d.NoConflictJob(sess, sql)
})
}

Expand Down Expand Up @@ -315,9 +351,19 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})
}

func (d *ddl) markJobNotProcessing(se *sess.Session, job *model.Job) error {
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), fmt.Sprintf(
"update mysql.tidb_ddl_job set processing = 0 where job_id = %d", job.ID),
"mark_job_not_processing")
return errors.Trace(err)
}

func (d *ddl) markJobProcessing(se *sess.Session, job *model.Job) error {
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), fmt.Sprintf("update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID), "mark_job_processing")
_, err := se.Execute(context.Background(), fmt.Sprintf(
"update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID),
"mark_job_processing")
return errors.Trace(err)
}

Expand Down
8 changes: 8 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
// if timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
if dbterror.ErrPausedDDLJob.Equal(err) {
// if ErrPausedDDLJob, we should return, check for the owner and re-wait job done.
return ver, nil
}
return ver, errors.Trace(err)
}
}
Expand Down Expand Up @@ -2579,6 +2583,10 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb
return w.reorgPartitionDataAndIndex(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
return false, ver, nil
}

if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
Expand Down
52 changes: 38 additions & 14 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ type reorgCtx struct {
doneCh chan error
// rowCount is used to simulate a job's row count.
rowCount int64
// notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled.
// 0: job is not canceled.
// 1: job is canceled.
notifyCancelReorgJob int32
jobState model.JobState

mu struct {
sync.Mutex
Expand Down Expand Up @@ -94,12 +91,18 @@ const defaultWaitReorgTimeout = 10 * time.Second
// ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage.
var ReorgWaitTimeout = 5 * time.Second

func (rc *reorgCtx) notifyReorgCancel() {
atomic.StoreInt32(&rc.notifyCancelReorgJob, 1)
func (rc *reorgCtx) notifyJobState(state model.JobState) {
atomic.StoreInt32((*int32)(&rc.jobState), int32(state))
}

func (rc *reorgCtx) isReorgCanceled() bool {
return atomic.LoadInt32(&rc.notifyCancelReorgJob) == 1
return int32(model.JobStateCancelled) == atomic.LoadInt32((*int32)(&rc.jobState)) ||
int32(model.JobStateCancelling) == atomic.LoadInt32((*int32)(&rc.jobState))
}

func (rc *reorgCtx) isReorgPaused() bool {
return int32(model.JobStatePaused) == atomic.LoadInt32((*int32)(&rc.jobState)) ||
int32(model.JobStatePausing) == atomic.LoadInt32((*int32)(&rc.jobState))
}

func (rc *reorgCtx) setRowCount(count int64) {
Expand Down Expand Up @@ -166,7 +169,8 @@ func (rc *reorgCtx) getRowCount() int64 {
// the additional ddl round.
//
// After that, we can make sure that the worker goroutine is correctly shut down.
func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error {
func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *model.TableInfo,
lease time.Duration, f func() error) error {
job := reorgInfo.Job
d := reorgInfo.d
// This is for tests compatible, because most of the early tests try to build the reorg job manually
Expand All @@ -183,13 +187,18 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
rc := w.getReorgCtx(job.ID)
if rc == nil {
// This job is cancelling, we should return ErrCancelledDDLJob directly.
//
// Q: Is there any possibility that the job is cancelling and has no reorgCtx?
// A: Yes, consider the case that we cancel the job when backfilling the last batch of data, the cancel txn is commit first,
// and then the backfill workers send signal to the `doneCh` of the reorgCtx, and then the DDL worker will remove the reorgCtx and
// update the DDL job to `done`, but at the commit time, the DDL txn will raise a "write conflict" error and retry, and it happens.
// A: Yes, consider the case that :
// - we cancel the job when backfilling the last batch of data, the cancel txn is commit first,
// - and then the backfill workers send signal to the `doneCh` of the reorgCtx,
// - and then the DDL worker will remove the reorgCtx
// - and update the DDL job to `done`
// - but at the commit time, the DDL txn will raise a "write conflict" error and retry, and it happens.
if job.IsCancelling() {
return dbterror.ErrCancelledDDLJob
}

rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
Expand All @@ -216,6 +225,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
d.removeReorgCtx(job.ID)
return dbterror.ErrCancelledDDLJob
}

rowCount := rc.getRowCount()
if err != nil {
logutil.BgLogger().Warn("[ddl] run reorg job done", zap.Int64("handled rows", rowCount), zap.Error(err))
Expand All @@ -228,14 +238,16 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
// Update a job's warnings.
w.mergeWarningsIntoJob(job)

// TODO: should we do this if dbterror.ErrPausedDDLJob ???
d.removeReorgCtx(job.ID)

updateBackfillProgress(w, reorgInfo, tblInfo, rowCount)

// For other errors, even err is not nil here, we still wait the partial counts to be collected.
// since in the next round, the startKey is brand new which is stored by last time.
if err != nil {
return errors.Trace(err)
}

updateBackfillProgress(w, reorgInfo, tblInfo, 0)
case <-w.ctx.Done():
logutil.BgLogger().Info("[ddl] run reorg job quit")
d.removeReorgCtx(job.ID)
Expand Down Expand Up @@ -344,17 +356,29 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
return rows[0].GetInt64(0)
}

func (dc *ddlCtx) isReorgCancelled(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgCanceled()
}
func (dc *ddlCtx) isReorgPaused(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgPaused()
}

func (dc *ddlCtx) isReorgRunnable(jobID int64, isDistReorg bool) error {
if isChanClosed(dc.ctx.Done()) {
// Worker is closed. So it can't do the reorganization.
return dbterror.ErrInvalidWorker.GenWithStack("worker is closed")
}

if dc.getReorgCtx(jobID).isReorgCanceled() {
if dc.isReorgCancelled(jobID) {
// Job is cancelled. So it can't be done.
return dbterror.ErrCancelledDDLJob
}

if dc.isReorgPaused(jobID) {
logutil.BgLogger().Warn("[ddl] job paused by user", zap.String("ID", dc.uuid))
return dbterror.ErrPausedDDLJob
}

// If isDistReorg is true, we needn't check if it is owner.
if isDistReorg {
return nil
Expand Down
Loading

0 comments on commit 5c31e16

Please sign in to comment.