From 07952f31d70fbfdd3b76f5d3796d39bc718668a9 Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Fri, 6 Feb 2015 09:21:54 +0800 Subject: [PATCH] Change configs name and code style. --- .../org/apache/spark/HeartbeatReceiver.scala | 21 ++++++++++--------- .../spark/scheduler/TaskScheduler.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 4694b12e7060c..af883881a0a8e 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal * components to convey liveness or execution information for in-progress tasks. It will also - * expiry the hosts that have no heartbeat for more than spark.executor.heartbeat.timeoutMs. + * expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs. */ private[spark] case class Heartbeat( executorId: String, @@ -49,16 +49,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule val executorLastSeen = new mutable.HashMap[String, Long] - val slaveTimeout = sc.conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000) + val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs", + sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) - val checkTimeoutInterval = sc.conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000) + val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs", + sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) var timeoutCheckingTask: Cancellable = null - override def preStart() { + override def preStart(): Unit = { import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) super.preStart } @@ -70,21 +72,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule sender ! response case ExpireDeadHosts => expireDeadHosts() - } private def heartbeatReceived(executorId: String) = { executorLastSeen(executorId) = System.currentTimeMillis() } - private def expireDeadHosts() { + private def expireDeadHosts(): Unit = { logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.") val now = System.currentTimeMillis() - val minSeenTime = now - slaveTimeout + val minSeenTime = now - executorTimeout for ((executorId, lastSeenMs) <- executorLastSeen) { if (lastSeenMs < minSeenTime) { logWarning("Removing Executor " + executorId + " with no recent heartbeats: " - + (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms") + + (now - lastSeenMs) + " ms exceeds " + executorTimeout + "ms") scheduler.executorLost(executorId, SlaveLost()) sc.killExecutor(executorId) executorLastSeen.remove(executorId) @@ -92,7 +93,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule } } - override def postStop() { + override def postStop(): Unit = { if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 73929a55cd24e..aafd3a5f0ff3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -77,5 +77,5 @@ private[spark] trait TaskScheduler { /** * Process a lost executor in taskScheduler */ - def executorLost(executorId: String, reason: ExecutorLossReason) + def executorLost(executorId: String, reason: ExecutorLossReason): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e6865cb78aa2c..9bbed21f0cc0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl( } } - override def executorLost(executorId: String, reason: ExecutorLossReason) { + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ef84bdde18cdf..811a89d7f548c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -96,7 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorLost(executorId: String, reason: ExecutorLossReason) = {} + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -387,7 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorLost(executorId: String, reason: ExecutorLossReason) = {} + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } val noKillScheduler = new DAGScheduler( sc,