Skip to content

Commit

Permalink
Polish apache#3984 : Add Service registration and discovery implement…
Browse files Browse the repository at this point in the history
…ation for Zookeeper
  • Loading branch information
mercyblitz committed May 8, 2019
1 parent 6f136c8 commit 80140a7
Show file tree
Hide file tree
Showing 29 changed files with 1,367 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.function;

import java.util.function.Consumer;
import java.util.function.Function;

/**
* {@link Consumer} with {@link Throwable}
*
* @param <T> the source type
* @see Function
* @see Throwable
* @since 2.7.2
*/
@FunctionalInterface
public interface ThrowableConsumer<T> {

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @throws Throwable if met with any error
*/
void accept(T t) throws Throwable;

/**
* Executes {@link ThrowableConsumer}
*
* @param t the function argument
* @throws RuntimeException wrappers {@link Throwable}
*/
default void execute(T t) throws RuntimeException {
try {
accept(t);
} catch (Throwable e) {
throw new RuntimeException(e.getCause());
}
}

/**
* Executes {@link ThrowableConsumer}
*
* @param t the function argument
* @param consumer {@link ThrowableConsumer}
* @param <T> the source type
* @return the result after execution
*/
static <T> void execute(T t, ThrowableConsumer<T> consumer) {
consumer.execute(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.function;

import java.util.function.Function;

/**
* {@link Function} with {@link Throwable}
*
* @param <T> the source type
* @param <R> the return type
* @see Function
* @see Throwable
* @since 2.7.2
*/
@FunctionalInterface
public interface ThrowableFunction<T, R> {

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
* @throws Throwable if met with any error
*/
R apply(T t) throws Throwable;

/**
* Executes {@link ThrowableFunction}
*
* @param t the function argument
* @return the function result
* @throws RuntimeException wrappers {@link Throwable}
*/
default R execute(T t) throws RuntimeException {
R result = null;
try {
result = apply(t);
} catch (Throwable e) {
throw new RuntimeException(e.getCause());
}
return result;
}

/**
* Executes {@link ThrowableFunction}
*
* @param t the function argument
* @param function {@link ThrowableFunction}
* @param <T> the source type
* @param <R> the return type
* @return the result after execution
*/
static <T, R> R execute(T t, ThrowableFunction<T, R> function) {
return function.execute(t);
}
}
13 changes: 13 additions & 0 deletions dubbo-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>${curator_version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client.discovery;
package org.apache.dubbo.registry.client;

import org.apache.dubbo.registry.client.DefaultPage;
import org.apache.dubbo.registry.client.Page;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceDiscoveryChangeListener;

import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -100,6 +98,13 @@ public Page<ServiceInstance> getInstances(String serviceName, int offset, int re
return page;
}

@Override
public void registerListener(String serviceName, ServiceDiscoveryChangeListener listener) {
for (ServiceDiscovery serviceDiscovery : serviceDiscoveries) {
serviceDiscovery.registerListener(serviceName, listener);
}
}

@Override
public String toString() {
return format("%s [composite : %s]", this.getClass().getSimpleName(), valueOf(serviceDiscoveries));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public class DefaultServiceInstance implements ServiceInstance {

private final String id;

private final String serviceName;

private final String host;
Expand All @@ -39,12 +41,22 @@ public class DefaultServiceInstance implements ServiceInstance {

private Map<String, String> metadata = new HashMap<>();

public DefaultServiceInstance(String serviceName, String host, int port) {
public DefaultServiceInstance(String id, String serviceName, String host, int port) {
this.id = id;
this.serviceName = serviceName;
this.host = host;
this.port = port;
}

public DefaultServiceInstance(String serviceName, String host, int port) {
this(null, serviceName, host, port);
}

@Override
public String getId() {
return id;
}

@Override
public String getServiceName() {
return serviceName;
Expand Down Expand Up @@ -93,20 +105,22 @@ public boolean equals(Object o) {
if (!(o instanceof DefaultServiceInstance)) return false;
DefaultServiceInstance that = (DefaultServiceInstance) o;
return getPort() == that.getPort() &&
Objects.equals(getId(), that.getId()) &&
Objects.equals(getServiceName(), that.getServiceName()) &&
Objects.equals(getHost(), that.getHost()) &&
Objects.equals(getMetadata(), that.getMetadata());
}

@Override
public int hashCode() {
return Objects.hash(getServiceName(), getHost(), getPort(), getMetadata());
return Objects.hash(getId(), getServiceName(), getHost(), getPort(), getMetadata());
}

@Override
public String toString() {
return "DefaultServiceInstance{" +
"serviceName='" + serviceName + '\'' +
"id='" + id + '\'' +
", serviceName='" + serviceName + '\'' +
", host='" + host + '\'' +
", port=" + port +
", enabled=" + enabled +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client.discovery;
package org.apache.dubbo.registry.client;

import org.apache.dubbo.registry.client.DefaultPage;
import org.apache.dubbo.registry.client.Page;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceDiscoveryChangeListener;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -161,6 +159,14 @@ default Map<String, Page<ServiceInstance>> getInstances(Iterable<String> service
return unmodifiableMap(instances);
}

/**
* Register {@link ServiceDiscoveryChangeListener the service chagne event listener}
*
* @param serviceName the name of service that is required to be listened
* @param listener {@link ServiceDiscoveryChangeListener the service change event listener}
*/
void registerListener(String serviceName, ServiceDiscoveryChangeListener listener);

/**
* The priority of current {@link ServiceDiscovery}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
public interface ServiceInstance {

/**
* The name of service that current instance belongs to
* The id of the registered service instance.
*
* @return nullable
*/
String getId();

/**
* The name of service that current instance belongs to.
*
* @return non-null
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,50 @@
package org.apache.dubbo.registry.client;

/**
* The common interface to register and deregister for a service registry
* The common interface to register and unregister for a service registry
*
* @since 2.7.2
*/
public interface ServiceRegistry {

/**
* A human-readable description of the implementation
*
* @return The description.
*/
String toString();

/**
* Registers an instance of {@link ServiceInstance}.
*
* @param serviceInstance an instance of {@link ServiceInstance} to be registered
* @return If success, return <code>true</code>, or <code>false</code>
* @throws RuntimeException if failed
*/
boolean register(ServiceInstance serviceInstance);
void register(ServiceInstance serviceInstance) throws RuntimeException;

/**
* Deregisters an instance of {@link ServiceInstance}.
* Updates the registered {@link ServiceInstance}.
*
* @param serviceInstance the registered {@link ServiceInstance}
* @throws RuntimeException if failed
*/
void update(ServiceInstance serviceInstance) throws RuntimeException;

/**
* Unregisters an instance of {@link ServiceInstance}.
*
* @param serviceInstance an instance of {@link ServiceInstance} to be deregistered
* @return If success, return <code>true</code>, or <code>false</code>
* @throws RuntimeException if failed
*/
void unregister(ServiceInstance serviceInstance) throws RuntimeException;

/**
* Starts the ServiceRegistry. This is a lifecycle method.
*/
void deregister(ServiceInstance serviceInstance);
void start();

/**
* Closes the ServiceRegistry. This is a lifecycle method.
* Stops the ServiceRegistry. This is a lifecycle method.
*/
void close();
void stop();
}
Loading

0 comments on commit 80140a7

Please sign in to comment.