Skip to content

Commit

Permalink
simplify the implementation of CoarseGrainedSchedulerBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Mar 3, 2014
1 parent 55a4f11 commit 0c0e409
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
class WorkerOffer(val executorId: String, val host: String, val cores: Int)
class WorkerOffer(val executorId: String, val host: String, var cores: Int) {
@transient val totalcores = cores
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val workerOffers = new HashMap[String, WorkerOffer]
private val addressToExecutorId = new HashMap[Address, String]

override def preStart() {
Expand All @@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores))
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
Expand All @@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
workerOffers(executorId).cores += 1
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
Expand Down Expand Up @@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A

// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
}

// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId))))
}

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
workerOffers(task.executorId).cores -= 1
executorActor(task.executorId) ! LaunchTask(task)
}
}
Expand All @@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
addressToExecutorId -= executorAddress(executorId)
val numCores = workerOffers(executorId).totalcores
executorActor -= executorId
executorHost -= executorId
freeCores -= executorId
workerOffers -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}
Expand Down

0 comments on commit 0c0e409

Please sign in to comment.