Skip to content

Commit

Permalink
Merge pull request #12 from Synex-wh/fix_online_connect
Browse files Browse the repository at this point in the history
fix online notify connect error
  • Loading branch information
atellwu authored Aug 9, 2019
2 parents b12cb94 + 788de7f commit 003e487
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.google.common.collect.Sets;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
Expand Down Expand Up @@ -249,13 +250,15 @@ private void updateDataServerStatus() {
Map<String, LocalServerStatusEnum> map = nodeStatusMap.get(curVersion.get());
if (map != null) {
Set<String> ips = map.keySet();
if (!ips.containsAll(newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet())) {
LOGGER.info(
"nodeStatusMap not contains all push list,nodeStatusMap {} push {}",
nodeStatusMap,
newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet());
Set<String> itemIps = newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet();
if (!ips.containsAll(itemIps)) {

LOGGER
.info(
"nodeStatusMap not contains all push list,nodeStatusMap {},push {},diff1{},diff2{}",
nodeStatusMap, itemIps, Sets.difference(ips, itemIps),
Sets.difference(itemIps, ips));
return;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private void connectDataServer(String dataCenter, String ip) {
"[DataServerChangeEventHandler] connect dataserver %s in %s failed five times,dataServer will not work,please check connect!",
ip, dataCenter));
}
LOGGER.info(
"[DataServerChangeEventHandler] connect dataserver in {} success,remote={},local={}",
dataCenter, conn.getRemoteAddress(), conn.getLocalAddress());
//maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute
DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.registry.server.data.event.handler;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyFetchDatumRequest;
Expand All @@ -25,6 +26,7 @@
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.bolt.BoltChannel;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.BackupTriad;
Expand Down Expand Up @@ -91,6 +93,8 @@ public class LocalDataServerChangeEventHandler extends

private AtomicBoolean isChanged = new AtomicBoolean(false);

private static final int TRY_COUNT = 5;

@Override
public Class interest() {
return LocalDataServerChangeEvent.class;
Expand Down Expand Up @@ -358,13 +362,33 @@ private void notifyOnline(long changeVersion) {
for (Entry<String, DataServerNode> serverEntry : dataServerNodeMap.entrySet()) {
while (true) {
String ip = serverEntry.getKey();
DataServerNode dataServerNode = serverEntry.getValue();
DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode(
dataServerConfig.getLocalDataCenter(), ip);

if (dataServerNode == null) {
LOGGER
.warn(
"notify Online dataserver {} has not existed in DataServerNodeFactory!version={}",
ip, changeVersion);
break;
}
try {
if (dataServerNode.getConnection() == null
|| !dataServerNode.getConnection().isFine()) {
final Connection connection = dataServerNode.getConnection();
if (connection == null || !connection.isFine()) {
if (connection == null) {
LOGGER
.warn(
"notify Online dataserver connect not existed,ip={},version={}",
ip, changeVersion);
} else {
LOGGER
.warn(
"notify Online dataserver connect not fine!remote={},local={},version={}",
connection.getRemoteAddress(),
connection.getLocalAddress(), changeVersion);
}
//connect now and registry connect
connectDataServer(dataServerConfig.getLocalDataCenter(), ip);
//maybe get dataNode from metaServer,current has not connected!wait for connect task execute
TimeUtil.randomDelay(1000);
continue;
Expand All @@ -380,8 +404,8 @@ public Object getRequestBody() {

@Override
public URL getRequestUrl() {
return new URL(dataServerNode.getConnection().getRemoteIP(),
dataServerNode.getConnection().getRemotePort());
return new URL(connection.getRemoteIP(), connection
.getRemotePort());
}
}).getResult();
if (response.isSuccess()) {
Expand All @@ -400,5 +424,44 @@ public URL getRequestUrl() {
}
}

/**
* connect specific dataserver
*
* @param dataCenter
* @param ip
*/
private void connectDataServer(String dataCenter, String ip) {
Connection conn = null;
for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
try {
conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig
.getSyncDataPort()))).getConnection();
break;
} catch (Exception e) {
LOGGER.error(
"[LocalDataServerChangeEventHandler] connect dataServer {} in {} error",
ip, dataCenter, e);
TimeUtil.randomDelay(3000);
}
}
if (conn == null || !conn.isFine()) {
LOGGER
.error(
"[LocalDataServerChangeEventHandler] connect dataserver {} in {} failed five times",
ip, dataCenter);
throw new RuntimeException(
String
.format(
"[LocalDataServerChangeEventHandler] connect dataserver %s in %s failed five times,dataServer will not work,please check connect!",
ip, dataCenter));
}
LOGGER
.info(
"[LocalDataServerChangeEventHandler] connect dataserver in {} success,remote={},local={}",
dataCenter, conn.getRemoteAddress(), conn.getLocalAddress());
//maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute
DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn),
dataServerConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package com.alipay.sofa.registry.server.data.remoting.dataserver;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.google.common.collect.Lists;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -24,12 +30,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.google.common.collect.Lists;

/**
* the factory to hold other dataservers and connection connected to them
*
Expand Down Expand Up @@ -61,10 +61,15 @@ public class DataServerNodeFactory {
*/
public static void register(DataServerNode dataServerNode, DataServerConfig dataServerConfig) {
String dataCenter = dataServerNode.getDataCenter();
if (!MAP.containsKey(dataCenter)) {
MAP.put(dataCenter, new ConcurrentHashMap<>());
Map<String, DataServerNode> dataMap = MAP.get(dataCenter);
if (dataMap == null) {
Map<String, DataServerNode> newMap = new ConcurrentHashMap<>();
dataMap = MAP.putIfAbsent(dataCenter, newMap);
if (dataMap == null) {
dataMap = newMap;
}
}
MAP.get(dataCenter).put(dataServerNode.getIp(), dataServerNode);
dataMap.put(dataServerNode.getIp(), dataServerNode);
refreshConsistent(dataCenter, dataServerConfig);
}

Expand Down

0 comments on commit 003e487

Please sign in to comment.