-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver #4363
Changes from 19 commits
c922cb0
b5c0441
fb5df97
e197e20
c1dfda1
8e77408
bccd515
ce9257e
07952f3
6bab7aa
3e221d9
a858fb5
5bedcb8
52725af
b904aed
7448ac6
d221493
2dc456e
1a042ff
2c9a46a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,33 +17,86 @@ | |
|
||
package org.apache.spark | ||
|
||
import akka.actor.Actor | ||
import scala.concurrent.duration._ | ||
import scala.collection.mutable | ||
|
||
import akka.actor.{Actor, Cancellable} | ||
|
||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.storage.BlockManagerId | ||
import org.apache.spark.scheduler.TaskScheduler | ||
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} | ||
import org.apache.spark.util.ActorLogReceive | ||
|
||
/** | ||
* A heartbeat from executors to the driver. This is a shared message used by several internal | ||
* components to convey liveness or execution information for in-progress tasks. | ||
* components to convey liveness or execution information for in-progress tasks. It will also | ||
* expire the hosts that have not heartbeated for more than spark.network.timeoutMs. | ||
*/ | ||
private[spark] case class Heartbeat( | ||
executorId: String, | ||
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics | ||
blockManagerId: BlockManagerId) | ||
|
||
private[spark] case object ExpireDeadHosts | ||
|
||
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) | ||
|
||
/** | ||
* Lives in the driver to receive heartbeats from executors.. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add a header comment that communicates that one of the functions of the HeartbeatReceiver is to expire executors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Sryza, thanks for your review. |
||
*/ | ||
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) | ||
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend limit the number of things we pass to this receiver to the following set of smaller things instead of the whole
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will not easy to understand, on the other hand the SparkContext is use in a lot of place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but it's bad design pattern to pass in the whole Anyway, not a big deal since this is private. I will merge it as is. |
||
extends Actor with ActorLogReceive with Logging { | ||
|
||
// executor ID -> timestamp of when the last heartbeat from this executor was received | ||
private val executorLastSeen = new mutable.HashMap[String, Long] | ||
|
||
private val executorTimeout = sc.conf.getLong("spark.network.timeoutMs", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use the existing config |
||
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) | ||
|
||
private val checkTimeoutInterval = sc.conf.getLong("spark.network.timeoutIntervalMs", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. Since we already have |
||
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) | ||
|
||
private var timeoutCheckingTask: Cancellable = null | ||
|
||
override def preStart(): Unit = { | ||
import context.dispatcher | ||
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, | ||
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) | ||
super.preStart() | ||
} | ||
|
||
override def receiveWithLogging = { | ||
case Heartbeat(executorId, taskMetrics, blockManagerId) => | ||
val response = HeartbeatResponse( | ||
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) | ||
val unknownExecutor = !scheduler.executorHeartbeatReceived( | ||
executorId, taskMetrics, blockManagerId) | ||
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) | ||
executorLastSeen(executorId) = System.currentTimeMillis() | ||
sender ! response | ||
case ExpireDeadHosts => | ||
expireDeadHosts() | ||
} | ||
|
||
private def expireDeadHosts(): Unit = { | ||
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") | ||
val now = System.currentTimeMillis() | ||
for ((executorId, lastSeenMs) <- executorLastSeen) { | ||
if (now - lastSeenMs > executorTimeout) { | ||
logWarning(s"Removing executor $executorId with no recent heartbeats: " + | ||
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") | ||
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + | ||
"timed out after ${now - lastSeenMs} ms")) | ||
if (sc.supportDynamicAllocation) { | ||
sc.killExecutor(executorId) | ||
} | ||
executorLastSeen.remove(executorId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if a heartbeat from the executor gets delivered after we kill / remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the akka connection is still alive, we can kill executor by send kill message to applicationMaster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this code will correctly expire that executor as a dead host after a timeout. |
||
} | ||
} | ||
} | ||
|
||
override def postStop(): Unit = { | ||
if (timeoutCheckingTask != null) { | ||
timeoutCheckingTask.cancel() | ||
} | ||
super.postStop() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
private[spark] var (schedulerBackend, taskScheduler) = | ||
SparkContext.createTaskScheduler(this, master) | ||
private val heartbeatReceiver = env.actorSystem.actorOf( | ||
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") | ||
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver") | ||
@volatile private[spark] var dagScheduler: DAGScheduler = _ | ||
try { | ||
dagScheduler = new DAGScheduler(this) | ||
|
@@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) | ||
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = | ||
if (dynamicAllocationEnabled) { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
assert(supportDynamicAllocation, | ||
"Dynamic allocation of executors is currently only supported in YARN mode") | ||
Some(new ExecutorAllocationManager(this, listenerBus, conf)) | ||
} else { | ||
|
@@ -1035,6 +1035,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
postEnvironmentUpdate() | ||
} | ||
|
||
/** | ||
* Return whether dynamically adjusting the amount of resources allocated to | ||
* this application is supported. This is currently only available for YARN. | ||
*/ | ||
private[spark] def supportDynamicAllocation = | ||
master.contains("yarn") || dynamicAllocationTesting | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Register a listener to receive up-calls from events that happen during execution. | ||
|
@@ -1051,7 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
*/ | ||
@DeveloperApi | ||
override def requestExecutors(numAdditionalExecutors: Int): Boolean = { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
assert(supportDynamicAllocation, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you remove extra space at the end here |
||
"Requesting executors is currently only supported in YARN mode") | ||
schedulerBackend match { | ||
case b: CoarseGrainedSchedulerBackend => | ||
|
@@ -1069,7 +1076,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
*/ | ||
@DeveloperApi | ||
override def killExecutors(executorIds: Seq[String]): Boolean = { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
assert(supportDynamicAllocation, | ||
"Killing executors is currently only supported in YARN mode") | ||
schedulerBackend match { | ||
case b: CoarseGrainedSchedulerBackend => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,5 +73,9 @@ private[spark] trait TaskScheduler { | |
* @return An application ID | ||
*/ | ||
def applicationId(): String = appId | ||
|
||
|
||
/** | ||
* Process a lost executor | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just |
||
def executorLost(executorId: String, reason: ExecutorLossReason): Unit | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will need to update this comment too.