Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: add keyspace etcd namespace UT #58081

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,9 @@ func (d *ddl) close() {
// when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager,
// when run with uni-store BreakCampaignLoop is same as Close.
// hope we can unify it after refactor to let some components only start once.
d.ownerManager.BreakCampaignLoop()
if d.ownerManager != nil {
d.ownerManager.BreakCampaignLoop()
}
d.schemaVerSyncer.Close()

// d.delRangeMgr using sessions from d.sessPool.
Expand Down
11 changes: 11 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ func (do *Domain) EtcdClient() *clientv3.Client {
return do.etcdClient
}

// UnprefixedEtcdCli export for test.
func (do *Domain) UnprefixedEtcdCli() *clientv3.Client {
return do.unprefixedEtcdCli
}

// loadInfoSchema loads infoschema at startTS.
// It returns:
// 1. the needed infoschema
Expand Down Expand Up @@ -1280,6 +1285,11 @@ const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool wil

// NewDomain creates a new domain. Should not create multiple domains for the same store.
func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain {
return NewDomainWithEtcdClient(store, schemaLease, statsLease, dumpFileGcLease, factory, nil)
}

// NewDomainWithEtcdClient creates a new domain with etcd client. Should not create multiple domains for the same store.
func NewDomainWithEtcdClient(store kv.Storage, schemaLease time.Duration, statsLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, etcdClient *clientv3.Client) *Domain {
intest.Assert(schemaLease > 0, "schema lease should be a positive duration")
capacity := 200 // capacity of the sysSessionPool size
do := &Domain{
Expand Down Expand Up @@ -1324,6 +1334,7 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura
do.sysProcesses = SysProcesses{mu: &sync.RWMutex{}, procMap: make(map[uint64]sysproctrack.TrackProc)}
do.initDomainSysVars()
do.expiredTimeStamp4PC.expiredTimeStamp = types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp)
do.etcdClient = etcdClient
return do
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ go_library(
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -161,6 +162,7 @@ go_test(
"//pkg/executor",
"//pkg/expression",
"//pkg/expression/sessionexpr",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser",
Expand All @@ -181,9 +183,11 @@ go_test(
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_atomic//:atomic", #keep
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
90 changes: 90 additions & 0 deletions pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ package session

import (
"context"
"crypto/tls"
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression/sessionexpr"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/auth"
Expand All @@ -38,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/table/tblsession"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -2524,3 +2529,88 @@ func TestIndexJoinMultiPatternByUpgrade650To840(t *testing.T) {
require.Equal(t, 1, row.Len())
require.Equal(t, int64(0), row.GetInt64(0))
}

func TestKeyspaceEtcdNamespace(t *testing.T) {
keyspaceMeta := keyspacepb.KeyspaceMeta{}
keyspaceMeta.Id = 2
keyspaceMeta.Name = "test_ks_name2"
makeStore(t, &keyspaceMeta, true)
}

func TestNullKeyspaceEtcdNamespace(t *testing.T) {
makeStore(t, nil, false)
}

func makeStore(t *testing.T, keyspaceMeta *keyspacepb.KeyspaceMeta, isHasPrefix bool) {
integration.BeforeTestExternal(t)
var store kv.Storage
var err error
if keyspaceMeta != nil {
store, err = mockstore.NewMockStore(
mockstore.WithKeyspaceMeta(keyspaceMeta),
mockstore.WithStoreType(mockstore.EmbedUnistore),
)
} else {
store, err = mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore))
}
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

// Build a mockEtcdBackend.
mockStore := &mockEtcdBackend{
Storage: store,
pdAddrs: []string{cluster.Members[0].GRPCURL()}}
etcdClient := cluster.RandClient()

require.NoError(t, err)
dom, err := domap.getWithEtcdClient(mockStore, etcdClient)
require.NoError(t, err)
defer dom.Close()

checkETCDNameSpace(t, dom, isHasPrefix)
}

func checkETCDNameSpace(t *testing.T, dom *domain.Domain, isHasPrefix bool) {
namespacePrefix := keyspace.MakeKeyspaceEtcdNamespace(dom.Store().GetCodec())
testKeyWithoutPrefix := "/testkey"
testVal := "test"
var expectTestKey string
if isHasPrefix {
expectTestKey = namespacePrefix + testKeyWithoutPrefix
} else {
expectTestKey = testKeyWithoutPrefix
}

// Put key value into etcd.
_, err := dom.EtcdClient().Put(context.Background(), testKeyWithoutPrefix, testVal)
require.NoError(t, err)

// Use expectTestKey to get the key from etcd.
getResp, err := dom.UnprefixedEtcdCli().Get(context.Background(), expectTestKey)
require.NoError(t, err)
require.Equal(t, len(getResp.Kvs), 1)

if isHasPrefix {
getResp, err = dom.UnprefixedEtcdCli().Get(context.Background(), testKeyWithoutPrefix)
require.NoError(t, err)
require.Equal(t, 0, len(getResp.Kvs))
}
}

type mockEtcdBackend struct {
kv.Storage
pdAddrs []string
}

func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) {
return mebd.pdAddrs, nil
}

func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil }

func (mebd *mockEtcdBackend) StartGCWorker() error { return nil }
7 changes: 6 additions & 1 deletion pkg/session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/syncutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand All @@ -62,6 +63,10 @@ type domainMap struct {
// TODO decouple domain create from it, it's more clear to create domain explicitly
// before any usage of it.
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
return dm.getWithEtcdClient(store, nil)
}

func (dm *domainMap) getWithEtcdClient(store kv.Storage, etcdClient *clientv3.Client) (d *domain.Domain, err error) {
dm.mu.Lock()
defer dm.mu.Unlock()

Expand Down Expand Up @@ -90,7 +95,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
zap.Stringer("stats lease", statisticLease))
factory := createSessionFunc(store)
sysFactory := createSessionWithDomainFunc(store)
d = domain.NewDomain(store, ddlLease, statisticLease, planReplayerGCLease, factory)
d = domain.NewDomainWithEtcdClient(store, ddlLease, statisticLease, planReplayerGCLease, factory, etcdClient)

var ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker
if injector, ok := store.(schematracker.StorageDDLInjector); ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/mockstore/mockcopr/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestResolvedLargeTxnLocks(t *testing.T) {
tikvStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
require.NoError(t, err)

store, err := mockstorage.NewMockStorage(tikvStore)
store, err := mockstorage.NewMockStorage(tikvStore, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/mockstore/mockstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ type mockStorage struct {
}

// NewMockStorage wraps tikv.KVStore as kv.Storage.
func NewMockStorage(tikvStore *tikv.KVStore) (kv.Storage, error) {
func NewMockStorage(tikvStore *tikv.KVStore, keyspaceMeta *keyspacepb.KeyspaceMeta) (kv.Storage, error) {
coprConfig := config.DefaultConfig().TiKVClient.CoprCache
coprStore, err := copr.NewStore(tikvStore, &coprConfig)
if err != nil {
return nil, err
}
return &mockStorage{
KVStore: tikvStore,
Store: coprStore,
memCache: kv.NewCacheDB(),
KVStore: tikvStore,
Store: coprStore,
memCache: kv.NewCacheDB(),
keyspaceMeta: keyspaceMeta,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/mockstore/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ func newMockTikvStore(opt *mockOptions) (kv.Storage, error) {
if err != nil {
return nil, err
}
return mockstorage.NewMockStorage(kvstore)
return mockstorage.NewMockStorage(kvstore, opt.keyspaceMeta)
}
2 changes: 1 addition & 1 deletion pkg/store/mockstore/unistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func newUnistore(opts *mockOptions) (kv.Storage, error) {
}
}

return mockstorage.NewMockStorage(kvstore)
return mockstorage.NewMockStorage(kvstore, opts.keyspaceMeta)
}