Skip to content

Commit

Permalink
Fix race condition at SchedulerBackend.isReady in standalone mode
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Aug 1, 2014
1 parent 9632719 commit 41cf47e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var totalExpectedExecutors = new AtomicInteger(0)
var totalExecutors = new AtomicInteger(0)
var totalExpectedResources = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered executors / total expected executors)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
if (minRegisteredRatio > 1) minRegisteredRatio = 1
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
// Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds).
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
var ready = if (minRegisteredRatio <= 0) true else false

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
}
totalExecutors.addAndGet(1)
makeOffers()
}

Expand Down Expand Up @@ -268,14 +263,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
}

def checkRegisteredResources(): Boolean = true

override def isReady(): Boolean = {
if (ready) {
if (checkRegisteredResources) {
logInfo("SchedulerBackend is ready for scheduling beginning" +
", total expected resources: " + totalExpectedResources.get() +
", minRegisteredResourcesRatio: " + minRegisteredRatio)
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
"maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime)
return true
}
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _

val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
totalExpectedResources.getAndSet(maxCores.getOrElse(0))

override def start() {
super.start()
Expand Down Expand Up @@ -98,7 +99,6 @@ private[spark] class SparkDeploySchedulerBackend(

override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
Expand All @@ -111,4 +111,8 @@ private[spark] class SparkDeploySchedulerBackend(
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason.toString)
}

override def checkRegisteredResources(): Boolean = {
totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio
}
}
13 changes: 7 additions & 6 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -746,21 +746,22 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
</tr>
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
<td>0</td>
<td>
The minimum ratio of registered executors (registered executors / total expected executors)
The minimum ratio of registered resources (registered resources / total expected resources)
(resources are executors in yarn mode, CPU cores in standalone and mesos mode)
to wait for before scheduling begins. Specified as a double between 0 and 1.
Regardless of whether the minimum ratio of executors has been reached,
Regardless of whether the minimum ratio of resources has been reached,
the maximum amount of time it will wait before scheduling begins is controlled by config
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
<code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>
</td>
</tr>
<tr>
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
<td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
<td>30000</td>
<td>
Maximum amount of time to wait for executors to register before scheduling begins
Maximum amount of time to wait for resources to register before scheduling begins
(in milliseconds).
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {

if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
ready = false
}

var client: Client = null
Expand Down Expand Up @@ -84,7 +83,7 @@ private[spark] class YarnClientSchedulerBackend(

logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors.set(args.numExecutors)
totalExpectedResources.set(args.numExecutors)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
Expand Down Expand Up @@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}

override def checkRegisteredResources(): Boolean = {
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {

if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
ready = false
}

override def start() {
Expand All @@ -40,6 +39,10 @@ private[spark] class YarnClusterSchedulerBackend(
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedExecutors.set(numExecutors)
totalExpectedResources.set(numExecutors)
}

override def checkRegisteredResources(): Boolean = {
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
}
}

0 comments on commit 41cf47e

Please sign in to comment.