Skip to content

Commit

Permalink
[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
Browse files Browse the repository at this point in the history
This fixes the thread leak. I also changed the unit test to keep track
of allocated contexts and making sure they're closed after tests are
run; this is needed since some tests use this pattern:

    val sc = createContext()
    doSomethingThatMayThrow()
    sc.stop()
  • Loading branch information
Marcelo Vanzin committed Apr 1, 2015
1 parent 305abe1 commit 9886f69
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}

import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Register for scheduler callbacks to decide when to add and remove executors.
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
startPolling()

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

/**
* Start the main polling thread that keeps track of when to add and remove executors.
* Stop the allocation manager.
*/
private def startPolling(): Unit = {
val t = new Thread {
override def run(): Unit = {
while (true) {
try {
schedule()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
def stop(): Unit = {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
Expand Down Expand Up @@ -1406,6 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import scala.collection.mutable

import org.scalatest.{FunSuite, PrivateMethodTester}
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand All @@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._

private val contexts = new mutable.ListBuffer[SparkContext]()

before {
contexts.clear()
}

after {
contexts.foreach(_.stop())
}

test("verify min/max executors") {
val conf = new SparkConf()
.setMaster("local")
Expand Down Expand Up @@ -665,16 +675,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-2"))
assert(!removeTimes(manager).contains("executor-1"))
}
}

/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L

private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
Expand All @@ -688,9 +688,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
sustainedSchedulerBacklogTimeout.toString)
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
.set("spark.dynamicAllocation.testing", "true")
new SparkContext(conf)
val sc = new SparkContext(conf)
contexts += sc
sc
}

}

/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L

private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
}
Expand Down

0 comments on commit 9886f69

Please sign in to comment.