Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53720
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed Oct 28, 2024
1 parent cc04dd7 commit 69a6893
Show file tree
Hide file tree
Showing 8 changed files with 566 additions and 13 deletions.
40 changes: 35 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
<<<<<<< HEAD:ddl/ddl.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
Expand Down Expand Up @@ -65,6 +66,40 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
=======
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/syncer"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/ddl.go
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -367,11 +402,6 @@ type ddlCtx struct {

// reorgCtx is used for reorganization.
reorgCtx reorgContexts
// backfillCtx is used for backfill workers.
backfillCtx struct {
syncutil.RWMutex
jobCtxMap map[int64]*JobContext
}

jobCtx struct {
sync.RWMutex
Expand Down
44 changes: 44 additions & 0 deletions ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/chunk"
"github.com/stretchr/testify/require"
<<<<<<< HEAD:ddl/ddl_api_test.go
"golang.org/x/exp/slices"
=======
"golang.org/x/sync/errgroup"
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/ddl_api_test.go
)

func TestGetDDLJobs(t *testing.T) {
Expand Down Expand Up @@ -152,6 +156,46 @@ func enQueueDDLJobs(t *testing.T, sess session.Session, txn kv.Transaction, jobT
}
}

func TestCreateViewConcurrently(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int);")
tk.MustExec("create view v as select * from t;")
var (
counterErr error
counter int
)
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) {
counter++
if counter > 1 {
counterErr = fmt.Errorf("create view job should not run concurrently")
return
}
})
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
if job.Type == model.ActionCreateView {
counter--
}
})
var eg errgroup.Group
for i := 0; i < 5; i++ {
eg.Go(func() error {
newTk := testkit.NewTestKit(t, store)
_, err := newTk.Exec("use test")
if err != nil {
return err
}
_, err = newTk.Exec("create or replace view v as select * from t;")
return err
})
}
err := eg.Wait()
require.NoError(t, err)
require.NoError(t, counterErr)
}

func TestCreateDropCreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
33 changes: 33 additions & 0 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,39 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
asyncNotify(d.ddlJobCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
<<<<<<< HEAD:ddl/job_table.go
=======

err := wk.HandleLocalDDLJob(d.ddlCtx, job)
pool.put(wk)
if err != nil {
logutil.DDLLogger().Info("handle ddl job failed", zap.Error(err), zap.Stringer("job", job))
}
task.NotifyError(err)
})
}

