Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mod: modidfy Refer params and add licence #317

Merged
merged 2 commits into from
Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
}

func (refconfig *ReferenceConfig) Refer(impl interface{}) {
url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
url := common.NewURLWithOptions(common.WithPath(refconfig.id),
common.WithProtocol(refconfig.Protocol),
common.WithParams(refconfig.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, refconfig.id),
)

//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
Expand Down Expand Up @@ -123,12 +127,12 @@ func (refconfig *ReferenceConfig) Refer(impl interface{}) {
}
}
if len(refconfig.urls) == 1 {
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0], impl)
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0])
} else {
invokers := []protocol.Invoker{}
var regUrl *common.URL
for _, u := range refconfig.urls {
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u, impl))
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u))
if u.Protocol == constant.REGISTRY_PROTOCOL {
regUrl = u
}
Expand Down
2 changes: 1 addition & 1 deletion config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func newRegistryProtocol() protocol.Protocol {

type mockRegistryProtocol struct{}

func (*mockRegistryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}

func (dp *DubboProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestDubboProtocol_Refer(t *testing.T) {
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
clientConf = &ClientConfig{}
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
6 changes: 5 additions & 1 deletion protocol/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config"
)

type Client struct {
*grpc.ClientConn
invoker reflect.Value
}

func NewClient(impl interface{}, url common.URL) *Client {
func NewClient(url common.URL) *Client {
conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
panic(err)
}

key := url.GetParam(constant.BEAN_NAME_KEY, "")
impl := config.GetConsumerService(key)
invoker := getInvoker(impl, conn)

return &Client{
Expand Down
3 changes: 1 addition & 2 deletions protocol/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func TestNewClient(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()

var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
cli := NewClient(impl, url)
cli := NewClient(url)
assert.NotNil(t, cli)
}
3 changes: 1 addition & 2 deletions protocol/grpc/grpc_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ func TestInvoke(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()

var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)

cli := NewClient(impl, url)
cli := NewClient(url)

invoker := NewGrpcInvoker(url, cli)

Expand Down
4 changes: 2 additions & 2 deletions protocol/grpc/grpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
}
}

func (gp *GrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(impl, url))
func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
Expand Down
3 changes: 1 addition & 2 deletions protocol/grpc/grpc_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func TestGrpcProtocol_Refer(t *testing.T) {
proto := GetProtocol()
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.NoError(t, err)
var impl *internal.GrpcGreeterImpl
invoker := proto.Refer(url, impl)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
8 changes: 8 additions & 0 deletions protocol/grpc/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
"google.golang.org/grpc"
)

import (
"github.com/apache/dubbo-go/config"
)

func init() {
config.SetConsumerService(&GrpcGreeterImpl{})
}

// used for dubbo-grpc biz client
type GrpcGreeterImpl struct {
SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error
Expand Down
17 changes: 17 additions & 0 deletions protocol/grpc/internal/helloworld.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions protocol/grpc/protoc-gen-dubbo/examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

grpc-gen:
protoc -I ./ helloworld.proto --go_out=plugins=grpc:.
dubbo-gen:
Expand Down
17 changes: 17 additions & 0 deletions protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}

func (jp *JsonrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

Expand Down
2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestJsonrpcProtocol_Refer(t *testing.T) {
RequestTimeout: 5 * time.Second,
}
config.SetConsumerConfig(con)
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
4 changes: 2 additions & 2 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Extension - protocol
type Protocol interface {
Export(invoker Invoker) Exporter
Refer(url common.URL, impl interface{}) Invoker
Refer(url common.URL) Invoker
Destroy()
}

Expand Down Expand Up @@ -74,7 +74,7 @@ func (bp *BaseProtocol) Export(invoker Invoker) Exporter {
return NewBaseExporter("base", invoker, bp.exporterMap)
}

func (bp *BaseProtocol) Refer(url common.URL, impl interface{}) Invoker {
func (bp *BaseProtocol) Refer(url common.URL) Invoker {
return NewBaseInvoker(url)
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/protocolwrapper/mock_protocol_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporte
return protocol.NewBaseExporter("key", invoker, &sync.Map{})
}

func (pfw *mockProtocolFilter) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/protocolwrapper/protocol_filter_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo
return pfw.protocol.Export(invoker)
}

func (pfw *ProtocolFilterWrapper) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocol(url.Protocol)
}
return buildInvokerChain(pfw.protocol.Refer(url, impl), constant.REFERENCE_FILTER_KEY)
return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY)
}

func (pfw *ProtocolFilterWrapper) Destroy() {
Expand Down
2 changes: 1 addition & 1 deletion protocol/protocolwrapper/protocol_filter_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) {
u := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.REFERENCE_FILTER_KEY, "echo"))
invoker := filtProto.Refer(*u, nil)
invoker := filtProto.Refer(*u)
_, ok := invoker.(*FilterInvoker)
assert.True(t, ok)
}
Expand Down
8 changes: 3 additions & 5 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ type registryDirectory struct {
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
impl interface{}
Options
}

func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
Expand All @@ -80,7 +79,6 @@ func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.R
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
impl: impl,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
return dir, nil
Expand Down Expand Up @@ -200,13 +198,13 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Infof("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
logger.Infof("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
Expand Down
6 changes: 3 additions & 3 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) {
func TestSubscribe_InvalidUrl(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
_, err := NewRegistryDirectory(&url, nil, mockRegistry)
_, err := NewRegistryDirectory(&url, mockRegistry)
assert.Error(t, err)
}

Expand All @@ -77,7 +77,7 @@ func TestSubscribe_Group(t *testing.T) {
suburl.SetParam(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&regurl, nil, mockRegistry)
registryDirectory, _ := NewRegistryDirectory(&regurl, mockRegistry)

go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice")))

Expand Down Expand Up @@ -183,7 +183,7 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
url.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&url, nil, mockRegistry)
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)

go registryDirectory.Subscribe(&suburl)
if len(noMockEvent) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.serviceConfigurationListeners = &sync.Map{}
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
func (proto *registryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {

var registryUrl = url
var serviceUrl = registryUrl.SubURL
Expand All @@ -108,7 +108,7 @@ func (proto *registryProtocol) Refer(url common.URL, impl interface{}) protocol.
}

//new registry directory for store service url from registry
directory, err := directory2.NewRegistryDirectory(&registryUrl, impl, reg)
directory, err := directory2.NewRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
Expand Down
6 changes: 3 additions & 3 deletions registry/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) {

url.SubURL = &suburl

invoker := regProtocol.Refer(url, nil)
invoker := regProtocol.Refer(url)
assert.IsType(t, &protocol.BaseInvoker{}, invoker)
assert.Equal(t, invoker.GetUrl().String(), url.String())
}
Expand All @@ -85,7 +85,7 @@ func TestMultiRegRefer(t *testing.T) {

url2.SubURL = &suburl2

regProtocol.Refer(url2, nil)
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
Expand All @@ -107,7 +107,7 @@ func TestOneRegRefer(t *testing.T) {

url2.SubURL = &suburl2

regProtocol.Refer(url2, nil)
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
Expand Down
2 changes: 0 additions & 2 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"fmt"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -241,7 +240,6 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi

//listen sub path recursive
go func(zkPath string, listener remoting.DataListener) {
fmt.Printf("zkpath: %v \n", zkPath)
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
Expand Down