Skip to content

Commit

Permalink
Changes for executor actor recovery using ray's fault tolerance. (#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
KiranP-d11 authored Dec 15, 2023
1 parent 69e38ab commit 386ea3c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public static ActorHandle<RayDPExecutor> createExecutorActor(
if (placementGroup != null) {
creator.setPlacementGroup(placementGroup, bundleIndex);
}
creator.setMaxRestarts(3);
creator.setMaxTaskRetries(3);
creator.setMaxRestarts(-1);
creator.setMaxTaskRetries(-1);
creator.setMaxConcurrency(2);
return creator.remote();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ private[spark] class ApplicationInfo(
addressToExecutorId(address) = executorId
}

def kill(address: RpcAddress): Boolean = {
def kill(address: RpcAddress, shutdownActor: Boolean): Boolean = {
if (addressToExecutorId.contains(address)) {
kill(addressToExecutorId(address))
kill(addressToExecutorId(address), shutdownActor)
} else {
false
}
}

def kill(executorId: String): Boolean = {
def kill(executorId: String, shutdownActor: Boolean): Boolean = {
if (executors.contains(executorId)) {
val exec = executors(executorId)
if (exec.registered) {
Expand All @@ -116,12 +116,18 @@ private[spark] class ApplicationInfo(
removedExecutors += executors(executorId)
executors -= executorId
coresGranted -= exec.cores
// Previously we use Ray.kill(true) here, which prevents executors from restarting.
// But we want executors died accidentally to restart, so we use Ray.exitActor now.
// Because if ray actor is already dead, it probably died from node failure,
// and this method won't be executed, so it can restart.
// Otherwise, it exits intentionally here and won't restart.
RayExecutorUtils.exitExecutor(executorIdToHandler(executorId))
if (shutdownActor) {
// Previously we used to exitExecutor for all scenarios, but it will cause
// the following issue when a executor is down because of OOM issue:
// - Executor E1 dies at T0 lets say because of OOm
// - We try to kill it by firing stop call on E1 actor
// - Since the actor is not available, the stop task fails for E1
// - In the mean while, ray brings up the lost executor E1
// - The failed task (stop task) gets retried as there are task retries configured.
// - The stop task gets fired on the new executor which got recovered
// - The Recovered executor exits with status as user intended exit.
RayExecutorUtils.exitExecutor(executorIdToHandler(executorId))
}
executorIdToHandler -= executorId
true
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class RayAppMaster(host: String,
assert(appInfo != null && appInfo.id == appId)
var success = true
for (executorId <- executorIds) {
if (!appInfo.kill(executorId)) {
if (!appInfo.kill(executorId, shutdownActor = true)) {
success = false
}
}
Expand All @@ -210,7 +210,7 @@ class RayAppMaster(host: String,
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
appInfo.kill(remoteAddress)
appInfo.kill(remoteAddress, shutdownActor = false)
}

override def onStop(): Unit = {
Expand Down

0 comments on commit 386ea3c

Please sign in to comment.