Skip to content

Commit

Permalink
Merge branch 'master' into add-ut
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Dec 27, 2022
2 parents 72c34af + 83d275c commit 6d2d685
Show file tree
Hide file tree
Showing 44 changed files with 813 additions and 107 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4432,8 +4432,8 @@ def go_deps():
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
sum = "h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_sync",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/restore/prealloc_table_id/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ go_library(
srcs = ["alloc.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id",
visibility = ["//visibility:public"],
deps = ["//br/pkg/metautil"],
deps = [
"//br/pkg/metautil",
"//parser/model",
],
)

go_test(
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/prealloc_table_id/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"

"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/parser/model"
)

const (
Expand Down Expand Up @@ -48,6 +49,14 @@ func New(tables []*metautil.Table) *PreallocIDs {
if t.Info.ID > max && t.Info.ID < insaneTableIDThreshold {
max = t.Info.ID
}

if t.Info.Partition != nil && t.Info.Partition.Definitions != nil {
for _, part := range t.Info.Partition.Definitions {
if part.ID > max && part.ID < insaneTableIDThreshold {
max = part.ID
}
}
}
}
return &PreallocIDs{
end: max + 1,
Expand Down Expand Up @@ -86,3 +95,17 @@ func (p *PreallocIDs) Alloc(m Allocator) error {
func (p *PreallocIDs) Prealloced(tid int64) bool {
return p.allocedFrom <= tid && tid < p.end
}

func (p *PreallocIDs) PreallocedFor(ti *model.TableInfo) bool {
if !p.Prealloced(ti.ID) {
return false
}
if ti.Partition != nil && ti.Partition.Definitions != nil {
for _, part := range ti.Partition.Definitions {
if !p.Prealloced(part.ID) {
return false
}
}
}
return true
}
38 changes: 32 additions & 6 deletions br/pkg/restore/prealloc_table_id/alloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (t *testAllocator) AdvanceGlobalIDs(n int) (int64, error) {
func TestAllocator(t *testing.T) {
type Case struct {
tableIDs []int64
partitions map[int64][]int64
hasAllocatedTo int64
successfullyAllocated []int64
shouldAllocatedTo int64
Expand Down Expand Up @@ -57,26 +58,51 @@ func TestAllocator(t *testing.T) {
successfullyAllocated: []int64{5, 6},
shouldAllocatedTo: 7,
},
{
tableIDs: []int64{1, 2, 5, 6, 7},
hasAllocatedTo: 6,
successfullyAllocated: []int64{6, 7},
shouldAllocatedTo: 13,
partitions: map[int64][]int64{
7: {8, 9, 10, 11, 12},
},
},
{
tableIDs: []int64{1, 2, 5, 6, 7, 13},
hasAllocatedTo: 9,
successfullyAllocated: []int64{13},
shouldAllocatedTo: 14,
partitions: map[int64][]int64{
7: {8, 9, 10, 11, 12},
},
},
}

run := func(t *testing.T, c Case) {
tables := make([]*metautil.Table, 0, len(c.tableIDs))
for _, id := range c.tableIDs {
tables = append(tables, &metautil.Table{
table := metautil.Table{
Info: &model.TableInfo{
ID: id,
ID: id,
Partition: &model.PartitionInfo{},
},
})
}
if c.partitions != nil {
for _, part := range c.partitions[id] {
table.Info.Partition.Definitions = append(table.Info.Partition.Definitions, model.PartitionDefinition{ID: part})
}
}
tables = append(tables, &table)
}

ids := prealloctableid.New(tables)
allocator := testAllocator(c.hasAllocatedTo)
require.NoError(t, ids.Alloc(&allocator))

allocated := make([]int64, 0, len(c.successfullyAllocated))
for _, t := range c.tableIDs {
if ids.Prealloced(t) {
allocated = append(allocated, t)
for _, t := range tables {
if ids.PreallocedFor(t.Info) {
allocated = append(allocated, t.Info.ID)
}
}
require.ElementsMatch(t, allocated, c.successfullyAllocated)
Expand Down
2 changes: 2 additions & 0 deletions cmd/explaintest/r/explain_complex.result
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
set tidb_cost_model_version=2;
explain format = 'brief' SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
id estRows task access object operator info
Sort 1.00 root test.org_department.left_value
Expand All @@ -262,6 +263,7 @@ Sort 1.00 root test.org_department.left_value
└─TableReader(Probe) 9.99 root data:Selection
└─Selection 9.99 cop[tikv] eq(test.org_employee_position.status, 1000), not(isnull(test.org_employee_position.position_id))
└─TableFullScan 10000.00 cop[tikv] table:ep keep order:false, stats:pseudo
set tidb_cost_model_version=1;
create table test.Tab_A (id int primary key,bid int,cid int,name varchar(20),type varchar(20),num int,amt decimal(11,2));
create table test.Tab_B (id int primary key,name varchar(20));
create table test.Tab_C (id int primary key,name varchar(20),amt decimal(11,2));
Expand Down
2 changes: 2 additions & 0 deletions cmd/explaintest/t/explain_complex.test
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ CREATE TABLE org_position (
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

set tidb_cost_model_version=2;
explain format = 'brief' SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
set tidb_cost_model_version=1;

create table test.Tab_A (id int primary key,bid int,cid int,name varchar(20),type varchar(20),num int,amt decimal(11,2));
create table test.Tab_B (id int primary key,name varchar(20));
Expand Down
3 changes: 1 addition & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ func TestDropColumn(t *testing.T) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int,b int) partition by hash(a) partitions 4;")
err := tk.ExecToErr("alter table t1 drop column a")
// TODO: refine the error message to compatible with MySQL
require.EqualError(t, err, "[planner:1054]Unknown column 'a' in 'expression'")
require.EqualError(t, err, "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed")
}

func TestChangeColumn(t *testing.T) {
Expand Down
32 changes: 32 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4549,3 +4549,35 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`)
tk.MustContainErrMsg(`alter table t change a c int`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
}

func TestDropPartitionKeyColumn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database DropPartitionKeyColumn")
defer tk.MustExec("drop database DropPartitionKeyColumn")
tk.MustExec("use DropPartitionKeyColumn")

tk.MustExec("create table t1 (a tinyint, b char) partition by range (a) ( partition p0 values less than (10) )")
err := tk.ExecToErr("alter table t1 drop column a")
require.Error(t, err)
require.Equal(t, "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed", err.Error())
tk.MustExec("alter table t1 drop column b")

tk.MustExec("create table t2 (a tinyint, b char) partition by range (a-1) ( partition p0 values less than (10) )")
err = tk.ExecToErr("alter table t2 drop column a")
require.Error(t, err)
require.Equal(t, "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed", err.Error())
tk.MustExec("alter table t2 drop column b")

tk.MustExec("create table t3 (a tinyint, b char) partition by hash(a) partitions 4;")
err = tk.ExecToErr("alter table t3 drop column a")
require.Error(t, err)
require.Equal(t, "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed", err.Error())
tk.MustExec("alter table t3 drop column b")

tk.MustExec("create table t4 (a char, b char) partition by list columns (a) ( partition p0 values in ('0'), partition p1 values in ('a'), partition p2 values in ('b'));")
err = tk.ExecToErr("alter table t4 drop column a")
require.Error(t, err)
require.Equal(t, "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed", err.Error())
tk.MustExec("alter table t4 drop column b")
}
3 changes: 1 addition & 2 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,7 @@ func TestDDLWithInvalidTableInfo(t *testing.T) {

tk.MustExec("create table t (a bigint, b int, c int generated always as (b+1)) partition by hash(a) partitions 4;")
// Test drop partition column.
// TODO: refine the error message to compatible with MySQL
tk.MustGetErrMsg("alter table t drop column a;", "[planner:1054]Unknown column 'a' in 'expression'")
tk.MustGetErrMsg("alter table t drop column a;", "[ddl:3885]Column 'a' has a partitioning function dependency and cannot be dropped or renamed")
// Test modify column with invalid expression.
tk.MustGetErrMsg("alter table t modify column c int GENERATED ALWAYS AS ((case when (a = 0) then 0when (a > 0) then (b / a) end));", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 97 near \"then (b / a) end));\" ")
// Test add column with invalid expression.
Expand Down
21 changes: 21 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4306,6 +4306,9 @@ func checkIsDroppableColumn(ctx sessionctx.Context, is infoschema.InfoSchema, sc
if err = isDroppableColumn(tblInfo, colName); err != nil {
return false, errors.Trace(err)
}
if err = checkDropColumnWithPartitionConstraint(t, colName); err != nil {
return false, errors.Trace(err)
}
// Check the column with foreign key.
err = checkDropColumnWithForeignKeyConstraint(is, schema.Name.L, tblInfo, colName.L)
if err != nil {
Expand All @@ -4326,6 +4329,24 @@ func checkIsDroppableColumn(ctx sessionctx.Context, is infoschema.InfoSchema, sc
return true, nil
}

// checkDropColumnWithPartitionConstraint is used to check the partition constraint of the drop column.
func checkDropColumnWithPartitionConstraint(t table.Table, colName model.CIStr) error {
if t.Meta().Partition == nil {
return nil
}
pt, ok := t.(table.PartitionedTable)
if !ok {
// Should never happen!
return errors.Trace(dbterror.ErrDependentByPartitionFunctional.GenWithStackByArgs(colName.L))
}
for _, name := range pt.GetPartitionColumnNames() {
if strings.EqualFold(name.L, colName.L) {
return errors.Trace(dbterror.ErrDependentByPartitionFunctional.GenWithStackByArgs(colName.L))
}
}
return nil
}

func checkVisibleColumnCnt(t table.Table, addCnt, dropCnt int) error {
tblInfo := t.Meta()
visibleColumCnt := 0
Expand Down
54 changes: 40 additions & 14 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ type planReplayerHandle struct {
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
return true
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
logutil.BgLogger().Warn("discard one plan replayer dump task",
zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest))
return false
}
}

Expand All @@ -209,9 +211,13 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-task] collect plan replayer task failed", zap.Error(err))
return err
}
if unhandled {
logutil.BgLogger().Debug("[plan-replayer-task] collect plan replayer task success",
zap.String("sql-digest", key.SQLDigest),
zap.String("plan-digest", key.PlanDigest))
tasks = append(tasks, key)
}
}
Expand Down Expand Up @@ -351,16 +357,36 @@ type planReplayerTaskDumpWorker struct {

func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
w.handleTask(task)
}
}

func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
sqlDigest := task.SQLDigest
planDigest := task.PlanDigest
check := true
occupy := true
handleTask := true
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
zap.Bool("check", check),
zap.Bool("occupy", occupy),
zap.Bool("handle", handleTask))
}()
if task.IsContinuesCapture {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
check = false
return
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
occupy = w.status.occupyRunningTaskKey(task)
if !occupy {
return
}
handleTask = w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}

// HandleTask handled task
Expand All @@ -373,7 +399,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
logutil.BgLogger().Warn("[plan-replayer-capture] check task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -386,7 +412,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -409,7 +435,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -421,7 +447,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand Down
Loading

0 comments on commit 6d2d685

Please sign in to comment.