Skip to content

Commit

Permalink
filter out the PRIMARY index and foreign keys in table_constraints
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Aug 7, 2024
1 parent 3dfa15c commit fba6f4b
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 39 deletions.
86 changes: 53 additions & 33 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ import (
"go.uber.org/zap"
)

var lowerPrimaryKeyName = strings.ToLower(mysql.PrimaryKeyName)

type memtableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down Expand Up @@ -1170,9 +1172,6 @@ func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sess
} else {
// needs to update needed partitions for partition table.
for _, pi := range table.GetPartitionInfo().Definitions {
if ex.Filter("partition_name", pi.Name.L) {
continue
}
err := cache.TableRowStatsCache.UpdateByID(sctx, pi.ID)
if err != nil {
return err
Expand All @@ -1187,6 +1186,10 @@ func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sess
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}
// If there are any condition on the `PARTITION_NAME` in the extractor, this record should be ignored
if len(ex.ColPredicates["partition_name"]) > 0 {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
Expand Down Expand Up @@ -1818,24 +1821,26 @@ func (e *memtableRetriever) setDataForMetricTables() {
func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor *plannercore.InfoSchemaBaseExtractor) [][]types.Datum {
var rows [][]types.Datum
if table.PKIsHandle {
for _, col := range table.Columns {
if mysql.HasPriKeyFlag(col.GetFlag()) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
infoschema.PrimaryConstraint, // CONSTRAINT_NAME
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
col.Name.O, // COLUMN_NAME
1, // ORDINAL_POSITION
1, // POSITION_IN_UNIQUE_CONSTRAINT
nil, // REFERENCED_TABLE_SCHEMA
nil, // REFERENCED_TABLE_NAME
nil, // REFERENCED_COLUMN_NAME
)
rows = append(rows, record)
break
if extractor == nil || !extractor.Filter("constraint_name", lowerPrimaryKeyName) {
for _, col := range table.Columns {
if mysql.HasPriKeyFlag(col.GetFlag()) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
infoschema.PrimaryConstraint, // CONSTRAINT_NAME
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
col.Name.O, // COLUMN_NAME
1, // ORDINAL_POSITION
1, // POSITION_IN_UNIQUE_CONSTRAINT
nil, // REFERENCED_TABLE_SCHEMA
nil, // REFERENCED_TABLE_NAME
nil, // REFERENCED_COLUMN_NAME
)
rows = append(rows, record)
break
}
}
}
}
Expand All @@ -1845,16 +1850,19 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor
}
for _, index := range table.Indices {
var idxName string
var filterIdxName string
if index.Primary {
idxName = infoschema.PrimaryConstraint
idxName = mysql.PrimaryKeyName
filterIdxName = lowerPrimaryKeyName
} else if index.Unique {
idxName = index.Name.O
filterIdxName = index.Name.L
} else {
// Only handle unique/primary key
continue
}

if extractor != nil && extractor.Filter("constraint_name", idxName) {
if extractor != nil && extractor.Filter("constraint_name", filterIdxName) {
continue
}

Expand All @@ -1881,6 +1889,10 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor
}
}
for _, fk := range table.ForeignKeys {
if extractor != nil && extractor.Filter("constraint_name", fk.Name.L) {
continue
}

for i, key := range fk.Cols {
fkRefCol := ""
if len(fk.RefCols) > i {
Expand Down Expand Up @@ -2146,30 +2158,35 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
}

if tbl.PKIsHandle {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
mysql.PrimaryKeyName, // CONSTRAINT_NAME
schema.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
infoschema.PrimaryKeyType, // CONSTRAINT_TYPE
)
rows = append(rows, record)
if !ok || !extractor.Filter("constraint_name", lowerPrimaryKeyName) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
mysql.PrimaryKeyName, // CONSTRAINT_NAME
schema.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
infoschema.PrimaryKeyType, // CONSTRAINT_TYPE
)
rows = append(rows, record)
}
}

