diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee2766..7ea0d58aa3621 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -338,9 +338,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - taskScheduler.start() + private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) + if (appSlowStart) { + assert(master.contains("yarn"), + "Slow start of application is currently only supported in YARN mode") + logInfo("TaskScheduler will start later.") + } else { + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } + } val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 61d09d73e17cb..11424b54ea47d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -760,6 +760,9 @@ class DAGScheduler( val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties)) + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } submitStage(finalStage) } } @@ -1327,7 +1330,9 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") dagSchedulerActorSupervisor ! PoisonPill - taskScheduler.stop() + if (taskScheduler.started.compareAndSet(true, false)) { + taskScheduler.stop() + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 992c477493d8e..cee8d8241c71c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -25,6 +25,7 @@ package org.apache.spark.scheduler private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis + def initialize() = {} def start(): Unit def stop(): Unit def reviveOffers(): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915352b17..605de507fa513 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId @@ -73,5 +75,8 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId + + // Whether started + val started = new AtomicBoolean(false) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a1dfb01062591..63ed69255efb9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -122,6 +122,7 @@ private[spark] class TaskSchedulerImpl( def initialize(backend: SchedulerBackend) { this.backend = backend + this.backend.initialize() // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d30eb10bbe947..2f145e0d8e1cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -78,8 +78,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = {} - override def stop() = {} + override def start() = { started.set(true) } + override def stop() = { started.set(false) } override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { diff --git a/docs/configuration.md b/docs/configuration.md index f292bfbb7dcd6..2117cb425aa75 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1053,6 +1053,15 @@ Apart from these, the following properties are also available, and may be useful shipping a whole partition of data to the driver. + + spark.scheduler.app.slowstart + false + + Whether to make application slow start, which postpones TaskScheduler's requesting resources + to cluster master until DAGScheduler submits job. Note that this is currently only available + on YARN mode. + + #### Dynamic allocation diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371cd60..2577747639c6d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -66,6 +66,7 @@ private[spark] class Client( private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode + private var appContext: ApplicationSubmissionContext = null def stop(): Unit = yarnClient.stop() @@ -77,13 +78,15 @@ private[spark] class Client( * ------------------------------------------------------------------------------------- */ /** - * Submit an application running our ApplicationMaster to the ResourceManager. + * Create an application running our ApplicationMaster to the ResourceManager. + * This gets ApplicationId from the ResourceManager. However it doesn't submit the application + * submission context containing resources requests to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */ - def submitApplication(): ApplicationId = { + def createApplication(): ApplicationId = { yarnClient.init(yarnConf) yarnClient.start() @@ -100,12 +103,21 @@ private[spark] class Client( // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) + appContext = createApplicationSubmissionContext(newApp, containerContext) + appId + } + /** + * Submit the application submission context containing resources requests + * to the ResourceManager. When the ResourceManager gets this submission message, + * it will schedule and grant resources for this application. + * This will actually trigger resources scheduling in the cluster. + */ + def submitApplication() = { // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") + logInfo(s"Submitting application context of ${appContext.getApplicationId().getId} " + + "to ResourceManager") yarnClient.submitApplication(appContext) - appId } /** @@ -596,7 +608,9 @@ private[spark] class Client( * throw an appropriate SparkException. */ def run(): Unit = { - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) + val appId = createApplication() + submitApplication() + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId) if (yarnApplicationState == YarnApplicationState.FAILED || finalApplicationStatus == FinalApplicationStatus.FAILED) { throw new SparkException("Application finished with failed status") diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f99291553b7b8..bfcff99568f24 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -34,13 +34,12 @@ private[spark] class YarnClientSchedulerBackend( private var client: Client = null private var appId: ApplicationId = null @volatile private var stopping: Boolean = false - + /** - * Create a Yarn client to submit an application to the ResourceManager. - * This waits until the application is running. + * Create a Yarn client to create an application to the ResourceManager. */ - override def start() { - super.start() + override def initialize() { + super.initialize() val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort @@ -54,7 +53,17 @@ private[spark] class YarnClientSchedulerBackend( val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExpectedExecutors = args.numExecutors client = new Client(args, conf) - appId = client.submitApplication() + appId = client.createApplication() + } + + /** + * Submit the application created by Yarn client to the ResourceManager. + * This waits until the application is running. + */ + override def start() { + super.start() + + client.submitApplication() waitForApplication() asyncMonitorApplication() }