Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support drop sequence in TiDB #14442

Merged
merged 14 commits into from
Jan 20, 2020
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type DDL interface {
UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error
RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
31 changes: 31 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3334,6 +3334,9 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
if tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}
if tb.Meta().IsSequence() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}

job := &model.Job{
SchemaID: schema.ID,
Expand Down Expand Up @@ -4364,3 +4367,31 @@ func (d *ddl) CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStm
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool) (err error) {
schema, tbl, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tbl.Meta().IsSequence() {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
err = ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onRepairTable(d, t, job)
case model.ActionCreateView:
ver, err = onCreateView(d, t, job)
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
ver, err = onDropTableOrView(t, job)
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
case model.ActionDropTablePartition:
ver, err = onDropTablePartition(t, job)
Expand Down
2 changes: 1 addition & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = rollingbackDropIndex(t, job)
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
err = rollingbackDropTableOrView(t, job)
case model.ActionDropTablePartition:
ver, err = rollingbackDropTablePartition(t, job)
Expand Down
69 changes: 69 additions & 0 deletions ddl/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -84,3 +86,70 @@ func (s *testSequenceSuite) TestCreateSequence(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1142]CREATE command denied to user 'localhost'@'myuser' for table 'my_seq'")
}

func (s *testSequenceSuite) TestDropSequence(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the test case for drop table SEQUCNE_NAME, 'drop view SEQUCNE_NAME`.

Also, we need to test the behavior for drop sequence TABLE_NAME, drop sequence VIEW_NAME, but not in this function. You can find a suitable place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

s.tk.MustExec("use test")
s.tk.MustExec("drop sequence if exists seq")

// Test sequence is unknown.
s.tk.MustGetErrCode("drop sequence seq", mysql.ErrUnknownSequence)

// Test non-existed sequence can't drop successfully.
s.tk.MustExec("create sequence seq")
_, err := s.tk.Exec("drop sequence seq, seq2")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'test.seq2'")

// Test the specified object is not sequence.
s.tk.MustExec("create table seq3 (a int)")
_, err = s.tk.Exec("drop sequence seq3")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, ddl.ErrWrongObject), IsTrue)

// Test schema is not exist.
_, err = s.tk.Exec("drop sequence unknown.seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'unknown.seq'")

// Test drop sequence successfully.
s.tk.MustExec("create sequence seq")
_, err = s.tk.Exec("drop sequence seq")
c.Assert(err, IsNil)
_, err = s.tk.Exec("drop sequence seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'test.seq'")

// Test drop table when the object is a sequence.
s.tk.MustExec("create sequence seq")
_, err = s.tk.Exec("drop table seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:1051]Unknown table 'test.seq'")
AilinKid marked this conversation as resolved.
Show resolved Hide resolved

// Test drop view when the object is a sequence.
_, err = s.tk.Exec("drop view seq")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, ddl.ErrWrongObject), IsTrue)
s.tk.MustExec("drop sequence seq")

// Test drop privilege.
s.tk.MustExec("drop user if exists myuser@localhost")
s.tk.MustExec("create user myuser@localhost")
s.tk.MustExec("flush privileges")

tk1 := testkit.NewTestKit(c, s.store)
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(se.Auth(&auth.UserIdentity{Username: "myuser", Hostname: "localhost"}, nil, nil), IsTrue)
tk1.Se = se

// grant the myuser the access to database test.
s.tk.MustExec("create sequence my_seq")
s.tk.MustExec("grant select on test.* to 'myuser'@'localhost'")
s.tk.MustExec("flush privileges")

