Skip to content

Commit

Permalink
Rename variable totalExecutors and clean codes
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Aug 4, 2014
1 parent abf4860 commit e9a630b
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var totalExecutors = new AtomicInteger(0)
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
Expand Down Expand Up @@ -94,7 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalExecutors.addAndGet(1)
totalRegisteredExecutors.addAndGet(1)
makeOffers()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ private[spark] class YarnClientSchedulerBackend(
}

override def sufficientResourcesRegistered(): Boolean = {
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedExecutors = numExecutors
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
}

override def sufficientResourcesRegistered(): Boolean = {
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
}

0 comments on commit e9a630b

Please sign in to comment.