Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Jan 12, 2015
2 parents 67bcb46 + 2130de9 commit bc6e1ec
Show file tree
Hide file tree
Showing 87 changed files with 1,616 additions and 875 deletions.
5 changes: 4 additions & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ while (($#)); do
shift
done

DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
if [ -z "$SPARK_CONF_DIR" ]; then
export SPARK_CONF_DIR="$SPARK_HOME/conf"
fi
DEFAULT_PROPERTIES_FILE="$SPARK_CONF_DIR/spark-defaults.conf"
if [ "$MASTER" == "yarn-cluster" ]; then
SPARK_SUBMIT_DEPLOY_MODE=cluster
fi
Expand Down
6 changes: 5 additions & 1 deletion bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ set ORIG_ARGS=%*

rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf

if not defined %SPARK_CONF_DIR% (
set SPARK_CONF_DIR=%SPARK_HOME%\conf
)
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
Expand Down
117 changes: 77 additions & 40 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
listenerBus: LiveListenerBus,
conf: SparkConf)
extends Logging {

allocationManager =>

import ExecutorAllocationManager._

// Lower and upper bounds on the number of executors. These are required.
Expand Down Expand Up @@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private var clock: Clock = new RealClock

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener(this)
private val listener = new ExecutorAllocationListener

/**
* Verify that the settings specified through the config are valid.
Expand Down Expand Up @@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
addTime += sustainedSchedulerBacklogTimeout * 1000
}

removeTimes.foreach { case (executorId, expireTime) =>
if (now >= expireTime) {
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
removeExecutor(executorId)
removeTimes.remove(executorId)
}
!expired
}
}

Expand Down Expand Up @@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
// Do not kill the executor if we have already reached the lower bound
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
if (numExistingExecutors - 1 < minNumExecutors) {
logInfo(s"Not removing idle executor $executorId because there are only " +
logDebug(s"Not removing idle executor $executorId because there are only " +
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
return false
}
Expand All @@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorAdded(executorId: String): Unit = synchronized {
if (!executorIds.contains(executorId)) {
executorIds.add(executorId)
executorIds.foreach(onExecutorIdle)
// If an executor (call this executor X) is not removed because the lower bound
// has been reached, it will no longer be marked as idle. When new executors join,
// however, we are no longer at the lower bound, and so we must mark executor X
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
if (numExecutorsPending > 0) {
numExecutorsPending -= 1
Expand Down Expand Up @@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
* the executor is not already marked as idle.
*/
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}

Expand All @@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
* and consistency of events returned by the listener. For simplicity, it does not account
* for speculated tasks.
*/
private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
extends SparkListener {
private class ExecutorAllocationListener extends SparkListener {

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
synchronized {
val stageId = stageSubmitted.stageInfo.stageId
val numTasks = stageSubmitted.stageInfo.numTasks
val stageId = stageSubmitted.stageInfo.stageId
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
stageIdToNumTasks(stageId) = numTasks
allocationManager.onSchedulerBacklogged()
}
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
synchronized {
val stageId = stageCompleted.stageInfo.stageId
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToTaskIndices -= stageId

Expand All @@ -426,47 +437,62 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
val taskId = taskStart.taskInfo.taskId
val taskIndex = taskStart.taskInfo.index
val executorId = taskStart.taskInfo.executorId

// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
val numTasksScheduled = stageIdToTaskIndices(stageId).size
val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
if (numTasksScheduled == numTasksTotal) {
// No more pending tasks for this stage
stageIdToNumTasks -= stageId
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
allocationManager.synchronized {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
if (!allocationManager.executorIds.contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}

// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
val numTasksScheduled = stageIdToTaskIndices(stageId).size
val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
if (numTasksScheduled == numTasksTotal) {
// No more pending tasks for this stage
stageIdToNumTasks -= stageId
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
}
}

// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)
// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId

// If the executor is no longer running scheduled any tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
executorIdToTaskIds -= executorId
allocationManager.onExecutorIdle(executorId)
allocationManager.synchronized {
// If the executor is no longer running scheduled any tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
executorIdToTaskIds -= executorId
allocationManager.onExecutorIdle(executorId)
}
}
}
}

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
allocationManager.onExecutorAdded(executorId)
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
if (!allocationManager.executorIds.contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}
}
}

Expand All @@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def totalPendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def isExecutorIdle(executorId: String): Boolean = {
!executorIdToTaskIds.contains(executorId)
}
}

}
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)

/** Set a human readable description of the current job. */
@deprecated("use setJobGroup", "0.8.1")
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* Exception thrown when a task cannot be serialized.
*/
private[spark] class TaskNotSerializableException(error: Throwable) extends Exception(error)
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.""".stripMargin
| working directory of each executor.
""".stripMargin
)
SparkSubmit.exitFn()
}
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -866,26 +866,6 @@ class DAGScheduler(
}

if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
// cluster schedulers.
//
// We've already serialized RDDs and closures in taskBinary, but here we check for all other
// objects such as Partition.
try {
closureSerializer.serialize(tasks.head)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}

logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -209,6 +210,40 @@ private[spark] class TaskSchedulerImpl(
.format(manager.taskSet.id, manager.parent.name))
}

private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
Expand Down Expand Up @@ -251,23 +286,8 @@ private[spark] class TaskSchedulerImpl(
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
}
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}

Expand Down
Loading

0 comments on commit bc6e1ec

Please sign in to comment.