From 0c0e409d49afa954703462b338af04481b74f563 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 2 Mar 2014 23:22:09 -0500 Subject: [PATCH] simplify the implementation of CoarseGrainedSchedulerBackend --- .../apache/spark/scheduler/WorkerOffer.scala | 4 +++- .../CoarseGrainedSchedulerBackend.scala | 24 +++++++------------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index ba6bab3f91a65..51db82dc67e1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,6 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val executorId: String, val host: String, val cores: Int) +class WorkerOffer(val executorId: String, val host: String, var cores: Int) { + @transient val totalcores = cores +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 379e02eb9a437..82dfe06a5f644 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] - private val executorAddress = new HashMap[String, Address] - private val executorHost = new HashMap[String, String] - private val freeCores = new HashMap[String, Int] + private val workerOffers = new HashMap[String, WorkerOffer] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { @@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender - executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - freeCores(executorId) = cores - executorAddress(executorId) = sender.path.address + workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { - freeCores(executorId) += 1 + workerOffers(executorId).cores += 1 makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. @@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { - launchTasks(scheduler.resourceOffers( - executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { - launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId)))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= 1 + workerOffers(task.executorId).cores -= 1 executorActor(task.executorId) ! LaunchTask(task) } } @@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = freeCores(executorId) - addressToExecutorId -= executorAddress(executorId) + val numCores = workerOffers(executorId).totalcores executorActor -= executorId - executorHost -= executorId - freeCores -= executorId + workerOffers -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) }