From eed0557c77a1c54c6d2e7f529b979c3a57588ec2 Mon Sep 17 00:00:00 2001 From: liushao Date: Tue, 11 Jul 2023 22:14:04 +0800 Subject: [PATCH] feat: new feature for service discovery (#574) * feat: new feature for service discovery 1.Completed the code for organizing the workflow. 2.Implemented FileRegistryService based on local configuration files. * style: go imports * refactor: avoid the uncertainty of SQL field order in unit test. refactor select_for_update_executor_test.go to avoid the uncertainty of SQL field order causing test code to pass and fail sometimes. --- pkg/client/client.go | 10 +- pkg/client/config.go | 5 +- pkg/client/config_test.go | 14 ++ .../at/select_for_update_executor_test.go | 3 +- pkg/discovery/base.go | 39 ++++ pkg/discovery/config.go | 92 +++++++++ pkg/discovery/consul.go | 30 +++ pkg/discovery/etcd3.go | 30 +++ pkg/discovery/eureka.go | 30 +++ pkg/discovery/file.go | 88 +++++++++ pkg/discovery/file_test.go | 178 ++++++++++++++++++ pkg/discovery/init.go | 61 ++++++ pkg/discovery/init_test.go | 78 ++++++++ pkg/discovery/nacos.go | 30 +++ pkg/discovery/redis.go | 30 +++ pkg/discovery/sofa.go | 30 +++ pkg/discovery/zk.go | 30 +++ pkg/remoting/getty/rpc_client.go | 25 +-- pkg/tm/config.go | 16 -- testdata/conf/seatago.yml | 15 +- 20 files changed, 792 insertions(+), 42 deletions(-) create mode 100644 pkg/discovery/base.go create mode 100644 pkg/discovery/config.go create mode 100644 pkg/discovery/consul.go create mode 100644 pkg/discovery/etcd3.go create mode 100644 pkg/discovery/eureka.go create mode 100644 pkg/discovery/file.go create mode 100644 pkg/discovery/file_test.go create mode 100644 pkg/discovery/init.go create mode 100644 pkg/discovery/init_test.go create mode 100644 pkg/discovery/nacos.go create mode 100644 pkg/discovery/redis.go create mode 100644 pkg/discovery/sofa.go create mode 100644 pkg/discovery/zk.go diff --git a/pkg/client/client.go b/pkg/client/client.go index c2ee1e30e..54a56888b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,6 +23,7 @@ import ( "github.com/seata/seata-go/pkg/datasource" at "github.com/seata/seata-go/pkg/datasource/sql" "github.com/seata/seata-go/pkg/datasource/sql/exec/config" + "github.com/seata/seata-go/pkg/discovery" "github.com/seata/seata-go/pkg/integration" remoteConfig "github.com/seata/seata-go/pkg/remoting/config" "github.com/seata/seata-go/pkg/remoting/getty" @@ -41,7 +42,7 @@ func Init() { // InitPath init client with config path func InitPath(configFilePath string) { cfg := LoadPath(configFilePath) - + initRegistry(cfg) initRmClient(cfg) initTmClient(cfg) initDatasource() @@ -51,6 +52,7 @@ var ( onceInitTmClient sync.Once onceInitRmClient sync.Once onceInitDatasource sync.Once + onceInitRegistry sync.Once ) // InitTmClient init client tm client @@ -95,3 +97,9 @@ func initDatasource() { datasource.Init() }) } + +func initRegistry(cfg *Config) { + onceInitRegistry.Do(func() { + discovery.InitRegistry(&cfg.ServiceConfig, &cfg.RegistryConfig) + }) +} diff --git a/pkg/client/config.go b/pkg/client/config.go index a4a9eb772..bd84852d7 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -31,6 +31,7 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" + "github.com/seata/seata-go/pkg/discovery" "github.com/seata/seata-go/pkg/datasource/sql" "github.com/seata/seata-go/pkg/datasource/sql/undo" @@ -81,7 +82,8 @@ type Config struct { ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` GettyConfig remoteConfig.Config `yaml:"getty" json:"getty" koanf:"getty"` TransportConfig remoteConfig.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"` - ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"` + ServiceConfig discovery.ServiceConfig `yaml:"service" json:"service" koanf:"service"` + RegistryConfig discovery.RegistryConfig `yaml:"registry" json:"registry" koanf:"registry"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -98,6 +100,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.ClientConfig.RegisterFlagsWithPrefix("client", f) c.GettyConfig.RegisterFlagsWithPrefix("getty", f) c.TransportConfig.RegisterFlagsWithPrefix("transport", f) + c.RegistryConfig.RegisterFlagsWithPrefix("registry", f) c.ServiceConfig.RegisterFlagsWithPrefix("service", f) } diff --git a/pkg/client/config_test.go b/pkg/client/config_test.go index b54ae20fd..b60ae7889 100644 --- a/pkg/client/config_test.go +++ b/pkg/client/config_test.go @@ -112,6 +112,20 @@ func TestLoadPath(t *testing.T) { assert.Equal(t, "default", cfg.ServiceConfig.VgroupMapping["default_tx_group"]) assert.Equal(t, "127.0.0.1:8091", cfg.ServiceConfig.Grouplist["default"]) + assert.NotNil(t, cfg.RegistryConfig) + assert.Equal(t, "file", cfg.RegistryConfig.Type) + assert.Equal(t, "seatago.yml", cfg.RegistryConfig.File.Name) + assert.Equal(t, "seata-server", cfg.RegistryConfig.Nacos.Application) + assert.Equal(t, "127.0.0.1:8848", cfg.RegistryConfig.Nacos.ServerAddr) + assert.Equal(t, "SEATA_GROUP", cfg.RegistryConfig.Nacos.Group) + assert.Equal(t, "test-namespace", cfg.RegistryConfig.Nacos.Namespace) + assert.Equal(t, "test-username", cfg.RegistryConfig.Nacos.Username) + assert.Equal(t, "test-password", cfg.RegistryConfig.Nacos.Password) + assert.Equal(t, "test-access-key", cfg.RegistryConfig.Nacos.AccessKey) + assert.Equal(t, "test-secret-key", cfg.RegistryConfig.Nacos.SecretKey) + assert.Equal(t, "default", cfg.RegistryConfig.Etcd3.Cluster) + assert.Equal(t, "http://localhost:2379", cfg.RegistryConfig.Etcd3.ServerAddr) + // reset flag.CommandLine flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) } diff --git a/pkg/datasource/sql/exec/at/select_for_update_executor_test.go b/pkg/datasource/sql/exec/at/select_for_update_executor_test.go index 9a6f78171..707839884 100644 --- a/pkg/datasource/sql/exec/at/select_for_update_executor_test.go +++ b/pkg/datasource/sql/exec/at/select_for_update_executor_test.go @@ -44,7 +44,8 @@ func TestBuildSelectPKSQL(t *testing.T) { ctx, err := parser.DoParser(sql) metaData := types.TableMeta{ - TableName: "t_user", + TableName: "t_user", + ColumnNames: []string{"id", "order_id", "age"}, Indexs: map[string]types.IndexMeta{ "id": { IType: types.IndexTypePrimaryKey, diff --git a/pkg/discovery/base.go b/pkg/discovery/base.go new file mode 100644 index 000000000..fcc0d016d --- /dev/null +++ b/pkg/discovery/base.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +const ( + FILE string = "file" + NACOS string = "nacos" + ETCD string = "etcd" + EUREKA string = "eureka" + REDIS string = "redis" + ZK string = "zk" + CONSUL string = "consul" + SOFA string = "sofa" +) + +type ServiceInstance struct { + Addr string + Port int +} + +type RegistryService interface { + Lookup(key string) ([]*ServiceInstance, error) + Close() +} diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go new file mode 100644 index 000000000..c03c3da1f --- /dev/null +++ b/pkg/discovery/config.go @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +import ( + "flag" + + "github.com/seata/seata-go/pkg/util/flagext" +) + +type ServiceConfig struct { + VgroupMapping flagext.StringMap `yaml:"vgroup-mapping" json:"vgroup-mapping" koanf:"vgroup-mapping"` + Grouplist flagext.StringMap `yaml:"grouplist" json:"grouplist" koanf:"grouplist"` + EnableDegrade bool `yaml:"enable-degrade" json:"enable-degrade" koanf:"enable-degrade"` + DisableGlobalTransaction bool `yaml:"disable-global-transaction" json:"disable-global-transaction" koanf:"disable-global-transaction"` +} + +func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.EnableDegrade, prefix+".enable-degrade", false, "degrade current not support.") + f.BoolVar(&cfg.DisableGlobalTransaction, prefix+".disable-global-transaction", false, "disable globalTransaction.") + f.Var(&cfg.VgroupMapping, prefix+".vgroup-mapping", "The vgroup mapping.") + f.Var(&cfg.Grouplist, prefix+".grouplist", "The group list.") +} + +type RegistryConfig struct { + Type string `yaml:"type" json:"type" koanf:"type"` + File FileConfig `yaml:"file" json:"file" koanf:"file"` + Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"` + Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"` +} + +func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Type, prefix+".type", "file", "The registry type.") + cfg.File.RegisterFlagsWithPrefix(prefix+".file", f) + cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f) + cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f) +} + +type FileConfig struct { + Name string `yaml:"name" json:"name" koanf:"name"` +} + +func (cfg *FileConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Name, prefix+".name", "registry.conf", "The file name of registry.") +} + +type NacosConfig struct { + Application string `yaml:"application" json:"application" koanf:"application"` + ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"` + Group string `yaml:"group" json:"group" koanf:"group"` + Namespace string `yaml:"namespace" json:"namespace" koanf:"namespace"` + Username string `yaml:"username" json:"username" koanf:"username"` + Password string `yaml:"password" json:"password" koanf:"password"` + AccessKey string `yaml:"access-key" json:"access-key" koanf:"access-key"` + SecretKey string `yaml:"secret-key" json:"secret-key" koanf:"secret-key"` +} + +func (cfg *NacosConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Application, prefix+".application", "seata", "The application name of registry.") + f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "", "The server address of registry.") + f.StringVar(&cfg.Group, prefix+".group", "SEATA_GROUP", "The group of registry.") + f.StringVar(&cfg.Namespace, prefix+".namespace", "", "The namespace of registry.") + f.StringVar(&cfg.Username, prefix+".username", "", "The username of registry.") + f.StringVar(&cfg.Password, prefix+".password", "", "The password of registry.") + f.StringVar(&cfg.AccessKey, prefix+".access-key", "", "The access key of registry.") + f.StringVar(&cfg.SecretKey, prefix+".secret-key", "", "The secret key of registry.") +} + +type Etcd3Config struct { + Cluster string `yaml:"cluster" json:"cluster" koanf:"cluster"` + ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"` +} + +func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server address of registry.") + f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:2379", "The server address of registry.") +} diff --git a/pkg/discovery/consul.go b/pkg/discovery/consul.go new file mode 100644 index 000000000..1762e7bfb --- /dev/null +++ b/pkg/discovery/consul.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type ConsulRegistryService struct{} + +func (s *ConsulRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (s *ConsulRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go new file mode 100644 index 000000000..12fa18e72 --- /dev/null +++ b/pkg/discovery/etcd3.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type EtcdRegistryService struct{} + +func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (s *EtcdRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/eureka.go b/pkg/discovery/eureka.go new file mode 100644 index 000000000..67753ab68 --- /dev/null +++ b/pkg/discovery/eureka.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type EurekaRegistryService struct{} + +func (s *EurekaRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (s *EurekaRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/file.go b/pkg/discovery/file.go new file mode 100644 index 000000000..aa0b0bad8 --- /dev/null +++ b/pkg/discovery/file.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +import ( + "fmt" + "strconv" + "strings" + + "github.com/seata/seata-go/pkg/util/log" +) + +const ( + endPointSplitChar = ";" + ipPortSplitChar = ":" +) + +type FileRegistryService struct { + serviceConfig *ServiceConfig +} + +func newFileRegistryService(config *ServiceConfig) RegistryService { + if config == nil { + log.Fatalf("service config is nil") + panic("service config is nil") + } + return &FileRegistryService{ + serviceConfig: config, + } +} + +func (s *FileRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + var group string + if v, ok := s.serviceConfig.VgroupMapping[key]; ok { + group = v + } + if group == "" { + log.Errorf("vgroup is empty. key: %s", key) + return nil, fmt.Errorf("vgroup is empty. key: %s", key) + } + + var addrStr string + if v, ok := s.serviceConfig.Grouplist[group]; ok { + addrStr = v + } + if addrStr == "" { + log.Errorf("endpoint is empty. key: %s group: %s", group) + return nil, fmt.Errorf("endpoint is empty. key: %s group: %s", key, group) + } + + addrs := strings.Split(addrStr, endPointSplitChar) + instances := make([]*ServiceInstance, 0) + for _, addr := range addrs { + ipPort := strings.Split(addr, ipPortSplitChar) + if len(ipPort) != 2 { + return nil, fmt.Errorf("endpoint format should like ip:port. endpoint: %s", addr) + } + ip := ipPort[0] + port, err := strconv.Atoi(ipPort[1]) + if err != nil { + return nil, err + } + instances = append(instances, &ServiceInstance{ + Addr: ip, + Port: port, + }) + } + return instances, nil +} + +func (s *FileRegistryService) Close() { + +} diff --git a/pkg/discovery/file_test.go b/pkg/discovery/file_test.go new file mode 100644 index 000000000..7b3bc7b58 --- /dev/null +++ b/pkg/discovery/file_test.go @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +import ( + "reflect" + "testing" +) + +func TestFileRegistryService_Lookup(t *testing.T) { + type fields struct { + serviceConfig *ServiceConfig + } + type args struct { + key string + } + tests := []struct { + name string + fields fields + args args + want []*ServiceInstance + wantErr bool + wantErrMsg string + }{ + { + name: "normal single endpoint.", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "127.0.0.1:8091", + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "127.0.0.1", + Port: 8091, + }, + }, + wantErr: false, + }, + { + name: "normal multi endpoints.", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "127.0.0.1:8091;192.168.0.1:8092", + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "127.0.0.1", + Port: 8091, + }, + { + Addr: "192.168.0.1", + Port: 8092, + }, + }, + wantErr: false, + }, + { + name: "vgroup is empty.", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "", + }, + }, + }, + want: nil, + wantErr: true, + wantErrMsg: "vgroup is empty. key: default_tx_group", + }, + { + name: "endpoint is empty.", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + }, + }, + want: nil, + wantErr: true, + wantErrMsg: "endpoint is empty. key: default_tx_group group: default", + }, + { + name: "format is not ip:port", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "127.0.0.18091", + }, + }, + }, + want: nil, + wantErr: true, + wantErrMsg: "endpoint format should like ip:port. endpoint: 127.0.0.18091", + }, + { + name: "port is not number", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "127.0.0.1:abc", + }, + }, + }, + want: nil, + wantErr: true, + wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &FileRegistryService{ + serviceConfig: tt.fields.serviceConfig, + } + got, err := s.Lookup(tt.args.key) + if (err != nil) != tt.wantErr { + t.Errorf("Lookup() error = %v, wantErr = %v", err, tt.wantErr) + } + if tt.wantErr && err.Error() != tt.wantErrMsg { + t.Errorf("Lookup() errMsg = %v, wantErrMsg = %v", err.Error(), tt.wantErrMsg) + } + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("Lookup() got = %v, want = %v", got, tt.want) + } + }) + } +} diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go new file mode 100644 index 000000000..c3afe27cc --- /dev/null +++ b/pkg/discovery/init.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +import ( + "fmt" +) + +var ( + registryServiceInstance RegistryService +) + +func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig) { + var registryService RegistryService + var err error + switch registryConfig.Type { + case FILE: + //init file registry + registryService = newFileRegistryService(serviceConfig) + case ETCD: + //TODO: init etcd registry + case NACOS: + //TODO: init nacos registry + case EUREKA: + //TODO: init eureka registry + case REDIS: + //TODO: init redis registry + case ZK: + //TODO: init zk registry + case CONSUL: + //TODO: init consul registry + case SOFA: + //TODO: init sofa registry + default: + err = fmt.Errorf("service registry not support registry type:%s", registryConfig.Type) + } + + if err != nil { + panic(fmt.Errorf("init service registry err:%v", err)) + } + registryServiceInstance = registryService +} + +func GetRegistry() RegistryService { + return registryServiceInstance +} diff --git a/pkg/discovery/init_test.go b/pkg/discovery/init_test.go new file mode 100644 index 000000000..0cfce0e4d --- /dev/null +++ b/pkg/discovery/init_test.go @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +import ( + "reflect" + "testing" +) + +func TestInitRegistry(t *testing.T) { + type args struct { + serviceConfig *ServiceConfig + registryConfig *RegistryConfig + } + tests := []struct { + name string + args args + hasPanic bool + expectedType string + }{ + { + name: "normal", + args: args{ + registryConfig: &RegistryConfig{ + Type: FILE, + }, + serviceConfig: &ServiceConfig{}, + }, + expectedType: "FileRegistryService", + }, + { + name: "unknown type", + args: args{ + registryConfig: &RegistryConfig{ + Type: "unknown", + }, + serviceConfig: &ServiceConfig{}, + }, + hasPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + if r := recover(); r != nil { + if !tt.hasPanic { + t.Errorf("panic is not expected!") + } + } else if tt.hasPanic { + t.Errorf("Expected a panic but did not receive one") + } + }() + InitRegistry(tt.args.serviceConfig, tt.args.registryConfig) + instance := GetRegistry() + if !tt.hasPanic { + actualType := reflect.TypeOf(instance).Elem().Name() + if actualType != tt.expectedType { + t.Errorf("type = %v, want %v", actualType, tt.expectedType) + } + } + }) + } +} diff --git a/pkg/discovery/nacos.go b/pkg/discovery/nacos.go new file mode 100644 index 000000000..5455ea287 --- /dev/null +++ b/pkg/discovery/nacos.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type NacosRegistryService struct{} + +func (s *NacosRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (NacosRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go new file mode 100644 index 000000000..61adcc628 --- /dev/null +++ b/pkg/discovery/redis.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type RedisRegistryService struct{} + +func (s *RedisRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (RedisRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/sofa.go b/pkg/discovery/sofa.go new file mode 100644 index 000000000..5be223edc --- /dev/null +++ b/pkg/discovery/sofa.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type SofaRegistryService struct{} + +func (s *SofaRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (s *SofaRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/discovery/zk.go b/pkg/discovery/zk.go new file mode 100644 index 000000000..3f942bfd9 --- /dev/null +++ b/pkg/discovery/zk.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package discovery + +type ZkRegistryService struct{} + +func (s *ZkRegistryService) Lookup(key string) ([]*ServiceInstance, error) { + //TODO implement me + panic("implement me") +} + +func (s *ZkRegistryService) Close() { + //TODO implement me + panic("implement me") +} diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go index 12f8a4403..601064ef4 100644 --- a/pkg/remoting/getty/rpc_client.go +++ b/pkg/remoting/getty/rpc_client.go @@ -21,12 +21,11 @@ import ( "crypto/tls" "fmt" "net" - "strings" "sync" getty "github.com/apache/dubbo-getty" gxsync "github.com/dubbogo/gost/sync" - + "github.com/seata/seata-go/pkg/discovery" "github.com/seata/seata-go/pkg/protocol/codec" "github.com/seata/seata-go/pkg/remoting/config" "github.com/seata/seata-go/pkg/util/log" @@ -57,7 +56,7 @@ func (c *RpcClient) init() { } for _, address := range addressList { gettyClient := getty.NewTCPClient( - getty.WithServerAddress(address), + getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)), // todo if read c.gettyConf.ConnectionNum, will cause the connect to fail getty.WithConnectionNumber(1), getty.WithReconnectInterval(c.gettyConf.ReconnectInterval), @@ -68,21 +67,13 @@ func (c *RpcClient) init() { } } -func (c *RpcClient) getAvailServerList() []string { - defaultAddressList := []string{"127.0.0.1:8091"} - txServiceGroup := c.seataConf.TxServiceGroup - if txServiceGroup == "" { - return defaultAddressList - } - clusterName := c.seataConf.ServiceVgroupMapping[txServiceGroup] - if clusterName == "" { - return defaultAddressList - } - grouplist := c.seataConf.ServiceGrouplist[clusterName] - if grouplist == "" { - return defaultAddressList +func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance { + registryService := discovery.GetRegistry() + instances, err := registryService.Lookup(c.seataConf.TxServiceGroup) + if err != nil { + return nil } - return strings.Split(grouplist, ",") + return instances } func (c *RpcClient) newSession(session getty.Session) error { diff --git a/pkg/tm/config.go b/pkg/tm/config.go index 70fb62695..b940910c3 100644 --- a/pkg/tm/config.go +++ b/pkg/tm/config.go @@ -20,8 +20,6 @@ package tm import ( "flag" "time" - - "github.com/seata/seata-go/pkg/util/flagext" ) type TmConfig struct { @@ -43,17 +41,3 @@ func (cfg *TmConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.DegradeCheckAllowTimes, prefix+".degrade-check-allow-times", 10*time.Second, "The duration allowed for degrade checking.") f.IntVar(&cfg.InterceptorOrder, prefix+".interceptor-order", -2147482648, "The order of interceptor.") } - -type ServiceConfig struct { - VgroupMapping flagext.StringMap `yaml:"vgroup-mapping" json:"vgroup-mapping" koanf:"vgroup-mapping"` - Grouplist flagext.StringMap `yaml:"grouplist" json:"grouplist" koanf:"grouplist"` - EnableDegrade bool `yaml:"enable-degrade" json:"enable-degrade" koanf:"enable-degrade"` - DisableGlobalTransaction bool `yaml:"disable-global-transaction" json:"disable-global-transaction" koanf:"disable-global-transaction"` -} - -func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.BoolVar(&cfg.EnableDegrade, prefix+".enable-degrade", false, "degrade current not support.") - f.BoolVar(&cfg.DisableGlobalTransaction, prefix+".disable-global-transaction", false, "disable globalTransaction.") - f.Var(&cfg.VgroupMapping, prefix+".vgroup-mapping", "The vgroup mapping.") - f.Var(&cfg.Grouplist, prefix+".grouplist", "The group list.") -} diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml index 6b82536bb..de09102b5 100644 --- a/testdata/conf/seatago.yml +++ b/testdata/conf/seatago.yml @@ -122,17 +122,20 @@ seata: registry: type: file file: - name: registry.conf + name: seatago.yml nacos: application: seata-server server-addr: 127.0.0.1:8848 group: "SEATA_GROUP" - namespace: "" - username: "" - password: "" + namespace: "test-namespace" + username: "test-username" + password: "test-password" ##if use MSE Nacos with auth, mutex with username/password attribute # - #access-key: "" # - #secret-key: "" # + access-key: "test-access-key" # + secret-key: "test-secret-key" # + etcd3: + cluster: "default" + server-addr: "http://localhost:2379" log: exception-rate: 100 tcc: