diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 2b57cde0828c2..912eaa271e4a5 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1010,7 +1010,9 @@ func (d *ddl) close() { // when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager, // when run with uni-store BreakCampaignLoop is same as Close. // hope we can unify it after refactor to let some components only start once. - d.ownerManager.BreakCampaignLoop() + if d.ownerManager != nil { + d.ownerManager.BreakCampaignLoop() + } d.schemaVerSyncer.Close() // d.delRangeMgr using sessions from d.sessPool. diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 4b14d1ae8b17a..66a65f49b6a4b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -275,6 +275,11 @@ func (do *Domain) EtcdClient() *clientv3.Client { return do.etcdClient } +// UnprefixedEtcdCli export for test. +func (do *Domain) UnprefixedEtcdCli() *clientv3.Client { + return do.unprefixedEtcdCli +} + // loadInfoSchema loads infoschema at startTS. // It returns: // 1. the needed infoschema @@ -1280,6 +1285,11 @@ const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool wil // NewDomain creates a new domain. Should not create multiple domains for the same store. func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain { + return NewDomainWithEtcdClient(store, schemaLease, statsLease, dumpFileGcLease, factory, nil) +} + +// NewDomainWithEtcdClient creates a new domain with etcd client. Should not create multiple domains for the same store. +func NewDomainWithEtcdClient(store kv.Storage, schemaLease time.Duration, statsLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, etcdClient *clientv3.Client) *Domain { intest.Assert(schemaLease > 0, "schema lease should be a positive duration") capacity := 200 // capacity of the sysSessionPool size do := &Domain{ @@ -1324,6 +1334,7 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura do.sysProcesses = SysProcesses{mu: &sync.RWMutex{}, procMap: make(map[uint64]sysproctrack.TrackProc)} do.initDomainSysVars() do.expiredTimeStamp4PC.expiredTimeStamp = types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp) + do.etcdClient = etcdClient return do } diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index cb05016d148c1..f2da24851b1d8 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -131,6 +131,7 @@ go_library( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", + "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", @@ -161,6 +162,7 @@ go_test( "//pkg/executor", "//pkg/expression", "//pkg/expression/sessionexpr", + "//pkg/keyspace", "//pkg/kv", "//pkg/meta", "//pkg/parser", @@ -181,9 +183,11 @@ go_test( "//pkg/util/logutil", "//pkg/util/sqlexec", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", + "@io_etcd_go_etcd_tests_v3//integration", "@org_uber_go_atomic//:atomic", #keep "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index d4d4f75d4f882..9ff0d449be6ab 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -16,6 +16,7 @@ package session import ( "context" + "crypto/tls" "fmt" "sort" "strings" @@ -23,11 +24,14 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression/sessionexpr" + "github.com/pingcap/tidb/pkg/keyspace" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/auth" @@ -38,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/table/tblsession" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/tests/v3/integration" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -2524,3 +2529,88 @@ func TestIndexJoinMultiPatternByUpgrade650To840(t *testing.T) { require.Equal(t, 1, row.Len()) require.Equal(t, int64(0), row.GetInt64(0)) } + +func TestKeyspaceEtcdNamespace(t *testing.T) { + keyspaceMeta := keyspacepb.KeyspaceMeta{} + keyspaceMeta.Id = 2 + keyspaceMeta.Name = "test_ks_name2" + makeStore(t, &keyspaceMeta, true) +} + +func TestNullKeyspaceEtcdNamespace(t *testing.T) { + makeStore(t, nil, false) +} + +func makeStore(t *testing.T, keyspaceMeta *keyspacepb.KeyspaceMeta, isHasPrefix bool) { + integration.BeforeTestExternal(t) + var store kv.Storage + var err error + if keyspaceMeta != nil { + store, err = mockstore.NewMockStore( + mockstore.WithKeyspaceMeta(keyspaceMeta), + mockstore.WithStoreType(mockstore.EmbedUnistore), + ) + } else { + store, err = mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) + } + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + // Build a mockEtcdBackend. + mockStore := &mockEtcdBackend{ + Storage: store, + pdAddrs: []string{cluster.Members[0].GRPCURL()}} + etcdClient := cluster.RandClient() + + require.NoError(t, err) + dom, err := domap.getWithEtcdClient(mockStore, etcdClient) + require.NoError(t, err) + defer dom.Close() + + checkETCDNameSpace(t, dom, isHasPrefix) +} + +func checkETCDNameSpace(t *testing.T, dom *domain.Domain, isHasPrefix bool) { + namespacePrefix := keyspace.MakeKeyspaceEtcdNamespace(dom.Store().GetCodec()) + testKeyWithoutPrefix := "/testkey" + testVal := "test" + var expectTestKey string + if isHasPrefix { + expectTestKey = namespacePrefix + testKeyWithoutPrefix + } else { + expectTestKey = testKeyWithoutPrefix + } + + // Put key value into etcd. + _, err := dom.EtcdClient().Put(context.Background(), testKeyWithoutPrefix, testVal) + require.NoError(t, err) + + // Use expectTestKey to get the key from etcd. + getResp, err := dom.UnprefixedEtcdCli().Get(context.Background(), expectTestKey) + require.NoError(t, err) + require.Equal(t, len(getResp.Kvs), 1) + + if isHasPrefix { + getResp, err = dom.UnprefixedEtcdCli().Get(context.Background(), testKeyWithoutPrefix) + require.NoError(t, err) + require.Equal(t, 0, len(getResp.Kvs)) + } +} + +type mockEtcdBackend struct { + kv.Storage + pdAddrs []string +} + +func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) { + return mebd.pdAddrs, nil +} + +func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil } + +func (mebd *mockEtcdBackend) StartGCWorker() error { return nil } diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 5b095e3a76587..eb0c98a9fa638 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/syncutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -62,6 +63,10 @@ type domainMap struct { // TODO decouple domain create from it, it's more clear to create domain explicitly // before any usage of it. func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { + return dm.getWithEtcdClient(store, nil) +} + +func (dm *domainMap) getWithEtcdClient(store kv.Storage, etcdClient *clientv3.Client) (d *domain.Domain, err error) { dm.mu.Lock() defer dm.mu.Unlock() @@ -90,7 +95,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { zap.Stringer("stats lease", statisticLease)) factory := createSessionFunc(store) sysFactory := createSessionWithDomainFunc(store) - d = domain.NewDomain(store, ddlLease, statisticLease, planReplayerGCLease, factory) + d = domain.NewDomainWithEtcdClient(store, ddlLease, statisticLease, planReplayerGCLease, factory, etcdClient) var ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker if injector, ok := store.(schematracker.StorageDDLInjector); ok { diff --git a/pkg/store/mockstore/mockcopr/executor_test.go b/pkg/store/mockstore/mockcopr/executor_test.go index 75a9f4a5abff9..211e67d0fd024 100644 --- a/pkg/store/mockstore/mockcopr/executor_test.go +++ b/pkg/store/mockstore/mockcopr/executor_test.go @@ -52,7 +52,7 @@ func TestResolvedLargeTxnLocks(t *testing.T) { tikvStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) require.NoError(t, err) - store, err := mockstorage.NewMockStorage(tikvStore) + store, err := mockstorage.NewMockStorage(tikvStore, nil) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) diff --git a/pkg/store/mockstore/mockstorage/storage.go b/pkg/store/mockstore/mockstorage/storage.go index dc20704493b80..ca70712d8e69c 100644 --- a/pkg/store/mockstore/mockstorage/storage.go +++ b/pkg/store/mockstore/mockstorage/storage.go @@ -43,16 +43,17 @@ type mockStorage struct { } // NewMockStorage wraps tikv.KVStore as kv.Storage. -func NewMockStorage(tikvStore *tikv.KVStore) (kv.Storage, error) { +func NewMockStorage(tikvStore *tikv.KVStore, keyspaceMeta *keyspacepb.KeyspaceMeta) (kv.Storage, error) { coprConfig := config.DefaultConfig().TiKVClient.CoprCache coprStore, err := copr.NewStore(tikvStore, &coprConfig) if err != nil { return nil, err } return &mockStorage{ - KVStore: tikvStore, - Store: coprStore, - memCache: kv.NewCacheDB(), + KVStore: tikvStore, + Store: coprStore, + memCache: kv.NewCacheDB(), + keyspaceMeta: keyspaceMeta, }, nil } diff --git a/pkg/store/mockstore/tikv.go b/pkg/store/mockstore/tikv.go index eb56eae38f0a4..ff8e5a12dd7fb 100644 --- a/pkg/store/mockstore/tikv.go +++ b/pkg/store/mockstore/tikv.go @@ -39,5 +39,5 @@ func newMockTikvStore(opt *mockOptions) (kv.Storage, error) { if err != nil { return nil, err } - return mockstorage.NewMockStorage(kvstore) + return mockstorage.NewMockStorage(kvstore, opt.keyspaceMeta) } diff --git a/pkg/store/mockstore/unistore.go b/pkg/store/mockstore/unistore.go index 2b4948fb2cdfb..bf68f4417b865 100644 --- a/pkg/store/mockstore/unistore.go +++ b/pkg/store/mockstore/unistore.go @@ -50,5 +50,5 @@ func newUnistore(opts *mockOptions) (kv.Storage, error) { } } - return mockstorage.NewMockStorage(kvstore) + return mockstorage.NewMockStorage(kvstore, opts.keyspaceMeta) }