diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go new file mode 100644 index 0000000000..d87ee74dda --- /dev/null +++ b/config_center/nacos/client.go @@ -0,0 +1,217 @@ +package nacos + +import ( + "strconv" + "strings" + "sync" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/config_client" + nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const logDir = "logs/nacos/log" + +// NacosClient Nacos client +type NacosClient struct { + name string + NacosAddrs []string + sync.Mutex // for Client + client *config_client.IConfigClient + exit chan struct{} + Timeout time.Duration + once sync.Once + onceClose func() +} + +// Client Get Client +func (n *NacosClient) Client() *config_client.IConfigClient { + return n.client +} + +// SetClient Set client +func (n *NacosClient) SetClient(client *config_client.IConfigClient) { + n.Lock() + n.client = client + n.Unlock() +} + +type option func(*options) + +type options struct { + nacosName string + client *NacosClient +} + +// WithNacosName Set nacos name +func WithNacosName(name string) option { + return func(opt *options) { + opt.nacosName = name + } +} + +// ValidateNacosClient Validate nacos client , if null then create it +func ValidateNacosClient(container nacosClientFacade, opts ...option) error { + if container == nil { + return perrors.Errorf("container can not be null") + } + os := &options{} + for _, opt := range opts { + opt(os) + } + + url := container.GetUrl() + + if container.NacosClient() == nil { + //in dubbo ,every registry only connect one node ,so this is []string{r.Address} + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + nacosAddresses := strings.Split(url.Location, ",") + newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout) + if err != nil { + logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", + os.nacosName, url.Location, timeout.String(), err) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + container.SetNacosClient(newClient) + } + + if container.NacosClient().Client() == nil { + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range container.NacosClient().NacosAddrs { + split := strings.Split(nacosAddr, ":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + logger.Warnf("nacos addr port parse error ,error message is %v", err) + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + + client, err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(int32(container.NacosClient().Timeout / time.Millisecond)), + ListenInterval: 10000, + NotLoadCacheAtStart: true, + LogDir: logDir, + }, + }) + + container.NacosClient().SetClient(&client) + if err != nil { + logger.Errorf("nacos create config client error:%v", err) + } + } + + return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL) +} + +func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { + var ( + err error + n *NacosClient + ) + + n = &NacosClient{ + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), + onceClose: func() { + close(n.exit) + }, + } + + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range n.NacosAddrs { + split := strings.Split(nacosAddr, ":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + logger.Warnf("convert port , source:%s , error:%v ", split[1], err) + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + client, err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(timeout / time.Millisecond), + ListenInterval: 20000, + NotLoadCacheAtStart: true, + LogDir: logDir, + }, + }) + n.SetClient(&client) + if err != nil { + return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) + } + + return n, nil +} + +// Done Get nacos client exit signal +func (n *NacosClient) Done() <-chan struct{} { + return n.exit +} + +func (n *NacosClient) stop() bool { + select { + case <-n.exit: + return true + default: + n.once.Do(n.onceClose) + } + + return false +} + +// NacosClientValid Get nacos client valid status +func (n *NacosClient) NacosClientValid() bool { + select { + case <-n.exit: + return false + default: + } + + valid := true + n.Lock() + if n.Client() == nil { + valid = false + } + n.Unlock() + + return valid +} + +// Close Close nacos client , then set null +func (n *NacosClient) Close() { + if n == nil { + return + } + + n.stop() + n.SetClient(nil) + logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs) +} diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go new file mode 100644 index 0000000000..47a05dec66 --- /dev/null +++ b/config_center/nacos/client_test.go @@ -0,0 +1,31 @@ +package nacos + +import ( + "strings" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +func Test_newNacosClient(t *testing.T) { + server := mockCommonNacosServer() + nacosURL := strings.ReplaceAll(server.URL, "http", "registry") + registryUrl, _ := common.NewURL(nacosURL) + c := &nacosDynamicConfiguration{ + url: ®istryUrl, + done: make(chan struct{}), + } + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + assert.NoError(t, err) + c.wg.Add(1) + go HandleClientRestart(c) + c.client.Close() + <-c.client.Done() + c.Destroy() +} diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go new file mode 100644 index 0000000000..fc83e14eac --- /dev/null +++ b/config_center/nacos/facade.go @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" + "time" +) +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + connDelay = 3 + maxFailTimes = 15 +) + +type nacosClientFacade interface { + NacosClient() *NacosClient + SetNacosClient(*NacosClient) + // WaitGroup for wait group control, zk client listener & zk client container + WaitGroup() *sync.WaitGroup + // GetDone For nacos client control RestartCallBack() bool + GetDone() chan struct{} + common.Node +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + +// HandleClientRestart Restart client handler +func HandleClientRestart(r nacosClientFacade) { + var ( + err error + + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.NacosClient().Done(): + r.NacosClient().Close() + nacosName := r.NacosClient().name + nacosAddress := r.NacosClient().NacosAddrs + r.SetNacosClient(nil) + + // Connect nacos until success. + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos. + } + err = ValidateNacosClient(r, WithNacosName(nacosName)) + logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", + nacosAddress, perrors.WithStack(err)) + if err == nil { + break + } + failTimes++ + if maxFailTimes <= failTimes { + failTimes = maxFailTimes + } + } + } + } +} diff --git a/config_center/nacos/factory.go b/config_center/nacos/factory.go new file mode 100644 index 0000000000..3de91ea013 --- /dev/null +++ b/config_center/nacos/factory.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func init() { + extension.SetConfigCenterFactory("nacos", func() config_center.DynamicConfigurationFactory { return &nacosDynamicConfigurationFactory{} }) +} + +type nacosDynamicConfigurationFactory struct { +} + +// GetDynamicConfiguration Get Configuration with URL +func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { + dynamicConfiguration, err := newNacosDynamicConfiguration(url) + if err != nil { + return nil, err + } + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + return dynamicConfiguration, err + +} diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go new file mode 100644 index 0000000000..60ab89b003 --- /dev/null +++ b/config_center/nacos/impl.go @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +const nacosClientName = "nacos config_center" + +type nacosDynamicConfiguration struct { + url *common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *NacosClient + keyListeners sync.Map + parser parser.ConfigurationParser +} + +func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { + c := &nacosDynamicConfiguration{ + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + url: url, + done: make(chan struct{}), + } + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + if err != nil { + logger.Errorf("nacos client start error ,error message is %v", err) + return nil, err + } + c.wg.Add(1) + go HandleClientRestart(c) + return c, err + +} + +// AddListener Add listener +func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.addListener(key, listener) +} + +// RemoveListener Remove listener +func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.removeListener(key, listener) +} + +//nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key +func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + return n.GetRule(key, opts...) +} + +// GetInternalProperty Get properties value by key +func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} + +// GetRule Get router rule +func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{ + DataId: key, + Group: tmpOpts.Group, + }) + if err != nil { + return "", perrors.WithStack(err) + } else { + return string(content), nil + } +} + +// Parser Get Parser +func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser { + return n.parser +} + +// SetParser Set Parser +func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) { + n.parser = p +} + +// NacosClient Get Nacos Client +func (n *nacosDynamicConfiguration) NacosClient() *NacosClient { + return n.client +} + +// SetNacosClient Set Nacos Client +func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) { + n.cltLock.Lock() + n.client = client + n.cltLock.Unlock() +} + +// WaitGroup for wait group control, zk client listener & zk client container +func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &n.wg +} + +// GetDone For nacos client control RestartCallBack() bool +func (n *nacosDynamicConfiguration) GetDone() chan struct{} { + return n.done +} + +// GetUrl Get Url +func (n *nacosDynamicConfiguration) GetUrl() common.URL { + return *n.url +} + +// Destroy Destroy configuration instance +func (n *nacosDynamicConfiguration) Destroy() { + close(n.done) + n.wg.Wait() + n.closeConfigs() +} + +// IsAvailable Get available status +func (n *nacosDynamicConfiguration) IsAvailable() bool { + select { + case <-n.done: + return false + default: + return true + } +} + +func (r *nacosDynamicConfiguration) closeConfigs() { + r.cltLock.Lock() + client := r.client + r.client = nil + r.cltLock.Unlock() + // Close the old client first to close the tmp node + client.Close() + logger.Infof("begin to close provider nacos client") +} diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go new file mode 100644 index 0000000000..07bf8638d2 --- /dev/null +++ b/config_center/nacos/impl_test.go @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nacos + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +// run mock config server +func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request), + configListenHandler func(http.ResponseWriter, *http.Request)) *httptest.Server { + uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0) + + uriHandlerMap["/nacos/v1/cs/configs"] = configHandler + uriHandlerMap["/nacos/v1/cs/configs/listener"] = configListenHandler + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + uri := r.RequestURI + for path, handler := range uriHandlerMap { + if uri == path { + handler(w, r) + break + } + } + })) + + return ts +} + +func mockCommonNacosServer() *httptest.Server { + return runMockConfigServer(func(writer http.ResponseWriter, request *http.Request) { + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + fmt.Fprintf(writer, "%s", data) + }, func(writer http.ResponseWriter, request *http.Request) { + data := `dubbo.properties%02dubbo%02dubbo.service.com.ikurento.user.UserProvider.cluster=failback` + fmt.Fprintf(writer, "%s", data) + }) +} + +func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { + server := mockCommonNacosServer() + nacosURL := strings.ReplaceAll(server.URL, "http", "registry") + regurl, _ := common.NewURL(nacosURL) + nacosConfiguration, err := newNacosDynamicConfiguration(®url) + assert.NoError(t, err) + + nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + + return nacosConfiguration, err +} + +func Test_GetConfig(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo")) + _, err = nacos.Parser().Parse(configs) + assert.NoError(t, err) +} + +func Test_AddListener(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + listener := &mockDataListener{} + time.Sleep(time.Second * 2) + nacos.AddListener("dubbo.properties", listener) + listener.wg.Add(1) + listener.wg.Wait() +} + +func Test_RemoveListener(t *testing.T) { + //TODO not supported in current go_nacos_sdk version +} + +type mockDataListener struct { + wg sync.WaitGroup + event string +} + +func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) { + l.wg.Done() + l.event = configType.Key +} diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go new file mode 100644 index 0000000000..25c586586c --- /dev/null +++ b/config_center/nacos/listener.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "context" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" +) + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +func callback(listener config_center.ConfigurationListener, namespace, group, dataId, data string) { + listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) +} + +func (l *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { + _, loaded := l.keyListeners.Load(key) + if !loaded { + _, cancel := context.WithCancel(context.Background()) + err := (*l.client.Client()).ListenConfig(vo.ConfigParam{ + DataId: key, + Group: "dubbo", + OnChange: func(namespace, group, dataId, data string) { + go callback(listener, namespace, group, dataId, data) + }, + }) + logger.Errorf("nacos : listen config fail, error:%v ", err) + newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) + newListener[listener] = cancel + l.keyListeners.Store(key, newListener) + } else { + // TODO check goroutine alive, but this version of go_nacos_sdk is not support. + logger.Infof("profile:%s. this profile is already listening", key) + } +} + +func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { + // TODO: not supported in current go_nacos_sdk version + logger.Warn("not supported in current go_nacos_sdk version") +}