tk1.MustExec("use test")
_, err = tk1.Exec("drop sequence my_seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1142]DROP command denied to user 'localhost'@'myuser' for table 'my_seq'")
}
10 changes: 8 additions & 2 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,14 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTableOrView(job.SchemaID, job.TableID, true); err != nil {
break
if tblInfo.IsSequence() {
if err = t.DropSequence(job.SchemaID, job.TableID, true); err != nil {
break
}
} else {
if err = t.DropTableOrView(job.SchemaID, job.TableID, true); err != nil {
break
}
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
Expand Down
50 changes: 41 additions & 9 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
case *ast.DropDatabaseStmt:
err = e.executeDropDatabase(x)
case *ast.DropTableStmt:
err = e.executeDropTableOrView(x)
if x.IsView {
err = e.executeDropView(x)
} else {
err = e.executeDropTable(x)
}
case *ast.RecoverTableStmt:
err = e.executeRecoverTable(x)
case *ast.FlashBackTableStmt:
Expand All @@ -115,6 +119,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeRepairTable(x)
case *ast.CreateSequenceStmt:
err = e.executeCreateSequence(x)
case *ast.DropSequenceStmt:
err = e.executeDropSequence(x)

}
if err != nil {
Expand Down Expand Up @@ -251,9 +257,30 @@ func isSystemTable(schema, table string) bool {
return false
}

func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
type objectType int

const (
tableObject objectType = iota
viewObject
sequenceObject
)

func (e *DDLExec) executeDropTable(s *ast.DropTableStmt) error {
return e.dropTableObject(s.Tables, tableObject, s.IfExists)
}

func (e *DDLExec) executeDropView(s *ast.DropTableStmt) error {
return e.dropTableObject(s.Tables, viewObject, s.IfExists)
}

func (e *DDLExec) executeDropSequence(s *ast.DropSequenceStmt) error {
return e.dropTableObject(s.Sequences, sequenceObject, s.IfExists)
}

// dropTableObject actually applies to `tableObject`, `viewObject` and `sequenceObject`.
func (e *DDLExec) dropTableObject(objects []*ast.TableName, obt objectType, ifExists bool) error {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
var notExistTables []string
for _, tn := range s.Tables {
for _, tn := range objects {
fullti := ast.Ident{Schema: tn.Schema, Name: tn.Name}
_, ok := e.is.SchemaByName(tn.Schema)
if !ok {
Expand All @@ -276,7 +303,7 @@ func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
return errors.Errorf("Drop tidb system table '%s.%s' is forbidden", tn.Schema.L, tn.Name.L)
}

if config.CheckTableBeforeDrop {
if obt == tableObject && config.CheckTableBeforeDrop {
logutil.BgLogger().Warn("admin check table before drop",
zap.String("database", fullti.Schema.O),
zap.String("table", fullti.Name.O),
Expand All @@ -287,19 +314,24 @@ func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
return err
}
}

if s.IsView {
err = domain.GetDomain(e.ctx).DDL().DropView(e.ctx, fullti)
} else {
switch obt {
case tableObject:
err = domain.GetDomain(e.ctx).DDL().DropTable(e.ctx, fullti)
case viewObject:
err = domain.GetDomain(e.ctx).DDL().DropView(e.ctx, fullti)
case sequenceObject:
err = domain.GetDomain(e.ctx).DDL().DropSequence(e.ctx, fullti, ifExists)
}
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
} else if err != nil {
return err
}
}
if len(notExistTables) > 0 && !s.IfExists {
if len(notExistTables) > 0 && !ifExists {
if obt == sequenceObject {
return infoschema.ErrSequenceDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return infoschema.ErrTableDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
switch diff.Type {
case model.ActionCreateTable, model.ActionCreateSequence, model.ActionRecoverTable, model.ActionRepairTable:
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
case model.ActionTruncateTable:
oldTableID = diff.OldTableID
Expand Down
3 changes: 3 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
ErrTableExists = terror.ClassSchema.New(mysql.ErrTableExists, mysql.MySQLErrName[mysql.ErrTableExists])
// ErrTableDropExists returns for dropping a non-existent table.
ErrTableDropExists = terror.ClassSchema.New(mysql.ErrBadTable, mysql.MySQLErrName[mysql.ErrBadTable])
// ErrSequenceDropExists returns for dropping a non-exist sequence.
ErrSequenceDropExists = terror.ClassSchema.New(mysql.ErrUnknownSequence, mysql.MySQLErrName[mysql.ErrUnknownSequence])
// ErrColumnNotExists returns for column not exists.
ErrColumnNotExists = terror.ClassSchema.New(mysql.ErrBadField, mysql.MySQLErrName[mysql.ErrBadField])
// ErrColumnExists returns for column already exists.
Expand Down Expand Up @@ -342,6 +344,7 @@ func init() {
mysql.ErrBadUser: mysql.ErrBadUser,
mysql.ErrUserAlreadyExists: mysql.ErrUserAlreadyExists,
mysql.ErrTableLocked: mysql.ErrTableLocked,
mysql.ErrUnknownSequence: mysql.ErrUnknownSequence,
}
terror.ErrClassToMySQLCodes[terror.ClassSchema] = schemaMySQLErrCodes

Expand Down
11 changes: 11 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (m *Meta) DropDatabase(dbID int64) error {
return nil
}

// DropSequence drops sequence in database.
// Sequence is made of table struct and kv value pair.
func (m *Meta) DropSequence(dbID int64, tblID int64, delAutoID bool) error {
err := m.DropTableOrView(dbID, tblID, delAutoID)
if err != nil {
return err
}
err = m.txn.HDel(m.dbKey(dbID), m.sequenceKey(tblID))
return errors.Trace(err)
}

// DropTableOrView drops table in database.
// If delAutoID is true, it will delete the auto_increment id key-value of the table.
// For rename table, we do not need to rename auto_increment id key-value.
Expand Down
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,15 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, tableVal.Schema.L,
tableVal.Name.L, "", authErr)
}
case *ast.DropSequenceStmt:
for _, sequence := range v.Sequences {
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.Hostname,
b.ctx.GetSessionVars().User.Username, sequence.Name.L)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, sequence.Schema.L,
sequence.Name.L, "", authErr)
}
case *ast.TruncateTableStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.Hostname,
Expand Down
13 changes: 12 additions & 1 deletion planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
case *ast.CreateSequenceStmt:
p.flag |= inCreateOrDropTable
p.resolveCreateSequenceStmt(node)
case *ast.DropSequenceStmt:
p.flag |= inCreateOrDropTable
p.checkDropSequenceGrammar(node)
default:
p.flag &= ^parentIsJoin
}
Expand Down Expand Up @@ -453,8 +456,16 @@ func (p *preprocessor) checkCreateViewGrammar(stmt *ast.CreateViewStmt) {
}
}

func (p *preprocessor) checkDropSequenceGrammar(stmt *ast.DropSequenceStmt) {
p.checkDropTableNames(stmt.Sequences)
}

func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
for _, t := range stmt.Tables {
p.checkDropTableNames(stmt.Tables)
}

func (p *preprocessor) checkDropTableNames(tables []*ast.TableName) {
for _, t := range tables {
if isIncorrectName(t.Name.String()) {
p.err = ddl.ErrWrongTableName.GenWithStackByArgs(t.Name.String())
return
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func IsJobRollbackable(job *model.Job) bool {
job.SchemaState == model.StateWriteOnly {
return false
}
case model.ActionDropSchema, model.ActionDropTable:
case model.ActionDropSchema, model.ActionDropTable, model.ActionDropSequence:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
Expand Down