diff --git a/pkg/autoid_service/autoid.go b/pkg/autoid_service/autoid.go index 7dd4de7eb3688..27a3af6811149 100644 --- a/pkg/autoid_service/autoid.go +++ b/pkg/autoid_service/autoid.go @@ -322,17 +322,9 @@ func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Servic leaderShip: l, store: store, } - l.SetBeOwnerHook(func() { - // Reset the map to avoid a case that a node lose leadership and regain it, then - // improperly use the stale map to serve the autoid requests. - // See https://github.com/pingcap/tidb/issues/52600 - service.autoIDLock.Lock() - clear(service.autoIDMap) - service.autoIDLock.Unlock() - - logutil.BgLogger().Info("leader change of autoid service, this node become owner", - zap.String("addr", selfAddr), - zap.String("category", "autoid service")) + l.SetListener(&ownerListener{ + Service: service, + selfAddr: selfAddr, }) // 10 means that autoid service's etcd lease is 10s. err := l.CampaignOwner(10) @@ -580,6 +572,29 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi return &autoid.RebaseResponse{}, nil } +type ownerListener struct { + *Service + selfAddr string +} + +var _ owner.Listener = (*ownerListener)(nil) + +func (l *ownerListener) OnBecomeOwner() { + // Reset the map to avoid a case that a node lose leadership and regain it, then + // improperly use the stale map to serve the autoid requests. + // See https://github.com/pingcap/tidb/issues/52600 + l.autoIDLock.Lock() + clear(l.autoIDMap) + l.autoIDLock.Unlock() + + logutil.BgLogger().Info("leader change of autoid service, this node become owner", + zap.String("addr", l.selfAddr), + zap.String("category", "autoid service")) +} + +func (*ownerListener) OnRetireOwner() { +} + func init() { autoid1.MockForTest = MockForTest } diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index a7980045e19ad..749dc07acdbd1 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -679,10 +679,20 @@ func SetBackfillTaskChanSizeForTest(n int) { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. +<<<<<<< HEAD func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() +======= +func (dc *ddlCtx) writePhysicalTableRecord( + ctx context.Context, + sessPool *sess.Pool, + t table.PhysicalTable, + bfWorkerType backfillerType, + reorgInfo *reorgInfo, +) error { +>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548)) startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { @@ -697,8 +707,15 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical }) jc := reorgInfo.NewJobContext() +<<<<<<< HEAD sessCtx := newContext(reorgInfo.d.store) scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc) +======= + + eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) + + scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc) +>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548)) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index c6588333e3b79..c245e9c018bf8 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -92,6 +92,8 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor( jobMeta := &s.taskMeta.Job ddlObj := s.d + // TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown. + // we should move this operation out of GetStepExecutor, and put into Init. _, tblIface, err := ddlObj.getTableByTxn((*asAutoIDRequirement)(ddlObj.ddlCtx), jobMeta.SchemaID, jobMeta.TableID) if err != nil { return nil, err diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index 0f59055940b45..afaab2c30bf1d 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/backoff" - tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -89,7 +88,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch( return nil, err } job := &backfillMeta.Job - tblInfo, err := getTblInfo(sch.d, job) + tblInfo, err := getTblInfo(ctx, sch.d, job) if err != nil { return nil, err } @@ -101,7 +100,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch( if tblInfo.Partition != nil { return generatePartitionPlan(tblInfo) } - return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(execIDs)) + return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs)) case proto.BackfillStepMergeSort: return generateMergePlan(taskHandle, task, logger) case proto.BackfillStepWriteAndIngest: @@ -201,8 +200,8 @@ func (sch *LitBackfillScheduler) Close() { sch.BaseScheduler.Close() } -func getTblInfo(d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) { - err = kv.RunInNewTxn(d.ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { +func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) { + err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID) return err }) @@ -242,11 +241,13 @@ const ( ) func generateNonPartitionPlan( + ctx context.Context, d *ddl, tblInfo *model.TableInfo, job *model.Job, useCloud bool, - instanceCnt int) (metas [][]byte, err error) { + instanceCnt int, +) (metas [][]byte, err error) { tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo) if err != nil { return nil, err @@ -267,7 +268,7 @@ func generateNonPartitionPlan( subTaskMetas := make([][]byte, 0, 4) backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax) - err = handle.RunWithRetry(d.ctx, 8, backoffer, tidblogutil.Logger(d.ctx), func(_ context.Context) (bool, error) { + err = handle.RunWithRetry(ctx, 8, backoffer, logutil.DDLLogger(), func(_ context.Context) (bool, error) { regionCache := d.store.(helper.Storage).GetRegionCache() recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) if err != nil { diff --git a/pkg/ddl/backfilling_dist_scheduler_test.go b/pkg/ddl/backfilling_dist_scheduler_test.go index b73bf3da310c8..07c749cef7881 100644 --- a/pkg/ddl/backfilling_dist_scheduler_test.go +++ b/pkg/ddl/backfilling_dist_scheduler_test.go @@ -60,7 +60,8 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { task.Step = sch.GetNextStep(&task.TaskBase) require.Equal(t, proto.BackfillStepReadIndex, task.Step) execIDs := []string{":4000"} - metas, err := sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step) + ctx := util.WithInternalSourceType(context.Background(), "backfill") + metas, err := sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step) require.NoError(t, err) require.Equal(t, len(tblInfo.Partition.Definitions), len(metas)) for i, par := range tblInfo.Partition.Definitions { @@ -73,7 +74,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { task.State = proto.TaskStateRunning task.Step = sch.GetNextStep(&task.TaskBase) require.Equal(t, proto.StepDone, task.Step) - metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step) + metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step) require.NoError(t, err) require.Len(t, metas, 0) @@ -85,7 +86,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { // 2.1 empty table tk.MustExec("create table t1(id int primary key, v int)") task = createAddIndexTask(t, dom, "test", "t1", proto.Backfill, false) - metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step) + metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step) require.NoError(t, err) require.Equal(t, 0, len(metas)) // 2.2 non empty table. @@ -97,7 +98,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { task = createAddIndexTask(t, dom, "test", "t2", proto.Backfill, false) // 2.2.1 stepInit task.Step = sch.GetNextStep(&task.TaskBase) - metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step) + metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step) require.NoError(t, err) require.Equal(t, 1, len(metas)) require.Equal(t, proto.BackfillStepReadIndex, task.Step) @@ -105,7 +106,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { task.State = proto.TaskStateRunning task.Step = sch.GetNextStep(&task.TaskBase) require.Equal(t, proto.StepDone, task.Step) - metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step) + metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step) require.NoError(t, err) require.Equal(t, 0, len(metas)) } diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index 5a6aabd5b65f0..863e8d3ebb526 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -220,8 +220,8 @@ func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string) return nil } -func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { - if err = ValidateFlashbackTS(d.ctx, se, flashbackTS); err != nil { +func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { + if err = ValidateFlashbackTS(ctx, se, flashbackTS); err != nil { return err } @@ -231,13 +231,13 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M if err = closePDSchedule(); err != nil { return err } - if err = setTiDBEnableAutoAnalyze(d.ctx, se, variable.Off); err != nil { + if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil { return err } - if err = setTiDBSuperReadOnly(d.ctx, se, variable.On); err != nil { + if err = setTiDBSuperReadOnly(ctx, se, variable.On); err != nil { return err } - if err = setTiDBTTLJobEnable(d.ctx, se, variable.Off); err != nil { + if err = setTiDBTTLJobEnable(ctx, se, variable.Off); err != nil { return err } @@ -256,12 +256,12 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M // Check if there is an upgrade during [flashbackTS, now) sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString) - rows, err := sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version") + rows, err := sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version") if err != nil || len(rows) == 0 { return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback") } sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0)) - rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version") + rows, err = sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version") if err != nil { return errors.Trace(err) } @@ -271,7 +271,7 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M // Check is there a DDL task at flashbackTS. sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString) - rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_history_job") + rows, err = sess.NewSession(se).Execute(ctx, sql, "check_history_job") if err != nil || len(rows) == 0 { return errors.Errorf("Get history ddl jobs failed, can't do flashback") } @@ -609,12 +609,12 @@ func flashbackToVersion( ).RunOnRange(ctx, startKey, endKey) } -func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) { +func splitRegionsByKeyRanges(ctx context.Context, d *ddlCtx, keyRanges []kv.KeyRange) { if s, ok := d.store.(kv.SplittableStore); ok { for _, keys := range keyRanges { for { // tableID is useless when scatter == false - _, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil) + _, err := s.SplitRegions(ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil) if err == nil { break } @@ -696,12 +696,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges. case model.StateDeleteOnly: - if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { + if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, d, t, job, flashbackTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } // We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC. - startTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + startTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -722,10 +722,10 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return updateSchemaVersion(d, t, job) } // Split region by keyRanges, make sure no unrelated key ranges be locked. - splitRegionsByKeyRanges(d, keyRanges) + splitRegionsByKeyRanges(w.ctx, d, keyRanges) totalRegions.Store(0) for _, r := range keyRanges { - if err = flashbackToVersion(d.ctx, d, + if err = flashbackToVersion(w.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, r) totalRegions.Add(uint64(stats.CompletedRegions)) @@ -738,7 +738,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.Args[totalLockedRegionsOffset] = totalRegions.Load() // We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC. - commitTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { return ver, errors.Trace(err) } @@ -756,7 +756,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } for _, r := range keyRanges { - if err = flashbackToVersion(d.ctx, d, + if err = flashbackToVersion(w.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { // Use same startTS as prepare phase to simulate 1PC txn. stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, commitTS, r) diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index a7022300c66c5..8b050de08e382 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -1093,7 +1093,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err // https://github.com/pingcap/tidb/issues/38297 return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.") } - err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo) + err := w.writePhysicalTableRecord(w.ctx, w.sessPool, p, workType, reorgInfo) if err != nil { return err } @@ -1105,7 +1105,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err return nil } if tbl, ok := t.(table.PhysicalTable); ok { - return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) } return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID) } diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index a34ef9aac5690..efa7c50b890d5 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -291,11 +291,9 @@ type ddl struct { delRangeMgr delRangeManager enableTiFlashPoll *atomicutil.Bool // used in the concurrency ddl. - reorgWorkerPool *workerPool - generalDDLWorkerPool *workerPool - localWorkerPool *workerPool - // get notification if any DDL coming. - ddlJobCh chan struct{} + localWorkerPool *workerPool + // get notification if any DDL job submitted or finished. + ddlJobNotifyCh chan struct{} // localJobCh is used to delivery job in local TiDB nodes. localJobCh chan *limitJobTask @@ -357,6 +355,12 @@ func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) { w.onceMap[id] = struct{}{} } +func (w *waitSchemaSyncedController) clearOnceMap() { + w.mu.Lock() + defer w.mu.Unlock() + w.onceMap = make(map[int64]struct{}, jobOnceCapacity) +} + // ddlCtx is the context when we use worker to handle DDL jobs. type ddlCtx struct { ctx context.Context @@ -379,8 +383,6 @@ type ddlCtx struct { *waitSchemaSyncedController *schemaVersionManager - runningJobs *runningJobs - // reorgCtx is used for reorganization. reorgCtx reorgContexts // backfillCtx is used for backfill workers. @@ -402,6 +404,7 @@ type ddlCtx struct { interceptor Interceptor } + // TODO merge with *waitSchemaSyncedController into another new struct. ddlSeqNumMu struct { sync.Mutex seqNum uint64 @@ -720,7 +723,6 @@ func newDDL(ctx context.Context, options ...Option) *ddl { etcdCli: opt.EtcdCli, autoidCli: opt.AutoIDClient, waitSchemaSyncedController: newWaitSchemaSyncedController(), - runningJobs: newRunningJobs(), } ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx) ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext) @@ -735,7 +737,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { limitJobCh: make(chan *limitJobTask, batchAddingJobs), limitJobChV2: make(chan *limitJobTask, batchAddingJobs), enableTiFlashPoll: atomicutil.NewBool(true), - ddlJobCh: make(chan struct{}, 100), + ddlJobNotifyCh: make(chan struct{}, 100), localJobCh: make(chan *limitJobTask, 1), } @@ -782,7 +784,7 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { return delRangeMgr } -func (d *ddl) prepareWorkers4ConcurrencyDDL() { +func (d *ddl) prepareLocalModeWorkers() { workerFactory := func(tp workerType) func() (pools.Resource, error) { return func() (pools.Resource, error) { wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx) @@ -796,19 +798,14 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() { return wk, nil } } - // reorg worker count at least 1 at most 10. - reorgCnt := min(max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt) // local worker count at least 2 at most 10. localCnt := min(max(runtime.GOMAXPROCS(0)/4, 2), localWorkerCnt) - d.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), jobTypeReorg) - d.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), jobTypeGeneral) d.localWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(localWorker), localCnt, localCnt, 0), jobTypeLocal) failpoint.Inject("NoDDLDispatchLoop", func(val failpoint.Value) { if val.(bool) { failpoint.Return() } }) - d.wg.Run(d.startDispatchLoop) d.wg.Run(d.startLocalWorkerLoop) } @@ -823,16 +820,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs) }) d.sessPool = sess.NewSessionPool(ctxPool, d.store) - d.ownerManager.SetBeOwnerHook(func() { - var err error - d.ddlSeqNumMu.Lock() - defer d.ddlSeqNumMu.Unlock() - d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum() - if err != nil { - logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err)) - } - d.runningJobs.clear() - }) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) @@ -840,8 +827,11 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { logutil.DDLLogger().Warn("start DDL init state syncer failed", zap.Error(err)) return errors.Trace(err) } + d.ownerManager.SetListener(&ownerListener{ + ddl: d, + }) - d.prepareWorkers4ConcurrencyDDL() + d.prepareLocalModeWorkers() if config.TableLockEnabled() { d.wg.Add(1) @@ -905,10 +895,10 @@ func (d *ddl) DisableDDL() error { } // GetNextDDLSeqNum return the next DDL seq num. -func (d *ddl) GetNextDDLSeqNum() (uint64, error) { +func (s *jobScheduler) GetNextDDLSeqNum() (uint64, error) { var count uint64 - ctx := kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL) - err := kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { + ctx := kv.WithInternalSourceType(s.schCtx, kv.InternalTxnDDL) + err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err error count, err = t.GetHistoryDDLCount() @@ -927,12 +917,6 @@ func (d *ddl) close() { d.wg.Wait() d.ownerManager.Cancel() d.schemaSyncer.Close() - if d.reorgWorkerPool != nil { - d.reorgWorkerPool.close() - } - if d.generalDDLWorkerPool != nil { - d.generalDDLWorkerPool.close() - } if d.localWorkerPool != nil { d.localWorkerPool.close() } @@ -1062,7 +1046,7 @@ func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) { } } -func (dc *ddlCtx) asyncNotifyWorker(ch chan struct{}, etcdPath string, jobID int64, jobType string) { +func (dc *ddlCtx) notifyNewJobSubmitted(ch chan struct{}, etcdPath string, jobID int64, jobType string) { // If the workers don't run, we needn't notify workers. // TODO: It does not affect informing the backfill worker. if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { @@ -1071,7 +1055,7 @@ func (dc *ddlCtx) asyncNotifyWorker(ch chan struct{}, etcdPath string, jobID int if dc.isOwner() { asyncNotify(ch) } else { - dc.asyncNotifyByEtcd(etcdPath, jobID, jobType) + dc.notifyNewJobByEtcd(etcdPath, jobID, jobType) } } @@ -1177,7 +1161,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { sessVars.StmtCtx.IsDDLJobInQueue = true // Notice worker that we push a new job and wait the job done. - d.asyncNotifyWorker(d.ddlJobCh, addingDDLJobConcurrent, job.ID, job.Type.String()) + d.notifyNewJobSubmitted(d.ddlJobNotifyCh, addingDDLJobNotifyKey, job.ID, job.Type.String()) logutil.DDLLogger().Info("start DDL job", zap.Stringer("job", job), zap.String("query", job.Query)) if !d.shouldCheckHistoryJob(job) { return nil diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 916458f1d08e5..ddd8a9019f7bb 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -98,8 +98,9 @@ type worker struct { tp workerType addingDDLJobKey string ddlJobCh chan struct{} - ctx context.Context - wg sync.WaitGroup + // for local mode worker, it's ctx of 'ddl', else it's the ctx of 'job scheduler'. + ctx context.Context + wg sync.WaitGroup sessPool *sess.Pool // sessPool is used to new sessions to execute SQL in ddl package. sess *sess.Session // sess is used and only used in running DDL job. @@ -188,7 +189,7 @@ func (w *worker) Close() { tidblogutil.Logger(w.logCtx).Info("DDL worker closed", zap.Duration("take time", time.Since(startTime))) } -func (dc *ddlCtx) asyncNotifyByEtcd(etcdPath string, jobID int64, jobType string) { +func (dc *ddlCtx) notifyNewJobByEtcd(etcdPath string, jobID int64, jobType string) { if dc.etcdCli == nil { return } @@ -1336,7 +1337,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, case model.ActionAlterTablePlacement: ver, err = onAlterTablePlacement(d, t, job) case model.ActionCreateResourceGroup: - ver, err = onCreateResourceGroup(d, t, job) + ver, err = onCreateResourceGroup(w.ctx, d, t, job) case model.ActionAlterResourceGroup: ver, err = onAlterResourceGroup(d, t, job) case model.ActionDropResourceGroup: @@ -1399,7 +1400,7 @@ func toTError(err error) *terror.Error { // waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens, // we wait at most 2 * lease time(sessionTTL, 90 seconds). -func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { +func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { return nil } @@ -1414,13 +1415,13 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in }() if latestSchemaVersion == 0 { - tidblogutil.Logger(d.ctx).Info("schema version doesn't change", zap.String("category", "ddl"), zap.Int64("jobID", job.ID)) + logutil.DDLLogger().Info("schema version doesn't change", zap.Int64("jobID", job.ID)) return nil } - err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion) + err = d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion) if err != nil { - tidblogutil.Logger(d.ctx).Info("update latest schema version failed", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + logutil.DDLLogger().Info("update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) if variable.EnableMDL.Load() { return err } @@ -1431,7 +1432,7 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in } } - return checkAllVersions(d, job, latestSchemaVersion, timeStart) + return checkAllVersions(ctx, d, job, latestSchemaVersion, timeStart) } func checkAllVersions(d *ddlCtx, job *model.Job, latestSchemaVersion int64, timeStart time.Time) error { @@ -1459,9 +1460,9 @@ func checkAllVersions(d *ddlCtx, job *model.Job, latestSchemaVersion int64, time } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. -func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error { +func waitSchemaSyncedForMDL(ctx context.Context, d *ddlCtx, job *model.Job, latestSchemaVersion int64) error { timeStart := time.Now() - return checkAllVersions(d, job, latestSchemaVersion, timeStart) + return checkAllVersions(ctx, d, job, latestSchemaVersion, timeStart) } // waitSchemaSynced handles the following situation: diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index d6774a479aabd..b36a72959399d 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1937,10 +1937,10 @@ var MockDMLExecutionStateBeforeMerge func() func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.DDLLogger().Info("start to merge temp index", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo)) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) + return w.writePhysicalTableRecord(w.ctx, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) } logutil.DDLLogger().Info("start to add table index", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo)) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.ctx, w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1973,6 +1973,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { w.ddlCtx.mu.RLock() w.ddlCtx.mu.hook.OnUpdateReorgInfo(reorgInfo.Job, reorgInfo.PhysicalTableID) w.ddlCtx.mu.RUnlock() + failpoint.InjectCall("beforeUpdateReorgInfo-addTableIndex", reorgInfo.Job) finish, err = updateReorgInfo(w.sessPool, tbl, reorgInfo) if err != nil { @@ -2037,7 +2038,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { taskType := proto.Backfill taskKey := fmt.Sprintf("ddl/%s/%d", taskType, reorgInfo.Job.ID) - g, ctx := errgroup.WithContext(context.Background()) + g, ctx := errgroup.WithContext(w.ctx) ctx = kv.WithInternalSourceType(ctx, kv.InternalDistTask) done := make(chan struct{}) @@ -2489,7 +2490,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.DDLLogger().Info("start to clean up index", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo)) - return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.ctx, w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions. diff --git a/pkg/ddl/ingest/tests/BUILD.bazel b/pkg/ddl/ingest/tests/BUILD.bazel index 8ee0e08783e45..642e29a57b7a5 100644 --- a/pkg/ddl/ingest/tests/BUILD.bazel +++ b/pkg/ddl/ingest/tests/BUILD.bazel @@ -10,8 +10,8 @@ go_test( "//pkg/config", "//pkg/ddl/ingest", "//pkg/ddl/ingest/testutil", - "//pkg/ddl/util/callback", "//pkg/parser/model", "//pkg/testkit", + "//pkg/testkit/testfailpoint", ], ) diff --git a/pkg/ddl/ingest/tests/partition_table_test.go b/pkg/ddl/ingest/tests/partition_table_test.go index 8cc593ebf0c79..1c07d724af7ac 100644 --- a/pkg/ddl/ingest/tests/partition_table_test.go +++ b/pkg/ddl/ingest/tests/partition_table_test.go @@ -20,12 +20,14 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/ingest" ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" - "github.com/pingcap/tidb/pkg/ddl/util/callback" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" ) func TestAddIndexIngestRecoverPartition(t *testing.T) { + // TODO we are unregistering LitBackCtxMgr when owner changes, but another owner + // might access it, so this case is unstable by nature. port := config.GetGlobalConfig().Port tc := testkit.NewDistExecutionContext(t, 3) defer tc.Close() @@ -37,39 +39,22 @@ func TestAddIndexIngestRecoverPartition(t *testing.T) { tk.MustExec("insert into t values (2, 3), (3, 3), (5, 5);") partCnt := 0 - changeOwner0To1 := func(job *model.Job, _ int64) { - partCnt++ - if partCnt == 3 { - tc.SetOwner(1) - // TODO(tangenta): mock multiple backends in a better way. - //nolint: forcetypeassert - // TODO(tangenta): When owner changes, wait last ddl owner's DDL scheduling loop exits. - ingest.LitBackCtxMgr.(*ingest.MockBackendCtxMgr).ResetSessCtx() - bc, _ := ingest.LitBackCtxMgr.Load(job.ID) - bc.GetCheckpointManager().Close() - bc.AttachCheckpointManager(nil) - config.GetGlobalConfig().Port = port + 1 - } - } - changeOwner1To2 := func(job *model.Job, _ int64) { - partCnt++ - if partCnt == 6 { - tc.SetOwner(2) - //nolint: forcetypeassert - ingest.LitBackCtxMgr.(*ingest.MockBackendCtxMgr).ResetSessCtx() - bc, _ := ingest.LitBackCtxMgr.Load(job.ID) - bc.GetCheckpointManager().Close() - bc.AttachCheckpointManager(nil) - config.GetGlobalConfig().Port = port + 2 - } - } - tc.SetOwner(0) - hook0 := &callback.TestDDLCallback{} - hook0.OnUpdateReorgInfoExported = changeOwner0To1 - hook1 := &callback.TestDDLCallback{} - hook1.OnUpdateReorgInfoExported = changeOwner1To2 - tc.GetDomain(0).DDL().SetHook(hook0) - tc.GetDomain(1).DDL().SetHook(hook1) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeUpdateReorgInfo-addTableIndex", + func(job *model.Job) { + partCnt++ + if partCnt == 3 || partCnt == 6 { + tc.TriggerOwnerChange() + // TODO(tangenta): mock multiple backends in a better way. + //nolint: forcetypeassert + // TODO(tangenta): When owner changes, wait last ddl owner's DDL scheduling loop exits. + ingest.LitBackCtxMgr.(*ingest.MockBackendCtxMgr).ResetSessCtx() + bc, _ := ingest.LitBackCtxMgr.Load(job.ID) + bc.GetCheckpointManager().Close() + bc.AttachCheckpointManager(nil) + config.GetGlobalConfig().Port = port + 1 + } + }, + ) tk.MustExec("alter table t add index idx(b);") tk.MustExec("admin check table t;") } diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index aa2ebc3d516d0..5ac7eedd220d3 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -20,11 +20,13 @@ import ( "encoding/hex" "encoding/json" "fmt" + "runtime" "slices" "strconv" "strings" "time" + "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -41,7 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" - tidb_util "github.com/pingcap/tidb/pkg/util" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/intest" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" @@ -50,7 +52,9 @@ import ( ) var ( - addingDDLJobConcurrent = "/tidb/ddl/add_ddl_job_general" + // addingDDLJobNotifyKey is the key in etcd to notify DDL scheduler that there + // is a new DDL job. + addingDDLJobNotifyKey = "/tidb/ddl/add_ddl_job_general" dispatchLoopWaitingDuration = 1 * time.Second localWorkerWaitingDuration = 10 * time.Millisecond ) @@ -82,7 +86,99 @@ const ( jobTypeLocal ) -func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) { +type ownerListener struct { + ddl *ddl + scheduler *jobScheduler +} + +var _ owner.Listener = (*ownerListener)(nil) + +func (l *ownerListener) OnBecomeOwner() { + ctx, cancelFunc := context.WithCancel(l.ddl.ddlCtx.ctx) + l.scheduler = &jobScheduler{ + schCtx: ctx, + cancel: cancelFunc, + runningJobs: newRunningJobs(), + + ddlCtx: l.ddl.ddlCtx, + ddlJobNotifyCh: l.ddl.ddlJobNotifyCh, + sessPool: l.ddl.sessPool, + delRangeMgr: l.ddl.delRangeMgr, + } + l.scheduler.start() +} + +func (l *ownerListener) OnRetireOwner() { + if l.scheduler == nil { + return + } + l.scheduler.close() +} + +// jobScheduler is used to schedule the DDL jobs, it's only run on the DDL owner. +type jobScheduler struct { + // *ddlCtx already have context named as "ctx", so we use "schCtx" here to avoid confusion. + schCtx context.Context + cancel context.CancelFunc + wg tidbutil.WaitGroupWrapper + runningJobs *runningJobs + + // those fields are created on start + reorgWorkerPool *workerPool + generalDDLWorkerPool *workerPool + + // those fields are shared with 'ddl' instance + // TODO ddlCtx is too large for here, we should remove dependency on it. + *ddlCtx + ddlJobNotifyCh chan struct{} + sessPool *sess.Pool + delRangeMgr delRangeManager +} + +func (s *jobScheduler) start() { + var err error + s.ddlCtx.ddlSeqNumMu.Lock() + defer s.ddlCtx.ddlSeqNumMu.Unlock() + s.ddlCtx.ddlSeqNumMu.seqNum, err = s.GetNextDDLSeqNum() + if err != nil { + logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err)) + } + + workerFactory := func(tp workerType) func() (pools.Resource, error) { + return func() (pools.Resource, error) { + wk := newWorker(s.schCtx, tp, s.sessPool, s.delRangeMgr, s.ddlCtx) + sessForJob, err := s.sessPool.Get() + if err != nil { + return nil, err + } + sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + wk.sess = sess.NewSession(sessForJob) + metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc() + return wk, nil + } + } + // reorg worker count at least 1 at most 10. + reorgCnt := min(max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt) + s.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), jobTypeReorg) + s.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), jobTypeGeneral) + s.wg.RunWithLog(s.startDispatchLoop) + s.wg.RunWithLog(func() { + s.schemaSyncer.SyncJobSchemaVerLoop(s.schCtx) + }) +} + +func (s *jobScheduler) close() { + s.cancel() + s.wg.Wait() + if s.reorgWorkerPool != nil { + s.reorgWorkerPool.close() + } + if s.generalDDLWorkerPool != nil { + s.generalDDLWorkerPool.close() + } +} + +func (s *jobScheduler) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) { not := "not" label := "get_job_general" if tp == jobTypeReorg { @@ -93,7 +189,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id` var excludedJobIDs string - if ids := d.runningJobs.allIDs(); len(ids) > 0 { + if ids := s.runningJobs.allIDs(); len(ids) > 0 { excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) } sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) @@ -111,7 +207,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool return nil, errors.Trace(err) } - isRunnable, err := d.processJobDuringUpgrade(se, &job) + isRunnable, err := s.processJobDuringUpgrade(se, &job) if err != nil { return nil, errors.Trace(err) } @@ -129,7 +225,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool return nil, errors.Trace(err) } if b { - if err = d.markJobProcessing(se, &job); err != nil { + if err = s.markJobProcessing(se, &job); err != nil { logutil.DDLLogger().Warn( "[ddl] handle ddl job failed: mark job is processing meet error", zap.Error(err), @@ -144,15 +240,15 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool func hasSysDB(job *model.Job) bool { for _, info := range job.GetInvolvingSchemaInfo() { - if tidb_util.IsSysDB(info.Database) { + if tidbutil.IsSysDB(info.Database) { return true } } return false } -func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { - if d.stateSyncer.IsUpgradingState() { +func (s *jobScheduler) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { + if s.stateSyncer.IsUpgradingState() { if job.IsPaused() { return false, nil } @@ -204,29 +300,29 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun return true, nil } -func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) { - return d.getJob(sess, jobTypeGeneral, func(job *model.Job) (bool, error) { - if !d.runningJobs.checkRunnable(job) { +func (s *jobScheduler) getGeneralJob(sess *sess.Session) (*model.Job, error) { + return s.getJob(sess, jobTypeGeneral, func(job *model.Job) (bool, error) { + if !s.runningJobs.checkRunnable(job) { return false, nil } if job.Type == model.ActionDropSchema { // Check if there is any reorg job on this schema. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10))) - rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + rows, err := sess.Execute(s.schCtx, sql, "check conflict jobs") return len(rows) == 0, err } // Check if there is any running job works on the same table. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+ "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',(', REPLACE(t1.table_ids, ',', '|'), '),') != 0)"+ "or (type = %d and processing)", job.ID, model.ActionFlashbackCluster) - rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + rows, err := sess.Execute(s.schCtx, sql, "check conflict jobs") return len(rows) == 0, err }) } -func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { - return d.getJob(sess, jobTypeReorg, func(job *model.Job) (bool, error) { - if !d.runningJobs.checkRunnable(job) { +func (s *jobScheduler) getReorgJob(sess *sess.Session) (*model.Job, error) { + return s.getJob(sess, jobTypeReorg, func(job *model.Job) (bool, error) { + if !s.runningJobs.checkRunnable(job) { return false, nil } // Check if there is any block ddl running, like drop schema and flashback cluster. @@ -235,7 +331,7 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { "or (CONCAT(',', table_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+ "or (type = %d and processing) limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster) - rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + rows, err := sess.Execute(s.schCtx, sql, "check conflict jobs") return len(rows) == 0, err }) } @@ -255,36 +351,31 @@ func (d *ddl) startLocalWorkerLoop() { } } -func (d *ddl) startDispatchLoop() { - sessCtx, err := d.sessPool.Get() +func (s *jobScheduler) startDispatchLoop() { + sessCtx, err := s.sessPool.Get() if err != nil { logutil.DDLLogger().Fatal("dispatch loop get session failed, it should not happen, please try restart TiDB", zap.Error(err)) } - defer d.sessPool.Put(sessCtx) + defer s.sessPool.Put(sessCtx) se := sess.NewSession(sessCtx) var notifyDDLJobByEtcdCh clientv3.WatchChan - if d.etcdCli != nil { - notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent) + if s.etcdCli != nil { + notifyDDLJobByEtcdCh = s.etcdCli.Watch(s.schCtx, addingDDLJobNotifyKey) } - if err := d.checkAndUpdateClusterState(true); err != nil { + if err := s.checkAndUpdateClusterState(true); err != nil { logutil.DDLLogger().Fatal("dispatch loop get cluster state failed, it should not happen, please try restart TiDB", zap.Error(err)) } ticker := time.NewTicker(dispatchLoopWaitingDuration) defer ticker.Stop() - isOnce := false + // TODO move waitSchemaSyncedController out of ddlCtx. + s.clearOnceMap() for { - if d.ctx.Err() != nil { + if s.schCtx.Err() != nil { return } - if !d.isOwner() { - isOnce = true - d.onceMap = make(map[int64]struct{}, jobOnceCapacity) - time.Sleep(dispatchLoopWaitingDuration) - continue - } failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() { if ingest.ResignOwnerForTest.Load() { - err2 := d.ownerManager.ResignOwner(context.Background()) + err2 := s.ownerManager.ResignOwner(context.Background()) if err2 != nil { logutil.DDLLogger().Info("resign meet error", zap.Error(err2)) } @@ -292,32 +383,34 @@ func (d *ddl) startDispatchLoop() { } }) select { - case <-d.ddlJobCh: + case <-s.ddlJobNotifyCh: case <-ticker.C: case _, ok := <-notifyDDLJobByEtcdCh: if !ok { - logutil.DDLLogger().Warn("start worker watch channel closed", zap.String("watch key", addingDDLJobConcurrent)) - notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent) + logutil.DDLLogger().Warn("start worker watch channel closed", zap.String("watch key", addingDDLJobNotifyKey)) + notifyDDLJobByEtcdCh = s.etcdCli.Watch(s.schCtx, addingDDLJobNotifyKey) time.Sleep(time.Second) continue } - case <-d.ctx.Done(): + case <-s.schCtx.Done(): return } - if err := d.checkAndUpdateClusterState(isOnce); err != nil { + if err := s.checkAndUpdateClusterState(false); err != nil { continue } - isOnce = false - d.loadDDLJobAndRun(se, d.generalDDLWorkerPool, d.getGeneralJob) - d.loadDDLJobAndRun(se, d.reorgWorkerPool, d.getReorgJob) + s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, s.getGeneralJob) + s.loadDDLJobAndRun(se, s.reorgWorkerPool, s.getReorgJob) } } -func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error { +// TODO make it run in a separate routine. +func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error { select { - case _, ok := <-d.stateSyncer.WatchChan(): + case _, ok := <-s.stateSyncer.WatchChan(): if !ok { - d.stateSyncer.Rewatch(d.ctx) + // TODO stateSyncer should only be started when we are the owner, and use + // the context of scheduler, will refactor it later. + s.stateSyncer.Rewatch(s.ddlCtx.ctx) } default: if !needUpdate { @@ -325,23 +418,20 @@ func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error { } } - oldState := d.stateSyncer.IsUpgradingState() - stateInfo, err := d.stateSyncer.GetGlobalState(d.ctx) + oldState := s.stateSyncer.IsUpgradingState() + stateInfo, err := s.stateSyncer.GetGlobalState(s.schCtx) if err != nil { logutil.DDLLogger().Warn("get global state failed", zap.Error(err)) return errors.Trace(err) } logutil.DDLLogger().Info("get global state and global state change", - zap.Bool("oldState", oldState), zap.Bool("currState", d.stateSyncer.IsUpgradingState())) - if !d.isOwner() { - return nil - } + zap.Bool("oldState", oldState), zap.Bool("currState", s.stateSyncer.IsUpgradingState())) ownerOp := owner.OpNone if stateInfo.State == syncer.StateUpgrading { ownerOp = owner.OpSyncUpgradingState } - err = d.ownerManager.SetOwnerOpValue(d.ctx, ownerOp) + err = s.ownerManager.SetOwnerOpValue(s.schCtx, ownerOp) if err != nil { logutil.DDLLogger().Warn("the owner sets global state to owner operator value failed", zap.Error(err)) return errors.Trace(err) @@ -350,16 +440,16 @@ func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error { return nil } -func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.Job, error)) { +func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.Job, error)) { wk, err := pool.get() if err != nil || wk == nil { logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err)) return } - d.mu.RLock() - d.mu.hook.OnGetJobBefore(pool.tp().String()) - d.mu.RUnlock() + s.mu.RLock() + s.mu.hook.OnGetJobBefore(pool.tp().String()) + s.mu.RUnlock() startTime := time.Now() job, err := getJob(se) @@ -370,11 +460,11 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(* pool.put(wk) return } - d.mu.RLock() - d.mu.hook.OnGetJobAfter(pool.tp().String(), job) - d.mu.RUnlock() + s.mu.RLock() + s.mu.hook.OnGetJobAfter(pool.tp().String(), job) + s.mu.RUnlock() - d.delivery2Worker(wk, pool, job) + s.delivery2Worker(wk, pool, job) } // delivery2LocalWorker runs the DDL job of v2 in local. @@ -415,21 +505,28 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { } // delivery2Worker owns the worker, need to put it back to the pool in this function. -func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { +func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { injectFailPointForGetJob(job) - d.runningJobs.add(job) - d.wg.Run(func() { + s.runningJobs.add(job) + s.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { - d.runningJobs.remove(job) - asyncNotify(d.ddlJobCh) + s.runningJobs.remove(job) + asyncNotify(s.ddlJobNotifyCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() + if wk.ctx.Err() != nil && ingest.LitBackCtxMgr != nil { + // if ctx cancelled, i.e. owner changed, we need to Unregister the backend + // as litBackendCtx is holding this very 'ctx', and it cannot reuse now. + // TODO make LitBackCtxMgr a local value of the job scheduler, it makes + // it much harder to test multiple owners in 1 unit test. + ingest.LitBackCtxMgr.Unregister(job.ID) + } }() - ownerID := d.ownerManager.ID() + ownerID := s.ownerManager.ID() // check if this ddl job is synced to all servers. - if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { + if !job.NotStarted() && (!s.isSynced(job) || !s.maybeAlreadyRunOnce(job.ID)) { if variable.EnableMDL.Load() { - exist, version, err := checkMDLInfo(job.ID, d.sessPool) + exist, version, err := checkMDLInfo(job.ID, s.sessPool) if err != nil { wk.jobLogger(job).Warn("check MDL info failed", zap.Error(err)) // Release the worker resource. @@ -438,28 +535,28 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { } else if exist { // Release the worker resource. pool.put(wk) - err = waitSchemaSyncedForMDL(d.ddlCtx, job, version) + err = waitSchemaSyncedForMDL(wk.ctx, s.ddlCtx, job, version) if err != nil { return } - d.setAlreadyRunOnce(job.ID) - cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced) + s.setAlreadyRunOnce(job.ID) + cleanMDLInfo(s.sessPool, job, s.etcdCli, ownerID, job.State == model.JobStateSynced) // Don't have a worker now. return } } else { - err := waitSchemaSynced(d.ddlCtx, job, 2*d.lease) + err := waitSchemaSynced(wk.ctx, s.ddlCtx, job, 2*s.lease) if err != nil { time.Sleep(time.Second) // Release the worker resource. pool.put(wk) return } - d.setAlreadyRunOnce(job.ID) + s.setAlreadyRunOnce(job.ID) } } - schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, job) + schemaVer, err := wk.HandleDDLJobTable(s.ddlCtx, job) logCtx := wk.logCtx pool.put(wk) if err != nil { @@ -477,28 +574,28 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. - err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + err := waitSchemaChanged(wk.ctx, s.ddlCtx, s.lease*2, schemaVer, job) if err != nil { return } - cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced) - d.synced(job) + cleanMDLInfo(s.sessPool, job, s.etcdCli, ownerID, job.State == model.JobStateSynced) + s.synced(job) if RunInGoTest { - // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. - d.mu.RLock() - d.mu.hook.OnSchemaStateChanged(schemaVer) - d.mu.RUnlock() + // s.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. + s.mu.RLock() + s.mu.hook.OnSchemaStateChanged(schemaVer) + s.mu.RUnlock() } - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() + s.mu.RLock() + s.mu.hook.OnJobUpdated(job) + s.mu.RUnlock() } }) } -func (*ddl) markJobProcessing(se *sess.Session, job *model.Job) error { +func (*jobScheduler) markJobProcessing(se *sess.Session, job *model.Job) error { se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) _, err := se.Execute(context.Background(), fmt.Sprintf( "update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID), diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 15ea2271e73e6..842ef15985a76 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -1964,14 +1964,14 @@ func getTableInfoWithOriginalPartitions(t *model.TableInfo, oldIDs []int64, newI return nt } -func dropLabelRules(d *ddlCtx, schemaName, tableName string, partNames []string) error { +func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames []string) error { deleteRules := make([]string, 0, len(partNames)) for _, partName := range partNames { deleteRules = append(deleteRules, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, schemaName, tableName, partName)) } // delete batch rules patch := label.NewRulePatch([]*label.Rule{}, deleteRules) - return infosync.UpdateLabelRules(d.ctx, patch) + return infosync.UpdateLabelRules(ctx, patch) } // onDropTablePartition deletes old partition meta. @@ -1998,7 +1998,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } // TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name? - err = dropLabelRules(d, job.SchemaName, tblInfo.Name.L, pNames) + err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") @@ -2046,7 +2046,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Trace(err) } physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames) - err = dropLabelRules(d, job.SchemaName, tblInfo.Name.L, partNames) + err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, partNames) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 5a82987ec7ab3..836c6eb703cfe 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -245,11 +245,6 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, if err != nil { return errors.Trace(err) } - case <-w.ctx.Done(): - logutil.DDLLogger().Info("run reorg job quit") - d.removeReorgCtx(job.ID) - // We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break. - return dbterror.ErrWaitReorgTimeout case <-time.After(waitTimeout): rowCount := rc.getRowCount() job.SetRowCount(rowCount) diff --git a/pkg/ddl/resource_group.go b/pkg/ddl/resource_group.go index caca77ec438e4..39265a46dee6a 100644 --- a/pkg/ddl/resource_group.go +++ b/pkg/ddl/resource_group.go @@ -38,7 +38,7 @@ const ( alreadyExists = "already exists" ) -func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { +func onCreateResourceGroup(ctx context.Context, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { groupInfo := &model.ResourceGroupInfo{} if err := job.DecodeArgs(groupInfo); err != nil { job.State = model.JobStateCancelled @@ -63,7 +63,7 @@ func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, return ver, errors.Trace(err) } - ctx, cancel := context.WithTimeout(d.ctx, defaultInfosyncTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultInfosyncTimeout) defer cancel() err = infosync.AddResourceGroup(ctx, protoGroup) if err != nil { diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go new file mode 100644 index 0000000000000..ec169386843b7 --- /dev/null +++ b/pkg/ddl/schema_version.go @@ -0,0 +1,417 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl/logutil" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/util/mathutil" + "go.uber.org/zap" +) + +// SetSchemaDiffForCreateTables set SchemaDiff for ActionCreateTables. +func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error { + var tableInfos []*model.TableInfo + err := job.DecodeArgs(&tableInfos) + if err != nil { + return errors.Trace(err) + } + diff.AffectedOpts = make([]*model.AffectedOption, len(tableInfos)) + for i := range tableInfos { + diff.AffectedOpts[i] = &model.AffectedOption{ + SchemaID: job.SchemaID, + OldSchemaID: job.SchemaID, + TableID: tableInfos[i].ID, + OldTableID: tableInfos[i].ID, + } + } + return nil +} + +// SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable. +func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error { + // Truncate table has two table ID, should be handled differently. + err := job.DecodeArgs(&diff.TableID) + if err != nil { + return errors.Trace(err) + } + diff.OldTableID = job.TableID + + // affects are used to update placement rule cache + if len(job.CtxVars) > 0 { + oldIDs := job.CtxVars[0].([]int64) + newIDs := job.CtxVars[1].([]int64) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } + return nil +} + +// SetSchemaDiffForCreateView set SchemaDiff for ActionCreateView. +func SetSchemaDiffForCreateView(diff *model.SchemaDiff, job *model.Job) error { + tbInfo := &model.TableInfo{} + var orReplace bool + var oldTbInfoID int64 + if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil { + return errors.Trace(err) + } + // When the statement is "create or replace view " and we need to drop the old view, + // it has two table IDs and should be handled differently. + if oldTbInfoID > 0 && orReplace { + diff.OldTableID = oldTbInfoID + } + diff.TableID = tbInfo.ID + return nil +} + +// SetSchemaDiffForRenameTable set SchemaDiff for ActionRenameTable. +func SetSchemaDiffForRenameTable(diff *model.SchemaDiff, job *model.Job) error { + err := job.DecodeArgs(&diff.OldSchemaID) + if err != nil { + return errors.Trace(err) + } + diff.TableID = job.TableID + return nil +} + +// SetSchemaDiffForRenameTables set SchemaDiff for ActionRenameTables. +func SetSchemaDiffForRenameTables(diff *model.SchemaDiff, job *model.Job) error { + var ( + oldSchemaIDs, newSchemaIDs, tableIDs []int64 + tableNames, oldSchemaNames []*model.CIStr + ) + err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames) + if err != nil { + return errors.Trace(err) + } + affects := make([]*model.AffectedOption, len(newSchemaIDs)-1) + for i, newSchemaID := range newSchemaIDs { + // Do not add the first table to AffectedOpts. Related issue tidb#47064. + if i == 0 { + continue + } + affects[i-1] = &model.AffectedOption{ + SchemaID: newSchemaID, + TableID: tableIDs[i], + OldTableID: tableIDs[i], + OldSchemaID: oldSchemaIDs[i], + } + } + diff.TableID = tableIDs[0] + diff.SchemaID = newSchemaIDs[0] + diff.OldSchemaID = oldSchemaIDs[0] + diff.AffectedOpts = affects + return nil +} + +// SetSchemaDiffForExchangeTablePartition set SchemaDiff for ActionExchangeTablePartition. +func SetSchemaDiffForExchangeTablePartition(diff *model.SchemaDiff, job *model.Job, multiInfos ...schemaIDAndTableInfo) error { + // From start of function: diff.SchemaID = job.SchemaID + // Old is original non partitioned table + diff.OldTableID = job.TableID + diff.OldSchemaID = job.SchemaID + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err := job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return errors.Trace(err) + } + // This is needed for not crashing TiFlash! + // TODO: Update TiFlash, to handle StateWriteOnly + diff.AffectedOpts = []*model.AffectedOption{{ + TableID: ptTableID, + }} + if job.SchemaState != model.StatePublic { + // No change, just to refresh the non-partitioned table + // with its new ExchangePartitionInfo. + diff.TableID = job.TableID + // Keep this as Schema ID of non-partitioned table + // to avoid trigger early rename in TiFlash + diff.AffectedOpts[0].SchemaID = job.SchemaID + // Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it. + if len(multiInfos) > 0 { + diff.AffectedOpts[0].OldSchemaID = ptSchemaID + } + } else { + // Swap + diff.TableID = ptDefID + // Also add correct SchemaID in case different schemas + diff.AffectedOpts[0].SchemaID = ptSchemaID + } + return nil +} + +// SetSchemaDiffForTruncateTablePartition set SchemaDiff for ActionTruncateTablePartition. +func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job) { + diff.TableID = job.TableID + if len(job.CtxVars) > 0 { + oldIDs := job.CtxVars[0].([]int64) + newIDs := job.CtxVars[1].([]int64) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } +} + +// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTablePartition, ActionRecoverTable, ActionDropTable. +func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job) { + // affects are used to update placement rule cache + diff.TableID = job.TableID + if len(job.CtxVars) > 0 { + if oldIDs, ok := job.CtxVars[0].([]int64); ok { + diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs) + } + } +} + +// SetSchemaDiffForReorganizePartition set SchemaDiff for ActionReorganizePartition. +func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job) { + diff.TableID = job.TableID + // TODO: should this be for every state of Reorganize? + if len(job.CtxVars) > 0 { + if droppedIDs, ok := job.CtxVars[0].([]int64); ok { + if addedIDs, ok := job.CtxVars[1].([]int64); ok { + // to use AffectedOpts we need both new and old to have the same length + maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs)) + // Also initialize them to 0! + oldIDs := make([]int64, maxParts) + copy(oldIDs, droppedIDs) + newIDs := make([]int64, maxParts) + copy(newIDs, addedIDs) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } + } + } +} + +// SetSchemaDiffForPartitionModify set SchemaDiff for ActionRemovePartitioning, ActionAlterTablePartitioning. +func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job) error { + diff.TableID = job.TableID + diff.OldTableID = job.TableID + if job.SchemaState == model.StateDeleteReorganization { + partInfo := &model.PartitionInfo{} + var partNames []string + err := job.DecodeArgs(&partNames, &partInfo) + if err != nil { + return errors.Trace(err) + } + // Final part, new table id is assigned + diff.TableID = partInfo.NewTableID + if len(job.CtxVars) > 0 { + if droppedIDs, ok := job.CtxVars[0].([]int64); ok { + if addedIDs, ok := job.CtxVars[1].([]int64); ok { + // to use AffectedOpts we need both new and old to have the same length + maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs)) + // Also initialize them to 0! + oldIDs := make([]int64, maxParts) + copy(oldIDs, droppedIDs) + newIDs := make([]int64, maxParts) + copy(newIDs, addedIDs) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } + } + } + } + return nil +} + +// SetSchemaDiffForCreateTable set SchemaDiff for ActionCreateTable. +func SetSchemaDiffForCreateTable(diff *model.SchemaDiff, job *model.Job) { + diff.TableID = job.TableID + if len(job.Args) > 0 { + tbInfo, _ := job.Args[0].(*model.TableInfo) + // When create table with foreign key, there are two schema status change: + // 1. none -> write-only + // 2. write-only -> public + // In the second status change write-only -> public, infoschema loader should apply drop old table first, then + // apply create new table. So need to set diff.OldTableID here to make sure it. + if tbInfo != nil && tbInfo.State == model.StatePublic && len(tbInfo.ForeignKeys) > 0 { + diff.OldTableID = job.TableID + } + } +} + +// SetSchemaDiffForRecoverSchema set SchemaDiff for ActionRecoverSchema. +func SetSchemaDiffForRecoverSchema(diff *model.SchemaDiff, job *model.Job) error { + var ( + recoverSchemaInfo *RecoverSchemaInfo + recoverSchemaCheckFlag int64 + ) + err := job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag) + if err != nil { + return errors.Trace(err) + } + // Reserved recoverSchemaCheckFlag value for gc work judgment. + job.Args[checkFlagIndexInJobArgs] = recoverSchemaCheckFlag + recoverTabsInfo := recoverSchemaInfo.RecoverTabsInfo + diff.AffectedOpts = make([]*model.AffectedOption, len(recoverTabsInfo)) + for i := range recoverTabsInfo { + diff.AffectedOpts[i] = &model.AffectedOption{ + SchemaID: job.SchemaID, + OldSchemaID: job.SchemaID, + TableID: recoverTabsInfo[i].TableInfo.ID, + OldTableID: recoverTabsInfo[i].TableInfo.ID, + } + } + return nil +} + +// SetSchemaDiffForFlashbackCluster set SchemaDiff for ActionFlashbackCluster. +func SetSchemaDiffForFlashbackCluster(diff *model.SchemaDiff, job *model.Job) { + diff.TableID = -1 + if job.SchemaState == model.StatePublic { + diff.RegenerateSchemaMap = true + } +} + +// SetSchemaDiffForMultiInfos set SchemaDiff for multiInfos. +func SetSchemaDiffForMultiInfos(diff *model.SchemaDiff, multiInfos ...schemaIDAndTableInfo) { + if len(multiInfos) > 0 { + existsMap := make(map[int64]struct{}) + existsMap[diff.TableID] = struct{}{} + for _, affect := range diff.AffectedOpts { + existsMap[affect.TableID] = struct{}{} + } + for _, info := range multiInfos { + _, exist := existsMap[info.tblInfo.ID] + if exist { + continue + } + existsMap[info.tblInfo.ID] = struct{}{} + diff.AffectedOpts = append(diff.AffectedOpts, &model.AffectedOption{ + SchemaID: info.schemaID, + OldSchemaID: info.schemaID, + TableID: info.tblInfo.ID, + OldTableID: info.tblInfo.ID, + }) + } + } +} + +// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff. +func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...schemaIDAndTableInfo) (int64, error) { + schemaVersion, err := d.setSchemaVersion(job, d.store) + if err != nil { + return 0, errors.Trace(err) + } + diff := &model.SchemaDiff{ + Version: schemaVersion, + Type: job.Type, + SchemaID: job.SchemaID, + } + switch job.Type { + case model.ActionCreateTables: + err = SetSchemaDiffForCreateTables(diff, job) + case model.ActionTruncateTable: + err = SetSchemaDiffForTruncateTable(diff, job) + case model.ActionCreateView: + err = SetSchemaDiffForCreateView(diff, job) + case model.ActionRenameTable: + err = SetSchemaDiffForRenameTable(diff, job) + case model.ActionRenameTables: + err = SetSchemaDiffForRenameTables(diff, job) + case model.ActionExchangeTablePartition: + err = SetSchemaDiffForExchangeTablePartition(diff, job, multiInfos...) + case model.ActionTruncateTablePartition: + SetSchemaDiffForTruncateTablePartition(diff, job) + case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable: + SetSchemaDiffForDropTable(diff, job) + case model.ActionReorganizePartition: + SetSchemaDiffForReorganizePartition(diff, job) + case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: + err = SetSchemaDiffForPartitionModify(diff, job) + case model.ActionCreateTable: + SetSchemaDiffForCreateTable(diff, job) + case model.ActionRecoverSchema: + err = SetSchemaDiffForRecoverSchema(diff, job) + case model.ActionFlashbackCluster: + SetSchemaDiffForFlashbackCluster(diff, job) + default: + diff.TableID = job.TableID + } + if err != nil { + return 0, err + } + SetSchemaDiffForMultiInfos(diff, multiInfos...) + err = t.SetSchemaDiff(diff) + return schemaVersion, errors.Trace(err) +} + +func checkAllVersions(ctx context.Context, d *ddlCtx, job *model.Job, latestSchemaVersion int64, timeStart time.Time) error { + failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { + panic("check down before update global version failed") + } + mockDDLErrOnce = -1 + } + }) + + // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). + err := d.schemaSyncer.OwnerCheckAllVersions(ctx, job.ID, latestSchemaVersion) + if err != nil { + logutil.DDLLogger().Info("wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), + zap.Int64("jobID", job.ID), zap.Duration("take time", time.Since(timeStart)), zap.Error(err)) + return err + } + logutil.DDLLogger().Info("wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", + zap.Int64("ver", latestSchemaVersion), + zap.Duration("take time", time.Since(timeStart)), + zap.String("job", job.String())) + return nil +} + +// waitSchemaSynced handles the following situation: +// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time, +// Then the worker restarts quickly, we may run the job immediately again, +// but in this case we don't wait enough 2 * lease time to let other servers update the schema. +// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version +// in a cluster, or to wait for 2 * lease time. +func waitSchemaSynced(ctx context.Context, d *ddlCtx, job *model.Job, waitTime time.Duration) error { + if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { + return nil + } + + ver, _ := d.store.CurrentVersion(kv.GlobalTxnScope) + snapshot := d.store.GetSnapshot(ver) + m := meta.NewSnapshotMeta(snapshot) + latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff() + if err != nil { + logutil.DDLLogger().Warn("get global version failed", zap.Int64("jobID", job.ID), zap.Error(err)) + return err + } + + failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { + panic("check down before update global version failed") + } + mockDDLErrOnce = -1 + } + }) + + return waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job) +} diff --git a/pkg/ddl/syncer/syncer.go b/pkg/ddl/syncer/syncer.go index 639df4a8b1a7c..37eab36c36d1a 100644 --- a/pkg/ddl/syncer/syncer.go +++ b/pkg/ddl/syncer/syncer.go @@ -378,7 +378,137 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i } } +<<<<<<< HEAD func isUpdatedLatestVersion(key, val string, latestVer int64, notMatchVerCnt, intervalCnt int, isUpdated bool) bool { +======= +// SyncJobSchemaVerLoop implements SchemaSyncer.SyncJobSchemaVerLoop interface. +func (s *schemaVersionSyncer) SyncJobSchemaVerLoop(ctx context.Context) { + for { + s.syncJobSchemaVer(ctx) + logutil.DDLLogger().Info("schema version sync loop interrupted, retrying...") + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + } +} + +func (s *schemaVersionSyncer) syncJobSchemaVer(ctx context.Context) { + resp, err := s.etcdCli.Get(ctx, s.jobNodeVerPrefix, clientv3.WithPrefix()) + if err != nil { + logutil.DDLLogger().Info("get all job versions failed", zap.Error(err)) + return + } + s.mu.Lock() + for jobID, item := range s.jobNodeVersions { + item.clearData() + // we might miss some DELETE events during retry, some items might be emptyAndNotUsed, remove them. + if item.emptyAndNotUsed() { + delete(s.jobNodeVersions, jobID) + } + } + s.mu.Unlock() + for _, oneKV := range resp.Kvs { + s.handleJobSchemaVerKV(oneKV, mvccpb.PUT) + } + + startRev := resp.Header.Revision + 1 + watchCtx, watchCtxCancel := context.WithCancel(ctx) + defer watchCtxCancel() + watchCtx = clientv3.WithRequireLeader(watchCtx) + watchCh := s.etcdCli.Watch(watchCtx, s.jobNodeVerPrefix, clientv3.WithPrefix(), clientv3.WithRev(startRev)) + for { + var ( + wresp clientv3.WatchResponse + ok bool + ) + select { + case <-watchCtx.Done(): + return + case wresp, ok = <-watchCh: + if !ok { + // ctx must be cancelled, else we should have received a response + // with err and caught by below err check. + return + } + } + failpoint.Inject("mockCompaction", func() { + wresp.CompactRevision = 123 + }) + if err := wresp.Err(); err != nil { + logutil.DDLLogger().Warn("watch job version failed", zap.Error(err)) + return + } + for _, ev := range wresp.Events { + s.handleJobSchemaVerKV(ev.Kv, ev.Type) + } + } +} + +func (s *schemaVersionSyncer) handleJobSchemaVerKV(kv *mvccpb.KeyValue, tp mvccpb.Event_EventType) { + jobID, tidbID, schemaVer, valid := decodeJobVersionEvent(kv, tp, s.jobNodeVerPrefix) + if !valid { + logutil.DDLLogger().Error("invalid job version kv", zap.Stringer("kv", kv), zap.Stringer("type", tp)) + return + } + if tp == mvccpb.PUT { + s.mu.Lock() + item, exists := s.jobNodeVersions[jobID] + if !exists { + item = newNodeVersions(1, nil) + s.jobNodeVersions[jobID] = item + } + s.mu.Unlock() + item.add(tidbID, schemaVer) + } else { // DELETE + s.mu.Lock() + if item, exists := s.jobNodeVersions[jobID]; exists { + item.del(tidbID) + if item.len() == 0 { + delete(s.jobNodeVersions, jobID) + } + } + s.mu.Unlock() + } +} + +func (s *schemaVersionSyncer) jobSchemaVerMatchOrSet(jobID int64, matchFn func(map[string]int64) bool) *nodeVersions { + s.mu.Lock() + defer s.mu.Unlock() + + item, exists := s.jobNodeVersions[jobID] + if exists { + item.matchOrSet(matchFn) + } else { + item = newNodeVersions(1, matchFn) + s.jobNodeVersions[jobID] = item + } + return item +} + +func decodeJobVersionEvent(kv *mvccpb.KeyValue, tp mvccpb.Event_EventType, prefix string) (jobID int64, tidbID string, schemaVer int64, valid bool) { + left := strings.TrimPrefix(string(kv.Key), prefix) + parts := strings.Split(left, "/") + if len(parts) != 2 { + return 0, "", 0, false + } + jobID, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, "", 0, false + } + // there is no Value in DELETE event, so we need to check it. + if tp == mvccpb.PUT { + schemaVer, err = strconv.ParseInt(string(kv.Value), 10, 64) + if err != nil { + return 0, "", 0, false + } + } + return jobID, parts[1], schemaVer, true +} + +func isUpdatedLatestVersion(key, val string, latestVer int64, notMatchVerCnt, intervalCnt int, nodeAlive bool) bool { +>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548)) ver, err := strconv.Atoi(val) if err != nil { logutil.DDLLogger().Info("syncer check all versions, convert value to int failed, continue checking.", diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index db86bbf1ee828..6a634b66947c2 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -782,10 +782,6 @@ func TestSubtaskHistoryTable(t *testing.T) { const ( taskID = 1 taskID2 = 2 - subTask1 = 1 - subTask2 = 2 - subTask3 = 3 - subTask4 = 4 // taskID2 tidb1 = "tidb1" tidb2 = "tidb2" tidb3 = "tidb3" @@ -793,11 +789,11 @@ func TestSubtaskHistoryTable(t *testing.T) { finishedMeta = "finished" ) - testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11) + subTask1 := testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11) require.NoError(t, sm.FinishSubtask(ctx, tidb1, subTask1, []byte(finishedMeta))) - testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb2, []byte(meta), proto.TaskTypeExample, 11) + subTask2 := testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb2, []byte(meta), proto.TaskTypeExample, 11) require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, tidb2, subTask2, proto.SubtaskStateCanceled, nil)) - testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb3, []byte(meta), proto.TaskTypeExample, 11) + subTask3 := testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb3, []byte(meta), proto.TaskTypeExample, 11) require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, tidb3, subTask3, proto.SubtaskStateFailed, nil)) subTasks, err := testutil.GetSubtasksByTaskID(ctx, sm, taskID) @@ -827,7 +823,7 @@ func TestSubtaskHistoryTable(t *testing.T) { testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)") time.Sleep(2 * time.Second) - testutil.CreateSubTask(t, sm, taskID2, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11) + subTask4 := testutil.CreateSubTask(t, sm, taskID2, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11) require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, tidb1, subTask4, proto.SubtaskStateFailed, nil)) require.NoError(t, testutil.TransferSubTasks2History(ctx, sm, taskID2)) diff --git a/pkg/disttask/framework/storage/task_state_test.go b/pkg/disttask/framework/storage/task_state_test.go index 7c5505cb0f1a6..d35381d8641aa 100644 --- a/pkg/disttask/framework/storage/task_state_test.go +++ b/pkg/disttask/framework/storage/task_state_test.go @@ -66,13 +66,18 @@ func TestTaskState(t *testing.T) { // 4. Reverted task id, err = gm.CreateTask(ctx, "key4", "test", 4, "", []byte("test")) require.NoError(t, err) +<<<<<<< HEAD require.Equal(t, int64(4), id) task, err = gm.GetTaskByID(ctx, 4) +======= + // require.Equal(t, int64(4), id) TODO: unstable for infoschema v2 + task, err = gm.GetTaskByID(ctx, id) +>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548)) require.NoError(t, err) checkTaskStateStep(t, task, proto.TaskStatePending, proto.StepInit) err = gm.RevertTask(ctx, task.ID, proto.TaskStatePending, nil) require.NoError(t, err) - task, err = gm.GetTaskByID(ctx, 4) + task, err = gm.GetTaskByID(ctx, id) require.NoError(t, err) checkTaskStateStep(t, task, proto.TaskStateReverting, proto.StepInit) diff --git a/pkg/disttask/framework/testutil/context.go b/pkg/disttask/framework/testutil/context.go index a8ee04aee6147..d99645b94ffa9 100644 --- a/pkg/disttask/framework/testutil/context.go +++ b/pkg/disttask/framework/testutil/context.go @@ -390,6 +390,7 @@ type TestContext struct { CallTime int } +<<<<<<< HEAD // InitTestContext inits test context for disttask tests. func InitTestContext(t *testing.T, nodeNum int) (context.Context, *gomock.Controller, *TestContext, *testkit.DistExecutionContext) { ctrl := gomock.NewController(t) @@ -404,6 +405,8 @@ func InitTestContext(t *testing.T, nodeNum int) (context.Context, *gomock.Contro return ctx, ctrl, testCtx, executionContext } +======= +>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548)) // CollectSubtask collects subtask info func (c *TestContext) CollectSubtask(subtask *proto.Subtask) { key := getTaskStepKey(subtask.TaskID, subtask.Step) diff --git a/pkg/disttask/framework/testutil/task_util.go b/pkg/disttask/framework/testutil/task_util.go index c80a6fc079b6b..e40786d37c07a 100644 --- a/pkg/disttask/framework/testutil/task_util.go +++ b/pkg/disttask/framework/testutil/task_util.go @@ -28,19 +28,29 @@ import ( // CreateSubTask adds a new task to subtask table. // used for testing. -func CreateSubTask(t *testing.T, gm *storage.TaskManager, taskID int64, step proto.Step, execID string, meta []byte, tp proto.TaskType, concurrency int) { - InsertSubtask(t, gm, taskID, step, execID, meta, proto.SubtaskStatePending, tp, concurrency) +func CreateSubTask(t *testing.T, gm *storage.TaskManager, taskID int64, step proto.Step, execID string, meta []byte, tp proto.TaskType, concurrency int) int64 { + return InsertSubtask(t, gm, taskID, step, execID, meta, proto.SubtaskStatePending, tp, concurrency) } // InsertSubtask adds a new subtask of any state to subtask table. -func InsertSubtask(t *testing.T, gm *storage.TaskManager, taskID int64, step proto.Step, execID string, meta []byte, state proto.SubtaskState, tp proto.TaskType, concurrency int) { +func InsertSubtask(t *testing.T, gm *storage.TaskManager, taskID int64, step proto.Step, execID string, meta []byte, state proto.SubtaskState, tp proto.TaskType, concurrency int) int64 { ctx := context.Background() ctx = util.WithInternalSourceType(ctx, "table_test") + var id int64 require.NoError(t, gm.WithNewSession(func(se sessionctx.Context) error { _, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), ` insert into mysql.tidb_background_subtask(`+storage.InsertSubtaskColumns+`) values`+ `(%?, %?, %?, %?, %?, %?, %?, NULL, CURRENT_TIMESTAMP(), '{}', '{}')`, step, taskID, execID, meta, state, proto.Type2Int(tp), concurrency) - return err + if err != nil { + return err + } + rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), "select @@last_insert_id") + if err != nil { + return err + } + id = rs[0].GetInt64(0) + return nil })) + return id } diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 11d5636c12136..174fbc5245c05 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -23,7 +23,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -40,6 +39,12 @@ import ( "go.uber.org/zap" ) +// Listener is used to listen the ownerManager's owner state. +type Listener interface { + OnBecomeOwner() + OnRetireOwner() +} + // Manager is used to campaign the owner and manage the owner information. type Manager interface { // ID returns the ID of the manager. @@ -62,9 +67,8 @@ type Manager interface { RequireOwner(ctx context.Context) error // CampaignCancel cancels one etcd campaign CampaignCancel() - - // SetBeOwnerHook sets a hook. The hook is called before becoming an owner. - SetBeOwnerHook(hook func()) + // SetListener sets the listener, set before CampaignOwner. + SetListener(listener Listener) } const ( @@ -111,12 +115,12 @@ type ownerManager struct { logCtx context.Context etcdCli *clientv3.Client cancel context.CancelFunc - elec unsafe.Pointer + elec atomic.Pointer[concurrency.Election] sessionLease *atomicutil.Int64 wg sync.WaitGroup campaignCancel context.CancelFunc - beOwnerHook func() + listener Listener } // NewOwnerManager creates a new Manager. @@ -143,7 +147,7 @@ func (m *ownerManager) ID() string { // IsOwner implements Manager.IsOwner interface. func (m *ownerManager) IsOwner() bool { - return atomic.LoadPointer(&m.elec) != unsafe.Pointer(nil) + return m.elec.Load() != nil } // Cancel implements Manager.Cancel interface. @@ -157,8 +161,8 @@ func (*ownerManager) RequireOwner(_ context.Context) error { return nil } -func (m *ownerManager) SetBeOwnerHook(hook func()) { - m.beOwnerHook = hook +func (m *ownerManager) SetListener(listener Listener) { + m.listener = listener } // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. @@ -198,7 +202,7 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { // ResignOwner lets the owner start a new election. func (m *ownerManager) ResignOwner(ctx context.Context) error { - elec := (*concurrency.Election)(atomic.LoadPointer(&m.elec)) + elec := m.elec.Load() if elec == nil { return errors.Errorf("This node is not a ddl owner, can't be resigned") } @@ -215,15 +219,20 @@ func (m *ownerManager) ResignOwner(ctx context.Context) error { } func (m *ownerManager) toBeOwner(elec *concurrency.Election) { - if m.beOwnerHook != nil { - m.beOwnerHook() + m.elec.Store(elec) + logutil.Logger(m.logCtx).Info("become owner") + if m.listener != nil { + m.listener.OnBecomeOwner() } - atomic.StorePointer(&m.elec, unsafe.Pointer(elec)) } // RetireOwner make the manager to be a not owner. func (m *ownerManager) RetireOwner() { - atomic.StorePointer(&m.elec, nil) + m.elec.Store(nil) + logutil.Logger(m.logCtx).Info("retire owner") + if m.listener != nil { + m.listener.OnRetireOwner() + } } // CampaignCancel implements Manager.CampaignCancel interface. diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index 6c86467b31b96..052ff181ab59c 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "runtime" + "sync/atomic" "testing" "time" @@ -78,6 +79,17 @@ func (ti *testInfo) Close(t *testing.T) { ti.cluster.Terminate(t) } +type listener struct { + val atomic.Bool +} + +func (l *listener) OnBecomeOwner() { + l.val.Store(true) +} +func (l *listener) OnRetireOwner() { + l.val.Store(false) +} + func TestSingle(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") @@ -87,9 +99,13 @@ func TestSingle(t *testing.T) { tInfo := newTestInfo(t) client, d := tInfo.client, tInfo.ddl defer tInfo.Close(t) - require.NoError(t, d.OwnerManager().CampaignOwner()) + ownerManager := d.OwnerManager() + lis := &listener{} + ownerManager.SetListener(lis) + require.NoError(t, ownerManager.CampaignOwner()) isOwner := checkOwner(d, true) require.True(t, isOwner) + require.True(t, lis.val.Load()) // test for newSession failed ctx := context.Background() @@ -105,9 +121,10 @@ func TestSingle(t *testing.T) { require.True(t, isOwner) // The test is used to exit campaign loop. - d.OwnerManager().Cancel() + ownerManager.Cancel() isOwner = checkOwner(d, false) require.False(t, isOwner) + require.False(t, lis.val.Load()) time.Sleep(200 * time.Millisecond) diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index babc054ed17b4..8d2900ccf1597 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -41,7 +41,7 @@ type mockManager struct { ctx context.Context wg sync.WaitGroup cancel context.CancelFunc - beOwnerHook func() + listener Listener retireHook func() campaignDone chan struct{} resignDone chan struct{} @@ -86,10 +86,10 @@ func (m *mockManager) IsOwner() bool { func (m *mockManager) toBeOwner() { ok := util.MockGlobalStateEntry.OwnerKey(m.storeID, m.key).SetOwner(m.id) if ok { - logutil.BgLogger().Debug("owner manager gets owner", zap.String("category", "ddl"), + logutil.BgLogger().Info("owner manager gets owner", zap.String("category", "ddl"), zap.String("ID", m.id), zap.String("ownerKey", m.key)) - if m.beOwnerHook != nil { - m.beOwnerHook() + if m.listener != nil { + m.listener.OnBecomeOwner() } } } @@ -97,6 +97,11 @@ func (m *mockManager) toBeOwner() { // RetireOwner implements Manager.RetireOwner interface. func (m *mockManager) RetireOwner() { util.MockGlobalStateEntry.OwnerKey(m.storeID, m.key).UnsetOwner(m.id) + logutil.BgLogger().Info("owner manager retire owner", zap.String("category", "ddl"), + zap.String("ID", m.id), zap.String("ownerKey", m.key)) + if m.listener != nil { + m.listener.OnRetireOwner() + } } // Cancel implements Manager.Cancel interface. @@ -169,9 +174,9 @@ func (*mockManager) RequireOwner(context.Context) error { return nil } -// SetBeOwnerHook implements Manager.SetBeOwnerHook interface. -func (m *mockManager) SetBeOwnerHook(hook func()) { - m.beOwnerHook = hook +// SetListener implements Manager.SetListener interface. +func (m *mockManager) SetListener(listener Listener) { + m.listener = listener } // CampaignCancel implements Manager.CampaignCancel interface diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index cae4ed84d2d4d..27e082e882abd 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -535,7 +535,10 @@ type Job struct { // Priority is only used to set the operation priority of adding indices. Priority int `json:"priority"` - // SeqNum is the total order in all DDLs, it's used to identify the order of DDL. + // SeqNum is the total order in all DDLs, it's used to identify the order of + // moving the job into DDL history, not the order of the job execution. + // fast create table doesn't honor this field, there might duplicate seq_num in this case. + // TODO: deprecated it, as it forces 'moving jobs into DDL history' part to be serial. SeqNum uint64 `json:"seq_num"` // Charset is the charset when the DDL Job is created. diff --git a/pkg/testkit/BUILD.bazel b/pkg/testkit/BUILD.bazel index aa3dd26e1b3c4..4c95a1311dcc5 100644 --- a/pkg/testkit/BUILD.bazel +++ b/pkg/testkit/BUILD.bazel @@ -16,7 +16,6 @@ go_library( deps = [ "//pkg/ddl/schematracker", "//pkg/domain", - "//pkg/domain/infosync", "//pkg/expression", "//pkg/kv", "//pkg/parser/ast", diff --git a/pkg/testkit/mockstore.go b/pkg/testkit/mockstore.go index c8df6c6dad5e6..3db132a8d161d 100644 --- a/pkg/testkit/mockstore.go +++ b/pkg/testkit/mockstore.go @@ -24,10 +24,8 @@ import ( "testing" "time" - "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/schematracker" "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/resourcemanager" "github.com/pingcap/tidb/pkg/session" @@ -131,6 +129,7 @@ func tryMakeImageOnce() (retry bool, err error) { // DistExecutionContext is the context // that used in Distributed execution test for Dist task framework and DDL. +// TODO remove it after we can start multiple DDL job scheduler separately. type DistExecutionContext struct { Store kv.Storage domains []*domain.Domain @@ -139,30 +138,17 @@ type DistExecutionContext struct { mu sync.Mutex } -// InitOwner select the last domain as DDL owner. -func (d *DistExecutionContext) InitOwner() { +// TriggerOwnerChange set one mock domain to DDL Owner by idx. +func (d *DistExecutionContext) TriggerOwnerChange() { d.mu.Lock() defer d.mu.Unlock() for _, dom := range d.domains { - dom.DDL().OwnerManager().RetireOwner() - } - err := d.domains[len(d.domains)-1].DDL().OwnerManager().CampaignOwner() - require.NoError(d.t, err) -} - -// SetOwner set one mock domain to DDL Owner by idx. -func (d *DistExecutionContext) SetOwner(idx int) { - d.mu.Lock() - defer d.mu.Unlock() - if idx >= len(d.domains) || idx < 0 { - require.NoError(d.t, errors.New("server idx out of bound")) - return - } - for _, dom := range d.domains { - dom.DDL().OwnerManager().RetireOwner() + om := dom.DDL().OwnerManager() + if om.IsOwner() { + _ = om.ResignOwner(nil) + break + } } - err := d.domains[idx].DDL().OwnerManager().CampaignOwner() - require.NoError(d.t, err) } // AddDomain add 1 domain which is not ddl owner. @@ -175,31 +161,6 @@ func (d *DistExecutionContext) AddDomain() { d.domains = append(d.domains, dom) } -// DeleteDomain delete 1 domain by idx, set server0 as ddl owner if the deleted owner is ddl owner. -func (d *DistExecutionContext) DeleteDomain(idx int) { - d.mu.Lock() - defer d.mu.Unlock() - if idx >= len(d.domains) || idx < 0 { - require.NoError(d.t, errors.New("server idx out of bound")) - return - } - if len(d.domains) == 1 { - require.NoError(d.t, errors.New("can't delete server, since server num = 1")) - return - } - if d.domains[idx].DDL().OwnerManager().IsOwner() { - d.mu.Unlock() - d.SetOwner(0) - d.mu.Lock() - } - - d.deletedDomains = append(d.deletedDomains, d.domains[idx]) - d.domains = append(d.domains[:idx], d.domains[idx+1:]...) - - err := infosync.MockGlobalServerInfoManagerEntry.Delete(idx) - require.NoError(d.t, err) -} - // Close cleanup running goroutines, release resources used. func (d *DistExecutionContext) Close() { d.t.Cleanup(func() { @@ -254,7 +215,6 @@ func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext res := DistExecutionContext{ schematracker.UnwrapStorage(store), domains, []*domain.Domain{}, t, sync.Mutex{}} - res.InitOwner() return &res } diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index 2e393ce4f8670..1cd192b84a485 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -546,6 +546,7 @@ func TestFirstLitSlowStart(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec(`set global tidb_enable_dist_task=off;`) tk.MustExec("create table t(a int, b int);") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")