Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Fix #1923, Fix chaos in collecting SSM node info
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou committed Sep 12, 2018
1 parent 6ebd540 commit ea6f083
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static void main(String[] args) throws IOException {
}
String agentAddress = AgentUtils.getAgentAddress(conf);
LOG.info("Agent address: " + agentAddress);
RegisterNewAgent.getInstance(agentAddress + "-" + getDateString());
RegisterNewAgent.getInstance("SSMAgent@" + agentAddress.replaceAll(":.*$", ""));

HadoopUtil.setSmartConfByHadoop(conf);
agent.authentication(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
public class ActiveServerInfo extends NodeInfo {
private static ActiveServerInfo inst;

private ActiveServerInfo(String id, String location, ExecutorType executorType) {
super(id, location, executorType);
private ActiveServerInfo(String id, String location) {
super(id, location, ExecutorType.LOCAL);
}

public static ActiveServerInfo getInstance() {
assert inst != null;
return inst;
}

public static void setInstance(String id, String location,
ExecutorType executorType) {
inst = new ActiveServerInfo(id, location, executorType);
public static void setInstance(String id, String location) {
inst = new ActiveServerInfo(id, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,15 @@ private boolean dispatch(LaunchCmdlet cmdlet) {
}

NodeCmdletMetrics metrics = regNodeInfos.get(nodeId);
String host = "";
if (metrics != null) {
host = metrics.getNodeInfo().getHost();
metrics.incCmdletsInExecution();
}
updateCmdActionStatus(cmdlet, host);
updateCmdActionStatus(cmdlet, nodeId);
dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType());

if (logDispResult) {
LOG.info(
String.format(
"Dispatching cmdlet->[%s] to executor service %s : %s",
cmdlet.getCmdletId(), selected.getExecutorType(), host));
LOG.info(String.format("Dispatching cmdlet->[%s] to executor: %s",
cmdlet.getCmdletId(), nodeId));
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ public HazelcastExecutorService(CmdletManager cmdletManager) {
private void initChannels() {
for (Member worker : HazelcastUtil.getWorkerMembers(instance)) {
ITopic<Serializable> topic = instance.getTopic(WORKER_TOPIC_PREFIX + worker.getUuid());
this.masterToWorkers.put(worker.getUuid(), topic);
this.masterToWorkers.put(getMemberNodeId(worker), topic);
}
}

public List<StandbyServerInfo> getStandbyServers() {
List<StandbyServerInfo> infos = new ArrayList<>();
for (Member worker : HazelcastUtil.getWorkerMembers(instance)) {
infos.add(new StandbyServerInfo(worker.getUuid(), worker.getAddress().toString()));
infos.add(new StandbyServerInfo(getMemberNodeId(worker),
worker.getAddress().getHost() + ":" + worker.getAddress().getPort()));
}
return infos;
}
Expand All @@ -102,7 +103,12 @@ public List<NodeInfo> getNodesInfo() {
}

private NodeInfo memberToNodeInfo(Member member) {
return new StandbyServerInfo(member.getUuid(), member.getAddress().toString());
return new StandbyServerInfo(getMemberNodeId(member),
member.getAddress().getHost() + ":" + member.getAddress().getPort());
}

private String getMemberNodeId(Member member) {
return "StandbySSMServer@" + member.getAddress().getHost();
}

@Override
Expand Down Expand Up @@ -155,20 +161,22 @@ public ClusterMembershipListener(HazelcastInstance instance) {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
Member worker = membershipEvent.getMember();
if (!masterToWorkers.containsKey(worker.getUuid())) {
String id = getMemberNodeId(worker);
if (!masterToWorkers.containsKey(id)) {
ITopic<Serializable> topic = instance.getTopic(WORKER_TOPIC_PREFIX + worker.getUuid());
masterToWorkers.put(worker.getUuid(), topic);
members.put(worker.getUuid(), worker);
masterToWorkers.put(id, topic);
members.put(id, worker);
EngineEventBus.post(new AddNodeMessage(memberToNodeInfo(worker)));
}
}

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
Member member = membershipEvent.getMember();
if (masterToWorkers.containsKey(member.getUuid())) {
masterToWorkers.get(member.getUuid()).destroy();
members.remove(member.getUuid());
String id = getMemberNodeId(member);
if (masterToWorkers.containsKey(id)) {
masterToWorkers.get(id).destroy();
members.remove(id);
EngineEventBus.post(new RemoveNodeMessage(memberToNodeInfo(member)));
}
//Todo: recover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

public class LocalCmdletExecutorService extends CmdletExecutorService implements StatusReporter {
private static final Logger LOG = LoggerFactory.getLogger(LocalCmdletExecutorService.class);
private static final String ACTIVE_SERVER_ID = "ActiveSSMServer";
private static final String ACTIVE_SERVER_ID = "ActiveSSMServer@";
private SmartConf conf;
private CmdletFactory cmdletFactory;
private CmdletExecutor cmdletExecutor;
Expand All @@ -61,7 +61,8 @@ public LocalCmdletExecutorService(SmartConf smartConf, CmdletManager cmdletManag
this.executorService.scheduleAtFixedRate(
statusReportTask, 1000, reportPeriod, TimeUnit.MILLISECONDS);

ActiveServerInfo.setInstance(ACTIVE_SERVER_ID, getActiveServerAddress(), ExecutorType.LOCAL);
ActiveServerInfo.setInstance(ACTIVE_SERVER_ID + getActiveServerAddress(),
getActiveServerAddress());
EngineEventBus.post(new AddNodeMessage(ActiveServerInfo.getInstance()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static String getFullPath(ActorSystem system, ActorPath path) {
}

public static String getHostPort(ActorRef ref) {
return ref.path().address().hostPort();
return ref.path().address().hostPort().replaceFirst("^.*@", "");
}

public static Cancellable repeatActionUntil(ActorSystem system,
Expand Down

0 comments on commit ea6f083

Please sign in to comment.