Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, planner, executor: implement CREATE TABLE ... SELECT #7787

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,22 @@ func (s *testDBSuite) TestAddIndexWithPK(c *C) {
s.tk.MustExec("create table test_add_index_with_pk(a int not null, b int not null default '0', primary key(a))")
s.tk.MustExec("insert into test_add_index_with_pk values(1, 2)")
s.tk.MustExec("alter table test_add_index_with_pk add index idx (a)")
c.Assert(s.tk.Se.AffectedRows(), Equals, uint64(0))
s.tk.MustQuery("select a from test_add_index_with_pk").Check(testkit.Rows("1"))
s.tk.MustExec("insert into test_add_index_with_pk values(2, 2)")
s.tk.MustExec("alter table test_add_index_with_pk add index idx1 (a, b)")
c.Assert(s.tk.Se.AffectedRows(), Equals, uint64(0))
s.tk.MustQuery("select * from test_add_index_with_pk").Check(testkit.Rows("1 2", "2 2"))
s.tk.MustExec("create table test_add_index_with_pk1(a int not null, b int not null default '0', c int, d int, primary key(c))")
s.tk.MustExec("insert into test_add_index_with_pk1 values(1, 1, 1, 1)")
s.tk.MustExec("alter table test_add_index_with_pk1 add index idx (c)")
c.Assert(s.tk.Se.AffectedRows(), Equals, uint64(0))
s.tk.MustExec("insert into test_add_index_with_pk1 values(2, 2, 2, 2)")
s.tk.MustQuery("select * from test_add_index_with_pk1").Check(testkit.Rows("1 1 1 1", "2 2 2 2"))
s.tk.MustExec("create table test_add_index_with_pk2(a int not null, b int not null default '0', c int unsigned, d int, primary key(c))")
s.tk.MustExec("insert into test_add_index_with_pk2 values(1, 1, 1, 1)")
s.tk.MustExec("alter table test_add_index_with_pk2 add index idx (c)")
c.Assert(s.tk.Se.AffectedRows(), Equals, uint64(0))
s.tk.MustExec("insert into test_add_index_with_pk2 values(2, 2, 2, 2)")
s.tk.MustQuery("select * from test_add_index_with_pk2").Check(testkit.Rows("1 1 1 1", "2 2 2 2"))
}
Expand Down Expand Up @@ -531,7 +535,7 @@ func (s *testDBSuite) TestCancelRenameIndex(c *C) {
s.mustExec(c, "alter table t rename index idx_c2 to idx_c3")
}

// TestCancelDropTable tests cancel ddl job which type is drop table.
// TestCancelDropTableAndSchema tests cancel ddl job which type is drop table.
func (s *testDBSuite) TestCancelDropTableAndSchema(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
testCases := []struct {
Expand Down Expand Up @@ -672,6 +676,66 @@ func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3")
}

func (s *testDBSuite) TestCancelCreateTable(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
sql := "create database if not exists test_db; use test_db;"
s.tk.MustExec(sql)
sql = "drop table if exists test_source; create table test_source(a int); insert into test_source values (1), (2);"
s.tk.MustExec(sql)

testCases := []struct {
jobState model.JobState
JobSchemaState model.SchemaState
}{
// Check create table.
{model.JobStateNone, model.StateNone},
{model.JobStateRunning, model.StateWriteReorganization},
}
var checkErr error
hook := &ddl.TestDDLCallback{}
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionCreateTable && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn(context.Background())
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}
originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
var err error
for i := range testCases {
testCase = &testCases[i]
sql = "create table test_cancel_create select * from test_source"
_, err = s.tk.Exec(sql)
c.Assert(checkErr, IsNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
// make sure the table can be created normally
s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
s.tk.Exec("create table test_cancel_create select * from test_source")
}

func (s *testDBSuite) testAlterLock(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
8 changes: 7 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ var (
type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
// CreateTable handles 'create table' ddl.
// 'snapshotTS' is used to specify the 'select' query timestamp of 'create table ... select',
// if there is no 'select' part, 'snapshotTS' is meaningless
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt, snapshotTS uint64) (err error)
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
Expand Down Expand Up @@ -571,6 +574,9 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
if historyJob.IsSynced() {
log.Infof("[ddl] DDL job ID:%d is finished", jobID)
if historyJob.Type == model.ActionCreateTable {
ctx.GetSessionVars().StmtCtx.AddAffectedRows(uint64(historyJob.GetRowCount()))
}
return nil
}

Expand Down
25 changes: 17 additions & 8 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) error {
return nil
}

// buildColumnAndConstraint builds table.Column from ast.ColumnDef and ast.Constraint
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint) (*table.Column, []*ast.Constraint, error) {
Expand Down Expand Up @@ -929,6 +930,7 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
tblInfo.AutoIncID = 0
tblInfo.ForeignKeys = nil
tblInfo.ID, err = d.genGlobalID()
tblInfo.State = model.StateNone
if err != nil {
return errors.Trace(err)
}
Expand All @@ -937,7 +939,7 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
TableID: tblInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tblInfo},
Args: []interface{}{tblInfo, false /*withSelect*/, 0 /*snapshotTS*/},
}

