From c922cb067606a0d99070e15a68943d22accb6c3d Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Wed, 4 Feb 2015 17:41:35 +0800 Subject: [PATCH 01/20] Add expireDeadHosts in HeartbeatReceiver --- .../org/apache/spark/HeartbeatReceiver.scala | 41 ++++++++++++++++++- .../scala/org/apache/spark/SparkContext.scala | 2 +- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b7f1516..2e52dd239c393 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler import org.apache.spark.util.ActorLogReceive +import org.apache.spark.scheduler.ExecutorLossReason /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -32,18 +33,56 @@ private[spark] case class Heartbeat( taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) +private[spark] case object ExpireDeadHosts + private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { + val executorLastSeen = new mutable.HashMap[String, Long] + + import context.dispatcher + var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, + 10.milliseconds, self, ExpireDeadHosts) + + val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", + math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000)) + override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + heartbeatReceived(executorId) sender ! response + case ExpireDeadHosts => + expireDeadHosts() + + } + + private def heartbeatReceived(executorId: String) = { + executorLastSeen(executorId) = System.currentTimeMillis() + } + + private def expireDeadHosts() { + logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.") + val now = System.currentTimeMillis() + val minSeenTime = now - slaveTimeout + for ((executorId, lastSeenMs) <- executorLastSeen) { + if (lastSeenMs < minSeenTime) { + val msg = "Removing Executor " + executorId + " with no recent heart beats: " + +(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms" + logWarning(msg) + if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) { + scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl] + .executorLost(executorId, new ExecutorLossReason("")) + } + sc.killExecutor(executorId) + executorLastSeen.remove(executorId) + } + } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7f5aef1c75df2..c3e1fced04cd8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( - Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") + Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver") @volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) From b5c04415766dd466abb80d9ab57f7b3714c67a60 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:38:47 +0800 Subject: [PATCH 02/20] Remove expireDeadHosts in BlockManagerMasterActor --- .../org/apache/spark/HeartbeatReceiver.scala | 47 ++++++++++++------- .../spark/scheduler/TaskScheduler.scala | 6 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../storage/BlockManagerMasterActor.scala | 36 +------------- .../spark/scheduler/DAGSchedulerSuite.scala | 2 + 5 files changed, 39 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 2e52dd239c393..00295df1b040e 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -17,16 +17,20 @@ package org.apache.spark -import akka.actor.Actor +import scala.concurrent.duration._ +import scala.collection.mutable + +import akka.actor.{Actor, Cancellable} + import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.scheduler.TaskScheduler +import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} import org.apache.spark.util.ActorLogReceive -import org.apache.spark.scheduler.ExecutorLossReason /** * A heartbeat from executors to the driver. This is a shared message used by several internal - * components to convey liveness or execution information for in-progress tasks. + * components to convey liveness or execution information for in-progress tasks. It will also + * expiry the hosts that have no heartbeat for more than spark.executor.heartbeat.timeoutMs. */ private[spark] case class Heartbeat( executorId: String, @@ -45,12 +49,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val executorLastSeen = new mutable.HashMap[String, Long] - import context.dispatcher - var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - 10.milliseconds, self, ExpireDeadHosts) - - val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", - math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000)) + val slaveTimeout = conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000) + + val checkTimeoutInterval = conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000) + + var timeoutCheckingTask: Cancellable = null + + override def preStart() { + import context.dispatcher + timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, + checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + super.preStart + } override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => @@ -73,16 +83,19 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val minSeenTime = now - slaveTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { - val msg = "Removing Executor " + executorId + " with no recent heart beats: " - +(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms" - logWarning(msg) - if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) { - scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl] - .executorLost(executorId, new ExecutorLossReason("")) - } + logWarning("Removing Executor " + executorId + " with no recent heartbeats: " + +(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") + scheduler.executorLost(executorId, SlaveLost) sc.killExecutor(executorId) executorLastSeen.remove(executorId) } } } + + override def postStop() { + if (timeoutCheckingTask != null) { + timeoutCheckingTask.cancel() + } + super.postStop + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915352b17..73929a55cd24e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -73,5 +73,9 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId - + + /** + * Process a lost executor in taskScheduler + */ + def executorLost(executorId: String, reason: ExecutorLossReason) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 79f84e70df9d5..e6865cb78aa2c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl( } } - def executorLost(executorId: String, reason: ExecutorLossReason) { + override def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 64133464d8daa..787b0f96bec32 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.duration._ -import akka.actor.{Actor, ActorRef, Cancellable} +import akka.actor.{Actor, ActorRef} import akka.pattern.ask import org.apache.spark.{Logging, SparkConf, SparkException} @@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private val akkaTimeout = AkkaUtils.askTimeout(conf) - val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000) - - val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) - - var timeoutCheckingTask: Cancellable = null - - override def preStart() { - import context.dispatcher - timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) - super.preStart() - } - override def receiveWithLogging = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) @@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case StopBlockManagerMaster => sender ! true - if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel() - } context.stop(self) - case ExpireDeadHosts => - expireDeadHosts() - case BlockManagerHeartbeat(blockManagerId) => sender ! heartbeatReceived(blockManagerId) @@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus logInfo(s"Removing block manager $blockManagerId") } - private def expireDeadHosts() { - logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.") - val now = System.currentTimeMillis() - val minSeenTime = now - slaveTimeout - val toRemove = new mutable.HashSet[BlockManagerId] - for (info <- blockManagerInfo.values) { - if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " - + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") - toRemove += info.blockManagerId - } - } - toRemove.foreach(removeBlockManager) - } - private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index eb116213f69fc..ef84bdde18cdf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def executorLost(executorId: String, reason: ExecutorLossReason) = {} } /** Length of time to wait while draining listener events. */ @@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true + override def executorLost(executorId: String, reason: ExecutorLossReason) = {} } val noKillScheduler = new DAGScheduler( sc, From fb5df97a2d23c9a49da0fcd097c50b93fd87b552 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:41:47 +0800 Subject: [PATCH 03/20] Remove ExpireDeadHosts in BlockManagerMessages --- .../scala/org/apache/spark/storage/BlockManagerMessages.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3f32099d08cc9..48247453edef0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages { extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - - case object ExpireDeadHosts extends ToBlockManagerMaster } From e197e20ef8cfbf6968465cedd1a1f2ffc72c01eb Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:47:26 +0800 Subject: [PATCH 04/20] Fix test failed --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 00295df1b040e..0de77fc69c796 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -84,7 +84,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - +(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") + + (now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") scheduler.executorLost(executorId, SlaveLost) sc.killExecutor(executorId) executorLastSeen.remove(executorId) From c1dfda10883885a53ad6550131efcb99cbf01496 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:51:15 +0800 Subject: [PATCH 05/20] Fix test failed --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 0de77fc69c796..a3c4cffed9ee6 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -84,7 +84,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - + (now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") + + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") scheduler.executorLost(executorId, SlaveLost) sc.killExecutor(executorId) executorLastSeen.remove(executorId) From 8e7740894db01863d394aa73bad7e696463d33fb Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:54:33 +0800 Subject: [PATCH 06/20] Fix test failed --- .../src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index a3c4cffed9ee6..301314a3bdc5a 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -49,16 +49,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val executorLastSeen = new mutable.HashMap[String, Long] - val slaveTimeout = conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000) + val slaveTimeout = sc.conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000) - val checkTimeoutInterval = conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000) + val checkTimeoutInterval = sc.conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000) var timeoutCheckingTask: Cancellable = null override def preStart() { import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) super.preStart } From bccd51510188e5df79d65706145186d71889a2b6 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 09:56:26 +0800 Subject: [PATCH 07/20] Fix test failed --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 301314a3bdc5a..ebb7e1efda471 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -85,7 +85,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") - scheduler.executorLost(executorId, SlaveLost) + scheduler.executorLost(executorId, SlaveLost()) sc.killExecutor(executorId) executorLastSeen.remove(executorId) } From ce9257e2508ce954c948a429d14be27923ed6210 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 5 Feb 2015 14:38:46 +0800 Subject: [PATCH 08/20] Fix test failed --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ebb7e1efda471..4694b12e7060c 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -58,7 +58,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule override def preStart() { import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) super.preStart } @@ -84,7 +84,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") + + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") scheduler.executorLost(executorId, SlaveLost()) sc.killExecutor(executorId) executorLastSeen.remove(executorId) From 07952f31d70fbfdd3b76f5d3796d39bc718668a9 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 6 Feb 2015 09:21:54 +0800 Subject: [PATCH 09/20] Change configs name and code style. --- .../org/apache/spark/HeartbeatReceiver.scala | 21 ++++++++++--------- .../spark/scheduler/TaskScheduler.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 4694b12e7060c..af883881a0a8e 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal * components to convey liveness or execution information for in-progress tasks. It will also - * expiry the hosts that have no heartbeat for more than spark.executor.heartbeat.timeoutMs. + * expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs. */ private[spark] case class Heartbeat( executorId: String, @@ -49,16 +49,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val executorLastSeen = new mutable.HashMap[String, Long] - val slaveTimeout = sc.conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000) + val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs", + sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) - val checkTimeoutInterval = sc.conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000) + val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs", + sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) var timeoutCheckingTask: Cancellable = null - override def preStart() { + override def preStart(): Unit = { import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) super.preStart } @@ -70,21 +72,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule sender ! response case ExpireDeadHosts => expireDeadHosts() - } private def heartbeatReceived(executorId: String) = { executorLastSeen(executorId) = System.currentTimeMillis() } - private def expireDeadHosts() { + private def expireDeadHosts(): Unit = { logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.") val now = System.currentTimeMillis() - val minSeenTime = now - slaveTimeout + val minSeenTime = now - executorTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") + + (now - lastSeenMs) + " ms exceeds " + executorTimeout + "ms") scheduler.executorLost(executorId, SlaveLost()) sc.killExecutor(executorId) executorLastSeen.remove(executorId) @@ -92,7 +93,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule } } - override def postStop() { + override def postStop(): Unit = { if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 73929a55cd24e..aafd3a5f0ff3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -77,5 +77,5 @@ private[spark] trait TaskScheduler { /** * Process a lost executor in taskScheduler */ - def executorLost(executorId: String, reason: ExecutorLossReason) + def executorLost(executorId: String, reason: ExecutorLossReason): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e6865cb78aa2c..9bbed21f0cc0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl( } } - override def executorLost(executorId: String, reason: ExecutorLossReason) { + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ef84bdde18cdf..811a89d7f548c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -96,7 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorLost(executorId: String, reason: ExecutorLossReason) = {} + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -387,7 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorLost(executorId: String, reason: ExecutorLossReason) = {} + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } val noKillScheduler = new DAGScheduler( sc, From 6bab7aafb2824d935302be2a57e82559499491a9 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 6 Feb 2015 09:36:22 +0800 Subject: [PATCH 10/20] Change a code style. --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index af883881a0a8e..927503b951a05 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -74,7 +74,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule expireDeadHosts() } - private def heartbeatReceived(executorId: String) = { + private def heartbeatReceived(executorId: String): Unit = { executorLastSeen(executorId) = System.currentTimeMillis() } From 3e221d9a2be84a6dca7a6d3a958841c469501fde Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 6 Feb 2015 19:00:23 +0800 Subject: [PATCH 11/20] A minor change in HeartbeatReceiver --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 927503b951a05..e613898955a57 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -68,18 +68,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) - heartbeatReceived(executorId) + executorLastSeen(executorId) = System.currentTimeMillis() sender ! response case ExpireDeadHosts => expireDeadHosts() } - - private def heartbeatReceived(executorId: String): Unit = { - executorLastSeen(executorId) = System.currentTimeMillis() - } - + private def expireDeadHosts(): Unit = { - logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.") + logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") val now = System.currentTimeMillis() val minSeenTime = now - executorTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { From a858fb59e9a13eaf5db29d7315a79003b1dd6827 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 6 Feb 2015 19:01:41 +0800 Subject: [PATCH 12/20] A minor change in HeartbeatReceiver --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index e613898955a57..b0c4659ad26ae 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -93,6 +93,6 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } - super.postStop + super.postStop() } } From 5bedcb873b68b5a2e80906941028ef9a85adbd8f Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Tue, 10 Feb 2015 09:32:31 +0800 Subject: [PATCH 13/20] Remove assert in SparkContext.killExecutors --- .../org/apache/spark/HeartbeatReceiver.scala | 6 +++--- .../scala/org/apache/spark/SparkContext.scala | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index b0c4659ad26ae..f2e22a0c6ec16 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -61,7 +61,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) - super.preStart + super.preStart() } override def receiveWithLogging = { @@ -80,8 +80,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val minSeenTime = now - executorTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { - logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - + (now - lastSeenMs) + " ms exceeds " + executorTimeout + "ms") + logWarning(s"Removing executor $executorId with no recent heartbeats: " + + s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") scheduler.executorLost(executorId, SlaveLost()) sc.killExecutor(executorId) executorLastSeen.remove(executorId) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c3e1fced04cd8..99c80ba95db57 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1069,14 +1069,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, - "Killing executors is currently only supported in YARN mode") - schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds) - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false + if (master.contains("yarn") || dynamicAllocationTesting) { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(executorIds) + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false + } + } else { + logWarning("Killing executors is currently only supported in YARN mode") + false } } From 52725af2fda2b9a5d4dec71e6faaf9ef9f5b73dc Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Tue, 10 Feb 2015 09:40:10 +0800 Subject: [PATCH 14/20] Remove assert in SparkContext.killExecutors --- .../org/apache/spark/HeartbeatReceiver.scala | 4 ++- .../scala/org/apache/spark/SparkContext.scala | 30 ++++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index f2e22a0c6ec16..87ca20a225ff5 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -83,7 +83,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") scheduler.executorLost(executorId, SlaveLost()) - sc.killExecutor(executorId) + if(sc.supportKillExecutor()) { + sc.killExecutor(executorId) + } executorLastSeen.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 99c80ba95db57..f3a5d7c3164c4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (dynamicAllocationEnabled) { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(supportKillExecutor(), "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { @@ -1034,6 +1034,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() } + + def supportKillExecutor(): Boolean = { + if(master.contains("yarn") || dynamicAllocationTesting) { + true + } + false + } /** * :: DeveloperApi :: @@ -1051,8 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, - "Requesting executors is currently only supported in YARN mode") + assert(supportKillExecutor(), "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1069,17 +1075,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - if (master.contains("yarn") || dynamicAllocationTesting) { - schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds) - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false - } - } else { - logWarning("Killing executors is currently only supported in YARN mode") - false + assert(supportKillExecutor(), "Killing executors is currently only supported in YARN mode") + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(executorIds) + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false } } From b904aed11e46875e55e3841bb82f26f16481a459 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Tue, 10 Feb 2015 11:42:46 +0800 Subject: [PATCH 15/20] Fix failed test --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f3a5d7c3164c4..7ce0487ec4e34 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1038,8 +1038,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def supportKillExecutor(): Boolean = { if(master.contains("yarn") || dynamicAllocationTesting) { true + } else { + false } - false } /** From 7448ac6c989fe15c2c9a5a8790e9aaaf4b0374d2 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 12 Feb 2015 19:46:22 +0800 Subject: [PATCH 16/20] A minor change in sparkContext and heartbeatReceiver --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- .../src/main/scala/org/apache/spark/SparkContext.scala | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 87ca20a225ff5..ccc1ddbc22a23 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") scheduler.executorLost(executorId, SlaveLost()) - if(sc.supportKillExecutor()) { + if (sc.supportKillExecutor()) { sc.killExecutor(executorId) } executorLastSeen.remove(executorId) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7ce0487ec4e34..2db07caafb640 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1034,14 +1034,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() } - - def supportKillExecutor(): Boolean = { - if(master.contains("yarn") || dynamicAllocationTesting) { - true - } else { - false - } - } + + private[spark] def supportKillExecutor = master.contains("yarn") || dynamicAllocationTesting /** * :: DeveloperApi :: From d22149325f394f46b4d199dbd41a28c569ad5a0e Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 13 Feb 2015 11:53:27 +0800 Subject: [PATCH 17/20] Fix test failed --- .../src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ccc1ddbc22a23..06123daa9626f 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") scheduler.executorLost(executorId, SlaveLost()) - if (sc.supportKillExecutor()) { + if (sc.supportKillExecutor) { sc.killExecutor(executorId) } executorLastSeen.remove(executorId) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2db07caafb640..ae16eff31e642 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (dynamicAllocationEnabled) { - assert(supportKillExecutor(), + assert(supportKillExecutor, "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { @@ -1053,7 +1053,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(supportKillExecutor(), "Requesting executors is currently only supported in YARN mode") + assert(supportKillExecutor, "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1070,7 +1070,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(supportKillExecutor(), "Killing executors is currently only supported in YARN mode") + assert(supportKillExecutor, "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) From 2dc456e207f265a7c160a02ebf1fb3b9c5210eb2 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 26 Feb 2015 17:18:36 +0800 Subject: [PATCH 18/20] Change some code style. --- .../org/apache/spark/HeartbeatReceiver.scala | 24 ++++++++++--------- .../scala/org/apache/spark/SparkContext.scala | 7 +++++- .../spark/scheduler/TaskScheduler.scala | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 06123daa9626f..884b64171c8c0 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal * components to convey liveness or execution information for in-progress tasks. It will also - * expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs. + * expire the hosts that have not heartbeated for more than spark.network.timeoutMs. */ private[spark] case class Heartbeat( executorId: String, @@ -47,15 +47,16 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { - val executorLastSeen = new mutable.HashMap[String, Long] + // executor ID -> timestamp of when the last heartbeat from this executor was received + private val executorLastSeen = new mutable.HashMap[String, Long] - val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs", + private val executorTimeout = sc.conf.getLong("spark.network.timeoutMs", sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) - val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs", + private val checkTimeoutInterval = sc.conf.getLong("spark.network.timeoutIntervalMs", sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) - var timeoutCheckingTask: Cancellable = null + private var timeoutCheckingTask: Cancellable = null override def preStart(): Unit = { import context.dispatcher @@ -66,8 +67,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => - val response = HeartbeatResponse( - !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + val unknownExecutor = !scheduler.executorHeartbeatReceived( + executorId, taskMetrics, blockManagerId) + val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) executorLastSeen(executorId) = System.currentTimeMillis() sender ! response case ExpireDeadHosts => @@ -77,13 +79,13 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule private def expireDeadHosts(): Unit = { logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") val now = System.currentTimeMillis() - val minSeenTime = now - executorTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { - if (lastSeenMs < minSeenTime) { + if (now - lastSeenMs > executorTimeout) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") - scheduler.executorLost(executorId, SlaveLost()) - if (sc.supportKillExecutor) { + scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + + "timed out after ${now - lastSeenMs} ms")) + if (sc.supportDynamicAllocation) { sc.killExecutor(executorId) } executorLastSeen.remove(executorId) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ae16eff31e642..aaf9283d69661 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1035,7 +1035,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } - private[spark] def supportKillExecutor = master.contains("yarn") || dynamicAllocationTesting + /** + * Return whether dynamically adjusting the amount of resources allocated to + * this application is supported. This is currently only available for YARN. + */ + private[spark] def supportDynamicAllocation = + master.contains("yarn") || dynamicAllocationTesting /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index aafd3a5f0ff3c..ed3418676e077 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -75,7 +75,7 @@ private[spark] trait TaskScheduler { def applicationId(): String = appId /** - * Process a lost executor in taskScheduler + * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit } From 1a042ff3f97c1c8dd37c27f0c871ff306f61175d Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Thu, 26 Feb 2015 17:59:45 +0800 Subject: [PATCH 19/20] Change some code style. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aaf9283d69661..27115cc901957 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (dynamicAllocationEnabled) { - assert(supportKillExecutor, + assert(supportDynamicAllocation, "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { @@ -1058,7 +1058,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(supportKillExecutor, "Requesting executors is currently only supported in YARN mode") + assert(supportDynamicAllocation, + "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1075,7 +1076,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(supportKillExecutor, "Killing executors is currently only supported in YARN mode") + assert(supportDynamicAllocation, + "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) From 2c9a46ab407bd16a5ae49a26d32f28a48bfe513b Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 27 Feb 2015 09:05:25 +0800 Subject: [PATCH 20/20] Change some code style. --- .../org/apache/spark/HeartbeatReceiver.scala | 16 ++++++++-------- .../scala/org/apache/spark/SparkContext.scala | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 884b64171c8c0..69178da1a7773 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal * components to convey liveness or execution information for in-progress tasks. It will also - * expire the hosts that have not heartbeated for more than spark.network.timeoutMs. + * expire the hosts that have not heartbeated for more than spark.network.timeout. */ private[spark] case class Heartbeat( executorId: String, @@ -50,18 +50,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new mutable.HashMap[String, Long] - private val executorTimeout = sc.conf.getLong("spark.network.timeoutMs", - sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) + private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout", + sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000 - private val checkTimeoutInterval = sc.conf.getLong("spark.network.timeoutIntervalMs", - sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) + private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval", + sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000 private var timeoutCheckingTask: Cancellable = null override def preStart(): Unit = { import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts) super.preStart() } @@ -80,9 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") val now = System.currentTimeMillis() for ((executorId, lastSeenMs) <- executorLastSeen) { - if (now - lastSeenMs > executorTimeout) { + if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + - s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + "timed out after ${now - lastSeenMs} ms")) if (sc.supportDynamicAllocation) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 27115cc901957..775878658923c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1058,7 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(supportDynamicAllocation, + assert(supportDynamicAllocation, "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend =>