-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver #4363
Changes from 17 commits
c922cb0
b5c0441
fb5df97
e197e20
c1dfda1
8e77408
bccd515
ce9257e
07952f3
6bab7aa
3e221d9
a858fb5
5bedcb8
52725af
b904aed
7448ac6
d221493
2dc456e
1a042ff
2c9a46a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,33 +17,84 @@ | |
|
||
package org.apache.spark | ||
|
||
import akka.actor.Actor | ||
import scala.concurrent.duration._ | ||
import scala.collection.mutable | ||
|
||
import akka.actor.{Actor, Cancellable} | ||
|
||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.storage.BlockManagerId | ||
import org.apache.spark.scheduler.TaskScheduler | ||
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} | ||
import org.apache.spark.util.ActorLogReceive | ||
|
||
/** | ||
* A heartbeat from executors to the driver. This is a shared message used by several internal | ||
* components to convey liveness or execution information for in-progress tasks. | ||
* components to convey liveness or execution information for in-progress tasks. It will also | ||
* expire the hosts that have not heartbeated for more than spark.driver.executorTimeoutMs. | ||
*/ | ||
private[spark] case class Heartbeat( | ||
executorId: String, | ||
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics | ||
blockManagerId: BlockManagerId) | ||
|
||
private[spark] case object ExpireDeadHosts | ||
|
||
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) | ||
|
||
/** | ||
* Lives in the driver to receive heartbeats from executors.. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add a header comment that communicates that one of the functions of the HeartbeatReceiver is to expire executors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Sryza, thanks for your review. |
||
*/ | ||
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) | ||
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend limit the number of things we pass to this receiver to the following set of smaller things instead of the whole
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will not easy to understand, on the other hand the SparkContext is use in a lot of place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but it's bad design pattern to pass in the whole Anyway, not a big deal since this is private. I will merge it as is. |
||
extends Actor with ActorLogReceive with Logging { | ||
|
||
val executorLastSeen = new mutable.HashMap[String, Long] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment on what the keys and values are:
|
||
|
||
val executorTimeout = sc.conf.getLong("spark.driver.executorTimeoutMs", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, can this just use |
||
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)) | ||
|
||
val checkTimeoutInterval = sc.conf.getLong("spark.driver.executorTimeoutIntervalMs", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isn't really a driver property. I would just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sryza if you read the documentation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You are right. However, in all the other places that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well we do already have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'd still like to avoid introducing new configs unless we have to since we can't remove those easily. I prefer to just make this consider only the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, Now i agree with considering the spark.network.* and spark.storage.blockManager.* configs. |
||
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) | ||
|
||
var timeoutCheckingTask: Cancellable = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these could all be private |
||
|
||
override def preStart(): Unit = { | ||
import context.dispatcher | ||
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, | ||
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) | ||
super.preStart() | ||
} | ||
|
||
override def receiveWithLogging = { | ||
case Heartbeat(executorId, taskMetrics, blockManagerId) => | ||
val response = HeartbeatResponse( | ||
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not your code, but can you put this in a separate val:
|
||
executorLastSeen(executorId) = System.currentTimeMillis() | ||
sender ! response | ||
case ExpireDeadHosts => | ||
expireDeadHosts() | ||
} | ||
|
||
private def expireDeadHosts(): Unit = { | ||
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") | ||
val now = System.currentTimeMillis() | ||
val minSeenTime = now - executorTimeout | ||
for ((executorId, lastSeenMs) <- executorLastSeen) { | ||
if (lastSeenMs < minSeenTime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's easier to read if it's the following instead:
|
||
logWarning(s"Removing executor $executorId with no recent heartbeats: " + | ||
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms") | ||
scheduler.executorLost(executorId, SlaveLost()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add an error message in |
||
if (sc.supportKillExecutor) { | ||
sc.killExecutor(executorId) | ||
} | ||
executorLastSeen.remove(executorId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if a heartbeat from the executor gets delivered after we kill / remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the akka connection is still alive, we can kill executor by send kill message to applicationMaster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this code will correctly expire that executor as a dead host after a timeout. |
||
} | ||
} | ||
} | ||
|
||
override def postStop(): Unit = { | ||
if (timeoutCheckingTask != null) { | ||
timeoutCheckingTask.cancel() | ||
} | ||
super.postStop() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
private[spark] var (schedulerBackend, taskScheduler) = | ||
SparkContext.createTaskScheduler(this, master) | ||
private val heartbeatReceiver = env.actorSystem.actorOf( | ||
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") | ||
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver") | ||
@volatile private[spark] var dagScheduler: DAGScheduler = _ | ||
try { | ||
dagScheduler = new DAGScheduler(this) | ||
|
@@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) | ||
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = | ||
if (dynamicAllocationEnabled) { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
assert(supportKillExecutor, | ||
"Dynamic allocation of executors is currently only supported in YARN mode") | ||
Some(new ExecutorAllocationManager(this, listenerBus, conf)) | ||
} else { | ||
|
@@ -1035,6 +1035,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
postEnvironmentUpdate() | ||
} | ||
|
||
private[spark] def supportKillExecutor = master.contains("yarn") || dynamicAllocationTesting | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is also used in
|
||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Register a listener to receive up-calls from events that happen during execution. | ||
|
@@ -1051,8 +1053,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
*/ | ||
@DeveloperApi | ||
override def requestExecutors(numAdditionalExecutors: Int): Boolean = { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
"Requesting executors is currently only supported in YARN mode") | ||
assert(supportKillExecutor, "Requesting executors is currently only supported in YARN mode") | ||
schedulerBackend match { | ||
case b: CoarseGrainedSchedulerBackend => | ||
b.requestExecutors(numAdditionalExecutors) | ||
|
@@ -1069,8 +1070,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
*/ | ||
@DeveloperApi | ||
override def killExecutors(executorIds: Seq[String]): Boolean = { | ||
assert(master.contains("yarn") || dynamicAllocationTesting, | ||
"Killing executors is currently only supported in YARN mode") | ||
assert(supportKillExecutor, "Killing executors is currently only supported in YARN mode") | ||
schedulerBackend match { | ||
case b: CoarseGrainedSchedulerBackend => | ||
b.killExecutors(executorIds) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,5 +73,9 @@ private[spark] trait TaskScheduler { | |
* @return An application ID | ||
*/ | ||
def applicationId(): String = appId | ||
|
||
|
||
/** | ||
* Process a lost executor in taskScheduler | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just |
||
def executorLost(executorId: String, reason: ExecutorLossReason): Unit | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'll need to update this comment if you rename the configs