// delivery2Worker owns the worker, need to put it back to the pool in this function.
func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
injectFailPointForGetJob(job)
s.runningJobs.add(job)
s.wg.Run(func() {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
defer func() {
failpoint.InjectCall("afterDelivery2Worker", job)
s.runningJobs.remove(job)
asyncNotify(s.ddlJobNotifyCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
if wk.ctx.Err() != nil && ingest.LitBackCtxMgr != nil {
// if ctx cancelled, i.e. owner changed, we need to Unregister the backend
// as litBackendCtx is holding this very 'ctx', and it cannot reuse now.
// TODO make LitBackCtxMgr a local value of the job scheduler, it makes
// it much harder to test multiple owners in 1 unit test.
ingest.LitBackCtxMgr.Unregister(job.ID)
}
}()
ownerID := s.ownerManager.ID()
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/job_table.go
// check if this ddl job is synced to all servers.
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
Expand Down
9 changes: 8 additions & 1 deletion ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) (
}

is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
// Use cached policy.
policy, ok := is.PolicyByName(policyName)
if ok {
Expand Down Expand Up @@ -319,8 +319,15 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI
return err
}
is := d.infoCache.GetLatest()
<<<<<<< HEAD:ddl/placement_policy.go
if is.SchemaMetaVersion() == currVer {
return CheckPlacementPolicyNotInUseFromInfoSchema(is, policy)
=======
if is != nil && is.SchemaMetaVersion() == currVer {
err = CheckPlacementPolicyNotInUseFromInfoSchema(is, policy)
} else {
err = CheckPlacementPolicyNotInUseFromMeta(t, policy)
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/placement_policy.go
}

return CheckPlacementPolicyNotInUseFromMeta(t, policy)
Expand Down
2 changes: 1 addition & 1 deletion ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo)
}
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
Expand Down
100 changes: 95 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,19 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
var orReplace bool
var oldTbInfoID int64
if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil {
var _placeholder int64 // oldTblInfoID
if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo.State = model.StateNone
err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L)

oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L)
if infoschema.ErrTableNotExists.Equal(err) {
err = nil
}
failpoint.InjectCall("onDDLCreateView", job)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) {
job.State = model.JobStateCancelled
Expand All @@ -304,13 +309,18 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
// none -> public
tbInfo.State = model.StatePublic
tbInfo.UpdateTS = t.StartTS
<<<<<<< HEAD:ddl/table.go
if oldTbInfoID > 0 && orReplace {
err = t.DropTableOrView(schemaID, oldTbInfoID)
=======
if oldTableID > 0 && orReplace {
err = t.DropTableOrView(schemaID, job.SchemaName, oldTableID, tbInfo.Name.L)
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/table.go
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del()
err = t.GetAutoIDAccessors(schemaID, oldTableID).Del()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -1463,13 +1473,51 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
}

return checkTableNotExistsFromStore(t, schemaID, tableName)
}

<<<<<<< HEAD:ddl/table.go
=======
func checkTableNotExistsByName(d *ddlCtx, t *meta.Meta, schemaID int64, schemaName, tableName string) error {
// Try to use memory schema info to check first.
currVer, err := t.GetSchemaVersion()
if err != nil {
return err
}
is := d.infoCache.GetLatest()
if is != nil && is.SchemaMetaVersion() == currVer {
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
}
return t.CheckTableNameNotExists(t.TableNameKey(schemaName, tableName))
}

func checkConstraintNamesNotExists(t *meta.Meta, schemaID int64, constraints []*model.ConstraintInfo) error {
if len(constraints) == 0 {
return nil
}
tbInfos, err := t.ListTables(schemaID)
if err != nil {
return err
}

for _, tb := range tbInfos {
for _, constraint := range constraints {
if constraint.State != model.StateWriteOnly {
if constraintInfo := tb.FindConstraintInfoByName(constraint.Name.L); constraintInfo != nil {
return infoschema.ErrCheckConstraintDupName.GenWithStackByArgs(constraint.Name.L)
}
}
}
}

return nil
}

>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/table.go
func checkTableIDNotExists(t *meta.Meta, schemaID, tableID int64) error {
tbl, err := t.GetTable(schemaID, tableID)
if err != nil {
Expand Down Expand Up @@ -1516,6 +1564,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string
return nil
}

func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) {
// Try to use memory schema info to check first.
currVer, err := t.GetSchemaVersion()
if err != nil {
return 0, err
}
is := d.infoCache.GetLatest()
if is != nil && is.SchemaMetaVersion() == currVer {
return findTableIDFromInfoSchema(is, schemaID, tableName)
}

return findTableIDFromStore(t, schemaID, tableName)
}

func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) {
schema, ok := is.SchemaByID(schemaID)
if !ok {
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
}
tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName))
if err != nil {
return 0, err
}
return tbl.Meta().ID, nil
}

func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) {
tbls, err := t.ListSimpleTables(schemaID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
}
return 0, errors.Trace(err)
}
for _, tbl := range tbls {
if tbl.Name.L == tableName {
return tbl.ID, nil
}
}
return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName)
}

// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
ver int64, err error) {
Expand Down
3 changes: 2 additions & 1 deletion ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func TestCreateView(t *testing.T) {
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.DoDDLJob(ctx, job)
require.Error(t, err)
// The non-existing table id in job args will not be considered anymore.
require.NoError(t, err)
}

func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
Expand Down
Loading

0 comments on commit 69a6893

Please sign in to comment.