Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: keep in-memory resident for model.TableInfo with special attributes #53301

Merged
merged 17 commits into from
Jun 4, 2024
7 changes: 3 additions & 4 deletions pkg/ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,9 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
var tableList = make([]TiFlashReplicaStatus, 0)

// Collect TiFlash Replica info, for every table.
for _, db := range schema.AllSchemaNames() {
tbls := schema.SchemaTables(db)
for _, tbl := range tbls {
tblInfo := tbl.Meta()
ch := schema.ListTablesWithSpecialAttribute(infoschema.TiFlashAttribute)
for _, v := range ch {
for _, tblInfo := range v.TableInfos {
LoadTiFlashReplicaInfo(tblInfo, &tableList)
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,26 @@ func (is *infoSchema) SchemaTableInfos(schema model.CIStr) []*model.TableInfo {
return getTableInfoList(is.SchemaTables(schema))
}

type tableInfoResult struct {
DBName string
TableInfos []*model.TableInfo
}

func (is *infoSchema) ListTablesWithSpecialAttribute(filter specialAttributeFilter) []tableInfoResult {
ret := make([]tableInfoResult, 0, 10)
for _, dbName := range is.AllSchemaNames() {
res := tableInfoResult{DBName: dbName.O}
for _, tblInfo := range is.SchemaTableInfos(dbName) {
if !filter(tblInfo) {
continue
}
res.TableInfos = append(res.TableInfos, tblInfo)
}
ret = append(ret, res)
}
return ret
}

// AllSchemaNames returns all the schemas' names.
func AllSchemaNames(is InfoSchema) (names []string) {
schemas := is.AllSchemaNames()
Expand Down
131 changes: 123 additions & 8 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ type Data struct {
// pid2tid is used by FindTableInfoByPartitionID, it stores {partitionID, schemaVersion} => table ID
// Need full data in memory!
pid2tid *btree.BTreeG[partitionItem]

// tableInfoResident stores {dbName, tableID, schemaVersion} => model.TableInfo
// It is part of the model.TableInfo data kept in memory to accelerate the list tables API.
// We observe the pattern that list table API always come with filter.
// All model.TableInfo with special attributes are here, currently the special attributes including:
// TTLInfo, TiFlashReplica
// PlacementPolicyRef, Partition might be added later, and also ForeignKeys, TableLock etc
tableInfoResident *btree.BTreeG[tableInfoItem]
}

type tableInfoItem struct {
dbName string
tableID int64
schemaVersion int64
tableInfo *model.TableInfo
tomb bool
}

type partitionItem struct {
Expand Down Expand Up @@ -152,12 +168,13 @@ type tableCacheKey struct {
// NewData creates an infoschema V2 data struct.
func NewData() *Data {
ret := &Data{
byID: btree.NewBTreeG[tableItem](compareByID),
byName: btree.NewBTreeG[tableItem](compareByName),
schemaMap: btree.NewBTreeG[schemaItem](compareSchemaItem),
tableCache: newSieve[tableCacheKey, table.Table](1024 * 1024 * size.MB),
specials: make(map[string]*schemaTables),
pid2tid: btree.NewBTreeG[partitionItem](comparePartitionItem),
byID: btree.NewBTreeG[tableItem](compareByID),
byName: btree.NewBTreeG[tableItem](compareByName),
schemaMap: btree.NewBTreeG[schemaItem](compareSchemaItem),
tableCache: newSieve[tableCacheKey, table.Table](1024 * 1024 * size.MB),
specials: make(map[string]*schemaTables),
pid2tid: btree.NewBTreeG[partitionItem](comparePartitionItem),
tableInfoResident: btree.NewBTreeG[tableInfoItem](compareTableInfoItem),
}
return ret
}
Expand All @@ -171,11 +188,20 @@ func (isd *Data) add(item tableItem, tbl table.Table) {
isd.byID.Set(item)
isd.byName.Set(item)
isd.tableCache.Set(tableCacheKey{item.tableID, item.schemaVersion}, tbl)
if pi := tbl.Meta().GetPartitionInfo(); pi != nil {
ti := tbl.Meta()
if pi := ti.GetPartitionInfo(); pi != nil {
for _, def := range pi.Definitions {
isd.pid2tid.Set(partitionItem{def.ID, item.schemaVersion, tbl.Meta().ID, false})
}
}
if hasSpecialAttributes(ti) {
isd.tableInfoResident.Set(tableInfoItem{
dbName: item.dbName,
tableID: item.tableID,
schemaVersion: item.schemaVersion,
tableInfo: ti,
tomb: false})
}
}

func (isd *Data) addSpecialDB(di *model.DBInfo, tables *schemaTables) {
Expand All @@ -191,6 +217,12 @@ func (isd *Data) remove(item tableItem) {
item.tomb = true
isd.byID.Set(item)
isd.byName.Set(item)
isd.tableInfoResident.Set(tableInfoItem{
dbName: item.dbName,
tableID: item.tableID,
schemaVersion: item.schemaVersion,
tableInfo: nil,
tomb: true})
isd.tableCache.Remove(tableCacheKey{item.tableID, item.schemaVersion})
}

Expand Down Expand Up @@ -228,6 +260,23 @@ func compareByName(a, b tableItem) bool {
return a.schemaVersion < b.schemaVersion
}

func compareTableInfoItem(a, b tableInfoItem) bool {
if a.dbName < b.dbName {
return true
}
if a.dbName > b.dbName {
return false
}

if a.tableID < b.tableID {
return true
}
if a.tableID > b.tableID {
return false
}
return a.schemaVersion < b.schemaVersion
}

func comparePartitionItem(a, b partitionItem) bool {
if a.partitionID < b.partitionID {
return true
Expand Down Expand Up @@ -472,7 +521,10 @@ func (is *infoschemaV2) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok

var dbInfo model.DBInfo
dbInfo.Name = schema
is.Data.schemaMap.Descend(schemaItem{dbInfo: &dbInfo, schemaVersion: math.MaxInt64}, func(item schemaItem) bool {
is.Data.schemaMap.Descend(schemaItem{
dbInfo: &dbInfo,
schemaVersion: math.MaxInt64,
}, func(item schemaItem) bool {
if item.Name() != schema.L {
ok = false
return false
Expand Down Expand Up @@ -648,6 +700,7 @@ retry:
// TODO: error could happen, so do not panic!
panic(err)
}

tables = make([]table.Table, 0, len(tblInfos))
for _, tblInfo := range tblInfos {
tbl, ok := is.tableByID(tblInfo.ID, true)
Expand Down Expand Up @@ -990,3 +1043,65 @@ func (b *bundleInfoBuilder) completeUpdateTablesV2(is *infoschemaV2) {
}
}
}

type specialAttributeFilter func(*model.TableInfo) bool

// TTLAttribute is the TTL attribute filter used by ListTablesWithSpecialAttribute.
var TTLAttribute specialAttributeFilter = func(t *model.TableInfo) bool {
return t.State == model.StatePublic && t.TTLInfo != nil
}

// TiFlashAttribute is the TiFlashReplica attribute filter used by ListTablesWithSpecialAttribute.
var TiFlashAttribute specialAttributeFilter = func(t *model.TableInfo) bool {
return t.TiFlashReplica != nil
}

func hasSpecialAttributes(t *model.TableInfo) bool {
return TTLAttribute(t) || TiFlashAttribute(t)
}

// AllSpecialAttribute marks a model.TableInfo with any special attributes.
var AllSpecialAttribute specialAttributeFilter = hasSpecialAttributes

func (is *infoschemaV2) ListTablesWithSpecialAttribute(filter specialAttributeFilter) []tableInfoResult {
ret := make([]tableInfoResult, 0, 10)
var currDB string
var lastTableID int64
var res tableInfoResult
is.Data.tableInfoResident.Reverse(func(item tableInfoItem) bool {
if item.schemaVersion > is.infoSchema.schemaMetaVersion {
// Skip the versions that we are not looking for.
return true
}
// Dedup the same record of different versions.
if lastTableID != 0 && lastTableID == item.tableID {
return true
}
lastTableID = item.tableID

if item.tomb {
return true
}

if !filter(item.tableInfo) {
return true
}

if currDB == "" {
currDB = item.dbName
res = tableInfoResult{DBName: item.dbName}
res.TableInfos = append(res.TableInfos, item.tableInfo)
} else if currDB == item.dbName {
res.TableInfos = append(res.TableInfos, item.tableInfo)
} else {
ret = append(ret, res)
res = tableInfoResult{DBName: item.dbName}
res.TableInfos = append(res.TableInfos, item.tableInfo)
}
return true
})
if len(res.TableInfos) > 0 {
ret = append(ret, res)
}
return ret
}
1 change: 1 addition & 0 deletions pkg/infoschema/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ type InfoSchema interface {
TableByID(id int64) (table.Table, bool)
SchemaTables(schema model.CIStr) []table.Table
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition)
ListTablesWithSpecialAttribute(filter specialAttributeFilter) []tableInfoResult
base() *infoSchema
}
4 changes: 3 additions & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/infoschema",
"//pkg/parser/auth",
"//pkg/parser/model",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
Expand Down
81 changes: 81 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package infoschemav2test

import (
"slices"
"strconv"
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -150,6 +154,83 @@ PARTITION p5 VALUES LESS THAN (1980))`)
require.Equal(t, pid, ntbl.Meta().ID)
}

func TestListTablesWithSpecialAttribute(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount")
require.NoError(t, err)
}()
tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()

for _, v := range []int{1024000, 0} {
tk.MustExec("set @@global.tidb_schema_cache_size = ?", v)

tk.MustExec("create database test_db1")
tk.MustExec("use test_db1")
tk.MustExec("create table t_ttl (created_at datetime) ttl = created_at + INTERVAL 1 YEAR ttl_enable = 'ON'")
checkResult(t, tk, "test_db1 t_ttl")

tk.MustExec("alter table t_ttl remove ttl")
checkResult(t, tk)

tk.MustExec("drop table t_ttl")
checkResult(t, tk)

tk.MustExec("create table t_ttl (created_at1 datetime) ttl = created_at1 + INTERVAL 1 YEAR ttl_enable = 'ON'")
checkResult(t, tk, "test_db1 t_ttl")

tk.MustExec("create database test_db2")
tk.MustExec("use test_db2")
checkResult(t, tk, "test_db1 t_ttl")

tk.MustExec("create table t_ttl (created_at datetime) ttl = created_at + INTERVAL 1 YEAR ttl_enable = 'ON'")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_ttl")

tk.MustExec("create table t_tiflash (id int)")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_ttl")

tk.MustExec("alter table t_tiflash set tiflash replica 1")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_tiflash", "test_db2 t_ttl")

tk.MustExec("alter table t_tiflash set tiflash replica 0")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_ttl")

tk.MustExec("drop table t_tiflash")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_ttl")

tk.MustExec("create table t_tiflash (id int)")
tk.MustExec("alter table t_tiflash set tiflash replica 1")
checkResult(t, tk, "test_db1 t_ttl", "test_db2 t_tiflash", "test_db2 t_ttl")

tk.MustExec("drop database test_db1")
checkResult(t, tk, "test_db2 t_tiflash", "test_db2 t_ttl")

tk.MustExec("drop database test_db2")
checkResult(t, tk)
}
}

func checkResult(t *testing.T, tk *testkit.TestKit, result ...string) {
is := domain.GetDomain(tk.Session()).InfoSchema()
ch := is.ListTablesWithSpecialAttribute(infoschema.AllSpecialAttribute)
var rows []string
for _, v := range ch {
for _, tblInfo := range v.TableInfos {
rows = append(rows, v.DBName+" "+tblInfo.Name.L)
}
}
slices.SortFunc(rows, strings.Compare)
require.Equal(t, rows, result)
}

func TestTiDBSchemaCacheSizeVariable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
8 changes: 5 additions & 3 deletions pkg/ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func (isc *InfoSchemaCache) Update(se session.Session) error {
}

newTables := make(map[int64]*PhysicalTable, len(isc.Tables))
for _, dbName := range is.AllSchemaNames() {
for _, tblInfo := range is.SchemaTableInfos(dbName) {

ch := is.ListTablesWithSpecialAttribute(infoschema.TTLAttribute)
for _, v := range ch {
for _, tblInfo := range v.TableInfos {
if tblInfo.TTLInfo == nil || !tblInfo.TTLInfo.Enable || tblInfo.State != model.StatePublic {
continue
}

dbName := model.NewCIStr(v.DBName)
logger := logutil.BgLogger().
With(zap.String("schema", dbName.L),
zap.Int64("tableID", tblInfo.ID), zap.String("tableName", tblInfo.Name.L))
Expand Down
11 changes: 4 additions & 7 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,17 +1063,14 @@ GROUP BY
}

noRecordTables := make([]string, 0)
for _, dbName := range is.AllSchemaNames() {
for _, tblInfo := range is.SchemaTableInfos(dbName) {
if tblInfo.TTLInfo == nil {
continue
}

ch := is.ListTablesWithSpecialAttribute(infoschema.TTLAttribute)
for _, v := range ch {
for _, tblInfo := range v.TableInfos {
interval, err := tblInfo.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(ctx).Error("failed to get table's job interval",
zap.Error(err),
zap.String("db", dbName.String()),
zap.String("db", v.DBName),
zap.String("table", tblInfo.Name.String()),
)
interval = time.Hour
Expand Down
Loading