diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java index e765643f111..275d01995c6 100644 --- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java @@ -16,7 +16,6 @@ */ package org.apache.dubbo.bootstrap; -import org.apache.dubbo.bootstrap.rest.UserService; import org.apache.dubbo.config.ReferenceConfig; import org.apache.dubbo.config.context.ConfigManager; @@ -33,13 +32,14 @@ public static void main(String[] args) throws Exception { .application("dubbo-consumer-demo") .protocol(builder -> builder.port(20887).name("dubbo")) // Zookeeper - .registry("zookeeper", builder -> builder.address("zookeeper://127.0.0.1:2181?registry.type=service&subscribed.services=dubbo-provider-demo")) +// .registry("zookeeper", builder -> builder.address("zookeeper://127.0.0.1:2181?registry.type=service&subscribed.services=dubbo-provider-demo")) + .registry("file",builder-> builder.address("file://1111?registry.type=service&subscribed.services=dubbo-provider-demo")) // .metadataReport(new MetadataReportConfig("zookeeper://127.0.0.1:2181")) // Nacos // .registry("nacos", builder -> builder.address("nacos://127.0.0.1:8848?registry.type=service&subscribed.services=dubbo-provider-demo")) // .registry("consul", builder -> builder.address("consul://127.0.0.1:8500?registry.type=service&subscribed.services=dubbo-provider-demo").group("namespace1")) .reference("echo", builder -> builder.interfaceClass(EchoService.class).protocol("dubbo")) - .reference("user", builder -> builder.interfaceClass(UserService.class).protocol("rest")) +// .reference("user", builder -> builder.interfaceClass(UserService.class).protocol("rest")) .onlyRegisterProvider(true) .start() .await(); diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java new file mode 100644 index 00000000000..c0764e8f609 --- /dev/null +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.bootstrap; + +/** + * TODO + */ +public class DubboServiceProviderMinimumBootstrap { + + public static void main(String[] args) { + new DubboBootstrap() + .application("dubbo-provider-demo") +// .registry(builder -> builder.address("zookeeper://127.0.0.1:2181?registry.type=service")) + .registry(builder -> builder.address("file://?registry.type=service")) + .protocol(builder -> builder.port(-1).name("dubbo")) + .service(builder -> builder.interfaceClass(EchoService.class).ref(new EchoServiceImpl())) + .start() + .await(); + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableRouter.java index 41044ec3fea..74dfce22c33 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ListenableRouter.java @@ -17,8 +17,8 @@ package org.apache.dubbo.rpc.cluster.router.condition.config; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.logger.Logger; @@ -56,10 +56,10 @@ public ListenableRouter(URL url, String ruleKey) { } @Override - public synchronized void process(ConfigChangeEvent event) { + public synchronized void process(ConfigChangedEvent event) { if (logger.isInfoEnabled()) { logger.info("Notification of condition rule, change type is: " + event.getChangeType() + - ", raw rule is:\n " + event.getValue()); + ", raw rule is:\n " + event.getContent()); } if (event.getChangeType().equals(ConfigChangeType.DELETED)) { @@ -67,11 +67,11 @@ public synchronized void process(ConfigChangeEvent event) { conditionRouters = Collections.emptyList(); } else { try { - routerRule = ConditionRuleParser.parse(event.getValue()); + routerRule = ConditionRuleParser.parse(event.getContent()); generateConditions(routerRule); } catch (Exception e) { logger.error("Failed to parse the raw condition rule and it will not take effect, please check " + - "if the condition rule matches with the template, the raw rule is:\n " + event.getValue(), e); + "if the condition rule matches with the template, the raw rule is:\n " + event.getContent(), e); } } } @@ -121,7 +121,7 @@ private synchronized void init(String ruleKey) { ruleRepository.addListener(routerKey, this); String rule = ruleRepository.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP); if (StringUtils.isNotEmpty(rule)) { - this.process(new ConfigChangeEvent(routerKey, rule)); + this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule)); } } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java index 05b1a113a7a..46b8d9f2ba7 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java @@ -17,8 +17,8 @@ package org.apache.dubbo.rpc.cluster.router.tag; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.constants.CommonConstants; @@ -61,17 +61,17 @@ public TagRouter(URL url) { } @Override - public synchronized void process(ConfigChangeEvent event) { + public synchronized void process(ConfigChangedEvent event) { if (logger.isDebugEnabled()) { logger.debug("Notification of tag rule, change type is: " + event.getChangeType() + ", raw rule is:\n " + - event.getValue()); + event.getContent()); } try { if (event.getChangeType().equals(ConfigChangeType.DELETED)) { this.tagRouterRule = null; } else { - this.tagRouterRule = TagRuleParser.parse(event.getValue()); + this.tagRouterRule = TagRuleParser.parse(event.getContent()); } } catch (Exception e) { logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the " + @@ -251,7 +251,7 @@ public void notify(List> invokers) { application = providerApplication; String rawRule = ruleRepository.getRule(key, DynamicConfiguration.DEFAULT_GROUP); if (StringUtils.isNotEmpty(rawRule)) { - this.process(new ConfigChangeEvent(key, rawRule)); + this.process(new ConfigChangedEvent(key, DynamicConfiguration.DEFAULT_GROUP, rawRule)); } } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangeEvent.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangeEvent.java deleted file mode 100644 index 177551be929..00000000000 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangeEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.common.config.configcenter; - -import java.util.EventObject; - -/** - * Config change event, immutable. - * - * @see ConfigChangeType - */ -public class ConfigChangeEvent extends EventObject { - private final String key; - - private final String value; - - private final ConfigChangeType changeType; - - public ConfigChangeEvent(String key, String value) { - this(key, value, ConfigChangeType.MODIFIED); - } - - public ConfigChangeEvent(String key, String value, ConfigChangeType changeType) { - super(key + "=" + value); - this.key = key; - this.value = value; - this.changeType = changeType; - } - - public String getKey() { - return key; - } - - public String getValue() { - return value; - } - - public ConfigChangeType getChangeType() { - return changeType; - } - - @Override - public String toString() { - return "ConfigChangeEvent{" + - "key='" + key + '\'' + - ", value='" + value + '\'' + - ", changeType=" + changeType + - '}'; - } -} diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangedEvent.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangedEvent.java new file mode 100644 index 00000000000..8ea32e27302 --- /dev/null +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigChangedEvent.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.config.configcenter; + +import java.util.EventObject; +import java.util.Objects; + +/** + * An event raised when the config changed, immutable. + * + * @see ConfigChangeType + */ +public class ConfigChangedEvent extends EventObject { + + private final String key; + + private final String group; + + private final String content; + + private final ConfigChangeType changeType; + + public ConfigChangedEvent(String key, String group, String content) { + this(key, group, content, ConfigChangeType.MODIFIED); + } + + public ConfigChangedEvent(String key, String group, String content, ConfigChangeType changeType) { + super(key + "=" + content); + this.key = key; + this.group = group; + this.content = content; + this.changeType = changeType; + } + + public String getKey() { + return key; + } + + public String getGroup() { + return group; + } + + public String getContent() { + return content; + } + + public ConfigChangeType getChangeType() { + return changeType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ConfigChangedEvent)) return false; + ConfigChangedEvent that = (ConfigChangedEvent) o; + return Objects.equals(getKey(), that.getKey()) && + Objects.equals(group, that.group) && + Objects.equals(getContent(), that.getContent()) && + getChangeType() == that.getChangeType(); + } + + @Override + public int hashCode() { + return Objects.hash(getKey(), group, getContent(), getChangeType()); + } + + @Override + public String toString() { + return "ConfigChangedEvent{" + + "key='" + key + '\'' + + ", group='" + group + '\'' + + ", content='" + content + '\'' + + ", changeType=" + changeType + + "} " + super.toString(); + } +} diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigurationListener.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigurationListener.java index 2a1779a383d..33e9f1bd156 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigurationListener.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/ConfigurationListener.java @@ -29,5 +29,5 @@ public interface ConfigurationListener extends EventListener { * * @param event config change event */ - void process(ConfigChangeEvent event); + void process(ConfigChangedEvent event); } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java index 45c2b6bb95b..603356d66f0 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java @@ -18,13 +18,14 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.AbstractDynamicConfiguration; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.function.ThrowableConsumer; import org.apache.dubbo.common.function.ThrowableFunction; import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; import com.sun.nio.file.SensitivityWatchEventModifier; import org.apache.commons.io.FileUtils; @@ -84,7 +85,7 @@ public class FileSystemDynamicConfiguration extends AbstractDynamicConfiguration public static final String DEFAULT_CONFIG_CENTER_DIR_PATH = System.getProperty("user.home") + File.separator + ".dubbo" + File.separator + "config-center"; - public static final String DEFAULT_THREAD_POOL_SIZE = "1"; + public static final int DEFAULT_THREAD_POOL_SIZE = 1; public static final String DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8"; @@ -158,7 +159,7 @@ public class FileSystemDynamicConfiguration extends AbstractDynamicConfiguration private final String encoding; /** - * The {@link Set} of {@link #configDirectory(String) directories} that may be processing, + * The {@link Set} of {@link #groupDirectory(String) directories} that may be processing, *

* if {@link #isBasedPoolingWatchService()} is false, this properties will be * {@link Collections#emptySet() empty} @@ -170,12 +171,24 @@ public class FileSystemDynamicConfiguration extends AbstractDynamicConfiguration private final Map> listenersRepository; public FileSystemDynamicConfiguration() { - this(URL.valueOf("file:///default")); + this(new File(DEFAULT_CONFIG_CENTER_DIR_PATH)); } - public FileSystemDynamicConfiguration(URL url) { - this(initDirectory(url), getEncoding(url), getThreadPoolPrefixName(url), getThreadPoolSize(url), - getThreadPoolKeepAliveTime(url)); + public FileSystemDynamicConfiguration(File rootDirectory) { + this(rootDirectory, DEFAULT_CONFIG_CENTER_ENCODING); + } + + public FileSystemDynamicConfiguration(File rootDirectory, String encoding) { + this(rootDirectory, encoding, DEFAULT_THREAD_POOL_PREFIX); + } + + public FileSystemDynamicConfiguration(File rootDirectory, String encoding, String threadPoolPrefixName) { + this(rootDirectory, encoding, threadPoolPrefixName, DEFAULT_THREAD_POOL_SIZE); + } + + public FileSystemDynamicConfiguration(File rootDirectory, String encoding, String threadPoolPrefixName, + int threadPoolSize) { + this(rootDirectory, encoding, threadPoolPrefixName, threadPoolSize, DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME); } public FileSystemDynamicConfiguration(File rootDirectory, String encoding, @@ -189,6 +202,11 @@ public FileSystemDynamicConfiguration(File rootDirectory, String encoding, this.listenersRepository = new LinkedHashMap<>(); } + public FileSystemDynamicConfiguration(URL url) { + this(initDirectory(url), getEncoding(url), getThreadPoolPrefixName(url), getThreadPoolSize(url), + getThreadPoolKeepAliveTime(url)); + } + private Set initProcessingDirectories() { return isBasedPoolingWatchService() ? new LinkedHashSet<>() : emptySet(); } @@ -222,13 +240,13 @@ public void removeListener(String key, String group, ConfigurationListener liste }); } - protected File configDirectory(String group) { + public File groupDirectory(String group) { String actualGroup = isBlank(group) ? DEFAULT_GROUP : group; return new File(rootDirectory, actualGroup); } - protected File configFile(String key, String group) { - return new File(configDirectory(group), key); + public File configFile(String key, String group) { + return new File(groupDirectory(group), key); } private void doInListener(String key, String group, BiConsumer> consumer) { @@ -275,7 +293,7 @@ private void processWatchEvents(WatchService watchService) { Path configFilePath = configDirectoryPath.resolve(currentPath); File configDirectory = configDirectoryPath.toFile(); executeMutually(configDirectory, () -> { - fireConfigChangeEvent(configFilePath.toFile(), configChangeType); + fireConfigChangeEvent(configDirectory, configFilePath.toFile(), configChangeType); signalConfigDirectory(configDirectory); return null; }); @@ -318,13 +336,13 @@ private List getListeners(File configFile) { return listenersRepository.computeIfAbsent(configFile, p -> new LinkedList<>()); } - private void fireConfigChangeEvent(File configFile, ConfigChangeType configChangeType) { + private void fireConfigChangeEvent(File configDirectory, File configFile, ConfigChangeType configChangeType) { String key = configFile.getName(); String value = getConfig(configFile); // fire ConfigChangeEvent one by one getListeners(configFile).forEach(listener -> { try { - listener.process(new ConfigChangeEvent(key, value, configChangeType)); + listener.process(new ConfigChangedEvent(key, configDirectory.getName(), value, configChangeType)); } catch (Throwable e) { if (logger.isErrorEnabled()) { logger.error(e.getMessage(), e); @@ -412,9 +430,9 @@ private boolean hasListeners(File configFile) { } /** - * Is processing on {@link #configDirectory(String) config rootDirectory} + * Is processing on {@link #groupDirectory(String) config rootDirectory} * - * @param configDirectory {@link #configDirectory(String) config rootDirectory} + * @param configDirectory {@link #groupDirectory(String) config rootDirectory} * @return if processing , return true, or false */ private boolean isProcessing(File configDirectory) { @@ -427,7 +445,7 @@ private void addProcessing(File configDirectory) { @Override public SortedSet getConfigKeys(String group) { - return Stream.of(configDirectory(group).listFiles(File::isFile)) + return Stream.of(groupDirectory(group).listFiles(File::isFile)) .map(File::getName) .collect(TreeSet::new, Set::add, Set::addAll); } @@ -462,11 +480,11 @@ protected void doClose() throws Exception { } - protected File getRootDirectory() { + public File getRootDirectory() { return rootDirectory; } - protected String getEncoding() { + public String getEncoding() { return encoding; } @@ -559,11 +577,19 @@ private static Optional newWatchService() { } protected static File initDirectory(URL url) { - String directoryPath = getParameter(url, CONFIG_CENTER_DIR_PARAM_NAME, DEFAULT_CONFIG_CENTER_DIR_PATH); - File rootDirectory = new File(getParameter(url, CONFIG_CENTER_DIR_PARAM_NAME, DEFAULT_CONFIG_CENTER_DIR_PATH)); + String directoryPath = url.getParameter(CONFIG_CENTER_DIR_PARAM_NAME, url.getPath()); + File rootDirectory = null; + if (!StringUtils.isBlank(directoryPath)) { + rootDirectory = new File("/" + directoryPath); + } + + if (!rootDirectory.exists()) { // If the directory does not exist + rootDirectory = new File(DEFAULT_CONFIG_CENTER_DIR_PATH); + } + if (!rootDirectory.exists() && !rootDirectory.mkdirs()) { throw new IllegalStateException(format("Dubbo config center rootDirectory[%s] can't be created!", - directoryPath)); + rootDirectory.getAbsolutePath())); } return rootDirectory; } diff --git a/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java index 603dd73601e..1c5d638b361 100644 --- a/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java @@ -17,8 +17,8 @@ package org.apache.dubbo.configcenter.support.apollo; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.logger.Logger; @@ -200,7 +200,7 @@ public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEv return; } - ConfigChangeEvent event = new ConfigChangeEvent(key, change.getNewValue(), getChangeType(change)); + ConfigChangedEvent event = new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change)); listeners.forEach(listener -> listener.process(event)); } } diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java index e6d5c745ab6..c981381ddcf 100644 --- a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java @@ -18,8 +18,8 @@ package org.apache.dubbo.configcenter.consul; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.logger.Logger; @@ -75,7 +75,7 @@ public ConsulDynamicConfiguration(URL url) { public void addListener(String key, String group, ConfigurationListener listener) { logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); String normalizedKey = convertKey(group, key); - ConsulKVWatcher watcher = watchers.putIfAbsent(normalizedKey, new ConsulKVWatcher(normalizedKey)); + ConsulKVWatcher watcher = watchers.putIfAbsent(normalizedKey, new ConsulKVWatcher(key, group)); if (watcher == null) { watcher = watchers.get(normalizedKey); watcherService.submit(watcher); @@ -133,21 +133,25 @@ private int buildWatchTimeout(URL url) { } private class ConsulKVWatcher implements Runnable { - private String key; + private final String key; + private final String group; + private final String normalizedKey; private Set listeners; private boolean running = true; private boolean existing = false; - public ConsulKVWatcher(String key) { + public ConsulKVWatcher(String key, String group) { this.key = key; + this.group = group; + this.normalizedKey = convertKey(group, key); this.listeners = new HashSet<>(); } @Override public void run() { while (running) { - Long lastIndex = consulIndexes.computeIfAbsent(key, k -> -1L); - Response response = getValue(key); + Long lastIndex = consulIndexes.computeIfAbsent(normalizedKey, k -> -1L); + Response response = getValue(normalizedKey); if (response == null) { try { Thread.sleep(watchTimeout); @@ -164,20 +168,20 @@ public void run() { } consulIndexes.put(key, currentIndex); - ConfigChangeEvent event = null; + ConfigChangedEvent event = null; if (getValue != null) { String value = getValue.getDecodedValue(); if (existing) { - logger.info("notify change for key: " + key + ", the changed value is: " + value); - event = new ConfigChangeEvent(key, value); + logger.info("notify change for key: " + normalizedKey + ", the changed value is: " + value); + event = new ConfigChangedEvent(key, group, value); } else { - logger.info("notify change for key: " + key + ", the added value is: " + value); - event = new ConfigChangeEvent(key, value, ADDED); + logger.info("notify change for key: " + normalizedKey + ", the added value is: " + value); + event = new ConfigChangedEvent(key, group, value, ADDED); } } else { if (existing) { - logger.info("notify change for key: " + key + ", the value is deleted"); - event = new ConfigChangeEvent(key, null, ConfigChangeType.DELETED); + logger.info("notify change for key: " + normalizedKey + ", the value is deleted"); + event = new ConfigChangedEvent(key, group, null, ConfigChangeType.DELETED); } } diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java index 356161a67fe..f686e860331 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -18,8 +18,8 @@ package org.apache.dubbo.configcenter.support.etcd; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.utils.StringUtils; @@ -81,8 +81,7 @@ public class EtcdDynamicConfiguration implements DynamicConfiguration { @Override public void addListener(String key, String group, ConfigurationListener listener) { if (watchListenerMap.get(listener) == null) { - String normalizedKey = convertKey(group, key); - EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener); + EtcdConfigWatcher watcher = new EtcdConfigWatcher(key, group, listener); watchListenerMap.put(listener, watcher); watcher.watch(); } @@ -134,10 +133,17 @@ public class EtcdConfigWatcher implements StreamObserver { private StreamObserver observer; protected long watchId; private ManagedChannel channel; - private String key; - public EtcdConfigWatcher(String key, ConfigurationListener listener) { + private final String key; + + private final String group; + + private String normalizedKey; + + public EtcdConfigWatcher(String key, String group, ConfigurationListener listener) { this.key = key; + this.group = group; + this.normalizedKey = convertKey(group, key); this.listener = listener; this.channel = etcdClient.getChannel(); } @@ -150,8 +156,7 @@ public void onNext(WatchResponse watchResponse) { if (etcdEvent.getType() == Event.EventType.DELETE) { type = ConfigChangeType.DELETED; } - ConfigChangeEvent event = new ConfigChangeEvent( - etcdEvent.getKv().getKey().toString(UTF_8), + ConfigChangedEvent event = new ConfigChangedEvent(key, group, etcdEvent.getKv().getValue().toString(UTF_8), type); listener.process(event); } @@ -175,7 +180,7 @@ private void watch() { watchStub = WatchGrpc.newStub(channel); observer = watchStub.watch(this); WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() - .setKey(ByteString.copyFromUtf8(key)) + .setKey(ByteString.copyFromUtf8(normalizedKey)) .setProgressNotify(true); WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build(); observer.onNext(req); diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java index c0ea8c39dac..86dd306bf81 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java @@ -18,7 +18,7 @@ package org.apache.dubbo.configcenter.support.etcd; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; @@ -104,10 +104,10 @@ public TestListener(CountDownLatch latch) { } @Override - public void process(ConfigChangeEvent event) { + public void process(ConfigChangedEvent event) { Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0); countMap.put(event.getKey(), ++count); - value = event.getValue(); + value = event.getContent(); latch.countDown(); } diff --git a/dubbo-configcenter/dubbo-configcenter-nacos/src/main/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-nacos/src/main/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfiguration.java index 5b4052709df..b82c8e72dfb 100644 --- a/dubbo-configcenter/dubbo-configcenter-nacos/src/main/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-nacos/src/main/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfiguration.java @@ -18,8 +18,8 @@ package org.apache.dubbo.configcenter.support.nacos; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.logger.Logger; @@ -231,7 +231,7 @@ public Executor getExecutor() { @Override public void innerReceive(String dataId, String group, String configInfo) { String oldValue = cacheData.get(dataId); - ConfigChangeEvent event = new ConfigChangeEvent(dataId, configInfo, getChangeType(configInfo, oldValue)); + ConfigChangedEvent event = new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue)); if (configInfo == null) { cacheData.remove(dataId); } else { diff --git a/dubbo-configcenter/dubbo-configcenter-nacos/src/test/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-nacos/src/test/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfigurationTest.java index a5ed3e7589e..350936c2c03 100644 --- a/dubbo-configcenter/dubbo-configcenter-nacos/src/test/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-nacos/src/test/java/org/apache/dubbo/configcenter/support/nacos/NacosDynamicConfigurationTest.java @@ -18,7 +18,7 @@ package org.apache.dubbo.configcenter.support.nacos; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; @@ -142,11 +142,11 @@ public TestListener(CountDownLatch latch) { } @Override - public void process(ConfigChangeEvent event) { + public void process(ConfigChangedEvent event) { System.out.println(this + ": " + event); Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0); countMap.put(event.getKey(), ++count); - value = event.getValue(); + value = event.getContent(); latch.countDown(); } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java index be24cc2cba7..a0acd51ef4d 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java @@ -16,8 +16,8 @@ */ package org.apache.dubbo.configcenter.support.zookeeper; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; @@ -76,6 +76,19 @@ private String pathToKey(String path) { return groupKey.substring(groupKey.indexOf(DOT_SEPARATOR) + 1); } + private String getGroup(String path) { + if (!StringUtils.isEmpty(path)) { + int beginIndex = path.indexOf(rootPath + PATH_SEPARATOR); + if (beginIndex > -1) { + int endIndex = path.indexOf(PATH_SEPARATOR, beginIndex); + if (endIndex > beginIndex) { + return path.substring(beginIndex, endIndex); + } + } + } + return path; + } + @Override public void dataChanged(String path, Object value, EventType eventType) { @@ -111,7 +124,7 @@ public void dataChanged(String path, Object value, EventType eventType) { return; } - ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType); + ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType); Set listeners = keyListeners.get(path); if (CollectionUtils.isNotEmpty(listeners)) { listeners.forEach(listener -> listener.process(configChangeEvent)); diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java index b2d03fc31f4..284c77a2c50 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java @@ -17,7 +17,7 @@ package org.apache.dubbo.configcenter.support.zookeeper; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory; @@ -170,12 +170,12 @@ public TestListener(CountDownLatch latch) { } @Override - public void process(ConfigChangeEvent event) { + public void process(ConfigChangedEvent event) { System.out.println(this + ": " + event); Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0)); countMap.put(event.getKey(), ++count); - value = event.getValue(); + value = event.getContent(); latch.countDown(); } diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java b/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java index a2182ebe3c6..80ba3bde048 100644 --- a/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java @@ -119,6 +119,13 @@ public void dispatch(Event event) { executor.execute(() -> { sortedListeners(entry -> entry.getKey().isAssignableFrom(event.getClass())) .forEach(listener -> { + if (listener instanceof ConditionalEventListener) { + ConditionalEventListener predicateEventListener = (ConditionalEventListener) listener; + if (!predicateEventListener.accept(event)) { // No accept + return; + } + } + // Handle the event listener.onEvent(event); }); }); diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/ConditionalEventListener.java b/dubbo-event/src/main/java/org/apache/dubbo/event/ConditionalEventListener.java new file mode 100644 index 00000000000..3c6fce92ab4 --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/ConditionalEventListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +/** + * An {@link EventListener} extending the the conditional feature that {@link #accept(Event) decides} some + * {@link Event event} is handled or not by current listener. + * + * @see EventListener + * @since 2.7.4 + */ +public interface ConditionalEventListener extends EventListener { + + /** + * Accept the event is handled or not by current listener + * + * @param event {@link Event event} + * @return if handled, return true, or false + */ + boolean accept(E event); +} diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/ConditionalEventListenerTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/ConditionalEventListenerTest.java new file mode 100644 index 00000000000..ad196f5a153 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/ConditionalEventListenerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * {@link ConditionalEventListener} test + * + * @since 2.7.4 + */ +public class ConditionalEventListenerTest { + + private final EventDispatcher eventDispatcher = EventDispatcher.getDefaultExtension(); + + @BeforeEach + public void init() { + eventDispatcher.removeAllEventListeners(); + } + + @Test + public void testOnEvent() { + + OnlyHelloWorldEventListener listener = new OnlyHelloWorldEventListener(); + + eventDispatcher.addEventListener(listener); + + eventDispatcher.dispatch(new EchoEvent("1")); + + assertNull(listener.getSource()); + + eventDispatcher.dispatch(new EchoEvent("Hello,World")); + + assertEquals("Hello,World", listener.getSource()); + } + + static class OnlyHelloWorldEventListener implements ConditionalEventListener { + + private String source; + + @Override + public boolean accept(EchoEvent event) { + return "Hello,World".equals(event.getSource()); + } + + @Override + public void onEvent(EchoEvent event) { + source = (String) event.getSource(); + } + + public String getSource() { + return source; + } + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java index 8e28adf5371..b9f5da7be98 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java @@ -27,11 +27,13 @@ */ public class DefaultServiceInstance implements ServiceInstance { - private final String id; + private static final long serialVersionUID = 1149677083747278100L; - private final String serviceName; + private String id; - private final String host; + private String serviceName; + + private String host; private Integer port; @@ -41,6 +43,9 @@ public class DefaultServiceInstance implements ServiceInstance { private Map metadata = new HashMap<>(); + public DefaultServiceInstance() { + } + public DefaultServiceInstance(String id, String serviceName, String host, Integer port) { if (port != null && port.intValue() < 1) { throw new IllegalArgumentException("The port must be greater than zero!"); @@ -57,6 +62,18 @@ public DefaultServiceInstance(String serviceName, String host, Integer port) { this(null, serviceName, host, port); } + public void setId(String id) { + this.id = id; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public void setHost(String host) { + this.host = host; + } + @Override public String getId() { return id; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java index dc20a0ffa39..3ba497011f9 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java @@ -218,8 +218,8 @@ public String toString() { } @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - serviceDiscovery.addServiceInstancesChangedListener(serviceName, listener); + public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { + serviceDiscovery.addServiceInstancesChangedListener(listener); } @Override diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java index e49d392436b..3d70039d23b 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java @@ -17,16 +17,35 @@ package org.apache.dubbo.registry.client; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.file.FileSystemDynamicConfiguration; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.event.EventListener; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; -import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; +import com.alibaba.fastjson.JSON; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static com.alibaba.fastjson.JSON.toJSONString; -import static org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader; +import static java.lang.String.format; +import static java.nio.channels.FileChannel.open; +import static org.apache.dubbo.common.config.configcenter.file.FileSystemDynamicConfiguration.CONFIG_CENTER_DIR_PARAM_NAME; +import static org.apache.dubbo.config.DubboShutdownHook.getDubboShutdownHook; /** * File System {@link ServiceDiscovery} implementation @@ -36,6 +55,10 @@ */ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListener { + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final Map fileLocksCache = new ConcurrentHashMap<>(); + private FileSystemDynamicConfiguration dynamicConfiguration; @Override @@ -46,27 +69,96 @@ public void onEvent(ServiceInstancesChangedEvent event) { @Override public void initialize(URL registryURL) throws Exception { dynamicConfiguration = createDynamicConfiguration(registryURL); + registerDubboShutdownHook(); + registerListener(); + } + + private void registerDubboShutdownHook() { + getDubboShutdownHook().addCallback(this::destroy); + getDubboShutdownHook().register(); + } + + private void registerListener() { + getServices().forEach(serviceName -> { + dynamicConfiguration.getConfigKeys(serviceName).forEach(serviceInstanceId -> { + dynamicConfiguration.addListener(serviceInstanceId, serviceName, this::onConfigChanged); + }); + }); + } + + public void onConfigChanged(ConfigChangedEvent event) { + } @Override public void destroy() throws Exception { dynamicConfiguration.close(); + releaseAndRemoveRegistrationFiles(); + } + + private void releaseAndRemoveRegistrationFiles() { + fileLocksCache.keySet().forEach(file -> { + releaseFileLock(file); + removeFile(file); + }); } - private String getConfigKey(ServiceInstance serviceInstance) { - return serviceInstance.getId(); + private void removeFile(File file) { + FileUtils.deleteQuietly(file); } - private String getConfigGroup(ServiceInstance serviceInstance) { + private String getServiceInstanceId(ServiceInstance serviceInstance) { + String id = serviceInstance.getId(); + if (StringUtils.isBlank(id)) { + return serviceInstance.getHost() + "." + serviceInstance.getPort(); + } + return id; + } + + private String getServiceName(ServiceInstance serviceInstance) { return serviceInstance.getServiceName(); } + @Override + public List getInstances(String serviceName) { + return dynamicConfiguration.getConfigKeys(serviceName) + .stream() + .map(serviceInstanceId -> dynamicConfiguration.getConfig(serviceInstanceId, serviceName)) + .map(content -> JSON.parseObject(content, DefaultServiceInstance.class)) + .collect(Collectors.toList()); + } + @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { - String key = getConfigKey(serviceInstance); - String group = getConfigGroup(serviceInstance); + String serviceInstanceId = getServiceInstanceId(serviceInstance); + String serviceName = getServiceName(serviceInstance); String content = toJSONString(serviceInstance); - dynamicConfiguration.publishConfig(key, group, content); + if (dynamicConfiguration.publishConfig(serviceInstanceId, serviceName, content)) { + lockFile(serviceInstanceId, serviceName); + } + } + + private void lockFile(String serviceInstanceId, String serviceName) { + File serviceInstanceFile = serviceInstanceFile(serviceInstanceId, serviceName); + Path serviceInstanceFilePath = serviceInstanceFile.toPath(); + + fileLocksCache.computeIfAbsent(serviceInstanceFile, file -> { + FileLock fileLock = null; + try { + FileChannel fileChannel = open(serviceInstanceFilePath, StandardOpenOption.READ, StandardOpenOption.WRITE, LinkOption.NOFOLLOW_LINKS); + fileLock = fileChannel.tryLock(); + } catch (IOException e) { + if (logger.isErrorEnabled()) { + logger.error(e.getMessage(), e); + } + } + if (fileLock != null) { + if (logger.isInfoEnabled()) { + logger.info(format("%s has been locked", serviceInstanceFilePath.toAbsolutePath())); + } + } + return fileLock; + }); } @Override @@ -76,25 +168,48 @@ public void update(ServiceInstance serviceInstance) throws RuntimeException { @Override public void unregister(ServiceInstance serviceInstance) throws RuntimeException { - String key = getConfigKey(serviceInstance); - String group = getConfigGroup(serviceInstance); + String key = getServiceInstanceId(serviceInstance); + String group = getServiceName(serviceInstance); + releaseFileLock(key, group); dynamicConfiguration.removeConfig(key, group); } - @Override - public Set getServices() { - return null; + private void releaseFileLock(String serviceInstanceId, String serviceName) { + File serviceInstanceFile = serviceInstanceFile(serviceInstanceId, serviceName); + releaseFileLock(serviceInstanceFile); } - @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws - NullPointerException, IllegalArgumentException { + private void releaseFileLock(File serviceInstanceFile) { + fileLocksCache.computeIfPresent(serviceInstanceFile, (f, fileLock) -> { + releaseFileLock(fileLock); + if (logger.isInfoEnabled()) { + logger.info(format("The file[%s] has been released", serviceInstanceFile.getAbsolutePath())); + } + return null; + }); + } + + private void releaseFileLock(FileLock fileLock) { + try (FileChannel fileChannel = fileLock.channel()) { + fileLock.release(); + } catch (IOException e) { + if (logger.isErrorEnabled()) { + logger.error(e.getMessage(), e); + } + } + } + private File serviceInstanceFile(String serviceInstanceId, String serviceName) { + return dynamicConfiguration.configFile(serviceInstanceId, serviceName); + } + + @Override + public Set getServices() { + return dynamicConfiguration.getConfigGroups(); } private static FileSystemDynamicConfiguration createDynamicConfiguration(URL connectionURL) { - String protocol = connectionURL.getProtocol(); - DynamicConfigurationFactory factory = getExtensionLoader(DynamicConfigurationFactory.class).getExtension(protocol); - return (FileSystemDynamicConfiguration) factory.getDynamicConfiguration(connectionURL); + String path = System.getProperty("user.home") + File.separator + ".dubbo" + File.separator + "registry"; + return new FileSystemDynamicConfiguration(connectionURL.addParameter(CONFIG_CENTER_DIR_PARAM_NAME, path)); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java index 3381d5a562a..bf33cba2613 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java @@ -20,6 +20,8 @@ import org.apache.dubbo.common.extension.SPI; import org.apache.dubbo.common.lang.Prioritized; import org.apache.dubbo.common.utils.Page; +import org.apache.dubbo.event.EventDispatcher; +import org.apache.dubbo.event.EventListener; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import java.util.LinkedHashMap; @@ -30,6 +32,7 @@ import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; +import static org.apache.dubbo.event.EventDispatcher.getDefaultExtension; /** * The common operations of Service Discovery @@ -190,14 +193,18 @@ default Map> getInstances(Iterable service /** * Add an instance of {@link ServiceInstancesChangedListener} for specified service + *

+ * Default, the ServiceInstancesChangedListener will be {@link EventDispatcher#addEventListener(EventListener) added} + * into {@link EventDispatcher} * - * @param serviceName the service name - * @param listener an instance of {@link ServiceInstancesChangedListener} + * @param listener an instance of {@link ServiceInstancesChangedListener} * @throws NullPointerException * @throws IllegalArgumentException */ - void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) - throws NullPointerException, IllegalArgumentException; + default void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) + throws NullPointerException, IllegalArgumentException { + getDefaultExtension().addEventListener(listener); + } // ==================================================================================== // diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index fd86ab62375..0a39fb13496 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -27,6 +27,8 @@ import org.apache.dubbo.metadata.store.RemoteWritableMetadataService; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; +import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxyFactory; import org.apache.dubbo.registry.client.selector.ServiceInstanceSelector; import org.apache.dubbo.registry.support.FailbackRegistry; @@ -275,8 +277,12 @@ protected void subscribeURLs(URL url, NotifyListener listener, String serviceNam subscribeURLs(url, listener, serviceName, serviceInstances); // Add Listener - serviceDiscovery.addServiceInstancesChangedListener(serviceName, event -> { - subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances())); + serviceDiscovery.addServiceInstancesChangedListener(new ServiceInstancesChangedListener(serviceName) { + + @Override + public void onEvent(ServiceInstancesChangedEvent event) { + subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances())); + } }); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java index 251372d033e..a5edd08f7ac 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.registry.client; +import java.io.Serializable; import java.util.Map; /** @@ -24,7 +25,7 @@ * * @since 2.7.4 */ -public interface ServiceInstance { +public interface ServiceInstance extends Serializable { /** * The id of the registered service instance. diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java index 4db3688aec0..008c3f7cf1e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java @@ -16,21 +16,47 @@ */ package org.apache.dubbo.registry.client.event.listener; +import org.apache.dubbo.event.ConditionalEventListener; import org.apache.dubbo.event.EventListener; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; +import java.util.Objects; + /** * The Service Discovery Changed {@link EventListener Event Listener} * * @see ServiceInstancesChangedEvent * @since 2.7.4 */ -public interface ServiceInstancesChangedListener extends EventListener { +public abstract class ServiceInstancesChangedListener implements ConditionalEventListener { + + private final String serviceName; + + protected ServiceInstancesChangedListener(String serviceName) { + this.serviceName = serviceName; + } /** * On {@link ServiceInstancesChangedEvent the service instances change event} * * @param event {@link ServiceInstancesChangedEvent} */ - void onEvent(ServiceInstancesChangedEvent event); + public abstract void onEvent(ServiceInstancesChangedEvent event); + + /** + * Get the correlative service name + * + * @return the correlative service name + */ + public final String getServiceName() { + return serviceName; + } + + /** + * @param event {@link ServiceInstancesChangedEvent event} + * @return If service name matches, return true, or false + */ + public final boolean accept(ServiceInstancesChangedEvent event) { + return Objects.equals(getServiceName(), event.getServiceName()); + } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/AbstractConfiguratorListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/AbstractConfiguratorListener.java index cf774ab83f8..53fa81a0011 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/AbstractConfiguratorListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/AbstractConfiguratorListener.java @@ -16,8 +16,8 @@ */ package org.apache.dubbo.registry.integration; -import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent; import org.apache.dubbo.common.config.configcenter.ConfigChangeType; +import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.extension.ExtensionLoader; @@ -50,16 +50,16 @@ protected final void initWith(String key) { } @Override - public void process(ConfigChangeEvent event) { + public void process(ConfigChangedEvent event) { if (logger.isInfoEnabled()) { logger.info("Notification of overriding rule, change type is: " + event.getChangeType() + - ", raw config content is:\n " + event.getValue()); + ", raw config content is:\n " + event.getContent()); } if (event.getChangeType().equals(ConfigChangeType.DELETED)) { configurators.clear(); } else { - if (!genConfiguratorsFromRawRule(event.getValue())) { + if (!genConfiguratorsFromRawRule(event.getContent())) { return; } } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java index c48bfa97584..93b0be84b86 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static java.lang.String.valueOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -33,7 +34,7 @@ public class DefaultServiceInstanceTest { public DefaultServiceInstance instance; public static DefaultServiceInstance createInstance() { - DefaultServiceInstance instance = new DefaultServiceInstance("A", "127.0.0.1", 8080); + DefaultServiceInstance instance = new DefaultServiceInstance(valueOf(System.nanoTime()), "A", "127.0.0.1", 8080); instance.getMetadata().put("dubbo.metadata-service.urls", "[ \"dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false®ister=true&release=2.7.3&revision=1.0.0&side=provider×tamp=1564826098503&version=1.0.0\" ]"); instance.getMetadata().put("dubbo.metadata-service.url-params", "{\"dubbo\":{\"application\":\"dubbo-provider-demo\",\"deprecated\":\"false\",\"group\":\"dubbo-provider-demo\",\"version\":\"1.0.0\",\"timestamp\":\"1564845042651\",\"dubbo\":\"2.0.2\",\"provider.host\":\"192.168.0.102\",\"provider.port\":\"20880\"}}"); return instance; diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java new file mode 100644 index 00000000000..2cb4c226e53 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.client; + +import org.apache.dubbo.common.URLBuilder; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.createInstance; + +/** + * {@link FileSystemServiceDiscovery} Test + * + * @since 2.7.4 + */ +public class FileSystemServiceDiscoveryTest { + + private FileSystemServiceDiscovery serviceDiscovery; + + private ServiceInstance serviceInstance; + + @BeforeEach + public void init() throws Exception { + serviceDiscovery = new FileSystemServiceDiscovery(); + serviceDiscovery.initialize(new URLBuilder().build()); + serviceInstance = createInstance(); + } + + @AfterEach + public void destroy() throws Exception { + serviceDiscovery.destroy(); + serviceInstance = null; + } + + @Test + public void testRegisterAndUnregister() { + + serviceDiscovery.register(serviceInstance); + + serviceDiscovery.unregister(serviceInstance); + + serviceDiscovery.register(serviceInstance); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java index 6f7c3ea4330..83ca8e6c188 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.utils.DefaultPage; import org.apache.dubbo.common.utils.Page; import org.apache.dubbo.event.EventDispatcher; -import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import java.util.ArrayList; import java.util.HashMap; @@ -103,9 +102,4 @@ public void initialize(URL registryURL) throws Exception { @Override public void destroy() { } - - @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - dispatcher.addEventListener(listener); - } } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java index 898e7324057..c4eedf1868d 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java @@ -44,7 +44,12 @@ public void testOnEvent() { AtomicReference eventRef = new AtomicReference<>(); - eventDispatcher.addEventListener((ServiceInstancesChangedListener) eventRef::set); + eventDispatcher.addEventListener(new ServiceInstancesChangedListener("test") { + @Override + public void onEvent(ServiceInstancesChangedEvent event) { + eventRef.set(event); + } + }); // Dispatch a ServiceInstancesChangedEvent eventDispatcher.dispatch(event); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/fastjson/DefaultServiceInstanceDeserializerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/fastjson/DefaultServiceInstanceDeserializerTest.java new file mode 100644 index 00000000000..291c792954c --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/fastjson/DefaultServiceInstanceDeserializerTest.java @@ -0,0 +1,58 @@ +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package org.apache.dubbo.registry.client.fastjson; +// +//import org.apache.dubbo.registry.client.DefaultServiceInstance; +// +//import com.alibaba.fastjson.JSON; +//import com.alibaba.fastjson.parser.ParserConfig; +//import org.junit.jupiter.api.BeforeAll; +//import org.junit.jupiter.api.Test; +// +///** +// * {@link DefaultServiceInstanceDeserializer} Test +// * +// * @since 2.7.4 +// */ +//public class DefaultServiceInstanceDeserializerTest { +// +// private static final String JSON_CONTENT = "{\n" + +// " \"enabled\": true,\n" + +// " \"healthy\": true,\n" + +// " \"host\": \"fe80:0:0:0:1c49:6eff:fe54:2495%7\",\n" + +// " \"metadata\": {\n" + +// " \"dubbo.metadata-service.url-params\": \"{\\\"dubbo\\\":{\\\"application\\\":\\\"dubbo-provider-demo\\\",\\\"deprecated\\\":\\\"false\\\",\\\"group\\\":\\\"dubbo-provider-demo\\\",\\\"version\\\":\\\"1.0.0\\\",\\\"timestamp\\\":\\\"1566132738256\\\",\\\"dubbo\\\":\\\"2.0.2\\\",\\\"provider.host\\\":\\\"fe80:0:0:0:1c49:6eff:fe54:2495%7\\\",\\\"provider.port\\\":\\\"20880\\\"}}\",\n" + +// " \"dubbo.subscribed-services.revision\": \"1\",\n" + +// " \"dubbo.metadata.storage-type\": \"default\",\n" + +// " \"dubbo.exported-services.revision\": \"640372560\"\n" + +// " },\n" + +// " \"port\": 20880,\n" + +// " \"serviceName\": \"dubbo-provider-demo\"\n" + +// "}"; +// +// @BeforeAll +// public static void init() { +// ParserConfig.getGlobalInstance().putDeserializer(DefaultServiceInstance.class, new DefaultServiceInstanceDeserializer()); +// } +// +// @Test +// public void testDeserialze() { +// +// DefaultServiceInstance serviceInstance = JSON.parseObject(JSON_CONTENT, DefaultServiceInstance.class); +// +// } +//} diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java index 22e44a149c8..830c23ba7cc 100644 --- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java @@ -25,7 +25,6 @@ import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; -import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; import com.ecwid.consul.v1.ConsulClient; @@ -117,11 +116,6 @@ public Set getServices() { return null; } - @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - - } - @Override public List getInstances(String serviceName) throws NullPointerException { Response> response = getHealthServices(serviceName, -1, buildWatchTimeout()); diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java index 45da96e8415..a5cff2b46ec 100644 --- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java @@ -49,8 +49,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY; - /** * 2019-07-08 */ @@ -161,8 +159,8 @@ public Set getServices() { } @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - registerServiceWatcher(serviceName); + public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { + registerServiceWatcher(listener.getServiceName()); dispatcher.addEventListener(listener); } diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java index 8863e8915c4..beb3cd19c85 100644 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java @@ -107,10 +107,10 @@ public List getInstances(String serviceName) throws NullPointer } @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) + public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { execute(namingService, service -> { - service.subscribe(serviceName, e -> { // Register Nacos EventListener + service.subscribe(listener.getServiceName(), e -> { // Register Nacos EventListener if (e instanceof NamingEvent) { NamingEvent event = (NamingEvent) e; handleEvent(event, listener); diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java index feac09358f1..772d5c9c37a 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java @@ -145,9 +145,9 @@ public Page getInstances(String serviceName, int offset, int pa } @Override - public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) + public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - registerServiceWatcher(serviceName); + registerServiceWatcher(listener.getServiceName()); dispatcher.addEventListener(listener); } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java index 12c1a1981c7..5dd975dbdfe 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java @@ -21,6 +21,8 @@ import org.apache.dubbo.event.EventDispatcher; import org.apache.dubbo.registry.client.DefaultServiceInstance; import org.apache.dubbo.registry.client.ServiceInstance; +import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; +import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.curator.test.TestingServer; import org.junit.jupiter.api.AfterEach; @@ -124,9 +126,12 @@ public void testGetInstances() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // Add Listener - discovery.addServiceInstancesChangedListener(SERVICE_NAME, event -> { - serviceInstances.addAll(event.getServiceInstances()); - latch.countDown(); + discovery.addServiceInstancesChangedListener(new ServiceInstancesChangedListener(SERVICE_NAME) { + @Override + public void onEvent(ServiceInstancesChangedEvent event) { + serviceInstances.addAll(event.getServiceInstances()); + latch.countDown(); + } }); discovery.register(createServiceInstance(SERVICE_NAME, LOCALHOST, 8082));