Skip to content

Commit

Permalink
[SPARK-23637][YARN] Yarn might allocate more resource if a same execu…
Browse files Browse the repository at this point in the history
…tor is killed multiple times.

## What changes were proposed in this pull request?
`YarnAllocator` uses `numExecutorsRunning` to track the number of running executor. `numExecutorsRunning` is used to check if there're executors missing and need to allocate more.

 In current code, `numExecutorsRunning` can be negative when driver asks to kill a same idle executor multiple times.

## How was this patch tested?
UT added

Author: jinxing <[email protected]>

Closes #20781 from jinxing64/SPARK-23637.
  • Loading branch information
jinxing authored and Mykhailo Shtelma committed Apr 5, 2018
1 parent 80cab07 commit 984faf5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

private val numExecutorsRunning = new AtomicInteger(0)
private val runningExecutors = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]())

private val numExecutorsStarting = new AtomicInteger(0)

Expand Down Expand Up @@ -166,7 +167,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getNumExecutorsRunning: Int = numExecutorsRunning.get()
def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
Expand Down Expand Up @@ -242,12 +243,11 @@ private[yarn] class YarnAllocator(
* Request that the ResourceManager release the container running the specified executor.
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
internalReleaseContainer(container)
runningExecutors.remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}

Expand All @@ -274,7 +274,7 @@ private[yarn] class YarnAllocator(
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))

Expand All @@ -286,7 +286,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning.get))
.format(completedContainers.size, runningExecutors.size))
}
}

Expand All @@ -300,9 +300,9 @@ private[yarn] class YarnAllocator(
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
numExecutorsStarting.get - runningExecutors.size
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

if (missing > 0) {
Expand Down Expand Up @@ -502,7 +502,7 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")

def updateInternalState(): Unit = synchronized {
numExecutorsRunning.incrementAndGet()
runningExecutors.add(executorId)
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
Expand All @@ -513,7 +513,7 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}

if (numExecutorsRunning.get < targetNumExecutors) {
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
Expand Down Expand Up @@ -554,7 +554,7 @@ private[yarn] class YarnAllocator(
} else {
logInfo(("Skip launching executorRunnable as running executors count: %d " +
"reached target executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
runningExecutors.size, targetNumExecutors))
}
}
}
Expand All @@ -569,7 +569,11 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
containerIdToExecutorId.get(containerId) match {
case Some(executorId) => runningExecutors.remove(executorId)
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
}

logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
}

test("kill same executor multiple times") {
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)

val executorToKill = handler.executorIdToContainer.keys.head
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
}

test("process same completed container multiple times") {
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)

val statuses = Seq(container1, container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
}
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)

}

test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
Expand All @@ -272,7 +316,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
Expand Down

0 comments on commit 984faf5

Please sign in to comment.