Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
Signed-off-by: wjhuang2016 <[email protected]>
  • Loading branch information
wjhuang2016 authored and ti-chi-bot committed Dec 20, 2022
1 parent b85be5f commit d0426c0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
27 changes: 23 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) {
}

// checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists.
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) {
sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID)
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) {
sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
rows, err := sess.execute(context.Background(), sql, "check-mdl-info")
if err != nil {
return false, err
return false, 0, err
}
return len(rows) > 0, nil
if len(rows) == 0 {
return false, 0, nil
}
ver := rows[0].GetInt64(0)
return true, ver, nil
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
Expand Down Expand Up @@ -1376,6 +1380,21 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l
zap.String("job", job.String()))
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, latestSchemaVersion int64) {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job)
}

// waitSchemaSynced handles the following situation:
// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time,
// Then the worker restarts quickly, we may run the job immediately again,
Expand Down
9 changes: 2 additions & 7 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if variable.EnableMDL.Load() {
exist, err := checkMDLInfo(job.ID, d.sessPool)
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String()))
// Release the worker resource.
Expand All @@ -245,12 +245,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
} else if exist {
// Release the worker resource.
pool.put(wk)
err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
waitSchemaSyncedForMDL(d.ddlCtx, job, 2*d.lease, version)
d.once.Store(false)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
Expand Down

0 comments on commit d0426c0

Please sign in to comment.