Skip to content

Commit

Permalink
[CELEBORN-567] Timeout workers/app need consider long leader election…
Browse files Browse the repository at this point in the history
… period (apache#1474)
  • Loading branch information
RexXiong authored May 6, 2023
1 parent c0a9578 commit 78a32fe
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class HAHelper {

public static boolean checkShouldProcess(
RpcCallContext context, AbstractMetaManager masterStatusSystem) {
if ((masterStatusSystem instanceof HAMasterMetaManager)) {
HARaftServer ratisServer = ((HAMasterMetaManager) masterStatusSystem).getRatisServer();
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
if (ratisServer.isLeader()) {
return true;
}
Expand All @@ -56,6 +56,33 @@ public static boolean checkShouldProcess(
return true;
}

public static long getWorkerTimeoutDeadline(AbstractMetaManager masterStatusSystem) {
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
return ratisServer.getWorkerTimeoutDeadline();
} else {
return -1;
}
}

public static long getAppTimeoutDeadline(AbstractMetaManager masterStatusSystem) {
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
return ratisServer.getAppTimeoutDeadline();
} else {
return -1;
}
}

public static HARaftServer getRatisServer(AbstractMetaManager masterStatusSystem) {
if ((masterStatusSystem instanceof HAMasterMetaManager)) {
HARaftServer ratisServer = ((HAMasterMetaManager) masterStatusSystem).getRatisServer();
return ratisServer;
}

return null;
}

public static ByteString convertRequestToByteString(ResourceProtos.ResourceRequest request) {
byte[] requestBytes = request.toByteArray();
return ByteString.copyFrom(requestBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ static long nextCallId() {
private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
private Optional<String> cachedLeaderPeerRpcEndpoint = Optional.empty();
private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;

/**
* Returns an Master Ratis server.
Expand All @@ -116,7 +119,9 @@ private HARaftServer(
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
this.conf = conf;
RaftProperties serverProperties = newRaftProperties(conf);
setDeadlineTime(Integer.MAX_VALUE, Integer.MAX_VALUE); // for default
this.server =
RaftServer.newBuilder()
.setServerId(this.raftPeerId)
Expand Down Expand Up @@ -481,6 +486,22 @@ public void updateServerRole() {
private void setServerRole(RaftProtos.RaftPeerRole currentRole, String leaderPeerRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
if (RaftProtos.RaftPeerRole.LEADER == currentRole && !checkCachedPeerRoleIsLeader()) {
leaderChanged = true;
setDeadlineTime(conf.workerHeartbeatTimeout(), conf.appHeartbeatTimeoutMs());
} else if (RaftProtos.RaftPeerRole.LEADER != currentRole && checkCachedPeerRoleIsLeader()) {
leaderChanged = true;
setDeadlineTime(Integer.MAX_VALUE, Integer.MAX_VALUE); // for revoke
}

if (leaderChanged) {
LOG.warn(
"Raft Role changed, CurrentNode Role: {}, Leader: {}",
currentRole,
leaderPeerRpcEndpoint);
}

this.cachedPeerRole = Optional.ofNullable(currentRole);
this.cachedLeaderPeerRpcEndpoint = Optional.ofNullable(leaderPeerRpcEndpoint);
} finally {
Expand Down Expand Up @@ -522,4 +543,17 @@ void stepDown() {
LOG.warn("Step down leader failed!", e);
}
}

public void setDeadlineTime(long increaseWorkerTime, long increaseAppTime) {
this.workerTimeoutDeadline = System.currentTimeMillis() + increaseWorkerTime;
this.appTimeoutDeadline = System.currentTimeMillis() + increaseAppTime;
}

public long getWorkerTimeoutDeadline() {
return workerTimeoutDeadline;
}

public long getAppTimeoutDeadline() {
return appTimeoutDeadline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ private[celeborn] class Master(

private def timeoutDeadWorkers() {
val currentTime = System.currentTimeMillis()
// Need increase timeout deadline to avoid long time leader election period
if (HAHelper.getWorkerTimeoutDeadline(statusSystem) > currentTime) {
return
}

var ind = 0
workersSnapShot.asScala.foreach { worker =>
if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
Expand All @@ -350,6 +355,10 @@ private[celeborn] class Master(

private def timeoutDeadApplications(): Unit = {
val currentTime = System.currentTimeMillis()
// Need increase timeout deadline to avoid long time leader election period
if (HAHelper.getAppTimeoutDeadline(statusSystem) > currentTime) {
return
}
statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
if (statusSystem.appHeartbeatTime.get(key) < currentTime - appHeartbeatTimeoutMs) {
logWarning(s"Application $key timeout, trigger applicationLost event.")
Expand Down

0 comments on commit 78a32fe

Please sign in to comment.