Skip to content

Commit

Permalink
Change configs name and code style.
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 committed Feb 6, 2015
1 parent ce9257e commit 07952f3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks. It will also
* expiry the hosts that have no heartbeat for more than spark.executor.heartbeat.timeoutMs.
* expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs.
*/
private[spark] case class Heartbeat(
executorId: String,
Expand All @@ -49,16 +49,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule

val executorLastSeen = new mutable.HashMap[String, Long]

val slaveTimeout = sc.conf.getLong("spark.executor.heartbeat.timeoutMs", 120 * 1000)
val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000))

val checkTimeoutInterval = sc.conf.getLong("spark.executor.heartbeat.timeoutIntervalMs", 60000)
val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))

var timeoutCheckingTask: Cancellable = null

override def preStart() {
override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
super.preStart
}

Expand All @@ -70,29 +72,28 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()

}

private def heartbeatReceived(executorId: String) = {
executorLastSeen(executorId) = System.currentTimeMillis()
}

private def expireDeadHosts() {
private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val minSeenTime = now - executorTimeout
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (lastSeenMs < minSeenTime) {
logWarning("Removing Executor " + executorId + " with no recent heartbeats: "
+ (now - lastSeenMs) + " ms exceeds " + slaveTimeout + "ms")
+ (now - lastSeenMs) + " ms exceeds " + executorTimeout + "ms")
scheduler.executorLost(executorId, SlaveLost())
sc.killExecutor(executorId)
executorLastSeen.remove(executorId)
}
}
}

override def postStop() {
override def postStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ private[spark] trait TaskScheduler {
/**
* Process a lost executor in taskScheduler
*/
def executorLost(executorId: String, reason: ExecutorLossReason)
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl(
}
}

override def executorLost(executorId: String, reason: ExecutorLossReason) {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason) = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -387,7 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def defaultParallelism() = 2
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason) = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down

0 comments on commit 07952f3

Please sign in to comment.