Skip to content

Commit

Permalink
Remove assert in SparkContext.killExecutors
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 committed Feb 10, 2015
1 parent 5bedcb8 commit 52725af
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms")
scheduler.executorLost(executorId, SlaveLost())
sc.killExecutor(executorId)
if(sc.supportKillExecutor()) {
sc.killExecutor(executorId)
}
executorLastSeen.remove(executorId)
}
}
Expand Down
30 changes: 16 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportKillExecutor(),
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
Expand Down Expand Up @@ -1034,6 +1034,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
}

def supportKillExecutor(): Boolean = {
if(master.contains("yarn") || dynamicAllocationTesting) {
true
}
false
}

/**
* :: DeveloperApi ::
Expand All @@ -1051,8 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
assert(supportKillExecutor(), "Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1069,17 +1075,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
if (master.contains("yarn") || dynamicAllocationTesting) {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
}
} else {
logWarning("Killing executors is currently only supported in YARN mode")
false
assert(supportKillExecutor(), "Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
}
}

Expand Down

0 comments on commit 52725af

Please sign in to comment.