Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver #4363

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,86 @@

package org.apache.spark

import akka.actor.Actor
import scala.concurrent.duration._
import scala.collection.mutable

import akka.actor.{Actor, Cancellable}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks.
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a header comment that communicates that one of the functions of the HeartbeatReceiver is to expire executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sryza, thanks for your review.

*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend limit the number of things we pass to this receiver to the following set of smaller things instead of the whole SparkContext.

private[spark] class HeartbeatReceiver(
    conf: SparkConf,
    scheduler: TaskScheduler,
    executorAllocationClient: Option[ExecutorAllocationClient]) {
  ...
}

The SparkContext is an instance of ExecutorAllocationClient, so if dynamic allocation is enabled you just pass in Some(this) in SparkContext, otherwise None to indicate that killing is not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not easy to understand, on the other hand the SparkContext is use in a lot of place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's bad design pattern to pass in the whole SparkContext everywhere since it has such a wide interface that we don't actually need. I would argue that it's actually easier to understand because from the signature alone we know exactly what is needed by HeartbeatReceiver.

Anyway, not a big deal since this is private. I will merge it as is.

extends Actor with ActorLogReceive with Logging {

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000

private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000

private var timeoutCheckingTask: Cancellable = null

override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()
}

private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
sc.killExecutor(executorId)
}
executorLastSeen.remove(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a heartbeat from the executor gets delivered after we kill / remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the akka connection is still alive, we can kill executor by send kill message to applicationMaster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code will correctly expire that executor as a dead host after a timeout.

}
}
}

override def postStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
super.postStop()
}
}
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
Expand Down Expand Up @@ -1035,6 +1035,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand All @@ -1051,7 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand All @@ -1069,7 +1076,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId


/**
* Process a lost executor
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just // Process a lost executor is fine here...

def executorLost(executorId: String, reason: ExecutorLossReason): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl(
}
}

def executorLost(executorId: String, reason: ExecutorLossReason) {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.{Actor, ActorRef, Cancellable}
import akka.actor.{Actor, ActorRef}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
Expand Down Expand Up @@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

private val akkaTimeout = AkkaUtils.askTimeout(conf)

val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

var timeoutCheckingTask: Cancellable = null

override def preStart() {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
Expand Down Expand Up @@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

case StopBlockManagerMaster =>
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
context.stop(self)

case ExpireDeadHosts =>
expireDeadHosts()

case BlockManagerHeartbeat(blockManagerId) =>
sender ! heartbeatReceived(blockManagerId)

Expand Down Expand Up @@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
logInfo(s"Removing block manager $blockManagerId")
}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
toRemove.foreach(removeBlockManager)
}

private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster

case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def defaultParallelism() = 2
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down