for _, idx := range tbl.Indices {
var cname, ctype string
var filterName string
if idx.Primary {
cname = mysql.PrimaryKeyName
filterName = lowerPrimaryKeyName
ctype = infoschema.PrimaryKeyType
} else if idx.Unique {
cname = idx.Name.O
filterName = idx.Name.L
ctype = infoschema.UniqueKeyType
} else {
// The index has no constriant.
continue
}
if ok && extractor.Filter("constraint_name", cname) {
if ok && extractor.Filter("constraint_name", filterName) {
continue
}
record := types.MakeDatums(
Expand All @@ -2184,6 +2201,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
}
// TiDB includes foreign key information for compatibility but foreign keys are not yet enforced.
for _, fk := range tbl.ForeignKeys {
if ok && extractor.Filter("constraint_name", fk.Name.L) {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
Expand Down
46 changes: 40 additions & 6 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,14 @@ func TestInfoSchemaConditionWorks(t *testing.T) {
tk := testkit.NewTestKit(t, store)
for db := 0; db < 2; db++ {
for table := 0; table < 2; table++ {
tk.MustExec(fmt.Sprintf("create database if not exists db%d;", db))
tk.MustExec(fmt.Sprintf(`create table db%d.table%d (id int primary key, data0 varchar(255), data1 varchar(255))
tk.MustExec(fmt.Sprintf("create database if not exists Db%d;", db))
tk.MustExec(fmt.Sprintf(`create table Db%d.Table%d (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`, db, table))
for index := 0; index < 2; index++ {
tk.MustExec(fmt.Sprintf("create index idx%d on db%d.table%d (data%d);", index, db, table, index))
tk.MustExec(fmt.Sprintf("create unique index Idx%d on Db%d.Table%d (id, data%d);", index, db, table, index))
}
}
}
Expand Down Expand Up @@ -703,12 +703,46 @@ func TestInfoSchemaConditionWorks(t *testing.T) {
colName := cols[i].Column.Name.L
if valPrefix, ok := testColumns[colName]; ok {
for j := 0; j < 2; j++ {
rows := tk.MustQuery(fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)).Rows()
sql := fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)
rows := tk.MustQuery(sql).Rows()
rowCountWithCondition := len(rows)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s", colName, table)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s. SQL: %s", colName, table, sql)

// check the condition works as expected
for _, row := range rows {
require.Equal(t, fmt.Sprintf("%s%d", valPrefix, j), strings.ToLower(row[i].(string)),
"%s has no effect on %s. SQL: %s", colName, table, sql)
}
}
}
}
}

// Test the PRIMARY constraint filter
rows := tk.MustQuery("select constraint_name, table_schema from information_schema.table_constraints where constraint_name = 'PRIMARY' and table_schema = 'db0';").Rows()
require.Equal(t, 2, len(rows))
for _, row := range rows {
require.Equal(t, "PRIMARY", row[0].(string))
require.Equal(t, "Db0", row[1].(string))
}
rows = tk.MustQuery("select constraint_name, table_schema from information_schema.key_column_usage where constraint_name = 'PRIMARY' and table_schema = 'db1';").Rows()
require.Equal(t, 2, len(rows))
for _, row := range rows {
require.Equal(t, "PRIMARY", row[0].(string))
require.Equal(t, "Db1", row[1].(string))
}

// Test the `partition_name` filter
tk.MustExec("create database if not exists db_no_partition;")
tk.MustExec("create table db_no_partition.t_no_partition (id int primary key, data0 varchar(255), data1 varchar(255));")
tk.MustExec(`create table db_no_partition.t_partition (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)
rows = tk.MustQuery("select * from information_schema.partitions where table_schema = 'db_no_partition' and partition_name is NULL;").Rows()
require.Equal(t, 1, len(rows))
rows = tk.MustQuery("select * from information_schema.partitions where table_schema = 'db_no_partition' and (partition_name is NULL or partition_name = 'p0');").Rows()
require.Equal(t, 2, len(rows))
}

0 comments on commit fba6f4b

Please sign in to comment.