Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: some tiny optimizations to reduce infoschema v2 memory #53242

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (isd *Data) addSpecialDB(di *model.DBInfo, tables *schemaTables) {
}

func (isd *Data) addDB(schemaVersion int64, dbInfo *model.DBInfo) {
dbInfo.Tables = nil
isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo})
}

Expand Down Expand Up @@ -311,6 +312,10 @@ func (is *infoschemaV2) CloneAndUpdateTS(startTS uint64) *infoschemaV2 {
}

func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) {
return is.tableByID(id, false)
}

func (is *infoschemaV2) tableByID(id int64, noRefill bool) (val table.Table, ok bool) {
if !tableIDIsValid(id) {
return
}
Expand Down Expand Up @@ -340,7 +345,9 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) {
oldKey := tableCacheKey{itm.tableID, itm.schemaVersion}
tbl, found = is.tableCache.Get(oldKey)
if found && tbl != nil {
is.tableCache.Set(key, tbl)
if !noRefill {
is.tableCache.Set(key, tbl)
}
return tbl, true
}

Expand All @@ -350,7 +357,9 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) {
return nil, false
}

is.tableCache.Set(key, ret)
if !noRefill {
is.tableCache.Set(key, ret)
}
return ret, true
}

Expand Down Expand Up @@ -415,7 +424,33 @@ func (is *infoschemaV2) TableInfoByID(id int64) (*model.TableInfo, bool) {

// SchemaTableInfos implements InfoSchema.FindTableInfoByPartitionID
func (is *infoschemaV2) SchemaTableInfos(schema model.CIStr) []*model.TableInfo {
return getTableInfoList(is.SchemaTables(schema))
if isSpecialDB(schema.L) {
schTbls := is.Data.specials[schema.L]
tables := make([]table.Table, 0, len(schTbls.tables))
for _, tbl := range schTbls.tables {
tables = append(tables, tbl)
}
return getTableInfoList(tables)
}

dbInfo, ok := is.SchemaByName(schema)
if !ok {
return nil
}
snapshot := is.r.Store().GetSnapshot(kv.NewVersion(is.ts))
// Using the KV timeout read feature to address the issue of potential DDL lease expiration when
// the meta region leader is slow.
snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms.
m := meta.NewSnapshotMeta(snapshot)
tblInfos, err := m.ListTables(dbInfo.ID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return nil
}
// TODO: error could happen, so do not panic!
panic(err)
}
return tblInfos
}

