Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: re-structure job scheduler and ddl executor, part 1 #54967

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"ddl_history.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
Expand All @@ -43,7 +42,8 @@ go_library(
"index.go",
"index_cop.go",
"index_merge_tmp.go",
"job_table.go",
"job_scheduler.go",
"job_worker.go",
"mock.go",
"multi_schema_change.go",
"options.go",
Expand Down Expand Up @@ -224,7 +224,6 @@ go_test(
"ddl_history_test.go",
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
"executor_test.go",
"export_test.go",
Expand All @@ -236,7 +235,8 @@ go_test(
"index_test.go",
"integration_test.go",
"job_scheduler_test.go",
"job_table_test.go",
"job_scheduler_testkit_test.go",
"job_worker_test.go",
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
Expand Down
318 changes: 0 additions & 318 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -575,14 +574,6 @@ func (dc *ddlCtx) initJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
}

func (e *executor) getJobDoneCh(jobID int64) (chan struct{}, bool) {
return e.ddlJobDoneChMap.Load(jobID)
}

func (e *executor) delJobDoneCh(jobID int64) {
e.ddlJobDoneChMap.Delete(jobID)
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
Expand Down Expand Up @@ -951,35 +942,6 @@ func (d *ddl) GetLease() time.Duration {
return lease
}

func (e *executor) genGlobalIDs(count int) ([]int64, error) {
var ret []int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// lock to reduce conflict
e.globalIDLock.Lock()
defer e.globalIDLock.Unlock()
err := kv.RunInNewTxn(ctx, e.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
ret, err = m.GenGlobalIDs(count)
return err
})

return ret, err
}

func (e *executor) genPlacementPolicyID() (int64, error) {
var ret int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, e.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
ret, err = m.GenPlacementPolicyID()
return err
})

return ret, err
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
return d.schemaSyncer
Expand All @@ -1000,286 +962,6 @@ func (d *ddl) GetID() string {
return d.uuid
}

var (
fastDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
}
normalDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
}
slowDDLIntervalPolicy = []time.Duration{
500 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
1 * time.Second,
3 * time.Second,
}
)

func getIntervalFromPolicy(policy []time.Duration, i int) (time.Duration, bool) {
plen := len(policy)
if i < plen {
return policy[i], true
}
return policy[plen-1], false
}

func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn,
model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
return getIntervalFromPolicy(slowDDLIntervalPolicy, i)
case model.ActionCreateTable, model.ActionCreateSchema:
return getIntervalFromPolicy(fastDDLIntervalPolicy, i)
default:
return getIntervalFromPolicy(normalDDLIntervalPolicy, i)
}
}

func (e *executor) notifyNewJobSubmitted(ch chan struct{}, etcdPath string, jobID int64, jobType string) {
// If the workers don't run, we needn't notify workers.
// TODO: It does not affect informing the backfill worker.
if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
return
}
if e.ownerManager.IsOwner() {
asyncNotify(ch)
} else {
e.notifyNewJobByEtcd(etcdPath, jobID, jobType)
}
}

func updateTickerInterval(ticker *time.Ticker, lease time.Duration, job *model.Job, i int) *time.Ticker {
interval, changed := getJobCheckInterval(job, i)
if !changed {
return ticker
}
// For now we should stop old ticker and create a new ticker
ticker.Stop()
return time.NewTicker(chooseLeaseTime(lease, interval))
}

func recordLastDDLInfo(ctx sessionctx.Context, job *model.Job) {
if job == nil {
return
}
ctx.GetSessionVars().LastDDLInfo.Query = job.Query
ctx.GetSessionVars().LastDDLInfo.SeqNum = job.SeqNum
}

func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
switch job.Type {
case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable:
job.Query = ""
default:
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
}
}

func setDDLJobMode(job *model.Job) {
if !variable.EnableFastCreateTable.Load() {
job.LocalMode = false
return
}

switch job.Type {
// currently, v2 only support CreateTable without foreign keys.
case model.ActionCreateTable:
tbInfo, ok := job.Args[0].(*model.TableInfo)
if ok && len(tbInfo.ForeignKeys) == 0 {
job.LocalMode = true
return
}
case model.ActionCreateSchema:
job.LocalMode = true
return
default:
}
job.LocalMode = false
}

func (e *executor) deliverJobTask(task *JobWrapper) {
if task.LocalMode {
e.limitJobChV2 <- task
} else {
e.limitJobCh <- task
}
}

// DoDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (e *executor) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return e.DoDDLJobWrapper(ctx, NewJobWrapper(job, false))
}

func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) error {
job := jobW.Job
job.TraceInfo = &model.TraceInfo{
ConnectionID: ctx.GetSessionVars().ConnectionID,
SessionAlias: ctx.GetSessionVars().SessionAlias,
}
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}
// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
setDDLJobMode(job)
e.deliverJobTask(jobW)

