Skip to content

Commit

Permalink
Change some code style.
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 committed Feb 27, 2015
1 parent 1a042ff commit 2c9a46a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.util.ActorLogReceive
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.network.timeoutMs.
* expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
executorId: String,
Expand All @@ -50,18 +50,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

private val executorTimeout = sc.conf.getLong("spark.network.timeoutMs",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000))
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000

private val checkTimeoutInterval = sc.conf.getLong("spark.network.timeoutIntervalMs",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000

private var timeoutCheckingTask: Cancellable = null

override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

Expand All @@ -80,9 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeout) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms")
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand Down

0 comments on commit 2c9a46a

Please sign in to comment.