diff --git a/Makefile b/Makefile index 5d6c1c284b622..511c605092d43 100644 --- a/Makefile +++ b/Makefile @@ -496,6 +496,7 @@ gen_mock: mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go tools/bin/mockgen -package mockstorage github.com/pingcap/tidb/br/pkg/storage ExternalStorage > br/pkg/mock/storage/storage.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl SchemaLoader > pkg/ddl/mock/schema_loader_mock.go + tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager > pkg/ddl/mock/systable_manager_mock.go # There is no FreeBSD environment for GitHub actions. So cross-compile on Linux # but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index cfef206007e50..1ad515a5e332f 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -71,11 +71,11 @@ go_library( "//pkg/config", "//pkg/ddl/copr", "//pkg/ddl/ingest", - "//pkg/ddl/internal/session", "//pkg/ddl/label", "//pkg/ddl/logutil", "//pkg/ddl/placement", "//pkg/ddl/resourcegroup", + "//pkg/ddl/session", "//pkg/ddl/syncer", "//pkg/ddl/systable", "//pkg/ddl/util", @@ -269,11 +269,11 @@ go_test( "//pkg/config", "//pkg/ddl/copr", "//pkg/ddl/ingest", - "//pkg/ddl/internal/session", "//pkg/ddl/logutil", "//pkg/ddl/mock", "//pkg/ddl/placement", "//pkg/ddl/schematracker", + "//pkg/ddl/session", "//pkg/ddl/syncer", "//pkg/ddl/testutil", "//pkg/ddl/util", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index ba1afe446ace6..5ed43cf78c15d 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/expression" exprctx "github.com/pingcap/tidb/pkg/expression/context" diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 33583abde493d..f4adee5f2f06e 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -30,7 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/ingest" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/operator" "github.com/pingcap/tidb/pkg/kv" diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 9000b2bbed30c..dc0201729787c 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -22,8 +22,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/copr" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index d4cc2864f578d..e3b449317f7c7 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -28,8 +28,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index 657b0520f2d85..229e35f7321be 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/expression" exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/infoschema" diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index e4e6061d1df1c..94256cdf0074d 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -35,9 +35,10 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/syncer" + "github.com/pingcap/tidb/pkg/ddl/systable" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -262,6 +263,8 @@ type DDL interface { GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema // DoDDLJob does the DDL job, it's exported for test. DoDDLJob(ctx sessionctx.Context, job *model.Job) error + // GetMinJobIDRefresher gets the MinJobIDRefresher, this api only works after Start. + GetMinJobIDRefresher() *systable.MinJobIDRefresher // DoDDLJobWrapper similar to DoDDLJob, but with JobWrapper as input. // exported for testing. // TODO remove it after decouple components of DDL. @@ -314,7 +317,9 @@ type ddl struct { // used in the concurrency ddl. localWorkerPool *workerPool // get notification if any DDL job submitted or finished. - ddlJobNotifyCh chan struct{} + ddlJobNotifyCh chan struct{} + sysTblMgr systable.Manager + minJobIDRefresher *systable.MinJobIDRefresher // localJobCh is used to delivery job in local TiDB nodes. localJobCh chan *JobWrapper @@ -855,13 +860,18 @@ func (d *ddl) prepareLocalModeWorkers() { func (d *ddl) Start(ctxPool *pools.ResourcePool) error { logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load())) + d.sessPool = sess.NewSessionPool(ctxPool) + d.sysTblMgr = systable.NewManager(d.sessPool) + d.minJobIDRefresher = systable.NewMinJobIDRefresher(d.sysTblMgr) d.wg.Run(func() { d.limitDDLJobs(d.limitJobCh, d.addBatchDDLJobsV1) }) d.wg.Run(func() { d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs) }) - d.sessPool = sess.NewSessionPool(ctxPool) + d.wg.Run(func() { + d.minJobIDRefresher.Start(d.ctx) + }) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) @@ -1380,6 +1390,10 @@ func (d *ddl) SetHook(h Callback) { d.mu.hook = h } +func (d *ddl) GetMinJobIDRefresher() *systable.MinJobIDRefresher { + return d.minJobIDRefresher +} + func (d *ddl) startCleanDeadTableLock() { defer func() { d.wg.Done() diff --git a/pkg/ddl/ddl_history.go b/pkg/ddl/ddl_history.go index c983d3d79b554..ed3f494bc2270 100644 --- a/pkg/ddl/ddl_history.go +++ b/pkg/ddl/ddl_history.go @@ -22,8 +22,8 @@ import ( "strconv" "github.com/pingcap/errors" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" diff --git a/pkg/ddl/ddl_history_test.go b/pkg/ddl/ddl_history_test.go index 6014ccc374e74..8455960e841f6 100644 --- a/pkg/ddl/ddl_history_test.go +++ b/pkg/ddl/ddl_history_test.go @@ -24,7 +24,7 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 111aeca1946d3..3c9aea4265930 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -29,8 +29,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" @@ -411,16 +411,17 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error { return nil } + ctx := kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL) se, err := d.sessPool.Get() if err != nil { return errors.Trace(err) } defer d.sessPool.Put(se) - flashClusterJobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster)) + found, err := d.sysTblMgr.HasFlashbackClusterJob(ctx, d.minJobIDRefresher.GetCurrMinJobID()) if err != nil { return errors.Trace(err) } - if len(flashClusterJobs) != 0 { + if found { return errors.Errorf("Can't add ddl job, have flashback cluster job") } @@ -429,7 +430,6 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error { bdrRole = string(ast.BDRRoleNone) ) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index d34f4dfb06622..68ddfe092982f 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" diff --git a/pkg/ddl/executor_test.go b/pkg/ddl/executor_test.go index 1b862339b2eaf..be1037adaf8a2 100644 --- a/pkg/ddl/executor_test.go +++ b/pkg/ddl/executor_test.go @@ -24,7 +24,7 @@ import ( "time" "github.com/pingcap/tidb/pkg/ddl" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 4d1655e453e6c..d814000eff867 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -20,7 +20,7 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl/copr" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 2d608fbc57f85..62e60436c348c 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -33,8 +33,8 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 71811a11b01cb..30d6f70c8b9e0 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -24,7 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/distsql" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 6271084c36047..7c3f2768049d5 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -20,8 +20,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", - "//pkg/ddl/internal/session", "//pkg/ddl/logutil", + "//pkg/ddl/session", "//pkg/ddl/util", "//pkg/kv", "//pkg/lightning/backend", @@ -78,7 +78,7 @@ go_test( "//pkg/config", "//pkg/ddl", "//pkg/ddl/ingest/testutil", - "//pkg/ddl/internal/session", + "//pkg/ddl/session", "//pkg/ddl/testutil", "//pkg/ddl/util/callback", "//pkg/domain", diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 8aeaad7247db4..35f2462254afd 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index d3a897ebb05b4..530d30d3a5f09 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -23,7 +23,7 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl/ingest" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" diff --git a/pkg/ddl/ingest/env.go b/pkg/ddl/ingest/env.go index 6fedd184874ec..fda7e76720319 100644 --- a/pkg/ddl/ingest/env.go +++ b/pkg/ddl/ingest/env.go @@ -26,8 +26,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util" diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 5276657a25704..e218161c78015 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -31,8 +31,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/syncer" "github.com/pingcap/tidb/pkg/ddl/systable" "github.com/pingcap/tidb/pkg/ddl/util" @@ -97,12 +97,14 @@ var _ owner.Listener = (*ownerListener)(nil) func (l *ownerListener) OnBecomeOwner() { ctx, cancelFunc := context.WithCancel(l.ddl.ddlCtx.ctx) + sysTblMgr := systable.NewManager(l.ddl.sessPool) l.scheduler = &jobScheduler{ - schCtx: ctx, - cancel: cancelFunc, - runningJobs: newRunningJobs(), - sysTblMgr: systable.NewManager(l.ddl.sessPool), - schemaLoader: l.ddl.schemaLoader, + schCtx: ctx, + cancel: cancelFunc, + runningJobs: newRunningJobs(), + sysTblMgr: sysTblMgr, + schemaLoader: l.ddl.schemaLoader, + minJobIDRefresher: l.ddl.minJobIDRefresher, ddlCtx: l.ddl.ddlCtx, ddlJobNotifyCh: l.ddl.ddlJobNotifyCh, @@ -122,12 +124,13 @@ func (l *ownerListener) OnRetireOwner() { // jobScheduler is used to schedule the DDL jobs, it's only run on the DDL owner. type jobScheduler struct { // *ddlCtx already have context named as "ctx", so we use "schCtx" here to avoid confusion. - schCtx context.Context - cancel context.CancelFunc - wg tidbutil.WaitGroupWrapper - runningJobs *runningJobs - sysTblMgr systable.Manager - schemaLoader SchemaLoader + schCtx context.Context + cancel context.CancelFunc + wg tidbutil.WaitGroupWrapper + runningJobs *runningJobs + sysTblMgr systable.Manager + schemaLoader SchemaLoader + minJobIDRefresher *systable.MinJobIDRefresher // those fields are created on start reorgWorkerPool *workerPool @@ -381,12 +384,12 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error { defer s.runningJobs.resetAllPending() - const getJobSQL = `select reorg, job_meta from mysql.tidb_ddl_job %s order by job_id` + const getJobSQL = `select reorg, job_meta from mysql.tidb_ddl_job where job_id >= %d %s order by job_id` var whereClause string if ids := s.runningJobs.allIDs(); len(ids) > 0 { - whereClause = fmt.Sprintf("where job_id not in (%s)", ids) + whereClause = fmt.Sprintf("and job_id not in (%s)", ids) } - sql := fmt.Sprintf(getJobSQL, whereClause) + sql := fmt.Sprintf(getJobSQL, s.minJobIDRefresher.GetCurrMinJobID(), whereClause) rows, err := se.Execute(context.Background(), sql, "load_ddl_jobs") if err != nil { return errors.Trace(err) diff --git a/pkg/ddl/mock/BUILD.bazel b/pkg/ddl/mock/BUILD.bazel index e8f97e89691e2..a58b2710ee4e7 100644 --- a/pkg/ddl/mock/BUILD.bazel +++ b/pkg/ddl/mock/BUILD.bazel @@ -2,8 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "mock", - srcs = ["schema_loader_mock.go"], + srcs = [ + "schema_loader_mock.go", + "systable_manager_mock.go", + ], importpath = "github.com/pingcap/tidb/pkg/ddl/mock", visibility = ["//visibility:public"], - deps = ["@org_uber_go_mock//gomock"], + deps = [ + "//pkg/parser/model", + "@org_uber_go_mock//gomock", + ], ) diff --git a/pkg/ddl/mock/systable_manager_mock.go b/pkg/ddl/mock/systable_manager_mock.go new file mode 100644 index 0000000000000..2b959163b0016 --- /dev/null +++ b/pkg/ddl/mock/systable_manager_mock.go @@ -0,0 +1,106 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/pkg/ddl/systable (interfaces: Manager) +// +// Generated by this command: +// +// mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + model "github.com/pingcap/tidb/pkg/parser/model" + gomock "go.uber.org/mock/gomock" +) + +// MockManager is a mock of Manager interface. +type MockManager struct { + ctrl *gomock.Controller + recorder *MockManagerMockRecorder +} + +// MockManagerMockRecorder is the mock recorder for MockManager. +type MockManagerMockRecorder struct { + mock *MockManager +} + +// NewMockManager creates a new mock instance. +func NewMockManager(ctrl *gomock.Controller) *MockManager { + mock := &MockManager{ctrl: ctrl} + mock.recorder = &MockManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockManager) EXPECT() *MockManagerMockRecorder { + return m.recorder +} + +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockManager) ISGOMOCK() struct{} { + return struct{}{} +} + +// GetJobByID mocks base method. +func (m *MockManager) GetJobByID(arg0 context.Context, arg1 int64) (*model.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetJobByID", arg0, arg1) + ret0, _ := ret[0].(*model.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetJobByID indicates an expected call of GetJobByID. +func (mr *MockManagerMockRecorder) GetJobByID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJobByID", reflect.TypeOf((*MockManager)(nil).GetJobByID), arg0, arg1) +} + +// GetMDLVer mocks base method. +func (m *MockManager) GetMDLVer(arg0 context.Context, arg1 int64) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMDLVer", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMDLVer indicates an expected call of GetMDLVer. +func (mr *MockManagerMockRecorder) GetMDLVer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMDLVer", reflect.TypeOf((*MockManager)(nil).GetMDLVer), arg0, arg1) +} + +// GetMinJobID mocks base method. +func (m *MockManager) GetMinJobID(arg0 context.Context, arg1 int64) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMinJobID", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMinJobID indicates an expected call of GetMinJobID. +func (mr *MockManagerMockRecorder) GetMinJobID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMinJobID", reflect.TypeOf((*MockManager)(nil).GetMinJobID), arg0, arg1) +} + +// HasFlashbackClusterJob mocks base method. +func (m *MockManager) HasFlashbackClusterJob(arg0 context.Context, arg1 int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasFlashbackClusterJob", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HasFlashbackClusterJob indicates an expected call of HasFlashbackClusterJob. +func (mr *MockManagerMockRecorder) HasFlashbackClusterJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasFlashbackClusterJob", reflect.TypeOf((*MockManager)(nil).HasFlashbackClusterJob), arg0, arg1) +} diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 55ef0d59b624c..783d7b8561145 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -27,10 +27,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/label" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/ddl/placement" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index d3ce9c17c97c7..65c1bebc60f4f 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/ingest" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/distsql" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index 8dec33dd42f8f..5fa8025a74eca 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -20,8 +20,8 @@ import ( "strings" "github.com/pingcap/errors" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" diff --git a/pkg/ddl/schematracker/BUILD.bazel b/pkg/ddl/schematracker/BUILD.bazel index 18bd4de695363..8a721b4a0c524 100644 --- a/pkg/ddl/schematracker/BUILD.bazel +++ b/pkg/ddl/schematracker/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/ddl", "//pkg/ddl/syncer", + "//pkg/ddl/systable", "//pkg/infoschema", "//pkg/kv", "//pkg/meta/autoid", diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 04135e4a191bc..d82a0591eeb26 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -25,6 +25,7 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/syncer" + "github.com/pingcap/tidb/pkg/ddl/systable" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/autoid" @@ -560,6 +561,11 @@ func (d *Checker) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return d.realDDL.DoDDLJob(ctx, job) } +// GetMinJobIDRefresher implements the DDL interface. +func (d *Checker) GetMinJobIDRefresher() *systable.MinJobIDRefresher { + return d.realDDL.GetMinJobIDRefresher() +} + // DoDDLJobWrapper implements the DDL interface. func (d *Checker) DoDDLJobWrapper(ctx sessionctx.Context, jobW *ddl.JobWrapper) error { return d.realDDL.DoDDLJobWrapper(ctx, jobW) diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index 19c5836a99fd7..7a79e5df2e754 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/syncer" + "github.com/pingcap/tidb/pkg/ddl/systable" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/owner" @@ -1276,6 +1277,11 @@ func (SchemaTracker) DoDDLJob(_ sessionctx.Context, _ *model.Job) error { return nil } +// GetMinJobIDRefresher implements the DDL interface, it's no-op in DM's case. +func (SchemaTracker) GetMinJobIDRefresher() *systable.MinJobIDRefresher { + panic("not implemented") +} + // DoDDLJobWrapper implements the DDL interface, it's no-op in DM's case. func (SchemaTracker) DoDDLJobWrapper(_ sessionctx.Context, _ *ddl.JobWrapper) error { return nil diff --git a/pkg/ddl/internal/session/BUILD.bazel b/pkg/ddl/session/BUILD.bazel similarity index 88% rename from pkg/ddl/internal/session/BUILD.bazel rename to pkg/ddl/session/BUILD.bazel index 0221a13ca544d..e982b11627cf2 100644 --- a/pkg/ddl/internal/session/BUILD.bazel +++ b/pkg/ddl/session/BUILD.bazel @@ -6,8 +6,8 @@ go_library( "session.go", "session_pool.go", ], - importpath = "github.com/pingcap/tidb/pkg/ddl/internal/session", - visibility = ["//pkg/ddl:__subpackages__"], + importpath = "github.com/pingcap/tidb/pkg/ddl/session", + visibility = ["//visibility:public"], deps = [ "//pkg/ddl/logutil", "//pkg/domain/infosync", diff --git a/pkg/ddl/internal/session/session.go b/pkg/ddl/session/session.go similarity index 100% rename from pkg/ddl/internal/session/session.go rename to pkg/ddl/session/session.go diff --git a/pkg/ddl/internal/session/session_pool.go b/pkg/ddl/session/session_pool.go similarity index 100% rename from pkg/ddl/internal/session/session_pool.go rename to pkg/ddl/session/session_pool.go diff --git a/pkg/ddl/internal/session/session_pool_test.go b/pkg/ddl/session/session_pool_test.go similarity index 97% rename from pkg/ddl/internal/session/session_pool_test.go rename to pkg/ddl/session/session_pool_test.go index 570d0689f1cb9..d3b8080c1b658 100644 --- a/pkg/ddl/internal/session/session_pool_test.go +++ b/pkg/ddl/session/session_pool_test.go @@ -19,7 +19,7 @@ import ( "testing" "github.com/ngaut/pools" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) diff --git a/pkg/ddl/systable/BUILD.bazel b/pkg/ddl/systable/BUILD.bazel index 1ced5af199ca4..c0ddb9bfc63d0 100644 --- a/pkg/ddl/systable/BUILD.bazel +++ b/pkg/ddl/systable/BUILD.bazel @@ -2,29 +2,40 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "systable", - srcs = ["manager.go"], + srcs = [ + "manager.go", + "min_job_id.go", + ], importpath = "github.com/pingcap/tidb/pkg/ddl/systable", visibility = ["//visibility:public"], deps = [ - "//pkg/ddl/internal/session", + "//pkg/ddl/logutil", + "//pkg/ddl/session", "//pkg/parser/model", "@com_github_pingcap_errors//:errors", + "@org_uber_go_zap//:zap", ], ) go_test( name = "systable_test", timeout = "short", - srcs = ["manager_test.go"], + srcs = [ + "manager_test.go", + "min_job_id_test.go", + ], + embed = [":systable"], flaky = True, deps = [ - ":systable", - "//pkg/ddl/internal/session", + "//pkg/ddl/mock", + "//pkg/ddl/session", "//pkg/domain", + "//pkg/parser/model", "//pkg/store/mockstore", "//pkg/testkit", "@com_github_ngaut_pools//:pools", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", + "@org_uber_go_mock//gomock", ], ) diff --git a/pkg/ddl/systable/manager.go b/pkg/ddl/systable/manager.go index 735ca590c7538..2dc24d813c0ae 100644 --- a/pkg/ddl/systable/manager.go +++ b/pkg/ddl/systable/manager.go @@ -21,7 +21,7 @@ import ( "fmt" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/parser/model" ) @@ -38,6 +38,13 @@ type Manager interface { GetJobByID(ctx context.Context, jobID int64) (*model.Job, error) // GetMDLVer gets the MDL version by job ID, returns ErrNotFound if the MDL info does not exist. GetMDLVer(ctx context.Context, jobID int64) (int64, error) + // GetMinJobID gets current minimum job ID in the job table for job_id >= prevMinJobID, + // if no jobs, returns 0. prevMinJobID is used to avoid full table scan, see + // https://github.com/pingcap/tidb/issues/52905 + GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error) + // HasFlashbackClusterJob checks if there is any flashback cluster job. + // minJobID has the same meaning as in GetMinJobID. + HasFlashbackClusterJob(ctx context.Context, minJobID int64) (bool, error) } type manager struct { @@ -105,3 +112,42 @@ func (mgr *manager) GetMDLVer(ctx context.Context, jobID int64) (int64, error) { } return ver, nil } + +func (mgr *manager) GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error) { + var minID int64 + if err := mgr.withNewSession(func(se *session.Session) error { + sql := fmt.Sprintf(`select min(job_id) from mysql.tidb_ddl_job where job_id >= %d`, prevMinJobID) + rows, err := se.Execute(ctx, sql, "get-min-job-id") + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + minID = rows[0].GetInt64(0) + return nil + }); err != nil { + return 0, err + } + return minID, nil +} + +func (mgr *manager) HasFlashbackClusterJob(ctx context.Context, minJobID int64) (bool, error) { + var hasFlashbackClusterJob bool + if err := mgr.withNewSession(func(se *session.Session) error { + sql := fmt.Sprintf(`select count(1) from mysql.tidb_ddl_job where job_id >= %d and type = %d`, + minJobID, model.ActionFlashbackCluster) + rows, err := se.Execute(ctx, sql, "has-flashback-cluster-job") + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + hasFlashbackClusterJob = rows[0].GetInt64(0) > 0 + return nil + }); err != nil { + return false, err + } + return hasFlashbackClusterJob, nil +} diff --git a/pkg/ddl/systable/manager_test.go b/pkg/ddl/systable/manager_test.go index 77add72b4663d..2145694ad0fc9 100644 --- a/pkg/ddl/systable/manager_test.go +++ b/pkg/ddl/systable/manager_test.go @@ -16,13 +16,15 @@ package systable_test import ( "context" + "fmt" "testing" "time" "github.com/ngaut/pools" - "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/ddl/systable" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -64,4 +66,40 @@ func TestManager(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 123, ver) }) + + t.Run("GetMinJobID", func(t *testing.T) { + tk.MustExec("delete from mysql.tidb_ddl_job") + id, err := mgr.GetMinJobID(ctx, 0) + require.NoError(t, err) + require.EqualValues(t, 0, id) + tk.MustExec(`insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) + values(123456, 0, '1', '1', '{"id":9998}', 1, 0)`) + id, err = mgr.GetMinJobID(ctx, 0) + require.NoError(t, err) + require.EqualValues(t, 123456, id) + id, err = mgr.GetMinJobID(ctx, 123456) + require.NoError(t, err) + require.EqualValues(t, 123456, id) + id, err = mgr.GetMinJobID(ctx, 123457) + require.NoError(t, err) + require.EqualValues(t, 0, id) + }) + + t.Run("HasFlashbackClusterJob", func(t *testing.T) { + tk.MustExec("delete from mysql.tidb_ddl_job") + found, err := mgr.HasFlashbackClusterJob(ctx, 0) + require.NoError(t, err) + require.False(t, found) + tk.MustExec(fmt.Sprintf(`insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) + values(123, 0, '1', '1', '{"id":9998}', %d, 0)`, model.ActionFlashbackCluster)) + found, err = mgr.HasFlashbackClusterJob(ctx, 0) + require.NoError(t, err) + require.True(t, found) + found, err = mgr.HasFlashbackClusterJob(ctx, 123) + require.NoError(t, err) + require.True(t, found) + found, err = mgr.HasFlashbackClusterJob(ctx, 124) + require.NoError(t, err) + require.False(t, found) + }) } diff --git a/pkg/ddl/systable/min_job_id.go b/pkg/ddl/systable/min_job_id.go new file mode 100644 index 0000000000000..7c26e96510afa --- /dev/null +++ b/pkg/ddl/systable/min_job_id.go @@ -0,0 +1,73 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package systable + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/pkg/ddl/logutil" + "go.uber.org/zap" +) + +var ( + refreshInterval = 10 * time.Second +) + +// MinJobIDRefresher is used to maintain the minimal job ID in tidb_ddl_job table. +// we use it to mitigate this issue https://github.com/pingcap/tidb/issues/52905 +// by querying since min job ID, TiKV can seek to the position where rows exists +// to avoid scanning and skipping all the deleted rows. +type MinJobIDRefresher struct { + sysTblMgr Manager + currMinJobID atomic.Int64 +} + +// NewMinJobIDRefresher creates a new MinJobIDRefresher. +func NewMinJobIDRefresher(sysTblMgr Manager) *MinJobIDRefresher { + return &MinJobIDRefresher{ + sysTblMgr: sysTblMgr, + } +} + +// GetCurrMinJobID gets the minimal job ID in tidb_ddl_job table. +func (r *MinJobIDRefresher) GetCurrMinJobID() int64 { + return r.currMinJobID.Load() +} + +// Start refreshes the minimal job ID in tidb_ddl_job table. +func (r *MinJobIDRefresher) Start(ctx context.Context) { + for { + r.refresh(ctx) + + select { + case <-ctx.Done(): + return + case <-time.After(refreshInterval): + } + } +} + +func (r *MinJobIDRefresher) refresh(ctx context.Context) { + currMinID := r.currMinJobID.Load() + nextMinID, err := r.sysTblMgr.GetMinJobID(ctx, currMinID) + if err != nil { + logutil.DDLLogger().Info("get min job ID failed", zap.Error(err)) + return + } + // use max, in case all job are finished to avoid the currMinJobID go back. + r.currMinJobID.Store(max(currMinID, nextMinID)) +} diff --git a/pkg/ddl/systable/min_job_id_test.go b/pkg/ddl/systable/min_job_id_test.go new file mode 100644 index 0000000000000..f4d00a7c3ec10 --- /dev/null +++ b/pkg/ddl/systable/min_job_id_test.go @@ -0,0 +1,54 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package systable + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/ddl/mock" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestRefreshMinJobID(t *testing.T) { + loopRetryIntBak := refreshInterval + refreshInterval = 10 * time.Millisecond + t.Cleanup(func() { + refreshInterval = loopRetryIntBak + }) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mgr := mock.NewMockManager(ctrl) + ctx := context.Background() + + refresher := NewMinJobIDRefresher(mgr) + // success + mgr.EXPECT().GetMinJobID(gomock.Any(), int64(0)).Return(int64(1), nil) + refresher.refresh(ctx) + require.EqualValues(t, 1, refresher.GetCurrMinJobID()) + require.True(t, ctrl.Satisfied()) + // success again + mgr.EXPECT().GetMinJobID(gomock.Any(), int64(1)).Return(int64(100), nil) + refresher.refresh(ctx) + require.EqualValues(t, 100, refresher.GetCurrMinJobID()) + require.True(t, ctrl.Satisfied()) + // don't go back when all jobs are done + mgr.EXPECT().GetMinJobID(gomock.Any(), int64(100)).Return(int64(0), nil) + refresher.refresh(ctx) + require.EqualValues(t, 100, refresher.GetCurrMinJobID()) + require.True(t, ctrl.Satisfied()) +} diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 95869dee3806f..996887bd2e11e 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -24,10 +24,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/label" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/ddl/placement" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index e77aa49d9bcf8..57c3ee876f8ff 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/ddl", "//pkg/ddl/placement", "//pkg/ddl/schematracker", + "//pkg/ddl/systable", "//pkg/ddl/util", "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index bb55529788de7..a9c5decfa2000 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/placement" "github.com/pingcap/tidb/pkg/ddl/schematracker" + "github.com/pingcap/tidb/pkg/ddl/systable" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" @@ -202,8 +203,9 @@ type Domain struct { sctxs map[sessionctx.Context]bool } - mdlCheckCh chan struct{} - stopAutoAnalyze atomicutil.Bool + mdlCheckCh chan struct{} + stopAutoAnalyze atomicutil.Bool + minJobIDRefresher *systable.MinJobIDRefresher instancePlanCache sessionctx.InstancePlanCache // the instance level plan cache @@ -881,15 +883,18 @@ func (do *Domain) refreshMDLCheckTableInfo() { } // Make sure the session is new. sctx := se.(sessionctx.Context) - if _, err := sctx.GetSQLExecutor().ExecuteInternal(kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta), "rollback"); err != nil { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta) + if _, err := sctx.GetSQLExecutor().ExecuteInternal(ctx, "rollback"); err != nil { se.Close() return } defer do.sysSessionPool.Put(se) exec := sctx.GetRestrictedSQLExecutor() domainSchemaVer := do.InfoSchema().SchemaMetaVersion() - rows, _, err := exec.ExecRestrictedSQL(kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta), nil, - fmt.Sprintf("select job_id, version, table_ids from mysql.tidb_mdl_info where version <= %d", domainSchemaVer)) + // the job must stay inside tidb_ddl_job if we need to wait schema version for it. + sql := fmt.Sprintf(`select job_id, version, table_ids from mysql.tidb_mdl_info + where job_id >= %d and version <= %d`, do.minJobIDRefresher.GetCurrMinJobID(), domainSchemaVer) + rows, _, err := exec.ExecRestrictedSQL(ctx, nil, sql) if err != nil { logutil.BgLogger().Warn("get mdl info from tidb_mdl_info failed", zap.Error(err)) return @@ -1274,6 +1279,7 @@ func (do *Domain) Init( return sysExecutorFactory(do) } sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) + ctx, cancelFunc := context.WithCancel(context.Background()) do.cancelFns.mu.Lock() do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc) @@ -1366,6 +1372,7 @@ func (do *Domain) Init( if err != nil { return err } + do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher() // TODO there are many place set ddlLease to 0, remove them completely, we want // UT and even local uni-store to run similar code path as normal.