Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Feb 10, 2015
1 parent d302c48 commit cd3b2ff
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ package org.apache.spark
*/
private[spark] trait ExecutorAllocationClient {

/**
* Express a preference to the cluster manager for a given total number of executors. This can
* result in canceling pending requests or filing additional requests.
*/
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
* This is currently only supported in YARN mode. Return whether the request is received.
*/
def requestExecutors(numAdditionalExecutors: Int): Boolean
def requestExecutors(numExecutors: Int): Boolean

/**
* Request that the cluster manager kill the specified executors.
Expand Down
150 changes: 105 additions & 45 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
}

/**
* If the add time has expired, request new executors and refresh the add time.
* If the remove time for an existing executor has expired, kill the executor.
* The number of executors we would have if the cluster manager were to fulfill all our existing
* requests.
*/
private def targetNumExecutors(): Int =
numExecutorsPending + executorIds.size - executorsPendingToRemove.size

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}

/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
if (addTime != NOT_SET && now >= addTime) {
addExecutors()
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
}

addOrCancelExecutorRequests(now)

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -223,59 +239,90 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
* are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
val currentTarget = targetNumExecutors
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < currentTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests.
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(newTotalExecutors)
numExecutorsToAdd = 1
updateNumExecutorsPending(newTotalExecutors)
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
delta
} else {
0
}
}

/**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
* Return the number actually requested.
*
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
* tasks could fill
* @return the number of additional executors actually requested.
*/
private def addExecutors(): Int = synchronized {
// Do not request more executors if we have already reached the upper bound
val numExistingExecutors = executorIds.size + numExecutorsPending
if (numExistingExecutors >= maxNumExecutors) {
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
val currentTarget = targetNumExecutors
if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}

// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}

// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)

val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)

val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size,
maxNumExecutorsNeeded)
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
if (addRequestAcknowledged) {
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
numExecutorsToAdd =
if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
numExecutorsPending += actualNumExecutorsToAdd
actualNumExecutorsToAdd
val delta = updateNumExecutorsPending(newTotalExecutors)
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
s" (new desired total will be $newTotalExecutors)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(s"Unable to reach the cluster manager " +
s"to request $actualNumExecutorsToAdd executors!")
logWarning(
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
0
}
}

/**
* Given the new target number of executors, update the number of pending executor requests,
* and return the delta from the old number of pending requests.
*/
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
val newNumExecutorsPending =
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
val delta = newNumExecutorsPending - numExecutorsPending
numExecutorsPending = newNumExecutorsPending
delta
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
Expand Down Expand Up @@ -415,6 +462,8 @@ private[spark] class ExecutorAllocationManager(
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
private var numRunningTasks: Int = _

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
Expand All @@ -435,6 +484,10 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}
Expand All @@ -446,6 +499,7 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand Down Expand Up @@ -475,7 +529,8 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
allocationManager.synchronized {
// If the executor is no longer running scheduled any tasks, mark it as idle
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
Expand Down Expand Up @@ -514,6 +569,11 @@ private[spark] class ExecutorAllocationManager(
}.sum
}

/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,25 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}

/**
* :: DeveloperApi ::
* Express a preference to the cluster manager for a given total number of executors. This can
* result in canceling pending requests or filing additional requests.
* This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
override private[spark] def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
false
}
}

/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged.
* @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
if (numAdditionalExecutors < 0) {
Expand All @@ -327,6 +327,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
doRequestTotalExecutors(newTotal)
}

/**
* Express a preference to the cluster manager for a given total number of executors. This can
* result in canceling pending requests or filing additional requests.
* @return whether the request is acknowledged.
*/
final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {
numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
doRequestTotalExecutors(numExecutors)
}

/**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
Expand All @@ -337,7 +348,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
* Return whether the request is acknowledged.
* @return whether the request is acknowledged.
*/
protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsPending(manager) === 9)
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))

assert(numExecutorsPending(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 3)

val task1Info = createTaskInfo(0, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))

assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)

val task2Info = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))

assert(adjustRequestedExecutors(manager) === -1)
}

test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -679,13 +706,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {

private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
private val _addTime = PrivateMethod[Long]('addTime)
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
Expand Down Expand Up @@ -724,7 +753,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}

private def addExecutors(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _addExecutors()
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
}

private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _addOrCancelExecutorRequests(0L)
}

private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
Expand Down

0 comments on commit cd3b2ff

Please sign in to comment.