diff --git a/common/extension/metadata_service.go b/common/extension/metadata_service.go index e35677d148..08ddbc333e 100644 --- a/common/extension/metadata_service.go +++ b/common/extension/metadata_service.go @@ -26,12 +26,15 @@ import ( ) import ( + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/metadata/service" ) var ( // there will be two types: local or remote metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2) + // remoteMetadataService + remoteMetadataService service.MetadataService ) // SetMetadataService will store the msType => creator pair @@ -48,3 +51,17 @@ func GetMetadataService(msType string) (service.MetadataService, error) { "local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+ "remote - github.com/apache/dubbo-go/metadata/service/remote", msType)) } + +// GetRemoteMetadataService will get a RemoteMetadataService instance +func GetRemoteMetadataService() (service.MetadataService, error) { + if remoteMetadataService != nil { + return remoteMetadataService, nil + } + if creator, ok := metadataServiceInsMap["remote"]; ok { + var err error + remoteMetadataService, err = creator() + return remoteMetadataService, err + } + logger.Warn("could not find the metadata service creator for metadataType: remote") + return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: remote")) +} diff --git a/config/reference_config.go b/config/reference_config.go index 431ec0e2eb..895ab9df26 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -167,7 +167,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) { // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } - + // publish consumer metadata + publishConsumerDefinition(cfgURL) // create proxy if c.Async { callback := GetCallback(c.id) @@ -257,6 +258,12 @@ func (c *ReferenceConfig) GenericLoad(id string) { c.Implement(genericService) } +func publishConsumerDefinition(url *common.URL) { + if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { + remoteMetadataService.PublishServiceDefinition(url) + } +} + // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig. func (c *ReferenceConfig) postProcessConfig(url *common.URL) { for _, p := range extension.GetConfigPostProcessors() { diff --git a/config/service_config.go b/config/service_config.go index fd49390aa2..e8523bdea7 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -234,6 +234,7 @@ func (c *ServiceConfig) Export() error { } c.exporters = append(c.exporters, exporter) } + publishServiceDefinition(ivkURL) } c.exported.Store(true) return nil @@ -347,6 +348,12 @@ func (c *ServiceConfig) GetExportedUrls() []*common.URL { return nil } +func publishServiceDefinition(url *common.URL) { + if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { + remoteMetadataService.PublishServiceDefinition(url) + } +} + // postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig. func (c *ServiceConfig) postProcessConfig(url *common.URL) { for _, p := range extension.GetConfigPostProcessors() { diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index f364087fdc..56a22de8f8 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -109,6 +109,10 @@ type MetadataReport struct { // NewMetadataReport will create a MetadataReport with initiation func NewMetadataReport() (*MetadataReport, error) { url := instance.GetMetadataReportUrl() + if url == nil { + logger.Warn("the metadataReport URL is not configured, you should configure it.") + return nil, perrors.New("the metadataReport URL is not configured, you should configure it.") + } bmr := &MetadataReport{ reportUrl: url, syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index d21cc88a3d..bb520d3df7 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -116,22 +116,40 @@ func (mts *MetadataService) UnsubscribeURL(url *common.URL) error { func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) - if len(interfaceName) > 0 && !isGeneric { - sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey()) - sd := definition.BuildServiceDefinition(*sv, url) + if common.RoleType(common.PROVIDER).Role() == url.GetParam(constant.SIDE_KEY, "") { + if len(interfaceName) > 0 && !isGeneric { + sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey()) + sd := definition.BuildServiceDefinition(*sv, url) + id := &identifier.MetadataIdentifier{ + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: interfaceName, + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), + Side: url.GetParam(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL), + }, + } + mts.delegateReport.StoreProviderMetadata(id, sd) + return nil + } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + } else { + params := make(map[string]string, len(url.GetParams())) + url.RangeParams(func(key, value string) bool { + params[key] = value + return true + }) id := &identifier.MetadataIdentifier{ BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ ServiceInterface: interfaceName, Version: url.GetParam(constant.VERSION_KEY, ""), - // Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP), - Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), - Side: url.GetParam(constant.SIDE_KEY, "provider"), + Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), + Side: url.GetParam(constant.SIDE_KEY, "consumer"), }, } - mts.delegateReport.StoreProviderMetadata(id, sd) + mts.delegateReport.StoreConsumerMetadata(id, params) return nil } - logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 2ca4b8015b..c97a7f7b51 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -174,10 +174,6 @@ func (s *serviceDiscoveryRegistry) Register(url *common.URL) error { logger.Warnf("The URL[%s] has been registry!", url.String()) } - err = s.metaDataService.PublishServiceDefinition(url) - if err != nil { - return perrors.WithMessage(err, "publish the service definition failed. ") - } return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""), url.GetParam(constant.GROUP_KEY, ""), url.GetParam(constant.Version, ""),