Skip to content

Commit

Permalink
Add expireDeadHosts in HeartbeatReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 committed Feb 4, 2015
1 parent 5aa0f21 commit c922cb0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
41 changes: 40 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.scheduler.ExecutorLossReason

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -32,18 +33,56 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

val executorLastSeen = new mutable.HashMap[String, Long]

import context.dispatcher
var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
10.milliseconds, self, ExpireDeadHosts)

val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000))

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
heartbeatReceived(executorId)
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()

}

private def heartbeatReceived(executorId: String) = {
executorLastSeen(executorId) = System.currentTimeMillis()
}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (lastSeenMs < minSeenTime) {
val msg = "Removing Executor " + executorId + " with no recent heart beats: "
+(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms"
logWarning(msg)
if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) {
scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]
.executorLost(executorId, new ExecutorLossReason(""))
}
sc.killExecutor(executorId)
executorLastSeen.remove(executorId)
}
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down

0 comments on commit c922cb0

Please sign in to comment.