Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 4962 #25

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cdef539
Merge pull request #1 from apache/master
YanTangZhai Aug 6, 2014
cbcba66
Merge pull request #3 from apache/master
YanTangZhai Aug 20, 2014
8a00106
Merge pull request #6 from apache/master
YanTangZhai Sep 12, 2014
03b62b0
Merge pull request #7 from apache/master
YanTangZhai Sep 16, 2014
76d4027
Merge pull request #8 from apache/master
YanTangZhai Oct 20, 2014
d26d982
Merge pull request #9 from apache/master
YanTangZhai Nov 4, 2014
e249846
Merge pull request #10 from apache/master
YanTangZhai Nov 11, 2014
6e643f8
Merge pull request #11 from apache/master
YanTangZhai Dec 1, 2014
718afeb
Merge pull request #12 from apache/master
YanTangZhai Dec 5, 2014
e4c2c0a
Merge pull request #15 from apache/master
YanTangZhai Dec 24, 2014
05469de
[SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to s…
YanTangZhai Dec 26, 2014
722bd65
[SPARK-4692] [SQL] Support ! boolean logic operator like NOT
YanTangZhai Dec 31, 2014
18d4f33
revert
YanTangZhai Dec 31, 2014
bf18f5f
[SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to s…
YanTangZhai Jan 12, 2015
759fae1
resolve conflict
YanTangZhai Jan 12, 2015
30aab15
resolve conflict
YanTangZhai Jan 12, 2015
577614f
resolve conflict
YanTangZhai Jan 12, 2015
5550b1b
resolve conflict
YanTangZhai Jan 13, 2015
a3954f2
resolve conflict
YanTangZhai Jan 13, 2015
3875b1b
resolve conflict
YanTangZhai Jan 13, 2015
a0153e4
Merge pull request #26 from apache/master
YanTangZhai Jan 13, 2015
a5d6176
update
YanTangZhai Jan 13, 2015
42e02a7
[SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to s…
YanTangZhai Jan 13, 2015
74094af
[SPARK-4962] [CORE] Put TaskScheduler.start back in SparkContext to s…
YanTangZhai Jan 13, 2015
575d57d
fix a bug
YanTangZhai Jan 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
taskScheduler.start()
private val appSlowStart = conf.getBoolean("spark.scheduler.app.slowstart", false)
if (appSlowStart) {
assert(master.contains("yarn"),
"Slow start of application is currently only supported in YARN mode")
logInfo("TaskScheduler will start later.")
} else {
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
if (taskScheduler.started.compareAndSet(false, true)) {
taskScheduler.start()
}
}

val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ class DAGScheduler(
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
if (taskScheduler.started.compareAndSet(false, true)) {
taskScheduler.start()
}
submitStage(finalStage)
}
}
Expand Down Expand Up @@ -1327,7 +1330,9 @@ class DAGScheduler(
def stop() {
logInfo("Stopping DAGScheduler")
dagSchedulerActorSupervisor ! PoisonPill
taskScheduler.stop()
if (taskScheduler.started.compareAndSet(true, false)) {
taskScheduler.stop()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package org.apache.spark.scheduler
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis

def initialize() = {}
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -73,5 +75,8 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId

// Whether started
val started = new AtomicBoolean(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private[spark] class TaskSchedulerImpl(

def initialize(backend: SchedulerBackend) {
this.backend = backend
this.backend.initialize()
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val taskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def start() = { started.set(true) }
override def stop() = { started.set(false) }
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,15 @@ Apart from these, the following properties are also available, and may be useful
shipping a whole partition of data to the driver.
</td>
</tr>
<tr>
<td><code>spark.scheduler.app.slowstart</code></td>
<td>false</td>
<td>
Whether to make application slow start, which postpones TaskScheduler's requesting resources
to cluster master until DAGScheduler submits job. Note that this is currently only available
on YARN mode.
</td>
</tr>
</table>

#### Dynamic allocation
Expand Down
26 changes: 20 additions & 6 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private[spark] class Client(
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
private var appContext: ApplicationSubmissionContext = null


def stop(): Unit = yarnClient.stop()
Expand All @@ -77,13 +78,15 @@ private[spark] class Client(
* ------------------------------------------------------------------------------------- */

/**
* Submit an application running our ApplicationMaster to the ResourceManager.
* Create an application running our ApplicationMaster to the ResourceManager.
* This gets ApplicationId from the ResourceManager. However it doesn't submit the application
* submission context containing resources requests to the ResourceManager.
*
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
def createApplication(): ApplicationId = {
yarnClient.init(yarnConf)
yarnClient.start()

Expand All @@ -100,12 +103,21 @@ private[spark] class Client(

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
appContext = createApplicationSubmissionContext(newApp, containerContext)
appId
}

/**
* Submit the application submission context containing resources requests
* to the ResourceManager. When the ResourceManager gets this submission message,
* it will schedule and grant resources for this application.
* This will actually trigger resources scheduling in the cluster.
*/
def submitApplication() = {
// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
logInfo(s"Submitting application context of ${appContext.getApplicationId().getId} " +
"to ResourceManager")
yarnClient.submitApplication(appContext)
appId
}

/**
Expand Down Expand Up @@ -596,7 +608,9 @@ private[spark] class Client(
* throw an appropriate SparkException.
*/
def run(): Unit = {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
val appId = createApplication()
submitApplication()
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException("Application finished with failed status")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ private[spark] class YarnClientSchedulerBackend(
private var client: Client = null
private var appId: ApplicationId = null
@volatile private var stopping: Boolean = false

/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
* Create a Yarn client to create an application to the ResourceManager.
*/
override def start() {
super.start()
override def initialize() {
super.initialize()
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
Expand All @@ -54,7 +53,17 @@ private[spark] class YarnClientSchedulerBackend(
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
appId = client.submitApplication()
appId = client.createApplication()
}

/**
* Submit the application created by Yarn client to the ResourceManager.
* This waits until the application is running.
*/
override def start() {
super.start()

client.submitApplication()
waitForApplication()
asyncMonitorApplication()
}
Expand Down