Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: keep the timestamp of infoschema v2 updating #52983

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,27 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS)
schemaTsOrStartTs := schemaTs
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
schemaTsOrStartTs = startTS
}

if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil {
isV2, raw := infoschema.IsV2(is)
if isV2 {
// Copy the infoschema V2 instance and update its ts.
// For example, the DDL run 30 minutes ago, GC happened 10 minutes ago. If we use
// that infoschema it would get error "GC life time is shorter than transaction
// duration" when visiting TiKV.
// So we keep updating the ts of the infoschema v2.
is = raw.CloneAndUpdateTS(startTS)
}

// try to insert here as well to correct the schemaTs if previous is wrong
// the insert method check if schemaTs is zero
do.infoCache.Insert(is, schemaTs)

enableV2 := variable.SchemaCacheSize.Load() > 0
isV2 := infoschema.IsV2(is)
if enableV2 == isV2 {
return is, true, 0, nil, nil
}
Expand All @@ -273,7 +280,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < LoadSchemaDiffVersionGapThreshold {
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion, schemaTsOrStartTs)
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion, startTS)
if err == nil {
infoschema_metrics.LoadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, schemaTs)
Expand Down Expand Up @@ -317,7 +324,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)))

is := newISBuilder.Build(schemaTs)
is := newISBuilder.Build(startTS)
do.infoCache.Insert(is, schemaTs)
return is, false, currentSchemaVersion, nil, nil
}
Expand Down Expand Up @@ -441,7 +448,7 @@ func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, don
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64, schemaTS uint64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64, startTS uint64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
Expand Down Expand Up @@ -482,7 +489,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
}

is := builder.Build(schemaTS)
is := builder.Build(startTS)
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
Expand Down
3 changes: 2 additions & 1 deletion pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,8 @@ func (b *Builder) Build(schemaTS uint64) InfoSchema {
func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) (*Builder, error) {
// Do not mix infoschema v1 and infoschema v2 building, this can simplify the logic.
// If we want to build infoschema v2, but the old infoschema is v1, just return error to trigger a full load.
if b.enableV2 != IsV2(oldSchema) {
isV2, _ := IsV2(oldSchema)
if b.enableV2 != isV2 {
return nil, errors.Errorf("builder's (v2=%v) infoschema mismatch, return error to trigger full reload", b.enableV2)
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (h *InfoCache) GetLatest() InfoSchema {
infoschema_metrics.GetLatestCounter.Inc()
if len(h.cache) > 0 {
infoschema_metrics.HitLatestCounter.Inc()
return h.cache[0].infoschema
ret := h.cache[0].infoschema
return ret
}
return nil
}
Expand Down Expand Up @@ -203,13 +204,19 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
})

// cached entry
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version &&
IsV2(h.cache[i].infoschema) == IsV2(is) {
// update timestamp if it is not 0 and cached one is 0
if schemaTS > 0 && h.cache[i].timestamp == 0 {
h.cache[i].timestamp = int64(schemaTS)
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
xisV2, _ := IsV2(h.cache[i].infoschema)
yisV2, _ := IsV2(is)
if xisV2 == yisV2 {
// update timestamp if it is not 0 and cached one is 0
if schemaTS > 0 && h.cache[i].timestamp == 0 {
h.cache[i].timestamp = int64(schemaTS)
} else if xisV2 {
// update infoschema if it's infoschema v2
h.cache[i].infoschema = is
}
return true
}
return true
}

if len(h.cache) < cap(h.cache) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ func TestEnableInfoSchemaV2(t *testing.T) {

// Check the InfoSchema used is V2.
is := domain.GetDomain(tk.Session()).InfoSchema()
require.True(t, infoschema.IsV2(is))
isV2, _ := infoschema.IsV2(is)
require.True(t, isV2)

// Execute some basic operations under infoschema v2.
tk.MustQuery("show tables").Check(testkit.Rows("v2"))
Expand All @@ -985,7 +986,8 @@ func TestEnableInfoSchemaV2(t *testing.T) {

tk.MustExec("drop table v1")
is = domain.GetDomain(tk.Session()).InfoSchema()
require.False(t, infoschema.IsV2(is))
isV2, _ = infoschema.IsV2(is)
require.False(t, isV2)
}

type infoschemaTestContext struct {
Expand Down
12 changes: 9 additions & 3 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ func (is *infoschemaV2) base() *infoSchema {
return is.infoSchema
}

func (is *infoschemaV2) CloneAndUpdateTS(startTS uint64) *infoschemaV2 {
tmp := *is
tmp.ts = startTS
return &tmp
}

func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) {
if !tableIDIsValid(id) {
return
Expand Down Expand Up @@ -662,9 +668,9 @@ func isTableVirtual(id int64) bool {
}

// IsV2 tells whether an InfoSchema is v2 or not.
func IsV2(is InfoSchema) bool {
_, ok := is.(*infoschemaV2)
return ok
func IsV2(is InfoSchema) (bool, *infoschemaV2) {
ret, ok := is.(*infoschemaV2)
return ok, ret
}

func applyTableUpdate(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestSpecialSchemas(t *testing.T) {
tk.MustQuery("select @@global.tidb_schema_cache_size;").Check(testkit.Rows("1024"))
tk.MustExec("create table t (id int);")
is := domain.GetDomain(tk.Session()).InfoSchema()
require.True(t, infoschema.IsV2(is))
isV2, _ := infoschema.IsV2(is)
require.True(t, isV2)

tk.MustQuery("show databases;").Check(testkit.Rows(
"INFORMATION_SCHEMA", "METRICS_SCHEMA", "PERFORMANCE_SCHEMA", "mysql", "sys", "test"))
Expand Down