Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: zk/consul Metadata #633

Merged
merged 19 commits into from
Jul 4, 2020
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ classes

# go mod, go test
vendor/
coverage.txt

logs/
.vscode/
coverage.txt
zouyx marked this conversation as resolved.
Show resolved Hide resolved

# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
metadata/report/zookeeper/zookeeper-4unittest/
registry/consul/agent*
config_center/apollo/mockDubbog.properties.json
5 changes: 4 additions & 1 deletion before_ut.bat
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ md cluster\router\chain\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"

md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"

md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
9 changes: 6 additions & 3 deletions before_ut.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ if [ ! -f "${zkJar}" ]; then
fi

mkdir -p config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar

mkdir -p registry/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar

mkdir -p cluster/router/chain/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar

mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar

mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
136 changes: 136 additions & 0 deletions metadata/report/consul/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package consul

import (
consul "github.com/hashicorp/consul/api"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
)

var (
emptyStrSlice = make([]string, 0)
)

func init() {
mf := &consulMetadataReportFactory{}
extension.SetMetadataReportFactory("consul", func() factory.MetadataReportFactory {
return mf
})
flycash marked this conversation as resolved.
Show resolved Hide resolved
}

// consulMetadataReport is the implementation of
// MetadataReport based on consul.
type consulMetadataReport struct {
client *consul.Client
}

