Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr : nacos config #357

Merged
merged 15 commits into from
Feb 13, 2020
218 changes: 218 additions & 0 deletions config_center/nacos/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package nacos

import (
"strconv"
"strings"
"sync"
"time"
)

import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)

const logDir = "logs/nacos/log"

// NacosClient Nacos client
type NacosClient struct {
name string
NacosAddrs []string
sync.Mutex // for Client
client *config_client.IConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
}

// Client Get Client
func (n *NacosClient) Client() *config_client.IConfigClient {
return n.client
}

// SetClient Set client
func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
n.Lock()
n.client = client
n.Unlock()
}

type option func(*options)

type options struct {
nacosName string
client *NacosClient
}

// WithNacosName Set nacos name
func WithNacosName(name string) option {
return func(opt *options) {
opt.nacosName = name
}
}

// ValidateNacosClient Validate nacos client , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
zouyx marked this conversation as resolved.
Show resolved Hide resolved
var (
err error
)
os := &options{}
for _, opt := range opts {
opt(os)
}

err = nil

url := container.GetUrl()

if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
nacosAddresses := strings.Split(url.Location, ",")
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout)
if err != nil {
logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.SetNacosClient(newClient)
}

if container.NacosClient().Client() == nil {
svrConfList := []nacosconst.ServerConfig{}
for _, nacosAddr := range container.NacosClient().NacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
logger.Warnf("nacos addr port parse error ,error message is %v", err)
continue
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}

client, err := clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(container.NacosClient().Timeout / time.Millisecond)),
ListenInterval: 10000,
NotLoadCacheAtStart: true,
LogDir: logDir,
},
})

container.NacosClient().SetClient(&client)
if err != nil {
logger.Errorf("nacos create config client error:%v", err)
}
}

return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.PrimitiveURL)
}

func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) {
var (
err error
n *NacosClient
)

n = &NacosClient{
name: name,
NacosAddrs: nacosAddrs,
Timeout: timeout,
exit: make(chan struct{}),
onceClose: func() {
close(n.exit)
},
}

svrConfList := []nacosconst.ServerConfig{}
for _, nacosAddr := range n.NacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
continue
zouyx marked this conversation as resolved.
Show resolved Hide resolved
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}
client, err := clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(timeout / time.Millisecond),
ListenInterval: 20000,
NotLoadCacheAtStart: true,
LogDir: logDir,
},
})
n.SetClient(&client)
if err != nil {
return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs)
}

return n, nil
}

// Done Get nacos client exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}

func (n *NacosClient) stop() bool {
select {
case <-n.exit:
return true
default:
n.once.Do(n.onceClose)
}

return false
}

// NacosClientValid Get nacos client valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
return false
default:
}

valid := true
n.Lock()
if n.Client() == nil {
valid = false
}
n.Unlock()

return valid
}

// Close Close nacos client , then set null
func (n *NacosClient) Close() {
if n == nil {
return
}

n.stop()
n.SetClient(nil)
logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs)
}
36 changes: 36 additions & 0 deletions config_center/nacos/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package nacos

import (
"fmt"
"strings"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
)

func Test_newNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := strings.ReplaceAll(server.URL, "http", "registry")
registryUrl, _ := common.NewURL(nacosURL)
c := &nacosDynamicConfiguration{
url: &registryUrl,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
if err != nil {
fmt.Println("nacos client start error ,error message is", err)
zouyx marked this conversation as resolved.
Show resolved Hide resolved
}
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
c.client.Close()
<-c.client.Done()
fmt.Println("nacos client close done")
zouyx marked this conversation as resolved.
Show resolved Hide resolved
c.Destroy()
}
97 changes: 97 additions & 0 deletions config_center/nacos/facade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nacos

import (
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)

const (
connDelay = 3
maxFailTimes = 15
)

type nacosClientFacade interface {
NacosClient() *NacosClient
SetNacosClient(*NacosClient)
// WaitGroup for wait group control, zk client listener & zk client container
WaitGroup() *sync.WaitGroup
// GetDone For nacos client control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}

func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}

// HandleClientRestart Restart client handler
func HandleClientRestart(r nacosClientFacade) {
var (
err error

failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.NacosClient().Done():
r.NacosClient().Close()
nacosName := r.NacosClient().name
nacosAddress := r.NacosClient().NacosAddrs
r.SetNacosClient(nil)

// Connect nacos until success.
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos.
}
err = ValidateNacosClient(r, WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
nacosAddress, perrors.WithStack(err))
if err == nil {
break
}
failTimes++
if maxFailTimes <= failTimes {
failTimes = maxFailTimes
}
}
}
}
}
43 changes: 43 additions & 0 deletions config_center/nacos/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nacos

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)

func init() {
extension.SetConfigCenterFactory("nacos", func() config_center.DynamicConfigurationFactory { return &nacosDynamicConfigurationFactory{} })
zouyx marked this conversation as resolved.
Show resolved Hide resolved
}

type nacosDynamicConfigurationFactory struct {
}

// GetDynamicConfiguration Get Configuration with URL
func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
dynamicConfiguration, err := newNacosDynamicConfiguration(url)
if err != nil {
return nil, err
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err

}
Loading