Skip to content

Commit

Permalink
Push down variable totalExpectedResources to children classes
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Aug 1, 2014
1 parent ca54bd9 commit abf4860
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var totalExecutors = new AtomicInteger(0)
var totalExpectedResources = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
Expand Down Expand Up @@ -268,8 +267,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A

override def isReady(): Boolean = {
if (sufficientResourcesRegistered) {
logInfo("SchedulerBackend is ready for scheduling beginning, total expected resources: " +
s"$totalExpectedResources, minRegisteredResourcesRatio: $minRegisteredRatio")
logInfo("SchedulerBackend is ready for scheduling beginning after " +
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _

val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
totalExpectedResources.getAndSet(maxCores.getOrElse(0))
val totalExpectedCores = maxCores.getOrElse(0)

override def start() {
super.start()
Expand Down Expand Up @@ -113,6 +113,6 @@ private[spark] class SparkDeploySchedulerBackend(
}

override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private[spark] class YarnClientSchedulerBackend(
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false
var totalExpectedExecutors = 0

private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
Expand Down Expand Up @@ -83,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(

logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedResources.set(args.numExecutors)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
Expand Down Expand Up @@ -150,6 +151,6 @@ private[spark] class YarnClientSchedulerBackend(
}

override def sufficientResourcesRegistered(): Boolean = {
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {

var totalExpectedExecutors = 0

if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
Expand All @@ -39,10 +41,10 @@ private[spark] class YarnClusterSchedulerBackend(
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedResources.set(numExecutors)
totalExpectedExecutors = numExecutors
}

override def sufficientResourcesRegistered(): Boolean = {
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}

0 comments on commit abf4860

Please sign in to comment.