Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: update cluster_id to a global variable #8615

Merged
merged 20 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
re.NoError(kgm.Bootstrap(suite.ctx))
re.NoError(suite.manager.Bootstrap())
Expand Down
19 changes: 8 additions & 11 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client

syncutil.RWMutex
// groups is the cache of keyspace group related information.
Expand All @@ -86,7 +85,6 @@ func NewKeyspaceGroupManager(
ctx context.Context,
store endpoint.KeyspaceGroupStorage,
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
groups := make(map[endpoint.UserKind]*indexedHeap)
Expand All @@ -99,15 +97,14 @@ func NewKeyspaceGroupManager(
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.initTSONodesWatcher(m.client)
m.tsoNodesWatcher.StartWatchLoop()
}
return m
Expand Down Expand Up @@ -218,8 +215,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
tsoServiceKey := keypath.TSOPath()

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -1154,7 +1151,7 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath(m.clusterID)
rootPath := keypath.TSOSvcRootPath()
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil)
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
Expand Down
13 changes: 4 additions & 9 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
package discovery

import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := ServicePath(clusterID, serviceName)
func Discover(cli *clientv3.Client, serviceName string) ([]string, error) {
key := keypath.ServicePath(serviceName)
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand All @@ -48,11 +47,7 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case constant.TSOServiceName, constant.SchedulingServiceName, constant.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, constant.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
servicePath := keypath.ServicePath(serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand Down
16 changes: 8 additions & 8 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestDiscover(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
err := sr1.Register()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
re.Equal("127.0.0.1:1", endpoints[0])
Expand All @@ -43,7 +43,7 @@ func TestDiscover(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
Expand All @@ -55,17 +55,17 @@ func TestServiceRegistryEntry(t *testing.T) {
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
Expand All @@ -78,7 +78,7 @@ func TestServiceRegistryEntry(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
41 changes: 0 additions & 41 deletions pkg/mcs/discovery/key_path.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to pkg/utils/keypath

This file was deleted.

5 changes: 3 additions & 2 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -40,9 +41,9 @@ type ServiceRegister struct {
}

// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
serviceKey := keypath.RegistryPath(serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestRegister(t *testing.T) {
etcd, cfg := servers[0], servers[0].Config()

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
sr := NewServiceRegister(context.Background(), client, "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
err := sr.Register()
re.NoError(err)
re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key)
re.Equal("/ms/0/test_service/registry/http://127.0.0.1:1", sr.key)
resp, err := client.Get(context.Background(), sr.key)
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))
Expand All @@ -51,14 +51,14 @@ func TestRegister(t *testing.T) {
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
sr.cancel()
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
fname := testutil.InitTempFileLogger("info")
Expand Down
31 changes: 16 additions & 15 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -58,10 +59,10 @@
}

// NewService creates a new meta storage service.
func NewService[T ClusterIDProvider](svr bs.Server) registry.RegistrableService {
func NewService(svr bs.Server) registry.RegistrableService {
return &Service{
ctx: svr.Context(),
manager: NewManager[T](svr),
manager: NewManager(svr),
}
}

Expand Down Expand Up @@ -115,11 +116,11 @@
if res.Err() != nil {
var resp meta_storagepb.WatchResponse
if startRevision < res.CompactRevision {
resp.Header = s.wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_DATA_COMPACTED,
resp.Header = wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_DATA_COMPACTED,

Check warning on line 119 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L119

Added line #L119 was not covered by tests
fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision %d.", startRevision, res.CompactRevision))
resp.CompactRevision = res.CompactRevision
} else {
resp.Header = s.wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_UNKNOWN,
resp.Header = wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_UNKNOWN,

Check warning on line 123 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L123

Added line #L123 was not covered by tests
fmt.Sprintf("watch channel meet other error %s.", res.Err().Error()))
}
if err := server.Send(&resp); err != nil {
Expand All @@ -146,7 +147,7 @@
}
if len(events) > 0 {
if err := server.Send(&meta_storagepb.WatchResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: res.Header.GetRevision()},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: res.Header.GetRevision()},
Events: events, CompactRevision: res.CompactRevision}); err != nil {
return err
}
Expand Down Expand Up @@ -180,10 +181,10 @@
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.GetResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.GetResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil

Check warning on line 184 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L184

Added line #L184 was not covered by tests
}
resp := &meta_storagepb.GetResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
Count: res.Count,
More: res.More,
}
Expand Down Expand Up @@ -219,11 +220,11 @@
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.PutResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.PutResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil

Check warning on line 223 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L223

Added line #L223 was not covered by tests
}

resp := &meta_storagepb.PutResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
}
if res.PrevKv != nil {
resp.PrevKv = &meta_storagepb.KeyValue{Key: res.PrevKv.Key, Value: res.PrevKv.Value}
Expand Down Expand Up @@ -251,11 +252,11 @@
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.DeleteResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.DeleteResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil

Check warning on line 255 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L255

Added line #L255 was not covered by tests
}

resp := &meta_storagepb.DeleteResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},

Check warning on line 259 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L259

Added line #L259 was not covered by tests
}
resp.PrevKvs = make([]*meta_storagepb.KeyValue, len(res.PrevKvs))
for i, kv := range res.PrevKvs {
Expand All @@ -264,16 +265,16 @@
return resp, nil
}

func (s *Service) wrapErrorAndRevision(revision int64, errorType meta_storagepb.ErrorType, message string) *meta_storagepb.ResponseHeader {
return s.errorHeader(revision, &meta_storagepb.Error{
func wrapErrorAndRevision(revision int64, errorType meta_storagepb.ErrorType, message string) *meta_storagepb.ResponseHeader {
return errorHeader(revision, &meta_storagepb.Error{

Check warning on line 269 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L268-L269

Added lines #L268 - L269 were not covered by tests
Type: errorType,
Message: message,
})
}

func (s *Service) errorHeader(revision int64, err *meta_storagepb.Error) *meta_storagepb.ResponseHeader {
func errorHeader(revision int64, err *meta_storagepb.Error) *meta_storagepb.ResponseHeader {

Check warning on line 275 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L275

Added line #L275 was not covered by tests
return &meta_storagepb.ResponseHeader{
ClusterId: s.manager.ClusterID(),
ClusterId: keypath.ClusterID(),

Check warning on line 277 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L277

Added line #L277 was not covered by tests
Revision: revision,
Error: err,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/metastorage/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("MetaStorage", ms_server.NewService[ms_server.ClusterIDProvider])
register.RegisterService("MetaStorage", ms_server.NewService)
}
Loading