Skip to content

Commit

Permalink
ddl: re-structure job scheduler and ddl executor, part 1 (pingcap#54967)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and hawkingrei committed Aug 1, 2024
1 parent 960b0c8 commit f853157
Show file tree
Hide file tree
Showing 9 changed files with 1,035 additions and 1,044 deletions.
8 changes: 4 additions & 4 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"ddl_history.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
Expand All @@ -43,7 +42,8 @@ go_library(
"index.go",
"index_cop.go",
"index_merge_tmp.go",
"job_table.go",
"job_scheduler.go",
"job_worker.go",
"mock.go",
"multi_schema_change.go",
"options.go",
Expand Down Expand Up @@ -224,7 +224,6 @@ go_test(
"ddl_history_test.go",
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
"executor_test.go",
"export_test.go",
Expand All @@ -236,7 +235,8 @@ go_test(
"index_test.go",
"integration_test.go",
"job_scheduler_test.go",
"job_table_test.go",
"job_scheduler_testkit_test.go",
"job_worker_test.go",
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
Expand Down
318 changes: 0 additions & 318 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -575,14 +574,6 @@ func (dc *ddlCtx) initJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
}

func (e *executor) getJobDoneCh(jobID int64) (chan struct{}, bool) {
return e.ddlJobDoneChMap.Load(jobID)
}

func (e *executor) delJobDoneCh(jobID int64) {
e.ddlJobDoneChMap.Delete(jobID)
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
Expand Down Expand Up @@ -951,35 +942,6 @@ func (d *ddl) GetLease() time.Duration {
return lease
}

func (e *executor) genGlobalIDs(count int) ([]int64, error) {
var ret []int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// lock to reduce conflict
e.globalIDLock.Lock()
defer e.globalIDLock.Unlock()
err := kv.RunInNewTxn(ctx, e.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
ret, err = m.GenGlobalIDs(count)
return err
})

return ret, err
}

func (e *executor) genPlacementPolicyID() (int64, error) {
var ret int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, e.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
ret, err = m.GenPlacementPolicyID()
return err
})

return ret, err
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
return d.schemaSyncer
Expand All @@ -1000,286 +962,6 @@ func (d *ddl) GetID() string {
return d.uuid
}

var (
fastDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
}
normalDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
}
slowDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
1 * time.Second,
3 * time.Second,
}
)

func getIntervalFromPolicy(policy []time.Duration, i int) (time.Duration, bool) {
plen := len(policy)
if i < plen {
return policy[i], true
}
return policy[plen-1], false
}

func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn,
model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
return getIntervalFromPolicy(slowDDLIntervalPolicy, i)
case model.ActionCreateTable, model.ActionCreateSchema:
return getIntervalFromPolicy(fastDDLIntervalPolicy, i)
default:
return getIntervalFromPolicy(normalDDLIntervalPolicy, i)
}
}

func (e *executor) notifyNewJobSubmitted(ch chan struct{}, etcdPath string, jobID int64, jobType string) {
// If the workers don't run, we needn't notify workers.
// TODO: It does not affect informing the backfill worker.
if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
return
}
if e.ownerManager.IsOwner() {
asyncNotify(ch)
} else {
e.notifyNewJobByEtcd(etcdPath, jobID, jobType)
}
}

func updateTickerInterval(ticker *time.Ticker, lease time.Duration, job *model.Job, i int) *time.Ticker {
interval, changed := getJobCheckInterval(job, i)
if !changed {
return ticker
}
// For now we should stop old ticker and create a new ticker
ticker.Stop()
return time.NewTicker(chooseLeaseTime(lease, interval))
}

func recordLastDDLInfo(ctx sessionctx.Context, job *model.Job) {
if job == nil {
return
}
ctx.GetSessionVars().LastDDLInfo.Query = job.Query
ctx.GetSessionVars().LastDDLInfo.SeqNum = job.SeqNum
}

func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
switch job.Type {
case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable:
job.Query = ""
default:
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
}
}

func setDDLJobMode(job *model.Job) {
if !variable.EnableFastCreateTable.Load() {
job.LocalMode = false
return
}

switch job.Type {
// currently, v2 only support CreateTable without foreign keys.
case model.ActionCreateTable:
tbInfo, ok := job.Args[0].(*model.TableInfo)
if ok && len(tbInfo.ForeignKeys) == 0 {
job.LocalMode = true
return
}
case model.ActionCreateSchema:
job.LocalMode = true
return
default:
}
job.LocalMode = false
}

func (e *executor) deliverJobTask(task *JobWrapper) {
if task.LocalMode {
e.limitJobChV2 <- task
} else {
e.limitJobCh <- task
}
}

// DoDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (e *executor) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return e.DoDDLJobWrapper(ctx, NewJobWrapper(job, false))
}

func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) error {
job := jobW.Job
job.TraceInfo = &model.TraceInfo{
ConnectionID: ctx.GetSessionVars().ConnectionID,
SessionAlias: ctx.GetSessionVars().SessionAlias,
}
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}
// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
setDDLJobMode(job)
e.deliverJobTask(jobW)