failpoint.Inject("mockParallelSameDDLJobTwice", func(val failpoint.Value) {
if val.(bool) {
<-jobW.ErrChs[0]
// The same job will be put to the DDL queue twice.
job = job.Clone()
newJobW := NewJobWrapper(job, jobW.IDAllocated)
e.deliverJobTask(newJobW)
// The second job result is used for test.
jobW = newJobW
}
})

// worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err
err := <-jobW.ErrChs[0]
// job.ID must be allocated after previous channel receive returns nil.
defer e.delJobDoneCh(job.ID)
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
}
failpoint.InjectCall("waitJobSubmitted")

sessVars := ctx.GetSessionVars()
sessVars.StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
e.notifyNewJobSubmitted(e.ddlJobNotifyCh, addingDDLJobNotifyKey, job.ID, job.Type.String())
logutil.DDLLogger().Info("start DDL job", zap.Stringer("job", job), zap.String("query", job.Query))

// for local mode job, we add the history job directly now, so no need to check it.
// fast-create doesn't wait schema version synced, we must reload info-schema
// here to make sure later statements can see the created table/database.
if job.LocalMode {
return e.schemaLoader.Reload()
}

var historyJob *model.Job
jobID := job.ID

// Attach the context of the jobId to the calling session so that
// KILL can cancel this DDL job.
ctx.GetSessionVars().StmtCtx.DDLJobID = jobID

// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
initInterval, _ := getJobCheckInterval(job, 0)
ticker := time.NewTicker(chooseLeaseTime(10*e.lease, initInterval))
startTime := time.Now()
metrics.JobsGauge.WithLabelValues(job.Type.String()).Inc()
defer func() {
ticker.Stop()
metrics.JobsGauge.WithLabelValues(job.Type.String()).Dec()
metrics.HandleJobHistogram.WithLabelValues(job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
recordLastDDLInfo(ctx, historyJob)
}()
i := 0
notifyCh, _ := e.getJobDoneCh(job.ID)
for {
failpoint.InjectCall("storeCloseInLoop")
select {
case <-notifyCh:
case <-ticker.C:
i++
ticker = updateTickerInterval(ticker, 10*e.lease, job, i)
case <-e.ctx.Done():
logutil.DDLLogger().Info("DoDDLJob will quit because context done")
return context.Canceled
}

// If the connection being killed, we need to CANCEL the DDL job.
if sessVars.SQLKiller.HandleSignal() == exeerrors.ErrQueryInterrupted {
if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown {
logutil.DDLLogger().Info("DoDDLJob will quit because context done")
return context.Canceled
}
if sessVars.StmtCtx.DDLJobID != 0 {
se, err := e.sessPool.Get()
if err != nil {
logutil.DDLLogger().Error("get session failed, check again", zap.Error(err))
continue
}
sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.
errs, err := CancelJobsBySystem(se, []int64{jobID})
e.sessPool.Put(se)
if len(errs) > 0 {
logutil.DDLLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
if err != nil {
logutil.DDLLogger().Warn("Kill command could not cancel DDL job", zap.Error(err))
continue
}
}
}

se, err := e.sessPool.Get()
if err != nil {
logutil.DDLLogger().Error("get session failed, check again", zap.Error(err))
continue
}
historyJob, err = GetHistoryJobByID(se, jobID)
e.sessPool.Put(se)
if err != nil {
logutil.DDLLogger().Error("get history DDL job failed, check again", zap.Error(err))
continue
}
if historyJob == nil {
logutil.DDLLogger().Debug("DDL job is not in history, maybe not run", zap.Int64("jobID", jobID))
continue
}

e.checkHistoryJobInTest(ctx, historyJob)

// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
if historyJob.IsSynced() {
// Judge whether there are some warnings when executing DDL under the certain SQL mode.
if historyJob.ReorgMeta != nil && len(historyJob.ReorgMeta.Warnings) != 0 {
if len(historyJob.ReorgMeta.Warnings) != len(historyJob.ReorgMeta.WarningsCount) {
logutil.DDLLogger().Info("DDL warnings doesn't match the warnings count", zap.Int64("jobID", jobID))
} else {
for key, warning := range historyJob.ReorgMeta.Warnings {
keyCount := historyJob.ReorgMeta.WarningsCount[key]
if keyCount == 1 {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
} else {
newMsg := fmt.Sprintf("%d warnings with this error code, first warning: "+warning.GetMsg(), keyCount)
newWarning := dbterror.ClassTypes.Synthesize(terror.ErrCode(warning.Code()), newMsg)
ctx.GetSessionVars().StmtCtx.AppendWarning(newWarning)
}
}
}
}
appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

logutil.DDLLogger().Info("DDL job is finished", zap.Int64("jobID", jobID))
return nil
}

if historyJob.Error != nil {
logutil.DDLLogger().Info("DDL job is failed", zap.Int64("jobID", jobID))
return errors.Trace(historyJob.Error)
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) {
d.binlogCli = binlogCli
Expand Down
Loading