Skip to content

Commit

Permalink
[SPARK-16554][CORE] Automatically Kill Executors and Nodes when they …
Browse files Browse the repository at this point in the history
…are Blacklisted

## What changes were proposed in this pull request?

In SPARK-8425, we introduced a mechanism for blacklisting executors and nodes (hosts). After a certain number of failures, these resources would be "blacklisted" and no further work would be assigned to them for some period of time.

In some scenarios, it is better to fail fast, and to simply kill these unreliable resources. This changes proposes to do so by having the BlacklistTracker kill unreliable resources when they would otherwise be "blacklisted".

In order to be thread safe, this code depends on the CoarseGrainedSchedulerBackend sending a message to the driver backend in order to do the actual killing. This also helps to prevent a race which would permit work to begin on a resource (executor or node), between the time the resource is marked for killing and the time at which it is finally killed.

## How was this patch tested?

./dev/run-tests
Ran https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh, and checked logs to see executors and nodes being killed.

Testing can likely be improved here; suggestions welcome.

Author: José Hiram Soltren <[email protected]>

Closes apache#16650 from jsoltren/SPARK-16554-submit.
  • Loading branch information
José Hiram Soltren authored and Thomas Graves committed May 24, 2017
1 parent 426b9e3 commit 8a270a9
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,30 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones, default false
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(executorIds: Seq[String]): Seq[String]
def killExecutors(
executorIds: Seq[String],
replace: Boolean = false,
force: Boolean = false): Seq[String]

/**
* Request that the cluster manager kill every executor on the specified host.
* Results in a call to killExecutors for each executor on the host, with the replace
* and force arguments set to true.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutorsOnHost(host: String): Boolean

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_KILL_ENABLED =
ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
.booleanConf
.createWithDefault(false)

private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
allocationClient: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {

def this(sc: SparkContext) = {
this(sc.listenerBus, sc.conf)
def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = {
this(sc.listenerBus, sc.conf, allocationClient)
}

BlacklistTracker.validateBlacklistConfs(conf)
Expand Down Expand Up @@ -173,6 +174,17 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
allocationClient.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
Expand All @@ -187,6 +199,19 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (allocationClient.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,11 @@ private[spark] object TaskSchedulerImpl {

private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
Some(new BlacklistTracker(sc))
val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
case b: ExecutorAllocationClient => Some(b)
case _ => None
}
Some(new BlacklistTracker(sc, executorAllocClient))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage

case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage

sealed trait RegisterExecutorResponse

case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case KillExecutorsOnHost(host) =>
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
}
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand All @@ -153,6 +158,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
Expand Down Expand Up @@ -527,15 +540,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager kill the specified executors.
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}

/**
* Request that the cluster manager kill the specified executors.
*
Expand All @@ -544,12 +548,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
* @param force whether to force kill busy executors
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
* @param replace whether to replace the killed executors with new ones, default false
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
final def killExecutors(
final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
Expand Down Expand Up @@ -605,6 +608,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager kill all executors on a given host.
* @return whether the kill request is acknowledged.
*/
final override def killExecutorsOnHost(host: String): Boolean = {
logInfo(s"Requesting to kill any and all executors on host ${host}")
// A potential race exists if a new executor attempts to register on a host
// that is on the blacklist and is no no longer valid. To avoid this race,
// all executor registration and killing happens in the event loop. This way, either
// an executor will fail to register, or will be killed when all executors on a host
// are killed.
// Kill all the executors on this host in an event loop to ensure serialization.
driverEndpoint.send(KillExecutorsOnHost(host))
true
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,10 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def requestExecutors(numAdditionalExecutors: Int): Boolean =
sc.requestExecutors(numAdditionalExecutors)

override def killExecutors(executorIds: Seq[String]): Seq[String] = {
override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
val response = sc.killExecutors(executorIds)
if (response) {
executorIds
Expand All @@ -1154,4 +1157,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def reviveOffers(): Unit = sb.reviveOffers()

override def defaultParallelism(): Int = sb.defaultParallelism()

override def killExecutorsOnHost(host: String): Boolean = {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.deploy
import scala.collection.mutable
import scala.concurrent.duration._

import org.mockito.Mockito.{mock, when}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, verify, when}
import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._

Expand All @@ -29,10 +30,11 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RegisterExecutorFailed}

/**
* End-to-end tests for dynamic allocation in standalone mode.
Expand Down Expand Up @@ -467,6 +469,52 @@ class StandaloneDynamicAllocationSuite
}
}

test("kill all executors on localhost") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
val beforeList = getApplications().head.executors.keys.toSet
assert(killExecutorsOnHost(sc, "localhost").equals(true))

syncExecutors(sc)
val afterList = getApplications().head.executors.keys.toSet

eventually(timeout(10.seconds), interval(100.millis)) {
assert(beforeList.intersect(afterList).size == 0)
}
}

test("executor registration on a blacklisted host must fail") {
sc = new SparkContext(appConf.set(config.BLACKLIST_ENABLED.key, "true"))
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty)

// Get "localhost" on a blacklist.
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.sc).thenReturn(sc)
sc.taskScheduler = taskScheduler

// Create a fresh scheduler backend to blacklist "localhost".
sc.schedulerBackend.stop()
val backend =
new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL))
backend.start()

backend.driverEndpoint.ask[Boolean](message)
eventually(timeout(10.seconds), interval(100.millis)) {
verify(endpointRef).send(RegisterExecutorFailed(any()))
}
}

// ===============================
// | Utility methods for testing |
// ===============================
Expand Down Expand Up @@ -528,6 +576,16 @@ class StandaloneDynamicAllocationSuite
}
}

/** Kill the executors on a given host. */
private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutorsOnHost(host)
case _ => fail("expected coarse grained scheduler")
}
}

/**
* Return a list of executor IDs belonging to this application.
*
Expand Down
Loading

0 comments on commit 8a270a9

Please sign in to comment.