// StoreProviderMetadata stores the metadata.
func (m *consulMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
kv := &consul.KVPair{Key: providerIdentifier.GetIdentifierKey(), Value: []byte(serviceDefinitions)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go fmt not work ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok with go fmt.

_, err := m.client.KV().Put(kv, nil)
return err
}

// StoreConsumerMetadata stores the metadata.
func (m *consulMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
kv := &consul.KVPair{Key: consumerMetadataIdentifier.GetIdentifierKey(), Value: []byte(serviceParameterString)}
_, err := m.client.KV().Put(kv, nil)
return err
}

// SaveServiceMetadata saves the metadata.
func (m *consulMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
kv := &consul.KVPair{Key: metadataIdentifier.GetIdentifierKey(), Value: []byte(url.String())}
_, err := m.client.KV().Put(kv, nil)
return err
}

// RemoveServiceMetadata removes the metadata.
func (m *consulMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
k := metadataIdentifier.GetIdentifierKey()
_, err := m.client.KV().Delete(k, nil)
return err
}

// GetExportedURLs gets the urls.
func (m *consulMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string {
k := metadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
if err != nil {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
panic(err)
}

if kv == nil {
return emptyStrSlice
}
return []string{string(kv.Value)}
}

// SaveSubscribedData saves the urls.
func (m *consulMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
kv := &consul.KVPair{Key: subscriberMetadataIdentifier.GetIdentifierKey(), Value: []byte(urls)}
_, err := m.client.KV().Put(kv, nil)
return err
}

// GetSubscribedURLs gets the urls.
func (m *consulMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string {
k := subscriberMetadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil || kv == nil {
return emptyStrSlice, err
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I follow your advice and keep code more simpler.

if err != nil {
panic(err)
}

if kv == nil {
return emptyStrSlice
}
return []string{string(kv.Value)}
}

// GetServiceDefinition gets the service definition.
func (m *consulMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string {
k := metadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
if err != nil {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't panic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove this panic and return an error in this function.

}

if kv == nil {
return ""
}
return string(kv.Value)
}

type consulMetadataReportFactory struct {
}

func (mf *consulMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
config := &consul.Config{Address: url.Location}
client, err := consul.NewClient(config)
if err != nil {
panic(err)
}
return &consulMetadataReport{client: client}
}
163 changes: 163 additions & 0 deletions metadata/report/consul/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package consul

import (
"encoding/json"
"net/url"
"strconv"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/remoting/consul"
)

func newProviderRegistryUrl(host string, port int) *common.URL {
url1 := common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
return url1
zouyx marked this conversation as resolved.
Show resolved Hide resolved
}

func newBaseMetadataIdentifier(side string) *identifier.BaseMetadataIdentifier {
return &identifier.BaseMetadataIdentifier{
ServiceInterface: "org.apache.HelloWorld",
Version: "1.0.0",
Group: "group",
Side: side,
}
}

func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
return &identifier.MetadataIdentifier{
Application: "application",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}

func newServiceMetadataIdentifier(side string) *identifier.ServiceMetadataIdentifier {
return &identifier.ServiceMetadataIdentifier{
Revision: "1.0",
Protocol: "dubbo",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}

func newSubscribeMetadataIdentifier(side string) *identifier.SubscriberMetadataIdentifier {
return &identifier.SubscriberMetadataIdentifier{
Revision: "1.0",
MetadataIdentifier: *newMetadataIdentifier(side),
}
}

type consulMetadataReportTestSuite struct {
t *testing.T
m report.MetadataReport
}

func newConsulMetadataReportTestSuite(t *testing.T, m report.MetadataReport) *consulMetadataReportTestSuite {
return &consulMetadataReportTestSuite{t: t, m: m}
}

func (suite *consulMetadataReportTestSuite) testStoreProviderMetadata() {
providerMi := newMetadataIdentifier("provider")
providerMeta := "provider"
err := suite.m.StoreProviderMetadata(providerMi, providerMeta)
assert.NoError(suite.t, err)
}

func (suite *consulMetadataReportTestSuite) testStoreConsumerMetadata() {
consumerMi := newMetadataIdentifier("consumer")
consumerMeta := "consumer"
err := suite.m.StoreProviderMetadata(consumerMi, consumerMeta)
assert.NoError(suite.t, err)
}

func (suite *consulMetadataReportTestSuite) testSaveServiceMetadata(url common.URL) {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.SaveServiceMetadata(serviceMi, url)
assert.NoError(suite.t, err)
}

func (suite *consulMetadataReportTestSuite) testRemoveServiceMetadata() {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.RemoveServiceMetadata(serviceMi)
assert.NoError(suite.t, err)
}

func (suite *consulMetadataReportTestSuite) testGetExportedURLs() {
serviceMi := newServiceMetadataIdentifier("provider")
urls := suite.m.GetExportedURLs(serviceMi)
assert.Equal(suite.t, 1, len(urls))
}

func (suite *consulMetadataReportTestSuite) testSaveSubscribedData(url common.URL) {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls := []string{url.String()}
bytes, _ := json.Marshal(urls)
err := suite.m.SaveSubscribedData(subscribeMi, string(bytes))
assert.Nil(suite.t, err)
}

func (suite *consulMetadataReportTestSuite) testGetSubscribedURLs() {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls := suite.m.GetSubscribedURLs(subscribeMi)
assert.Equal(suite.t, 1, len(urls))
}

func (suite *consulMetadataReportTestSuite) testGetServiceDefinition() {
providerMi := newMetadataIdentifier("provider")
providerMeta := suite.m.GetServiceDefinition(providerMi)
assert.Equal(suite.t, "provider", providerMeta)
}

func test1(t *testing.T) {
consulAgent := consul.NewConsulAgent(t, 8500)
defer consulAgent.Close()

url := newProviderRegistryUrl("localhost", 8500)
mf := extension.GetMetadataReportFactory("consul")
m := mf.CreateMetadataReport(url)

suite := newConsulMetadataReportTestSuite(t, m)
suite.testStoreProviderMetadata()
suite.testStoreConsumerMetadata()
suite.testSaveServiceMetadata(*url)
suite.testGetExportedURLs()
suite.testRemoveServiceMetadata()
suite.testSaveSubscribedData(*url)
suite.testGetSubscribedURLs()
suite.testGetServiceDefinition()
}

func TestConsulMetadataReport(t *testing.T) {
t.Run("test1", test1)
}
14 changes: 12 additions & 2 deletions metadata/report/delegate/delegate_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

import (
"github.com/go-co-op/gocron"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -241,11 +242,20 @@ func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadata

// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
urlStrList := make([]string, 0, len(urls))
for _, url := range urls {
urlStrList = append(urlStrList, url.String())
}
bytes, err := json.Marshal(urlStrList)
if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}

report := instance.GetMetadataReportInstance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This common logic may be confirmed by @flycash

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May create influence to nacos's implement.

Copy link
Author

@gaoxinge gaoxinge Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a common code for different metadata reports, and one can find the similar code in java version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep those codes, I will test it again

if mr.syncReport {
return report.SaveSubscribedData(identifier, urls)
return report.SaveSubscribedData(identifier, string(bytes))
}
go report.SaveSubscribedData(identifier, urls)
go report.SaveSubscribedData(identifier, string(bytes))
return nil
}

Expand Down
Loading