Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix the issue that some extracted conditions are not used in information_schema #55236

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ func (e *memtableRetriever) setDataFromKeyColumnUsage(ctx context.Context, sctx
if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
rs := keyColumnUsageInTable(schema, table)
rs := keyColumnUsageInTable(schema, table, extractor)
rows = append(rows, rs...)
}
}
Expand Down Expand Up @@ -1822,7 +1822,7 @@ func (e *memtableRetriever) setDataForMetricTables() {
e.rows = rows
}

func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo) [][]types.Datum {
func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor *plannercore.InfoSchemaBaseExtractor) [][]types.Datum {
var rows [][]types.Datum
if table.PKIsHandle {
for _, col := range table.Columns {
Expand Down Expand Up @@ -1860,6 +1860,11 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo) [][]types
// Only handle unique/primary key
continue
}

if extractor != nil && extractor.Filter("constraint_name", idxName) {
continue
}

for i, key := range index.Columns {
col := nameToCol[key.Name.L]
if col.Hidden {
Expand Down Expand Up @@ -2132,6 +2137,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
if ok && extractor.Filter("constraint_schema", schema.L) {
continue
}
if ok && extractor.Filter("table_schema", schema.L) {
continue
}
tables, err := e.is.SchemaTableInfos(ctx, schema)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2168,6 +2176,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
// The index has no constriant.
continue
}
if ok && extractor.Filter("constraint_name", cname) {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
Expand Down
85 changes: 85 additions & 0 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,88 @@ func TestReferencedTableSchemaWithForeignKey(t *testing.T) {
WHERE table_name = 't2' AND table_schema = 'test2';`).Check(testkit.Rows(
"id id t1 test2 test"))
}

func TestInfoSchemaConditionWorks(t *testing.T) {
// this test creates table in different schema with different index name, and check
// the condition in the following columns whether work as expected.
//
// - "table_schema"
// - "constraint_schema"
// - "table_name"
// - "constraint_name"
// - "partition_name"
// - "schema_name"
// - "index_name"
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
for db := 0; db < 2; db++ {
for table := 0; table < 2; table++ {
tk.MustExec(fmt.Sprintf("create database if not exists db%d;", db))
tk.MustExec(fmt.Sprintf(`create table db%d.table%d (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`, db, table))
for index := 0; index < 2; index++ {
tk.MustExec(fmt.Sprintf("create index idx%d on db%d.table%d (data%d);", index, db, table, index))
}
}
}

testColumns := map[string]string{
"table_schema": "db",
"constraint_schema": "db",
"table_name": "table",
"constraint_name": "idx",
"partition_name": "p",
"schema_name": "db",
"index_name": "idx",
}
testTables := []string{}
for _, row := range tk.MustQuery("show tables in information_schema").Rows() {
tableName := row[0].(string)
// exclude some tables which cannot run without TiKV.
if strings.HasPrefix(tableName, "CLUSTER_") ||
strings.HasPrefix(tableName, "INSPECTION_") ||
strings.HasPrefix(tableName, "METRICS_") ||
strings.HasPrefix(tableName, "TIFLASH_") ||
strings.HasPrefix(tableName, "TIKV_") ||
strings.HasPrefix(tableName, "USER_") ||
tableName == "TABLE_STORAGE_STATS" ||
strings.Contains(tableName, "REGION") {
continue
}
testTables = append(testTables, row[0].(string))
}
for _, table := range testTables {
rs, err := tk.Exec(fmt.Sprintf("select * from information_schema.%s", table))
require.NoError(t, err)
cols := rs.Fields()

chk := rs.NewChunk(nil)
rowCount := 0
for {
err := rs.Next(context.Background(), chk)
require.NoError(t, err)
if chk.NumRows() == 0 {
break
}
rowCount += chk.NumRows()
}
if rowCount == 0 {
// TODO: find a way to test the table without any rows by adding some rows to them.
continue
}
for i := 0; i < len(cols); i++ {
colName := cols[i].Column.Name.L
if valPrefix, ok := testColumns[colName]; ok {
for j := 0; j < 2; j++ {
rows := tk.MustQuery(fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)).Rows()
rowCountWithCondition := len(rows)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s", colName, table)
}
}
}
}
}