diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee2766..7ea0d58aa3621 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -338,9 +338,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - taskScheduler.start() + private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) + if (appSlowStart) { + assert(master.contains("yarn"), + "Slow start of application is currently only supported in YARN mode") + logInfo("TaskScheduler will start later.") + } else { + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } + } val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 61d09d73e17cb..11424b54ea47d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -760,6 +760,9 @@ class DAGScheduler( val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties)) + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } submitStage(finalStage) } } @@ -1327,7 +1330,9 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") dagSchedulerActorSupervisor ! PoisonPill - taskScheduler.stop() + if (taskScheduler.started.compareAndSet(true, false)) { + taskScheduler.stop() + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 992c477493d8e..cee8d8241c71c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -25,6 +25,7 @@ package org.apache.spark.scheduler private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis + def initialize() = {} def start(): Unit def stop(): Unit def reviveOffers(): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915352b17..605de507fa513 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId @@ -73,5 +75,8 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId + + // Whether started + val started = new AtomicBoolean(false) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a1dfb01062591..63ed69255efb9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -122,6 +122,7 @@ private[spark] class TaskSchedulerImpl( def initialize(backend: SchedulerBackend) { this.backend = backend + this.backend.initialize() // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d30eb10bbe947..2f145e0d8e1cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -78,8 +78,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = {} - override def stop() = {} + override def start() = { started.set(true) } + override def stop() = { started.set(false) } override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { diff --git a/docs/configuration.md b/docs/configuration.md index f292bfbb7dcd6..2117cb425aa75 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1053,6 +1053,15 @@ Apart from these, the following properties are also available, and may be useful shipping a whole partition of data to the driver. +
spark.scheduler.app.slowstart