Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain,infoschema: make infoschema activity block GC safepoint advancing #58062

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -310,7 +310,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -363,7 +363,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -427,7 +427,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/serial/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache())
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache())
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func (do *Domain) Init(
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli,
do.Store().GetCodec(), skipRegisterToDashboard)
do.Store().GetCodec(), skipRegisterToDashboard, do.infoCache)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type InfoSyncer struct {
scheduleManager ScheduleManager
tiflashReplicaManager TiFlashReplicaManager
resourceManagerClient pd.ResourceManagerClient
infoCache infoschemaMinTS
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -202,6 +203,10 @@ func SetPDHttpCliForTest(cli pdhttp.Client) func() {
}
}

type infoschemaMinTS interface {
GetAndResetRecentInfoSchemaTS(now uint64) uint64
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(
ctx context.Context,
Expand All @@ -211,6 +216,7 @@ func GlobalInfoSyncerInit(
pdCli pd.Client, pdHTTPCli pdhttp.Client,
codec tikv.Codec,
skipRegisterToDashBoard bool,
infoCache infoschemaMinTS,
) (*InfoSyncer, error) {
if pdHTTPCli != nil {
pdHTTPCli = pdHTTPCli.
Expand All @@ -224,6 +230,7 @@ func GlobalInfoSyncerInit(
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
infoCache: infoCache,
}
err := is.init(ctx, skipRegisterToDashBoard)
if err != nil {
Expand Down Expand Up @@ -801,6 +808,14 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
}
}

if is.infoCache != nil {
schemaTS := is.infoCache.GetAndResetRecentInfoSchemaTS(currentVer.Ver)
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("InfoSchema Recent StartTS", schemaTS))
if schemaTS > startTSLowerLimit && schemaTS < minStartTS {
minStartTS = schemaTS
}
}

is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS)

err = is.storeMinStartTS(context.Background())
Expand Down
6 changes: 3 additions & 3 deletions pkg/domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false, nil)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -156,7 +156,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
19 changes: 19 additions & 0 deletions pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ func NewCache(r autoid.Requirement, capacity int) *InfoCache {
}
}

// GetAndResetRecentInfoSchemaTS provides the min start ts for infosync.InfoSyncer.
// It works like this:
//
// There is a background infosync worker calling ReportMinStartTS() function periodically.
// At the beginning of each round, the Data.recentMinTS here is reset to current TS.
// If InfoSchemaV2 APIs are called, there is an internal keepAlive() function will also be called.
// The keepAlive() function will compare the InfoSchemaV2's ts with Data.recentMinTS, and
// update the Data.recentMinTS to smaller one.
//
// In a nutshell, every round of ReportMinStartTS(), the minimal known TS used be InfoSchemaV2 APIs will be reported.
// Some corner cases might happen: the caller take an InfoSchemaV2 instance and not use it immediately.
// Seveval rounds later, that InfoSchema is used and its TS is reported to block GC safepoint advancing.
// But that's too late, the GC has been done, "GC life time is shorter than transaction duration" error still happen.
func (h *InfoCache) GetAndResetRecentInfoSchemaTS(now uint64) uint64 {
ret := h.Data.recentMinTS.Load()
h.Data.recentMinTS.Store(now)
return ret
}

// ReSize re-size the cache.
func (h *InfoCache) ReSize(capacity int) {
h.mu.Lock()
Expand Down
24 changes: 23 additions & 1 deletion pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ngaut/pools"
Expand Down Expand Up @@ -122,6 +123,9 @@ type Data struct {
// TTLInfo, TiFlashReplica
// PlacementPolicyRef, Partition might be added later, and also ForeignKeys, TableLock etc
tableInfoResident *btree.BTreeG[tableInfoItem]

// the minimum ts of the recent used infoschema
recentMinTS atomic.Uint64
}

type tableInfoItem struct {
Expand Down Expand Up @@ -608,6 +612,7 @@ func (is *infoschemaV2) TableByID(ctx context.Context, id int64) (val table.Tabl
return
}

is.keepAlive()
itm, ok := is.searchTableItemByID(id)
if !ok {
return nil, false
Expand Down Expand Up @@ -769,8 +774,8 @@ func (is *infoschemaV2) TableByName(ctx context.Context, schema, tbl pmodel.CISt
return nil, ErrTableNotExists.FastGenByArgs(schema, tbl)
}

is.keepAlive()
start := time.Now()

var h tableByNameHelper
h.end = tableItem{dbName: schema, tableName: tbl, schemaVersion: math.MaxInt64}
h.schemaVersion = is.infoSchema.schemaMetaVersion
Expand Down Expand Up @@ -819,6 +824,22 @@ func (is *infoschemaV2) TableInfoByID(id int64) (*model.TableInfo, bool) {
return getTableInfo(tbl), ok
}

// keepAlive prevents the "GC life time is shorter than transaction duration" error on infoschema v2.
// It works by collecting the min TS of the during infoschem v2 API calls, and
// reports the min TS to info.InfoSyncer.
func (is *infoschemaV2) keepAlive() {
for {
v := is.Data.recentMinTS.Load()
if v <= is.ts {
break
}
succ := is.Data.recentMinTS.CompareAndSwap(v, is.ts)
if succ {
break
}
}
}

// SchemaTableInfos implements MetaOnlyInfoSchema.
func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CIStr) ([]*model.TableInfo, error) {
if IsSpecialDB(schema.L) {
Expand All @@ -834,6 +855,7 @@ func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CISt
return nil, nil // something wrong?
}

is.keepAlive()
retry:
dbInfo, ok := is.SchemaByName(schema)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand Down
52 changes: 52 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package infoschemav2test
import (
"context"
"fmt"
"math"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -558,3 +559,54 @@ func TestInfoSchemaCachedAutoIncrement(t *testing.T) {
tk.MustExec("drop table t1;") // trigger infoschema cache reload
tk.MustQuery(autoIncQuery).Check(testkit.Rows("0"))
}

func TestGetAndResetRecentInfoSchemaTS(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST")
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe))

tk.MustExec("use test")
infoCache := dom.InfoCache()
schemaTS1 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)

// After some DDL changes
tk.MustExec("create table dummytbl (id int)")
schemaTS2 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS1, schemaTS2)

ts, err := store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustExec("alter table dummytbl add column (c int)")
schemaTS3 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS2, schemaTS3)

tk.MustExec("alter table dummytbl add index idx(c)")
schemaTS4 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS3, schemaTS4)

// Reload several times
require.NoError(t, dom.Reload())
schemaTS5 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Equal(t, uint64(math.MaxUint64), schemaTS5)

require.NoError(t, dom.Reload())
schemaTS6 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Equal(t, uint64(math.MaxUint64), schemaTS6)

tk.MustQuery("select * from dummytbl").Check(testkit.Rows())
schemaTS7 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Less(t, schemaTS4, schemaTS7)

// Now snapshot read using old infoschema
tk.MustExec(fmt.Sprintf("set @@tidb_snapshot = %d", ts))
tk.MustQuery("select * from dummytbl").Check(testkit.Rows())
schemaTS8 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.True(t, schemaTS8 < schemaTS7 && schemaTS8 > schemaTS2)
}
2 changes: 1 addition & 1 deletion pkg/server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUptime(t *testing.T) {
}()
require.NoError(t, err)

_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)

tidbdrv := NewTiDBDriver(store)
Expand Down