Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver #4363

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,84 @@

package org.apache.spark

import akka.actor.Actor
import scala.concurrent.duration._
import scala.collection.mutable

import akka.actor.{Actor, Cancellable}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks.
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to update this comment if you rename the configs

*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a header comment that communicates that one of the functions of the HeartbeatReceiver is to expire executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sryza, thanks for your review.

*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend limit the number of things we pass to this receiver to the following set of smaller things instead of the whole SparkContext.

private[spark] class HeartbeatReceiver(
    conf: SparkConf,
    scheduler: TaskScheduler,
    executorAllocationClient: Option[ExecutorAllocationClient]) {
  ...
}

The SparkContext is an instance of ExecutorAllocationClient, so if dynamic allocation is enabled you just pass in Some(this) in SparkContext, otherwise None to indicate that killing is not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not easy to understand, on the other hand the SparkContext is use in a lot of place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's bad design pattern to pass in the whole SparkContext everywhere since it has such a wide interface that we don't actually need. I would argue that it's actually easier to understand because from the signature alone we know exactly what is needed by HeartbeatReceiver.

Anyway, not a big deal since this is private. I will merge it as is.

extends Actor with ActorLogReceive with Logging {

val executorLastSeen = new mutable.HashMap[String, Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment on what the keys and values are:

// executor ID -> timestamp of when the last heartbeat from this executor was received


val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorTimeoutMs and checkTimeoutIntervalMs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, can this just use spark.network.timeout?

sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000))

val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't really a driver property. I would just use spark.network.timeoutInterval

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.network.timeout(Interval) implies that this is the maximum amount of time we're willing to wait on the network, when really this is the amount of time the driver is willing to wait on an executor, whether it's garbage collection, a node failure, or the network that's causing the problem. We also don't necessarily want this linked to other network timeouts - I don't think it would be weird for a user to desire different timeouts for driver-executor heartbeats and shuffle communication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sryza if you read the documentation of spark.network.timeout it says it is the "default timeout for all network interactions" and one of the things it replaces is spark.storage.blockManagerSlaveTimeoutMs, which is essentially the driver-executor timeout (this was not implemented in the code for some reason, however). Everywhere else spark.network.timeout is used there might already be garbage collection (e.g. between executors themselves when fetching shuffle files) and other issues that would delay the response, so I don't see how the driver-executor case is special.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for spark.network.timeoutInterval, I'm OK with renaming it with something more specific if you have any better suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere else spark.network.timeout is used there might already be garbage collection

You are right.

However, in all the other places that spark.network.timeout is used, there's another more specific property that can override it, e.g. spark.shuffle.io.connectionTimeout, spark.core.connection.ack.wait.timeout, spark.akka.timeout. I think it would be useful to have the same thing here. (If that's what you were saying in the first place here, sorry for the noise).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well we do already have the spark.storage.blockManager* configs that we carried over from before. Even though this isn't the block manager the effect is the same from the user's perspective, so I think it's OK to continue using those as the more specific ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark.storage.blockManager.* names might make things easier for older users, but for new users who haven't dealt with these properties, I think those names could be fairly confusing. The advantage of keeping the old configs but deprecating them in favor of executorTimeout is that we don't break things for old users, but going forward we have something that more accurately describes what the config is used for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd still like to avoid introducing new configs unless we have to since we can't remove those easily. I prefer to just make this consider only the spark.network.* configs and the spark.storage.blockManager.* configs for now (with the former claiming precedence), instead of introducing another layer of deprecation. We can always add one separately later if we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Now i agree with considering the spark.network.* and spark.storage.blockManager.* configs.

sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))

var timeoutCheckingTask: Cancellable = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these could all be private


override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not your code, but can you put this in a separate val:

val unknownExecutor = !scheduler.executorHeartbeatReceived(...)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
...

executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()
}

private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
val minSeenTime = now - executorTimeout
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (lastSeenMs < minSeenTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easier to read if it's the following instead:

if (now - lastSeenMs > executorTimeout) {
  ... // remove it
}

logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms")
scheduler.executorLost(executorId, SlaveLost())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an error message in SlaveLost("Executor heartbeat timed out after X ms")

if (sc.supportKillExecutor) {
sc.killExecutor(executorId)
}
executorLastSeen.remove(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a heartbeat from the executor gets delivered after we kill / remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the akka connection is still alive, we can kill executor by send kill message to applicationMaster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code will correctly expire that executor as a dead host after a timeout.

}
}
}

override def postStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
super.postStop()
}
}
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportKillExecutor,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
Expand Down Expand Up @@ -1035,6 +1035,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

private[spark] def supportKillExecutor = master.contains("yarn") || dynamicAllocationTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also used in requestExecutors so we need to generalize the name. Also, please add a java doc

/**
 * Return whether dynamically adjusting the amount of resources allocated to
 * this application is supported. This is currently only available for YARN.
 */
private[spark] def supportDynamicAllocation = master.contains("yarn") || dynamicAllocationTesting


/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand All @@ -1051,8 +1053,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
assert(supportKillExecutor, "Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1069,8 +1070,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Killing executors is currently only supported in YARN mode")
assert(supportKillExecutor, "Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId


/**
* Process a lost executor in taskScheduler
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just // Process a lost executor is fine here...

def executorLost(executorId: String, reason: ExecutorLossReason): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[spark] class TaskSchedulerImpl(
}
}

def executorLost(executorId: String, reason: ExecutorLossReason) {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.{Actor, ActorRef, Cancellable}
import akka.actor.{Actor, ActorRef}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
Expand Down Expand Up @@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

private val akkaTimeout = AkkaUtils.askTimeout(conf)

val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

var timeoutCheckingTask: Cancellable = null

override def preStart() {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
Expand Down Expand Up @@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

case StopBlockManagerMaster =>
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
context.stop(self)

case ExpireDeadHosts =>
expireDeadHosts()

case BlockManagerHeartbeat(blockManagerId) =>
sender ! heartbeatReceived(blockManagerId)

Expand Down Expand Up @@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
logInfo(s"Removing block manager $blockManagerId")
}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
toRemove.foreach(removeBlockManager)
}

private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster

case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def defaultParallelism() = 2
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down