Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: gc delete range #40639

Merged
merged 29 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1711424
add keyspace drop table logic
ystaticy Jan 16, 2023
3752de0
add keyspace drop table logic
ystaticy Jan 16, 2023
9534706
fix condition of keyspace
ystaticy Jan 28, 2023
ede6068
Merge remote-tracking branch 'origin/master' into keyspace_gc
ystaticy Jan 28, 2023
7fac684
fix etcdcli in ServerInformationPath
ystaticy Jan 29, 2023
168a9b2
skip doGCPlacementRules when keyspace_name is set
ystaticy Jan 29, 2023
5c497c8
Merge branch 'master' into keyspace_gc
ystaticy Jan 30, 2023
216b196
remove empty line
ystaticy Jan 30, 2023
44ce5e2
Merge branch 'master' into keyspace_gc
ystaticy Jan 30, 2023
51169ba
Merge branch 'master' into keyspace_gc
ystaticy Jan 30, 2023
fa2e0d5
Merge branch 'master' into keyspace_gc
ystaticy Jan 31, 2023
e36f591
fix conditiosn
ystaticy Jan 31, 2023
ddd49a9
close etcdcli
ystaticy Jan 31, 2023
b254cfe
add comments
ystaticy Jan 31, 2023
bc2a66e
add comments
ystaticy Jan 31, 2023
a7742a3
Merge branch 'master' into keyspace_gc
ystaticy Jan 31, 2023
6de13aa
add comments
ystaticy Jan 31, 2023
5fb78da
Merge branch 'keyspace_gc' of github.com:ystaticy/tidb into keyspace_gc
ystaticy Jan 31, 2023
f54b9c9
Merge branch 'master' into keyspace_gc
ystaticy Feb 1, 2023
f230556
Merge branch 'master' into keyspace_gc
ystaticy Feb 1, 2023
9ffe54f
add func to print log when gc safepoint is too early
ystaticy Feb 1, 2023
8702daa
Merge branch 'keyspace_gc' of github.com:ystaticy/tidb into keyspace_gc
ystaticy Feb 1, 2023
5d32161
add uuid
ystaticy Feb 2, 2023
790651a
add comments
ystaticy Feb 2, 2023
29fd722
fix grammar
ystaticy Feb 2, 2023
1bf9285
fix grammar
ystaticy Feb 2, 2023
0c1cb16
add comnments
ystaticy Feb 2, 2023
af0df85
fix comnments
ystaticy Feb 2, 2023
7cbdbc6
Merge branch 'master' into keyspace_gc
ti-chi-bot Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -327,7 +327,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -380,7 +380,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -444,7 +444,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -504,7 +504,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -530,7 +530,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
87 changes: 53 additions & 34 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,26 @@ func NewMockDomain() *Domain {
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle atomic.Pointer[bindinfo.BindHandle]
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle atomic.Pointer[bindinfo.BindHandle]
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
// `unprefixedEtcdCli` will never set etcd namespace prefix by keyspace.
// It's only used in storeMinStartTS and RemoveMinStartTS now.
// It's must be used when the etcd path isn't need to separate by keyspaces.
// See: https://github.com/pingcap/tidb/pull/39685
unprefixedEtcdCli *clientv3.Client
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some grammar mistakes here in the comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the grammar mistakes. PTAL~

sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
Expand Down Expand Up @@ -881,6 +887,10 @@ func (do *Domain) Close() {
terror.Log(errors.Trace(do.etcdClient.Close()))
}

if do.unprefixedEtcdCli != nil {
terror.Log(errors.Trace(do.unprefixedEtcdCli.Close()))
}

do.slowQuery.Close()
if do.cancel != nil {
do.cancel()
Expand Down Expand Up @@ -927,6 +937,27 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

const serverIDForStandalone = 1 // serverID for standalone deployment.

func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: addrs,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
},
TLS: ebd.TLSConfig(),
})
return cli, err
}

// Init initializes a domain.
func (do *Domain) Init(
ddlLease time.Duration,
Expand All @@ -942,32 +973,20 @@ func (do *Domain) Init(
return err
}
if addrs != nil {
cfg := config.GetGlobalConfig()
// silence etcd warn log, when domain closed, it won't randomly print warn log
// see details at the issue https://github.com/pingcap/tidb/issues/15479
etcdLogCfg := zap.NewProductionConfig()
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: addrs,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
},
TLS: ebd.TLSConfig(),
})
cli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}

etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

do.etcdClient = cli

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
}
}

Expand Down Expand Up @@ -1029,7 +1048,7 @@ func (do *Domain) Init(

// step 1: prepare the info/schema syncer which domain reload needed.
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard)
if err != nil {
return err
}
Expand Down
32 changes: 17 additions & 15 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA

// InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down.
type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
managerMu struct {
etcdCli *clientv3.Client
unprefixedEtcdCli *clientv3.Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better also add comments for it like in Domain.

Copy link
Contributor Author

@ystaticy ystaticy Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added new comments here.

info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
managerMu struct {
mu sync.RWMutex
util2.SessionManager
}
Expand Down Expand Up @@ -180,12 +181,13 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
etcdCli: etcdCli,
unprefixedEtcdCli: unprefixedEtcdCli,
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
err := is.init(ctx, skipRegisterToDashBoard)
if err != nil {
Expand Down Expand Up @@ -721,20 +723,20 @@ func (is *InfoSyncer) GetMinStartTS() uint64 {

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
if is.unprefixedEtcdCli == nil {
return nil
}
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
return util.PutKVToEtcd(ctx, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
strconv.FormatUint(is.minStartTS, 10),
clientv3.WithLease(is.session.Lease()))
}

// RemoveMinStartTS removes self server min start timestamp from etcd.
func (is *InfoSyncer) RemoveMinStartTS() {
if is.etcdCli == nil {
if is.unprefixedEtcdCli == nil {
return
}
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
2 changes: 1 addition & 1 deletion server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestUptime(t *testing.T) {
}()
require.NoError(t, err)

_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)

tidbdrv := NewTiDBDriver(store)
Expand Down
4 changes: 0 additions & 4 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage,
if err != nil {
return nil, errors.Trace(err)
}
// If there's setting keyspace-name, then skipped GC worker logic.
// It needs a group of special tidb nodes to execute GC worker logic.
// TODO: remove this restriction while merged keyspace GC worker logic.
disableGC = true
}

codec := pdClient.GetCodec()
Expand Down
Loading