Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain,infoschema: make infoschema activity block GC safepoint advancing #58062

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -310,7 +310,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -363,7 +363,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -427,7 +427,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/serial/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache())
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache())
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func (do *Domain) Init(
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli,
do.Store().GetCodec(), skipRegisterToDashboard)
do.Store().GetCodec(), skipRegisterToDashboard, do.infoCache)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type InfoSyncer struct {
scheduleManager ScheduleManager
tiflashReplicaManager TiFlashReplicaManager
resourceManagerClient pd.ResourceManagerClient
infoCache infoschemaMinTS
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -202,6 +203,10 @@ func SetPDHttpCliForTest(cli pdhttp.Client) func() {
}
}

type infoschemaMinTS interface {
GetAndResetRecentInfoSchemaTS(now uint64) uint64
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(
ctx context.Context,
Expand All @@ -211,6 +216,7 @@ func GlobalInfoSyncerInit(
pdCli pd.Client, pdHTTPCli pdhttp.Client,
codec tikv.Codec,
skipRegisterToDashBoard bool,
infoCache infoschemaMinTS,
) (*InfoSyncer, error) {
if pdHTTPCli != nil {
pdHTTPCli = pdHTTPCli.
Expand All @@ -224,6 +230,7 @@ func GlobalInfoSyncerInit(
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
infoCache: infoCache,
}
err := is.init(ctx, skipRegisterToDashBoard)
if err != nil {
Expand Down Expand Up @@ -801,6 +808,14 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
}
}

if is.infoCache != nil {
schemaTS := is.infoCache.GetAndResetRecentInfoSchemaTS(currentVer.Ver)
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("InfoSchema Recent StartTS", schemaTS))
if schemaTS > startTSLowerLimit && schemaTS < minStartTS {
minStartTS = schemaTS
}
}

is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS)

err = is.storeMinStartTS(context.Background())
Expand Down
6 changes: 3 additions & 3 deletions pkg/domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false, nil)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -156,7 +156,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
8 changes: 8 additions & 0 deletions pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package infoschema
import (
"sort"
"sync"
"sync/atomic"

infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics"
"github.com/pingcap/tidb/pkg/meta/autoid"
Expand Down Expand Up @@ -59,6 +60,13 @@ func NewCache(r autoid.Requirement, capacity int) *InfoCache {
}
}

// GetAndResetRecentInfoSchemaTS provides the min start ts for infosync.InfoSyncer.
func (h *InfoCache) GetAndResetRecentInfoSchemaTS(now uint64) uint64 {
ret := atomic.LoadUint64(&h.Data.recentMinTS)
atomic.StoreUint64(&h.Data.recentMinTS, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use atomic.Uint64 instead?

Copy link
Contributor

@MyonKeminta MyonKeminta Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that an infoschemaV2 is acccessed in a lower frequence than ReportMinStartTS, so that a tick of ReportMinStartTS may miss this infoschemaV2 instance, which allows GC to proceed, and then the infoschemaV2 instance becomes unavailable again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, that is possible, so there is still potential risk. I have discussed that with @lcwangchao
But in practice I think this fix is good enough for the most common cases. @MyonKeminta

Copy link
Contributor

@MyonKeminta MyonKeminta Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But I suggest these details can be noted in the comments in the code.
Rest LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return ret
}

// ReSize re-size the cache.
func (h *InfoCache) ReSize(capacity int) {
h.mu.Lock()
Expand Down
24 changes: 23 additions & 1 deletion pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ngaut/pools"
Expand Down Expand Up @@ -122,6 +123,9 @@ type Data struct {
// TTLInfo, TiFlashReplica
// PlacementPolicyRef, Partition might be added later, and also ForeignKeys, TableLock etc
tableInfoResident *btree.BTreeG[tableInfoItem]

// the minimum ts of the recent used infoschema
recentMinTS uint64
}

type tableInfoItem struct {
Expand Down Expand Up @@ -608,6 +612,7 @@ func (is *infoschemaV2) TableByID(ctx context.Context, id int64) (val table.Tabl
return
}

is.keepAlive()
itm, ok := is.searchTableItemByID(id)
if !ok {
return nil, false
Expand Down Expand Up @@ -769,8 +774,8 @@ func (is *infoschemaV2) TableByName(ctx context.Context, schema, tbl pmodel.CISt
return nil, ErrTableNotExists.FastGenByArgs(schema, tbl)
}

is.keepAlive()
start := time.Now()

var h tableByNameHelper
h.end = tableItem{dbName: schema, tableName: tbl, schemaVersion: math.MaxInt64}
h.schemaVersion = is.infoSchema.schemaMetaVersion
Expand Down Expand Up @@ -819,6 +824,22 @@ func (is *infoschemaV2) TableInfoByID(id int64) (*model.TableInfo, bool) {
return getTableInfo(tbl), ok
}

// keepAlive prevents the "GC life time is shorter than transaction duration" error on infoschema v2.
// It works by collecting the min TS of the during infoschem v2 API calls, and
// reports the min TS to info.InfoSyncer.
func (is *infoschemaV2) keepAlive() {
for {
v := atomic.LoadUint64(&is.Data.recentMinTS)
if v <= is.ts {
break
}
succ := atomic.CompareAndSwapUint64(&is.Data.recentMinTS, v, is.ts)
if succ {
break
}
}
}

// SchemaTableInfos implements MetaOnlyInfoSchema.
func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CIStr) ([]*model.TableInfo, error) {
if IsSpecialDB(schema.L) {
Expand All @@ -834,6 +855,7 @@ func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CISt
return nil, nil // something wrong?
}

is.keepAlive()
retry:
dbInfo, ok := is.SchemaByName(schema)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand Down
52 changes: 52 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package infoschemav2test
import (
"context"
"fmt"
"math"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -558,3 +559,54 @@ func TestInfoSchemaCachedAutoIncrement(t *testing.T) {
tk.MustExec("drop table t1;") // trigger infoschema cache reload
tk.MustQuery(autoIncQuery).Check(testkit.Rows("0"))
}

func TestGetAndResetRecentInfoSchemaTS(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST")
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe))

tk.MustExec("use test")
infoCache := dom.InfoCache()
schemaTS1 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)

// After some DDL changes
tk.MustExec("create table dummytbl (id int)")
schemaTS2 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS1, schemaTS2)

ts, err := store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustExec("alter table dummytbl add column (c int)")
schemaTS3 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS2, schemaTS3)

tk.MustExec("alter table dummytbl add index idx(c)")
schemaTS4 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.LessOrEqual(t, schemaTS3, schemaTS4)

// Reload several times
require.NoError(t, dom.Reload())
schemaTS5 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Equal(t, uint64(math.MaxUint64), schemaTS5)

require.NoError(t, dom.Reload())
schemaTS6 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Equal(t, uint64(math.MaxUint64), schemaTS6)

tk.MustQuery("select * from dummytbl").Check(testkit.Rows())
schemaTS7 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.Less(t, schemaTS4, schemaTS7)

// Now snapshot read using old infoschema
tk.MustExec(fmt.Sprintf("set @@tidb_snapshot = %d", ts))
tk.MustQuery("select * from dummytbl").Check(testkit.Rows())
schemaTS8 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64)
require.True(t, schemaTS8 < schemaTS7 && schemaTS8 > schemaTS2)
}
2 changes: 1 addition & 1 deletion pkg/server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUptime(t *testing.T) {
}()
require.NoError(t, err)

_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true)
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache())
require.NoError(t, err)

tidbdrv := NewTiDBDriver(store)
Expand Down