Skip to content

Commit

Permalink
Merge branch 'master' into authplugin_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
zgcbj authored Nov 17, 2021
2 parents 2cf1b59 + b3bae05 commit 2c8adac
Show file tree
Hide file tree
Showing 37 changed files with 735 additions and 430 deletions.
8 changes: 6 additions & 2 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu
}

func (l *Lightning) RunServer() error {
l.serverLock.Lock()
l.taskCfgs = config.NewConfigList()
l.serverLock.Unlock()
log.L().Info(
"Lightning server is running, post to /tasks to start an import task",
zap.Stringer("address", l.serverAddr),
Expand Down Expand Up @@ -416,12 +418,13 @@ func (l *Lightning) handleGetTask(w http.ResponseWriter) {
Current *int64 `json:"current"`
QueuedIDs []int64 `json:"queue"`
}

l.serverLock.Lock()
if l.taskCfgs != nil {
response.QueuedIDs = l.taskCfgs.AllIDs()
} else {
response.QueuedIDs = []int64{}
}
l.serverLock.Unlock()

l.cancelLock.Lock()
if l.cancel != nil && l.curTask != nil {
Expand Down Expand Up @@ -463,7 +466,8 @@ func (l *Lightning) handleGetOneTask(w http.ResponseWriter, req *http.Request, t

func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Cache-Control", "no-store")

l.serverLock.Lock()
defer l.serverLock.Unlock()
if l.taskCfgs == nil {
// l.taskCfgs is non-nil only if Lightning is started with RunServer().
// Without the server mode this pointer is default to be nil.
Expand Down
7 changes: 7 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,13 @@ func (s *testIntegrationSuite5) TestErrnoErrorCode(c *C) {
tk.MustExec("create table test_error_code_null(c1 char(100) not null);")
sql = "insert into test_error_code_null (c1) values(null);"
tk.MustGetErrCode(sql, errno.ErrBadNull)
// disable tidb_enable_change_multi_schema
tk.MustExec("set global tidb_enable_change_multi_schema = false")
sql = "alter table test_error_code_null add column (x1 int, x2 int)"
tk.MustGetErrCode(sql, errno.ErrUnsupportedDDLOperation)
sql = "alter table test_error_code_null add column (x1 int, x2 int)"
tk.MustGetErrCode(sql, errno.ErrUnsupportedDDLOperation)
tk.MustExec("set global tidb_enable_change_multi_schema = true")
}

func (s *testIntegrationSuite3) TestTableDDLWithFloatType(c *C) {
Expand Down
51 changes: 32 additions & 19 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,22 @@ func isSameTypeMultiSpecs(specs []*ast.AlterTableSpec) bool {
return true
}

func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error {
if !sctx.GetSessionVars().EnableChangeMultiSchema {
if len(specs) > 1 {
return errRunMultiSchemaChanges
}
if len(specs) == 1 && len(specs[0].NewColumns) > 1 && specs[0].Tp == ast.AlterTableAddColumns {
return errRunMultiSchemaChanges
}
} else {
if len(specs) > 1 && !isSameTypeMultiSpecs(specs) {
return errRunMultiSchemaChanges
}
}
return nil
}

func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
validSpecs, err := resolveAlterTableSpec(sctx, specs)
if err != nil {
Expand All @@ -2645,29 +2661,26 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE")
}

err = checkMultiSpecs(sctx, validSpecs)
if err != nil {
return err
}

if len(validSpecs) > 1 {
if !sctx.GetSessionVars().EnableChangeMultiSchema {
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns:
err = d.AddColumns(sctx, ident, validSpecs)
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
return errRunMultiSchemaChanges
}

if isSameTypeMultiSpecs(validSpecs) {
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns:
err = d.AddColumns(sctx, ident, validSpecs)
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
return errRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
return nil
if err != nil {
return errors.Trace(err)
}

return errRunMultiSchemaChanges
return nil
}

for _, spec := range validSpecs {
Expand Down
7 changes: 7 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (s *testDBSuite6) TestCreateSchemaWithPlacement(c *C) {
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"nl\" REGIONS=\"se,nz,nl\" FOLLOWERS=3 */"))
tk.MustQuery("SELECT CATALOG_NAME, SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.schemata WHERE SCHEMA_NAME='SchemaDirectPlacementTest'").Check(testkit.Rows(`def SchemaDirectPlacementTest utf8mb4 utf8mb4_bin <nil> PRIMARY_REGION="nl" REGIONS="se,nz,nl" FOLLOWERS=3`))
tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.Tables WHERE TABLE_SCHEMA='SchemaDirectPlacementTest' AND TABLE_NAME = 'UseSchemaDefault'").Check(testkit.Rows(`def SchemaDirectPlacementTest UseSchemaDefault <nil> PRIMARY_REGION="nl" REGIONS="se,nz,nl" FOLLOWERS=3`))

tk.MustExec(`CREATE TABLE SchemaDirectPlacementTest.UseDirectPlacement (a int unsigned primary key, b varchar(255)) PRIMARY_REGION="se" REGIONS="se"`)
Expand All @@ -302,6 +303,7 @@ func (s *testDBSuite6) TestCreateSchemaWithPlacement(c *C) {
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`PolicySchemaTest` */"))
tk.MustQuery("SELECT CATALOG_NAME, SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.schemata WHERE SCHEMA_NAME='SchemaPolicyPlacementTest'").Check(testkit.Rows(`def SchemaPolicyPlacementTest utf8mb4 utf8mb4_bin PolicySchemaTest <nil>`))
tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.Tables WHERE TABLE_SCHEMA='SchemaPolicyPlacementTest' AND TABLE_NAME = 'UseSchemaDefault'").Check(testkit.Rows(`def SchemaPolicyPlacementTest UseSchemaDefault PolicySchemaTest <nil>`))

tk.MustExec(`CREATE TABLE SchemaPolicyPlacementTest.UsePolicy (a int unsigned primary key, b varchar(255)) PLACEMENT POLICY = "PolicyTableTest"`)
Expand Down Expand Up @@ -347,8 +349,11 @@ func (s *testDBSuite6) TestAlterDBPlacement(c *C) {
// Policy Test
// Test for Non-Exist policy
tk.MustGetErrCode("ALTER DATABASE TestAlterDB PLACEMENT POLICY=`alter_z`;", mysql.ErrPlacementPolicyNotExists)
tk.MustQuery("SELECT CATALOG_NAME, SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.schemata WHERE SCHEMA_NAME='TestAlterDB'").Check(testkit.Rows(`def TestAlterDB utf8mb4 utf8mb4_bin <nil> <nil>`))

tk.MustExec("ALTER DATABASE TestAlterDB PLACEMENT POLICY=`alter_x`;")
// Test for information_schema.schemata
tk.MustQuery("SELECT CATALOG_NAME, SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.schemata WHERE SCHEMA_NAME='TestAlterDB'").Check(testkit.Rows(`def TestAlterDB utf8mb4 utf8mb4_bin alter_x <nil>`))
// Test for Show Create Database
tk.MustQuery(`show create database TestAlterDB`).Check(testutil.RowsWithSep("|",
"TestAlterDB CREATE DATABASE `TestAlterDB` /*!40100 DEFAULT CHARACTER SET utf8mb4 */ "+
Expand Down Expand Up @@ -387,6 +392,8 @@ func (s *testDBSuite6) TestAlterDBPlacement(c *C) {

// DirectOption Test
tk.MustExec("ALTER DATABASE TestAlterDB PRIMARY_REGION=\"se\" FOLLOWERS=2 REGIONS=\"se\";")
// Test for information_schema.schemata
tk.MustQuery("SELECT CATALOG_NAME, SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.schemata WHERE SCHEMA_NAME='TestAlterDB'").Check(testkit.Rows(`def TestAlterDB utf8mb4 utf8mb4_bin <nil> PRIMARY_REGION="se" REGIONS="se" FOLLOWERS=2`))
// Test for Show Create Database
tk.MustQuery(`show create database TestAlterDB`).Check(testutil.RowsWithSep("|",
"TestAlterDB CREATE DATABASE `TestAlterDB` /*!40100 DEFAULT CHARACTER SET utf8mb4 */ "+
Expand Down
6 changes: 5 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type executorBuilder struct {
// isStaleness means whether this statement use stale read.
isStaleness bool
readReplicaScope string
inUpdateStmt bool
inDeleteStmt bool
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand Down Expand Up @@ -1963,6 +1965,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err != nil {
return nil
}
b.inUpdateStmt = true
updateExec := &UpdateExec{
baseExecutor: base,
OrderedList: v.OrderedList,
Expand Down Expand Up @@ -2007,6 +2010,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
if b.err != nil {
return nil
}
b.inDeleteStmt = true
base := newBaseExecutor(b.ctx, v.Schema(), v.ID(), selExec)
base.initCap = chunk.ZeroCapacity
deleteExec := &DeleteExec{
Expand Down Expand Up @@ -4684,7 +4688,7 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
zap.Stack("stack trace"))
}
}()
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt {
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS)
if err != nil {
log.Warn("Update Lock Info Error")
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2324,8 +2324,8 @@ func (s *testSuiteP2) TestTableScan(c *C) {
c.Assert(len(result.Rows()), GreaterEqual, 4)
tk.MustExec("use test")
tk.MustExec("create database mytest")
rowStr1 := fmt.Sprintf("%s %s %s %s %v", "def", "mysql", "utf8mb4", "utf8mb4_bin", nil)
rowStr2 := fmt.Sprintf("%s %s %s %s %v", "def", "mytest", "utf8mb4", "utf8mb4_bin", nil)
rowStr1 := fmt.Sprintf("%s %s %s %s %v %v %v", "def", "mysql", "utf8mb4", "utf8mb4_bin", nil, nil, nil)
rowStr2 := fmt.Sprintf("%s %s %s %s %v %v %v", "def", "mytest", "utf8mb4", "utf8mb4_bin", nil, nil, nil)
tk.MustExec("use information_schema")
result = tk.MustQuery("select * from schemata where schema_name = 'mysql'")
result.Check(testkit.Rows(rowStr1))
Expand Down
11 changes: 10 additions & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas
if len(schema.Collate) > 0 {
collation = schema.Collate // Overwrite default
}
var policyName, directPlacement interface{}
if schema.PlacementPolicyRef != nil {
policyName = schema.PlacementPolicyRef.Name.O
}
if schema.DirectPlacementOpts != nil {
directPlacement = schema.DirectPlacementOpts.String()
}

if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, "", "", mysql.AllPrivMask) {
continue
Expand All @@ -363,7 +370,9 @@ func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas
schema.Name.O, // SCHEMA_NAME
charset, // DEFAULT_CHARACTER_SET_NAME
collation, // DEFAULT_COLLATION_NAME
nil,
nil, // SQL_PATH
policyName, // TIDB_PLACEMENT_POLICY_NAME
directPlacement, // TIDB_DIRECT_PLACEMENT
)
rows = append(rows, record)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustQuery("select * from information_schema.SCHEMATA where schema_name='mysql';").Check(
testkit.Rows("def mysql utf8mb4 utf8mb4_bin <nil>"))
testkit.Rows("def mysql utf8mb4 utf8mb4_bin <nil> <nil> <nil>"))

// Test the privilege of new user for information_schema.schemata.
tk.MustExec("create user schemata_tester")
Expand All @@ -175,7 +175,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
schemataTester.MustQuery("select * from information_schema.SCHEMATA where schema_name='mysql';").Check(
[][]interface{}{})
schemataTester.MustQuery("select * from information_schema.SCHEMATA where schema_name='INFORMATION_SCHEMA';").Check(
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil>"))
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil> <nil> <nil>"))

// Test the privilege of user with privilege of mysql for information_schema.schemata.
tk.MustExec("CREATE ROLE r_mysql_priv;")
Expand All @@ -184,7 +184,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
schemataTester.MustExec("set role r_mysql_priv")
schemataTester.MustQuery("select count(*) from information_schema.SCHEMATA;").Check(testkit.Rows("2"))
schemataTester.MustQuery("select * from information_schema.SCHEMATA;").Check(
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil>", "def mysql utf8mb4 utf8mb4_bin <nil>"))
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil> <nil> <nil>", "def mysql utf8mb4 utf8mb4_bin <nil> <nil> <nil>"))
}

func (s *testInfoschemaTableSuite) TestTableIDAndIndexID(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions expression/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func newBaseBuiltinFuncWithTp(ctx sessionctx.Context, funcName string, args []Ex
args[i] = WrapWithCastAsDecimal(ctx, args[i])
case types.ETString:
args[i] = WrapWithCastAsString(ctx, args[i])
args[i] = WrapWithToBinary(ctx, args[i], funcName)
case types.ETDatetime:
args[i] = WrapWithCastAsTime(ctx, args[i], types.NewFieldType(mysql.TypeDatetime))
case types.ETTimestamp:
Expand Down Expand Up @@ -879,6 +880,9 @@ var funcs = map[string]functionClass{
ast.NextVal: &nextValFunctionClass{baseFunctionClass{ast.NextVal, 1, 1}},
ast.LastVal: &lastValFunctionClass{baseFunctionClass{ast.LastVal, 1, 1}},
ast.SetVal: &setValFunctionClass{baseFunctionClass{ast.SetVal, 2, 2}},

// TiDB implicit internal functions.
InternalFuncToBinary: &tidbConvertCharsetFunctionClass{baseFunctionClass{InternalFuncToBinary, 1, 1}},
}

// IsFunctionSupported check if given function name is a builtin sql function.
Expand All @@ -902,6 +906,7 @@ func GetDisplayName(name string) string {
func GetBuiltinList() []string {
res := make([]string, 0, len(funcs))
notImplementedFunctions := []string{ast.RowFunc, ast.IsTruthWithNull}
implicitFunctions := []string{InternalFuncToBinary}
for funcName := range funcs {
skipFunc := false
// Skip not implemented functions
Expand All @@ -910,6 +915,11 @@ func GetBuiltinList() []string {
skipFunc = true
}
}
for _, implicitFunc := range implicitFunctions {
if funcName == implicitFunc {
skipFunc = true
}
}
// Skip literal functions
// (their names are not readable: 'tidb`.(dateliteral, for example)
// See: https://github.com/pingcap/parser/pull/591
Expand Down
Loading

0 comments on commit 2c8adac

Please sign in to comment.