From 05469de9f0482bce54a60161b9cb386a64173826 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Fri, 26 Dec 2014 15:11:30 +0800 Subject: [PATCH 01/13] [SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period --- .../scala/org/apache/spark/SparkContext.scala | 12 +++++-- .../apache/spark/scheduler/DAGScheduler.scala | 7 +++- .../spark/scheduler/SchedulerBackend.scala | 1 + .../spark/scheduler/TaskScheduler.scala | 5 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../spark/scheduler/DAGSchedulerSuite.scala | 4 +-- .../org/apache/spark/deploy/yarn/Client.scala | 34 +++++++++++++------ .../apache/spark/deploy/yarn/ClientBase.scala | 9 +++-- .../cluster/YarnClientSchedulerBackend.scala | 21 ++++++++---- .../spark/deploy/yarn/ClientBaseSuite.scala | 3 +- 10 files changed, 71 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57bc3d4e4ae36..29a6c779a0fe1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -333,9 +333,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) } - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - taskScheduler.start() + if (conf.getBoolean("spark.scheduler.app.slowstart", false) && master == "yarn-client") { + logInfo("TaskScheduler will start later.") + } else { + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } + } val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb8ccfbdbdcbb..faeeb597996fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -759,6 +759,9 @@ class DAGScheduler( val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties)) + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } submitStage(finalStage) } } @@ -1346,7 +1349,9 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") dagSchedulerActorSupervisor ! PoisonPill - taskScheduler.stop() + if (taskScheduler.started.compareAndSet(true, false)) { + taskScheduler.stop() + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 992c477493d8e..cee8d8241c71c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -25,6 +25,7 @@ package org.apache.spark.scheduler private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis + def initialize() = {} def start(): Unit def stop(): Unit def reviveOffers(): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915352b17..605de507fa513 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId @@ -73,5 +75,8 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId + + // Whether started + val started = new AtomicBoolean(false) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cd3c015321e85..2b68c5bab955a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -121,6 +121,7 @@ private[spark] class TaskSchedulerImpl( def initialize(backend: SchedulerBackend) { this.backend = backend + this.backend.initialize() // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 436eea4f1fdcf..1d8506bafcec1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -78,8 +78,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = {} - override def stop() = {} + override def start() = { started.compareAndSet(false, true) } + override def stop() = { started.compareAndSet(true, false) } override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index addaddb711d3c..0f85b9495caf2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records @@ -45,6 +46,7 @@ private[spark] class Client( val yarnClient = YarnClient.createYarnClient val yarnConf = new YarnConfiguration(hadoopConf) + var appContext: ApplicationSubmissionContext = null def stop(): Unit = yarnClient.stop() @@ -55,13 +57,9 @@ private[spark] class Client( * ------------------------------------------------------------------------------------- */ /** - * Submit an application running our ApplicationMaster to the ResourceManager. - * - * The stable Yarn API provides a convenience method (YarnClient#createApplication) for - * creating applications and setting up the application submission context. This was not - * available in the alpha API. + * Create an application running our ApplicationMaster to the ResourceManager. */ - override def submitApplication(): ApplicationId = { + override def createApplication(): ApplicationId = { yarnClient.init(yarnConf) yarnClient.start() @@ -75,15 +73,29 @@ private[spark] class Client( // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) - + // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) - + appContext = createApplicationSubmissionContext(newApp, containerContext) + appId + } + + /** + * Submit an application running our ApplicationMaster to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. + */ + override def submitApplication() = { + if (appContext == null) { + throw new IllegalArgumentException("AppContext is null. " + + "The method createApplication should be called in advance.") + } + // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") + logInfo(s"Submitting application ${appContext.getApplicationId().getId} to ResourceManager") yarnClient.submitApplication(appContext) - appId } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5f0c67f05c9dd..9cc7769b0fb92 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -517,7 +517,9 @@ private[spark] trait ClientBase extends Logging { * throw an appropriate SparkException. */ def run(): Unit = { - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) + val appId = createApplication() + submitApplication() + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId) if (yarnApplicationState == YarnApplicationState.FAILED || finalApplicationStatus == FinalApplicationStatus.FAILED) { throw new SparkException("Application finished with failed status") @@ -535,8 +537,11 @@ private[spark] trait ClientBase extends Logging { | Methods that cannot be implemented here due to API differences across hadoop versions | * --------------------------------------------------------------------------------------- */ + /** Create an application running our ApplicationMaster to the ResourceManager. */ + def createApplication(): ApplicationId + /** Submit an application running our ApplicationMaster to the ResourceManager. */ - def submitApplication(): ApplicationId + def submitApplication(): Unit /** Set up security tokens for launching our ApplicationMaster container. */ protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 09597bd0e6ab9..fede285461af8 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -34,13 +34,12 @@ private[spark] class YarnClientSchedulerBackend( private var client: Client = null private var appId: ApplicationId = null @volatile private var stopping: Boolean = false - + /** - * Create a Yarn client to submit an application to the ResourceManager. - * This waits until the application is running. + * Create a Yarn client to create an application to the ResourceManager. */ - override def start() { - super.start() + override def initialize() { + super.initialize() val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort @@ -54,7 +53,17 @@ private[spark] class YarnClientSchedulerBackend( val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExpectedExecutors = args.numExecutors client = new Client(args, conf) - appId = client.submitApplication() + appId = client.createApplication() + } + + /** + * Submit the application created by Yarn client to the ResourceManager. + * This waits until the application is running. + */ + override def start() { + super.start() + + client.submitApplication() waitForApplication() asyncMonitorApplication() } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 17b79ae1d82c4..9f4ace31dda19 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -248,7 +248,8 @@ class ClientBaseSuite extends FunSuite with Matchers { val sparkConf: SparkConf, val yarnConf: YarnConfiguration) extends ClientBase { override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? - override def submitApplication(): ApplicationId = ??? + override def createApplication(): ApplicationId = ??? + override def submitApplication(): Unit = ??? override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? override def getClientToken(report: ApplicationReport): String = ??? } From 722bd65c425b63c64c2b5f6e736842e66258f27d Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 31 Dec 2014 10:56:18 +0800 Subject: [PATCH 02/13] [SPARK-4692] [SQL] Support ! boolean logic operator like NOT --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 1 + .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 3f3d9e7cd4fbe..9ac6915768fd1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1077,6 +1077,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right)) case Token(OR(), left :: right:: Nil) => Or(nodeToExpr(left), nodeToExpr(right)) case Token(NOT(), child :: Nil) => Not(nodeToExpr(child)) + case Token("!", child :: Nil) => Not(nodeToExpr(child)) /* Case statements */ case Token("TOK_FUNCTION", Token(WHEN(), Nil) :: branches) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4d81acc753a27..47f903a04cb4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -56,6 +56,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { Locale.setDefault(originalLocale) } + createQueryTest("! operator", + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + |WHERE !(a>1) + """.stripMargin) + createQueryTest("constant object inspector for generic udf", """SELECT named_struct( lower("AA"), "10", From 18d4f336703c6d611a8bd30f589de9f424d1f71b Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 31 Dec 2014 11:02:08 +0800 Subject: [PATCH 03/13] revert --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 1 - .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 -------- 2 files changed, 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 9ac6915768fd1..3f3d9e7cd4fbe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1077,7 +1077,6 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right)) case Token(OR(), left :: right:: Nil) => Or(nodeToExpr(left), nodeToExpr(right)) case Token(NOT(), child :: Nil) => Not(nodeToExpr(child)) - case Token("!", child :: Nil) => Not(nodeToExpr(child)) /* Case statements */ case Token("TOK_FUNCTION", Token(WHEN(), Nil) :: branches) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 47f903a04cb4c..4d81acc753a27 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -56,14 +56,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { Locale.setDefault(originalLocale) } - createQueryTest("! operator", - """ - |SELECT a FROM ( - | SELECT 1 AS a FROM src LIMIT 1 UNION ALL - | SELECT 2 AS a FROM src LIMIT 1) table - |WHERE !(a>1) - """.stripMargin) - createQueryTest("constant object inspector for generic udf", """SELECT named_struct( lower("AA"), "10", From bf18f5f67937348ffc55370b0bf0bc6a62adb063 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Mon, 12 Jan 2015 20:16:03 +0800 Subject: [PATCH 04/13] [SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period --- .../scala/org/apache/spark/SparkContext.scala | 5 ++++- .../spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- docs/configuration.md | 9 +++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 18 ++++++++++++------ .../apache/spark/deploy/yarn/ClientBase.scala | 13 +++++++++++-- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 29a6c779a0fe1..c84ec916e64ed 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -333,7 +333,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) } - if (conf.getBoolean("spark.scheduler.app.slowstart", false) && master == "yarn-client") { + private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) + if (appSlowStart) { + assert(master.contains("yarn"), + "Slow start of application is currently only supported in YARN mode") logInfo("TaskScheduler will start later.") } else { // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1d8506bafcec1..277076bef07ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -78,8 +78,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = { started.compareAndSet(false, true) } - override def stop() = { started.compareAndSet(true, false) } + override def start() = { started.set(true) } + override def stop() = { started.set(false) } override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { diff --git a/docs/configuration.md b/docs/configuration.md index 2cc013c47fdbb..b13a14e6be3c7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1041,6 +1041,15 @@ Apart from these, the following properties are also available, and may be useful shipping a whole partition of data to the driver. + + spark.scheduler.app.slowstart + false + + Whether to make application slow start, which postpones TaskScheduler's requesting resources + to cluster master until DAGScheduler submits job. Note that this is currently only available + on YARN mode. + + #### Dynamic allocation diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0f85b9495caf2..1bfa096707b47 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -58,6 +58,12 @@ private[spark] class Client( /** * Create an application running our ApplicationMaster to the ResourceManager. + * This gets ApplicationId from the ResourceManager. However it doesn't submit the application + * submission context containing resources requests to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. */ override def createApplication(): ApplicationId = { yarnClient.init(yarnConf) @@ -81,11 +87,10 @@ private[spark] class Client( } /** - * Submit an application running our ApplicationMaster to the ResourceManager. - * - * The stable Yarn API provides a convenience method (YarnClient#createApplication) for - * creating applications and setting up the application submission context. This was not - * available in the alpha API. + * Submit the application submission context containing resources requests + * to the ResourceManager. When the ResourceManager gets this submission message, + * it will schedule and grant resources for this application. + * This will actually trigger resources scheduling in the cluster. */ override def submitApplication() = { if (appContext == null) { @@ -94,7 +99,8 @@ private[spark] class Client( } // Finally, submit and monitor the application - logInfo(s"Submitting application ${appContext.getApplicationId().getId} to ResourceManager") + logInfo(s"Submitting application context of ${appContext.getApplicationId().getId} " + + "to ResourceManager") yarnClient.submitApplication(appContext) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 9cc7769b0fb92..527972f2acb52 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -537,10 +537,19 @@ private[spark] trait ClientBase extends Logging { | Methods that cannot be implemented here due to API differences across hadoop versions | * --------------------------------------------------------------------------------------- */ - /** Create an application running our ApplicationMaster to the ResourceManager. */ + /** + * Create an application running our ApplicationMaster to the ResourceManager. + * This gets ApplicationId from the ResourceManager. However it doesn't submit the application + * submission context containing resources requests to the ResourceManager. + */ def createApplication(): ApplicationId - /** Submit an application running our ApplicationMaster to the ResourceManager. */ + /** + * Submit the application submission context containing resources requests + * to the ResourceManager. When the ResourceManager gets this submission message, + * it will schedule and grant resources for this application. + * This will actually trigger resources scheduling in the cluster. + */ def submitApplication(): Unit /** Set up security tokens for launching our ApplicationMaster container. */ From 759fae1d256c42f051326c7a7b443a512d6c4837 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Mon, 12 Jan 2015 21:13:21 +0800 Subject: [PATCH 05/13] resolve conflict --- .../scala/org/apache/spark/SparkContext.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c84ec916e64ed..57bc3d4e4ae36 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -333,18 +333,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) } - private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) - if (appSlowStart) { - assert(master.contains("yarn"), - "Slow start of application is currently only supported in YARN mode") - logInfo("TaskScheduler will start later.") - } else { - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - if (taskScheduler.started.compareAndSet(false, true)) { - taskScheduler.start() - } - } + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + taskScheduler.start() val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) From 30aab15257e60be74c734b289f5d7c5b1da1e3fa Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Mon, 12 Jan 2015 21:27:21 +0800 Subject: [PATCH 06/13] resolve conflict --- .../scala/org/apache/spark/SparkContext.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c84ec916e64ed..57bc3d4e4ae36 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -333,18 +333,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) } - private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) - if (appSlowStart) { - assert(master.contains("yarn"), - "Slow start of application is currently only supported in YARN mode") - logInfo("TaskScheduler will start later.") - } else { - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - if (taskScheduler.started.compareAndSet(false, true)) { - taskScheduler.start() - } - } + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + taskScheduler.start() val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) From 577614f40702d6c2aea0d57bf8b52b744e69def3 Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Mon, 12 Jan 2015 21:30:29 +0800 Subject: [PATCH 07/13] resolve conflict --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index faeeb597996fd..6f49ec4d8b77c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1349,9 +1349,7 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") dagSchedulerActorSupervisor ! PoisonPill - if (taskScheduler.started.compareAndSet(true, false)) { - taskScheduler.stop() - } + taskScheduler.stop() } } From 5550b1b919dd4744be5144e077231608b3725b16 Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Tue, 13 Jan 2015 10:29:25 +0800 Subject: [PATCH 08/13] resolve conflict --- .../org/apache/spark/deploy/yarn/Client.scala | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1bfa096707b47..addaddb711d3c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records @@ -46,7 +45,6 @@ private[spark] class Client( val yarnClient = YarnClient.createYarnClient val yarnConf = new YarnConfiguration(hadoopConf) - var appContext: ApplicationSubmissionContext = null def stop(): Unit = yarnClient.stop() @@ -57,15 +55,13 @@ private[spark] class Client( * ------------------------------------------------------------------------------------- */ /** - * Create an application running our ApplicationMaster to the ResourceManager. - * This gets ApplicationId from the ResourceManager. However it doesn't submit the application - * submission context containing resources requests to the ResourceManager. + * Submit an application running our ApplicationMaster to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */ - override def createApplication(): ApplicationId = { + override def submitApplication(): ApplicationId = { yarnClient.init(yarnConf) yarnClient.start() @@ -79,29 +75,15 @@ private[spark] class Client( // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) - + // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) - appContext = createApplicationSubmissionContext(newApp, containerContext) - appId - } - - /** - * Submit the application submission context containing resources requests - * to the ResourceManager. When the ResourceManager gets this submission message, - * it will schedule and grant resources for this application. - * This will actually trigger resources scheduling in the cluster. - */ - override def submitApplication() = { - if (appContext == null) { - throw new IllegalArgumentException("AppContext is null. " + - "The method createApplication should be called in advance.") - } - + val appContext = createApplicationSubmissionContext(newApp, containerContext) + // Finally, submit and monitor the application - logInfo(s"Submitting application context of ${appContext.getApplicationId().getId} " + - "to ResourceManager") + logInfo(s"Submitting application ${appId.getId} to ResourceManager") yarnClient.submitApplication(appContext) + appId } /** From a3954f26302061ae074a4679fb044d37b1689ad0 Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Tue, 13 Jan 2015 10:31:29 +0800 Subject: [PATCH 09/13] resolve conflict --- .../apache/spark/deploy/yarn/ClientBase.scala | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 527972f2acb52..5f0c67f05c9dd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -517,9 +517,7 @@ private[spark] trait ClientBase extends Logging { * throw an appropriate SparkException. */ def run(): Unit = { - val appId = createApplication() - submitApplication() - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId) + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) if (yarnApplicationState == YarnApplicationState.FAILED || finalApplicationStatus == FinalApplicationStatus.FAILED) { throw new SparkException("Application finished with failed status") @@ -537,20 +535,8 @@ private[spark] trait ClientBase extends Logging { | Methods that cannot be implemented here due to API differences across hadoop versions | * --------------------------------------------------------------------------------------- */ - /** - * Create an application running our ApplicationMaster to the ResourceManager. - * This gets ApplicationId from the ResourceManager. However it doesn't submit the application - * submission context containing resources requests to the ResourceManager. - */ - def createApplication(): ApplicationId - - /** - * Submit the application submission context containing resources requests - * to the ResourceManager. When the ResourceManager gets this submission message, - * it will schedule and grant resources for this application. - * This will actually trigger resources scheduling in the cluster. - */ - def submitApplication(): Unit + /** Submit an application running our ApplicationMaster to the ResourceManager. */ + def submitApplication(): ApplicationId /** Set up security tokens for launching our ApplicationMaster container. */ protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit From 3875b1b0a3120af156c7acf4f103d3edd94b2cb8 Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Tue, 13 Jan 2015 10:33:36 +0800 Subject: [PATCH 10/13] resolve conflict --- .../scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 9f4ace31dda19..17b79ae1d82c4 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -248,8 +248,7 @@ class ClientBaseSuite extends FunSuite with Matchers { val sparkConf: SparkConf, val yarnConf: YarnConfiguration) extends ClientBase { override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? - override def createApplication(): ApplicationId = ??? - override def submitApplication(): Unit = ??? + override def submitApplication(): ApplicationId = ??? override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? override def getClientToken(report: ApplicationReport): String = ??? } From 42e02a7786ab31f4fbd945e481144c89abe53527 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 13 Jan 2015 12:55:00 +0800 Subject: [PATCH 11/13] [SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period --- .../scala/org/apache/spark/SparkContext.scala | 15 ++++++++--- .../apache/spark/scheduler/DAGScheduler.scala | 4 ++- .../org/apache/spark/deploy/yarn/Client.scala | 27 +++++++++++++++---- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee2766..7ea0d58aa3621 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -338,9 +338,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's - // constructor - taskScheduler.start() + private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false) + if (appSlowStart) { + assert(master.contains("yarn"), + "Slow start of application is currently only supported in YARN mode") + logInfo("TaskScheduler will start later.") + } else { + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + if (taskScheduler.started.compareAndSet(false, true)) { + taskScheduler.start() + } + } val applicationId: String = taskScheduler.applicationId() conf.set("spark.app.id", applicationId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ec36bc49524bd..11424b54ea47d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1330,7 +1330,9 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") dagSchedulerActorSupervisor ! PoisonPill - taskScheduler.stop() + if (taskScheduler.started.compareAndSet(true, false)) { + taskScheduler.stop() + } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371cd60..8deb66c5a966b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -66,6 +66,7 @@ private[spark] class Client( private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode + private var appContext: ApplicationSubmissionContext = null def stop(): Unit = yarnClient.stop() @@ -77,13 +78,15 @@ private[spark] class Client( * ------------------------------------------------------------------------------------- */ /** - * Submit an application running our ApplicationMaster to the ResourceManager. + * Create an application running our ApplicationMaster to the ResourceManager. + * This gets ApplicationId from the ResourceManager. However it doesn't submit the application + * submission context containing resources requests to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */ - def submitApplication(): ApplicationId = { + def createApplication(): ApplicationId = { yarnClient.init(yarnConf) yarnClient.start() @@ -100,12 +103,26 @@ private[spark] class Client( // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) + appContext = createApplicationSubmissionContext(newApp, containerContext) + appId + } + /** + * Submit an application running our ApplicationMaster to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. + * Submit the application submission context containing resources requests + * to the ResourceManager. When the ResourceManager gets this submission message, + * it will schedule and grant resources for this application. + * This will actually trigger resources scheduling in the cluster. + */ + def submitApplication() = { // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") + logInfo(s"Submitting application context of ${appContext.getApplicationId().getId} " + + "to ResourceManager") yarnClient.submitApplication(appContext) - appId } /** From 74094afa1bafbc55540f334c3d37fcb3a2570937 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 13 Jan 2015 12:59:08 +0800 Subject: [PATCH 12/13] [SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8deb66c5a966b..e84fdb78d3a7a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,11 +108,6 @@ private[spark] class Client( } /** - * Submit an application running our ApplicationMaster to the ResourceManager. - * - * The stable Yarn API provides a convenience method (YarnClient#createApplication) for - * creating applications and setting up the application submission context. This was not - * available in the alpha API. * Submit the application submission context containing resources requests * to the ResourceManager. When the ResourceManager gets this submission message, * it will schedule and grant resources for this application. From 575d57df46b8814355eacefd7e407f48ffb20dd6 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 13 Jan 2015 14:07:09 +0800 Subject: [PATCH 13/13] fix a bug --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e84fdb78d3a7a..2577747639c6d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -608,7 +608,9 @@ private[spark] class Client( * throw an appropriate SparkException. */ def run(): Unit = { - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) + val appId = createApplication() + submitApplication() + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId) if (yarnApplicationState == YarnApplicationState.FAILED || finalApplicationStatus == FinalApplicationStatus.FAILED) { throw new SparkException("Application finished with failed status")