failpoint.Inject("mockParallelSameDDLJobTwice", func(val failpoint.Value) {
if val.(bool) {
<-jobW.ErrChs[0]
// The same job will be put to the DDL queue twice.
job = job.Clone()
newJobW := NewJobWrapper(job, jobW.IDAllocated)
e.deliverJobTask(newJobW)
// The second job result is used for test.
jobW = newJobW
}
})

// worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err
err := <-jobW.ErrChs[0]
// job.ID must be allocated after previous channel receive returns nil.
defer e.delJobDoneCh(job.ID)
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
}
failpoint.InjectCall("waitJobSubmitted")

sessVars := ctx.GetSessionVars()
sessVars.StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
e.notifyNewJobSubmitted(e.ddlJobNotifyCh, addingDDLJobNotifyKey, job.ID, job.Type.String())
logutil.DDLLogger().Info("start DDL job", zap.Stringer("job", job), zap.String("query", job.Query))

// for local mode job, we add the history job directly now, so no need to check it.
// fast-create doesn't wait schema version synced, we must reload info-schema
// here to make sure later statements can see the created table/database.
if job.LocalMode {
return e.schemaLoader.Reload()
}

var historyJob *model.Job
jobID := job.ID

// Attach the context of the jobId to the calling session so that
// KILL can cancel this DDL job.
ctx.GetSessionVars().StmtCtx.DDLJobID = jobID

// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
initInterval, _ := getJobCheckInterval(job, 0)
ticker := time.NewTicker(chooseLeaseTime(10*e.lease, initInterval))
startTime := time.Now()
metrics.JobsGauge.WithLabelValues(job.Type.String()).Inc()
defer func() {
ticker.Stop()
metrics.JobsGauge.WithLabelValues(job.Type.String()).Dec()
metrics.HandleJobHistogram.WithLabelValues(job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
recordLastDDLInfo(ctx, historyJob)
}()
i := 0
notifyCh, _ := e.getJobDoneCh(job.ID)
for {
failpoint.InjectCall("storeCloseInLoop")
select {
case <-notifyCh:
case <-ticker.C:
i++
ticker = updateTickerInterval(ticker, 10*e.lease, job, i)
case <-e.ctx.Done():
logutil.DDLLogger().Info("DoDDLJob will quit because context done")
return context.Canceled
}

// If the connection being killed, we need to CANCEL the DDL job.
if sessVars.SQLKiller.HandleSignal() == exeerrors.ErrQueryInterrupted {
if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown {
logutil.DDLLogger().Info("DoDDLJob will quit because context done")
return context.Canceled
}
if sessVars.StmtCtx.DDLJobID != 0 {
se, err := e.sessPool.Get()
if err != nil {
logutil.DDLLogger().Error("get session failed, check again", zap.Error(err))
continue
}
sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.
errs, err := CancelJobsBySystem(se, []int64{jobID})
e.sessPool.Put(se)
if len(errs) > 0 {
logutil.DDLLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
if err != nil {
logutil.DDLLogger().Warn("Kill command could not cancel DDL job", zap.Error(err))
continue
}
}
}

se, err := e.sessPool.Get()
if err != nil {
logutil.DDLLogger().Error("get session failed, check again", zap.Error(err))
continue
}
historyJob, err = GetHistoryJobByID(se, jobID)
e.sessPool.Put(se)
if err != nil {
logutil.DDLLogger().Error("get history DDL job failed, check again", zap.Error(err))
continue
}
if historyJob == nil {
logutil.DDLLogger().Debug("DDL job is not in history, maybe not run", zap.Int64("jobID", jobID))
continue
}

e.checkHistoryJobInTest(ctx, historyJob)

// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
if historyJob.IsSynced() {
// Judge whether there are some warnings when executing DDL under the certain SQL mode.
if historyJob.ReorgMeta != nil && len(historyJob.ReorgMeta.Warnings) != 0 {
if len(historyJob.ReorgMeta.Warnings) != len(historyJob.ReorgMeta.WarningsCount) {
logutil.DDLLogger().Info("DDL warnings doesn't match the warnings count", zap.Int64("jobID", jobID))
} else {
for key, warning := range historyJob.ReorgMeta.Warnings {
keyCount := historyJob.ReorgMeta.WarningsCount[key]
if keyCount == 1 {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
} else {
newMsg := fmt.Sprintf("%d warnings with this error code, first warning: "+warning.GetMsg(), keyCount)
newWarning := dbterror.ClassTypes.Synthesize(terror.ErrCode(warning.Code()), newMsg)
ctx.GetSessionVars().StmtCtx.AppendWarning(newWarning)
}
}
}
}
appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

logutil.DDLLogger().Info("DDL job is finished", zap.Int64("jobID", jobID))
return nil
}

if historyJob.Error != nil {
logutil.DDLLogger().Info("DDL job is failed", zap.Int64("jobID", jobID))
return errors.Trace(historyJob.Error)
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) {
d.binlogCli = binlogCli
Expand Down
Loading

0 comments on commit f853157

Please sign in to comment.