diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 0329e73ba60..1f5fe6e2d5f 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -103,6 +103,7 @@
2.12.0
2.9.0
1.4.2
+ 2.0.0
1.3.6
3.1.15
0.8.0
@@ -231,6 +232,11 @@
consul-api
${consul_version}
+
+ com.pszymczyk.consul
+ embedded-consul
+ ${consul_process_version}
+
com.googlecode.xmemcached
xmemcached
diff --git a/dubbo-registry/dubbo-registry-consul/pom.xml b/dubbo-registry/dubbo-registry-consul/pom.xml
index ec32dbe4340..0b40fbf0b06 100644
--- a/dubbo-registry/dubbo-registry-consul/pom.xml
+++ b/dubbo-registry/dubbo-registry-consul/pom.xml
@@ -36,6 +36,11 @@
com.ecwid.consul
consul-api
+
+ com.pszymczyk.consul
+ embedded-consul
+ test
+
diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
index 72f7ff43b86..f498a67c6e9 100644
--- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
+++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java
@@ -32,10 +32,12 @@
import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
+import org.apache.dubbo.rpc.RpcException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -154,6 +156,24 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
notifier.stop();
}
+ @Override
+ public List lookup(URL url) {
+ if (url == null) {
+ throw new IllegalArgumentException("lookup url == null");
+ }
+ try {
+ String service = url.getServiceKey();
+ Response> result = client.getHealthServices(service, HealthServicesRequest.newBuilder().setTag(SERVICE_TAG).build());
+ if (result == null || result.getValue() == null || result.getValue().isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ return convert(result.getValue());
+ }
+ } catch (Throwable e) {
+ throw new RpcException("Failed to lookup " + url + " from consul " + getUrl() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
@Override
public boolean isAvailable() {
return client.getAgentSelf() != null;
diff --git a/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java b/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
new file mode 100644
index 00000000000..08203f37b5d
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.consul;
+
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.status.Status;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.status.RegistryStatusChecker;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class ConsulRegistryTest {
+
+ private static ConsulProcess consul;
+ private ConsulRegistry consulRegistry;
+ private String service = "org.apache.dubbo.test.injvmServie";
+ private URL serviceUrl = URL.valueOf("consul://127.0.0.1:8012/" + service + "?notify=false&methods=test1,test2");
+ private URL registryUrl;
+ private ConsulRegistryFactory consulRegistryFactory;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ this.consul = ConsulStarterBuilder.consulStarter()
+ .build()
+ .start();
+ this.registryUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort());
+
+ consulRegistryFactory = new ConsulRegistryFactory();
+ this.consulRegistry = (ConsulRegistry) consulRegistryFactory.createRegistry(registryUrl);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ consul.close();
+ this.consulRegistry.destroy();
+ }
+
+ @Test
+ public void testRegister() {
+ Set registered;
+
+ for (int i = 0; i < 2; i++) {
+ consulRegistry.register(serviceUrl);
+ registered = consulRegistry.getRegistered();
+ assertThat(registered.contains(serviceUrl), is(true));
+ }
+
+ registered = consulRegistry.getRegistered();
+
+ assertThat(registered.size(), is(1));
+ }
+
+ @Test
+ public void testSubscribe() {
+ NotifyListener listener = mock(NotifyListener.class);
+ consulRegistry.subscribe(serviceUrl, listener);
+
+ Map> subscribed = consulRegistry.getSubscribed();
+ assertThat(subscribed.size(), is(1));
+ assertThat(subscribed.get(serviceUrl).size(), is(1));
+
+ consulRegistry.unsubscribe(serviceUrl, listener);
+ subscribed = consulRegistry.getSubscribed();
+ assertThat(subscribed.size(), is(1));
+ assertThat(subscribed.get(serviceUrl).size(), is(0));
+ }
+
+ @Test
+ public void testAvailable() {
+ consulRegistry.register(serviceUrl);
+ assertThat(consulRegistry.isAvailable(), is(true));
+
+// consulRegistry.destroy();
+// assertThat(consulRegistry.isAvailable(), is(false));
+ }
+
+ @Test
+ public void testLookup() throws InterruptedException {
+ List lookup = consulRegistry.lookup(serviceUrl);
+ assertThat(lookup.size(), is(0));
+
+ consulRegistry.register(serviceUrl);
+ Thread.sleep(5000);
+ lookup = consulRegistry.lookup(serviceUrl);
+ assertThat(lookup.size(), is(1));
+ }
+
+ @Test
+ public void testStatusChecker() {
+ RegistryStatusChecker registryStatusChecker = new RegistryStatusChecker();
+ Status status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.UNKNOWN));
+
+ Registry registry = consulRegistryFactory.getRegistry(registryUrl);
+ assertThat(registry, not(nullValue()));
+
+ status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.OK));
+
+ registry.register(serviceUrl);
+ status = registryStatusChecker.check();
+ assertThat(status.getLevel(), is(Status.Level.OK));
+ }
+
+}