Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使redis支持password。 #135

Merged
merged 2 commits into from
May 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.dubbo.common.utils.StringUtils;
import org.apache.commons.pool.impl.GenericObjectPool;

import redis.clients.jedis.Jedis;
Expand All @@ -48,10 +49,11 @@
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.rpc.RpcException;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* RedisRegistry
*
*
* @author william.liangf
*/
public class RedisRegistry extends FailbackRegistry {
Expand All @@ -65,26 +67,26 @@ public class RedisRegistry extends FailbackRegistry {
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

private final ScheduledFuture<?> expireFuture;

private final String root;

private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();

private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

private final int reconnectPeriod;

private final int expirePeriod;

private volatile boolean admin = false;

private boolean replicate;

public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
throw new IllegalStateException("registry address == null");
}
GenericObjectPool.Config config = new GenericObjectPool.Config();
config.testOnBorrow = url.getParameter("test.on.borrow", true);
config.testOnReturn = url.getParameter("test.on.return", false);
Expand All @@ -103,19 +105,22 @@ public RedisRegistry(URL url) {
config.timeBetweenEvictionRunsMillis = url.getParameter("time.between.eviction.runs.millis", 0);
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.minEvictableIdleTimeMillis = url.getParameter("min.evictable.idle.time.millis", 0);

String cluster = url.getParameter("cluster", "failover");
if (! "failover".equals(cluster) && ! "replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);

List<String> addresses = new ArrayList<String>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
addresses.addAll(Arrays.asList(backups));
}

// 增加Redis密码支持
String password = url.getPassword();
for (String address : addresses) {
int i = address.indexOf(':');
String host;
Expand All @@ -127,10 +132,16 @@ public RedisRegistry(URL url) {
host = address;
port = DEFAULT_REDIS_PORT;
}
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
if (StringUtils.isEmpty(password)) {
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
} else {
// 使用密码连接。 此处要求备用redis与主要redis使用相同的密码
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), password));
}
}

this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
Expand All @@ -140,7 +151,7 @@ public RedisRegistry(URL url) {
group = group + Constants.PATH_SEPARATOR;
}
this.root = group;

this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
Expand All @@ -152,10 +163,11 @@ public void run() {
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
boolean isBroken = false;
try {
Jedis jedis = jedisPool.getResource();
try {
Expand All @@ -170,18 +182,24 @@ private void deferExpired() {
if (admin) {
clean(jedis);
}
if (! replicate) {
break;//  如果服务器端已同步数据,只需写入单台机器
if (!replicate) {
break;// 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}

// 监控中心负责删除过期脏数据
private void clean(Jedis jedis) {
Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
Expand All @@ -202,7 +220,7 @@ private void clean(Jedis jedis) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
}
}
if (delete) {
jedis.publish(key, Constants.UNREGISTER);
Expand All @@ -214,16 +232,20 @@ private void clean(Jedis jedis) {

public boolean isAvailable() {
for (JedisPool jedisPool : jedisPools.values()) {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
Jedis jedis = jedisPool.getResource();
try {
if (jedis.isConnected()) {
return true; // 至少需单台机器可用
}
} finally {
if (jedis.isConnected()) {
return true; // 至少需单台机器可用
}
} catch (JedisConnectionException e) {
isBroken = true;
} finally {
if (isBroken) {
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
}
}
return false;
Expand Down Expand Up @@ -265,15 +287,22 @@ public void doRegister(URL url) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (! replicate) {
break; //  如果服务器端已同步数据,只需写入单台机器
break; // 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand All @@ -298,15 +327,22 @@ public void doUnregister(URL url) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (! replicate) {
break; //  如果服务器端已同步数据,只需写入单台机器
break; // 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand All @@ -320,7 +356,7 @@ public void doUnregister(URL url) {
}
}
}

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
Expand All @@ -339,6 +375,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Expand All @@ -363,8 +400,14 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
}
success = true;
break; // 只需读一个服务器的数据
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch(Throwable t) { // 尝试下一个服务器
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand Down Expand Up @@ -470,7 +513,7 @@ private String toCategoryPath(URL url) {
}

private class NotifySub extends JedisPubSub {

private final JedisPool jedisPool;

public NotifySub(JedisPool jedisPool) {
Expand All @@ -482,14 +525,21 @@ public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
if (msg.equals(Constants.REGISTER)
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
doNotify(jedis, key);
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) { // TODO 通知失败没有恢复机制保障
logger.error(t.getMessage(), t);
Expand Down Expand Up @@ -527,23 +577,23 @@ private class Notifier extends Thread {
private volatile Jedis jedis;

private volatile boolean first = true;

private volatile boolean running = true;

private final AtomicInteger connectSkip = new AtomicInteger();

private final AtomicInteger connectSkiped = new AtomicInteger();

private final Random random = new Random();

private volatile int connectRandom;

private void resetSkip() {
connectSkip.set(0);
connectSkiped.set(0);
connectRandom = 0;
}

private boolean isSkip() {
int skip = connectSkip.get(); // 跳过次数增长
if (skip >= 10) { // 如果跳过次数增长超过10,取随机数
Expand All @@ -560,13 +610,13 @@ private boolean isSkip() {
connectRandom = 0;
return false;
}

public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}

@Override
public void run() {
while (running) {
Expand Down Expand Up @@ -618,7 +668,7 @@ public void run() {
}
}
}

public void shutdown() {
try {
running = false;
Expand All @@ -627,7 +677,7 @@ public void shutdown() {
logger.warn(t.getMessage(), t);
}
}

}

}
}
Loading