Skip to content

Commit

Permalink
applying review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Apr 18, 2018
1 parent 57086bb commit c92a090
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
updateNodeBlacklist()
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
}
updateNextExpiryTime()
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private[scheduler] class BlacklistTracker (

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
updateNodeBlacklist()
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
killExecutorsOnBlacklistedNode(host)
updateNextExpiryTime()
}
Expand All @@ -216,10 +216,6 @@ private[scheduler] class BlacklistTracker (
}
}

private def updateNodeBlacklist(): Unit = {
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
Expand Down Expand Up @@ -262,7 +258,7 @@ private[scheduler] class BlacklistTracker (
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
updateNodeBlacklist()
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
killExecutorsOnBlacklistedNode(node)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) exte

private val failedExecutorsTimeStamps = new mutable.Queue[Long]()

private def getNumFailuresWithinValidityInterval(
failedExecutorsTimeStampsForHost: mutable.Queue[Long],
endTime: Long): Int = {
private def getRecentFailureCount(failedExecutorsTimeStampsForHost: mutable.Queue[Long]): Int = {
val endTime = clock.getTimeMillis()
while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStampsForHost.nonEmpty
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) {
Expand All @@ -53,7 +52,7 @@ private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) exte
}

def getNumExecutorsFailed: Int = synchronized {
getNumFailuresWithinValidityInterval(failedExecutorsTimeStamps, clock.getTimeMillis())
getRecentFailureCount(failedExecutorsTimeStamps)
}

def registerFailureOnHost(hostname: String): Unit = synchronized {
Expand All @@ -74,7 +73,7 @@ private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) exte

def getNumExecutorFailuresOnHost(hostname: String): Int =
failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost =>
getNumFailuresWithinValidityInterval(failedExecutorsOnHost, clock.getTimeMillis())
getRecentFailureCount(failedExecutorsOnHost)
}.getOrElse(0)

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ private[spark] class YarnAllocatorBlacklistTracker(
private val BLACKLIST_TIMEOUT_MILLIS =
sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT))

private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED =
sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false)
private val IS_YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
sparkConf.get(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED).getOrElse(false)

private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)

private val BLACKLIST_SIZE_LIMIT = sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT)

private val BLACKLIST_SIZE_DEFAULT_WEIGHT = sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT)
private val BLACKLIST_MAX_NODE_BLACKLIST_RATIO =
sparkConf.get(YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO)

private var clock: Clock = new SystemClock

Expand All @@ -56,7 +55,7 @@ private[spark] class YarnAllocatorBlacklistTracker(

private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long]

private var numClusterNodes = (Int.MaxValue / BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt
private var numClusterNodes = (Int.MaxValue / BLACKLIST_MAX_NODE_BLACKLIST_RATIO).toInt

def setNumClusterNodes(numClusterNodes: Int): Unit = {
this.numClusterNodes = numClusterNodes
Expand Down Expand Up @@ -87,10 +86,10 @@ private[spark] class YarnAllocatorBlacklistTracker(
}

private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) {
if (IS_YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED) {
val failuresOnHost = failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname)
if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) {
logInfo("blacklisting host as YARN allocation failed: %s".format(hostname))
logInfo(s"blacklisting $hostname as YARN allocation failed $failuresOnHost times")
allocationBlacklistedNodesWithExpiry.put(
hostname,
clock.getTimeMillis() + BLACKLIST_TIMEOUT_MILLIS)
Expand All @@ -106,8 +105,7 @@ private[spark] class YarnAllocatorBlacklistTracker(

private def refreshBlacklistedNodes(): Unit = {
removeExpiredYarnBlacklistedNodes()
val limit =
BLACKLIST_SIZE_LIMIT.getOrElse((numClusterNodes * BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt)
val limit = (numClusterNodes * BLACKLIST_MAX_NODE_BLACKLIST_RATIO).toInt
val nodesToBlacklist =
if (schedulerBlacklistedNodesWithExpiry.size +
allocationBlacklistedNodesWithExpiry.size > limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,24 +329,17 @@ package object config {
CACHED_CONF_ARCHIVE)

/* YARN allocator-level blacklisting related config entries. */
private[spark] val YARN_ALLOCATION_BLACKLIST_ENABLED =
ConfigBuilder("spark.yarn.allocation.blacklist.enabled")
private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
ConfigBuilder("spark.yarn.executor.launch.blacklist.enabled")
.booleanConf
.createOptional

private[spark] val YARN_BLACKLIST_SIZE_LIMIT =
ConfigBuilder("spark.yarn.blacklist.size.limit")
.doc("Limit for blacklisted nodes. This is the maximum for the number of blacklisted nodes " +
"(including task and stage blacklisted nodes) sent to YARN.")
.intConf
.createOptional

private[spark] val YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT =
ConfigBuilder("spark.yarn.blacklist.size.default.weight")
.doc("If blacklist size limit is not specified then the default limit will be the number of " +
"cluster nodes multiplied by this weight.")
private[spark] val YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO =
ConfigBuilder("spark.yarn.blacklist.maxNodeBlacklistRatio")
.doc("There is limit for the number of blacklisted nodes sent to YARN. " +
"And it is calculated by multiplying the number of cluster nodes with this ratio.")
.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.checkValue(weight => weight >= 0 && weight <= 1, "The value of this ratio must be in [0, 1].")
.createWithDefault(0.75)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class FailureWithinTimeIntervalTrackerSuite extends SparkFunSuite with Matchers
super.beforeAll()
}

test("failures are expires if validity interval is set") {
test("failures expire if validity interval is set") {
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100L)

Expand Down Expand Up @@ -66,7 +66,7 @@ class FailureWithinTimeIntervalTrackerSuite extends SparkFunSuite with Matchers
}


test("failures are never expires if validity interval is not set (-1)") {
test("failures never expire if validity interval is not set (-1)") {
val sparkConf = new SparkConf()

val failureWithinTimeIntervalTracker = new FailureWithinTimeIntervalTracker(sparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.config.YARN_ALLOCATION_BLACKLIST_ENABLED
import org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
import org.apache.spark.internal.config.{MAX_FAILED_EXEC_PER_NODE, _}
import org.apache.spark.util.ManualClock

Expand All @@ -43,7 +43,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
override def beforeEach(): Unit = {
val sparkConf = new SparkConf()
sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
sparkConf.set(YARN_ALLOCATION_BLACKLIST_ENABLED, true)
sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)

amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
Expand Down

0 comments on commit c92a090

Please sign in to comment.