err = d.doDDLJob(ctx, job)
Expand Down Expand Up @@ -1018,7 +1020,7 @@ func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableS
return tbInfo, nil
}

func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err error) {
func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt, snapshotTS uint64) (err error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if s.ReferTable != nil {
referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name}
Expand Down Expand Up @@ -1047,7 +1049,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
TableID: tbInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo},
Args: []interface{}{tbInfo, s.Select != nil, snapshotTS},
}

err = handleTableOptions(s.Options, tbInfo)
Expand All @@ -1064,9 +1066,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

err = d.doDDLJob(ctx, job)
if err == nil {
if tbInfo.AutoIncID > 1 {
if tbInfo.AutoIncID > 1 && s.Select == nil {
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
// But if s.Select != nil, we've rebased auto increase ID before inserting data.
err = d.handleAutoIncID(tbInfo, schema.ID)
}
}
Expand Down Expand Up @@ -2208,7 +2211,7 @@ func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *a
return errors.Trace(err)
}

// AlterTableCharset changes the table charset and collate.
// AlterTableCharsetAndCollate changes the table charset and collate.
func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Ident, toCharset, toCollate string) error {
// use the last one.
if toCharset == "" && toCollate == "" {
Expand Down Expand Up @@ -2306,9 +2309,15 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
is := d.GetInfoSchemaWithInterceptor(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}

tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil || tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}

job := &model.Job{
Expand Down
22 changes: 22 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/tablecodec"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -274,10 +275,31 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionCreateTable:
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an CreateTable operation, we need to use delete-range to delete the half-done data.
tbInfo := &model.TableInfo{}
if err = job.DecodeArgs(tbInfo); err != nil {
return errors.Trace(err)
}
// Set args for deleteRange job
startKey := tablecodec.EncodeTablePrefix(tbInfo.ID)
physicalTableIDs := getPartitionIDs(tbInfo)
job.Args = []interface{}{startKey, physicalTableIDs}
if _, err = job.Encode(true); err == nil {
err = w.deleteRange(job)
}
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition, model.ActionTruncateTablePartition:
err = w.deleteRange(job)
}
}
if err != nil {
return errors.Trace(err)
}

switch job.Type {
case model.ActionRestoreTable:
err = finishRestoreTable(w, t, job)
Expand Down
7 changes: 4 additions & 3 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ func (s *testDDLSuite) TestTableError(c *C) {
// Args is wrong, so creating table is failed.
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1}, ctx, d)
// Schema ID is wrong, so creating table is failed.
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
// Table exists, so creating table is failed.
tblInfo.ID = tblInfo.ID + 1
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)

doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
}

func (s *testDDLSuite) TestViewError(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
return errors.Trace(err)
}
}
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable, model.ActionTruncateTable, model.ActionCreateTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
Expand Down Expand Up @@ -341,7 +341,7 @@ func doInsert(s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, end
return errors.Trace(err)
}

// getNowTS gets the current timestamp, in TSO.
// getNowTSO gets the current timestamp, in TSO.
func getNowTSO(ctx sessionctx.Context) (uint64, error) {
currVer, err := ctx.GetStore().CurrentVersion()
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ func rollingbackRenameIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func rollingbackCreateTable(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the work is inserting data.
if job.SchemaState == model.StateWriteReorganization {
job.State = model.JobStateRollingback
} else {
job.State = model.JobStateCancelled
if job.Error != nil {
// for insert error, `job.Error` is set already.
err = job.Error
} else {
err = errCancelledDDLJob
}
return
}
log.Infof("[ddl-%s] run the cancelling create table job: %s", w, job)
return
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -221,6 +239,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
err = rollingbackDropSchema(t, job)
case model.ActionRenameIndex:
ver, err = rollingbackRenameIndex(t, job)
case model.ActionCreateTable:
ver, err = rollingbackCreateTable(w, d, t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
Loading