Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: etcdv3 for registry #148

Merged
merged 39 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7a34fb2
ETCD tmp complete
Jul 19, 2019
549f4ae
merge && add etcdv3 remote pkg
Jul 23, 2019
c06c8a2
etcdv3 basic complete
Jul 24, 2019
14b198d
ADD etcdv3 basic complete
Jul 24, 2019
07c1d31
reset default config
Jul 24, 2019
61e5ec5
Fix pkg name
Jul 24, 2019
27809f3
Basic function comleted, wait for refactor
Jul 28, 2019
7defc21
Refactor remote registry
Jul 30, 2019
13acf67
ADD etcdv3 registry for dubbo
Jul 31, 2019
898fce9
ADD RACE for etcd test
Jul 31, 2019
0cdbd1d
ADD star&&stop etcd for ut
Jul 31, 2019
b40d56d
ADD etcdctl for ut
Jul 31, 2019
8fa6f5b
Fix etcd work-dir
Jul 31, 2019
d071e25
Fix etcd start && stop
Jul 31, 2019
09a4d07
FMT registry/etcdv3 remoting/v3
Jul 31, 2019
cb529f6
Fix etcd start cmd
Jul 31, 2019
8178ee4
wait for etcd start
Jul 31, 2019
60a490b
wait for etcd start
Jul 31, 2019
13e218d
ADD TestMain for ut
Jul 31, 2019
0e48540
Fix UT framework
Aug 1, 2019
bb4c0d1
Finish etcd registry
Aug 1, 2019
cabd9fc
Fix etcd remoting etcd endpionts
Aug 1, 2019
4576e92
ADD remoting/etcdv3 && registry/etcdv3
Aug 1, 2019
f6e6268
fmt project
Aug 3, 2019
4d3233c
Update go.sum to Fix github.com/coreos/[email protected]+incompatible: che…
Aug 3, 2019
97d0977
Fix etcd dep
Aug 3, 2019
a836ecf
Fix etcd dependency
Aug 3, 2019
f69675e
Fix etcd dependency
Aug 3, 2019
ca5a700
Fix depend with go.etcd.io
Aug 3, 2019
8444dc4
Fix GOPROXY cause dependency mistake
Aug 3, 2019
e574713
Fix export GOPROXY= && retidy
invalid-email-address Aug 3, 2019
21f7c5f
DEL go1.11 support
Aug 3, 2019
dff5a0a
Fix fmt and remove use juju/errors
Aug 10, 2019
86a197a
Fix comflict in go.mod && fmt project
Aug 10, 2019
b0f3e19
Fix deadlock && len(string) ==0 && rename errors-> perrors
Aug 10, 2019
e9a168b
Fix etcd event log format
invalid-email-address Aug 12, 2019
069d5fe
Fix rename facede_test.go facade_test.go
invalid-email-address Aug 12, 2019
1e91d2a
Fix go.sum conflict with develop branch
Aug 12, 2019
48e3abf
Fix merge from local branch
Aug 12, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,44 @@ require (
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.3.0
github.com/tebeka/strftime v0.1.3 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)
139 changes: 133 additions & 6 deletions go.sum

Large diffs are not rendered by default.

88 changes: 88 additions & 0 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package etcdv3

import (
"context"
"strings"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)

type dataListener struct {
interestedURL []*common.URL
listener remoting.ConfigurationListener
}

func NewRegistryDataListener(listener remoting.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}

func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

func (l *dataListener) DataChange(eventType remoting.Event) bool {

url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.Background(), url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action})
return true
}
}

return false
}

type configurationListener struct {
registry *etcdV3Registry
events chan *remoting.ConfigChangeEvent
}

func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.wg.Add(1)
return &configurationListener{registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}

func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
71 changes: 71 additions & 0 deletions registry/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package etcdv3

import (
"context"
"testing"
"time"
)

import (
"github.com/dubbogo/getty"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
)

type RegistryTestSuite struct {
suite.Suite
etcd *embed.Etcd
}

// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {

t := suite.T()

cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-getty.GetTimeWheel().After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}

suite.etcd = e
return
}

// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
}

func (suite *RegistryTestSuite) TestDataChange() {

t := suite.T()

listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.Background(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
}
}

func TestRegistrySuite(t *testing.T) {
suite.Run(t, &RegistryTestSuite{})
}

type MockDataListener struct{}

sxllwx marked this conversation as resolved.
Show resolved Hide resolved
func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {}
Loading