// FindTableInfoByPartitionID implements InfoSchema.FindTableInfoByPartitionID
Expand Down Expand Up @@ -611,7 +646,7 @@ retry:
}
tables = make([]table.Table, 0, len(tblInfos))
for _, tblInfo := range tblInfos {
tbl, ok := is.TableByID(tblInfo.ID)
tbl, ok := is.tableByID(tblInfo.ID, true)
if !ok {
// what happen?
continue
Expand Down Expand Up @@ -912,8 +947,8 @@ func (b *bundleInfoBuilder) updateInfoSchemaBundlesV2(is *infoschemaV2) {
// TODO: This is quite inefficient! we need some better way or avoid this API.
is.ruleBundleMap = make(map[int64]*placement.Bundle)
for _, dbInfo := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(dbInfo.Name) {
b.updateTableBundles(is, tbl.Meta().ID)
for _, tbl := range is.SchemaTableInfos(dbInfo.Name) {
b.updateTableBundles(is, tbl.ID)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/infoschema/infoschema_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestV2Basic(t *testing.T) {
is.Data.addDB(1, dbInfo)
internal.AddDB(t, r.Store(), dbInfo)
tblInfo := internal.MockTableInfo(t, r.Store(), tableName.O)
tblInfo.DBID = dbInfo.ID
is.Data.add(tableItem{schemaName.L, dbInfo.ID, tableName.L, tblInfo.ID, 2, false}, internal.MockTable(t, r.Store(), tblInfo))
internal.AddTable(t, r.Store(), dbInfo, tblInfo)
is.base().schemaMetaVersion = 1
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestV2Basic(t *testing.T) {

tblInfos := is.SchemaTableInfos(schemaName)
require.Equal(t, 1, len(tblInfos))
require.Same(t, tables[0].Meta(), tblInfos[0])
require.Equal(t, tables[0].Meta(), tblInfos[0])

tables = is.SchemaTables(model.NewCIStr("notexist"))
require.Equal(t, 0, len(tables))
Expand Down
18 changes: 9 additions & 9 deletions pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func TestWriteDBTablesData(t *testing.T) {
// No table in a schema.
info := infoschema.MockInfoSchema([]*model.TableInfo{})
rc := httptest.NewRecorder()
tbs := info.SchemaTables(model.NewCIStr("test"))
tbs := info.SchemaTableInfos(model.NewCIStr("test"))
require.Equal(t, 0, len(tbs))
tikvhandler.WriteDBTablesData(rc, tbs)
var ti []*model.TableInfo
Expand All @@ -1142,30 +1142,30 @@ func TestWriteDBTablesData(t *testing.T) {
// One table in a schema.
info = infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable()})
rc = httptest.NewRecorder()
tbs = info.SchemaTables(model.NewCIStr("test"))
tbs = info.SchemaTableInfos(model.NewCIStr("test"))
require.Equal(t, 1, len(tbs))
tikvhandler.WriteDBTablesData(rc, tbs)
decoder = json.NewDecoder(rc.Body)
err = decoder.Decode(&ti)
require.NoError(t, err)
require.Equal(t, 1, len(ti))
require.Equal(t, ti[0].ID, tbs[0].Meta().ID)
require.Equal(t, ti[0].Name.String(), tbs[0].Meta().Name.String())
require.Equal(t, ti[0].ID, tbs[0].ID)
require.Equal(t, ti[0].Name.String(), tbs[0].Name.String())

// Two tables in a schema.
info = infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()})
rc = httptest.NewRecorder()
tbs = info.SchemaTables(model.NewCIStr("test"))
tbs = info.SchemaTableInfos(model.NewCIStr("test"))
require.Equal(t, 2, len(tbs))
tikvhandler.WriteDBTablesData(rc, tbs)
decoder = json.NewDecoder(rc.Body)
err = decoder.Decode(&ti)
require.NoError(t, err)
require.Equal(t, 2, len(ti))
require.Equal(t, ti[0].ID, tbs[0].Meta().ID)
require.Equal(t, ti[1].ID, tbs[1].Meta().ID)
require.Equal(t, ti[0].Name.String(), tbs[0].Meta().Name.String())
require.Equal(t, ti[1].Name.String(), tbs[1].Meta().Name.String())
require.Equal(t, ti[0].ID, tbs[0].ID)
require.Equal(t, ti[1].ID, tbs[1].ID)
require.Equal(t, ti[0].Name.String(), tbs[0].Name.String())
require.Equal(t, ti[1].Name.String(), tbs[1].Name.String())
}

func TestSetLabels(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/handler/tikvhandler/tikv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func (h SchemaStorageHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
// For every table in the input, we marshal them. The result such as {tb1} {tb2} {tb3}.
// Then we add some bytes to make it become [{tb1}, {tb2}, {tb3}], so we can unmarshal it to []*model.TableInfo.
// Note: It would return StatusOK even if errors occur. But if errors occur, there must be some bugs.
func WriteDBTablesData(w http.ResponseWriter, tbs []table.Table) {
func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
if len(tbs) == 0 {
handler.WriteData(w, []*model.TableInfo{})
return
Expand All @@ -946,7 +946,7 @@ func WriteDBTablesData(w http.ResponseWriter, tbs []table.Table) {
} else {
init = true
}
js, err := json.MarshalIndent(tb.Meta(), "", " ")
js, err := json.MarshalIndent(tb, "", " ")
if err != nil {
terror.Log(errors.Trace(err))
return
Expand Down Expand Up @@ -987,7 +987,7 @@ func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// all table schemas in a specified database
if schema.SchemaExists(cDBName) {
tbs := schema.SchemaTables(cDBName)
tbs := schema.SchemaTableInfos(cDBName)
WriteDBTablesData(w, tbs)
return
}
Expand Down Expand Up @@ -1016,7 +1016,7 @@ func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler.WriteError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
return
}
handler.WriteData(w, tbl.Meta())
handler.WriteData(w, tbl)
return
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/statistics/handle/util/table_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func (c *tableInfoGetterImpl) TableInfoByID(is infoschema.InfoSchema, physicalID
func buildPartitionID2TableID(is infoschema.InfoSchema) map[int64]int64 {
mapper := make(map[int64]int64)
for _, dbName := range is.AllSchemaNames() {
tbls := is.SchemaTables(dbName)
tbls := is.SchemaTableInfos(dbName)
for _, tbl := range tbls {
tbl := tbl.Meta()
pi := tbl.GetPartitionInfo()
if pi == nil {
continue
Expand Down
3 changes: 1 addition & 2 deletions pkg/ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func (isc *InfoSchemaCache) Update(se session.Session) error {

newTables := make(map[int64]*PhysicalTable, len(isc.Tables))
for _, dbName := range is.AllSchemaNames() {
for _, tbl := range is.SchemaTables(dbName) {
tblInfo := tbl.Meta()
for _, tblInfo := range is.SchemaTableInfos(dbName) {
if tblInfo.TTLInfo == nil || !tblInfo.TTLInfo.Enable || tblInfo.State != model.StatePublic {
continue
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,7 @@ GROUP BY

noRecordTables := make([]string, 0)
for _, dbName := range is.AllSchemaNames() {
for _, tbl := range is.SchemaTables(dbName) {
tblInfo := tbl.Meta()
for _, tblInfo := range is.SchemaTableInfos(dbName) {
if tblInfo.TTLInfo == nil {
continue
}
Expand Down