Skip to content

Commit

Permalink
Shuffle biased task scheduling (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
rynorris authored and Will Manning committed Mar 13, 2019
1 parent 3e8b1f4 commit 921d72f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 19 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,8 @@ private[spark] class MapOutputTrackerMaster(
def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = {
shuffleStatuses.values
.flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive))
.groupBy(_._1)
.mapValues(_.exists(_._2))
.groupBy(_._1) // group by executor ID
.mapValues(_.exists(_._2)) // true if any are Active
.mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ private[spark] class TaskSchedulerImpl(
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

// whether to prefer assigning tasks to executors that contain shuffle files
val shuffleBiasedTaskSchedulingEnabled =
conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false)

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

Expand Down Expand Up @@ -414,11 +418,7 @@ private[spark] class TaskSchedulerImpl(
}
}.getOrElse(offers)

val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
var tasks: Seq[Seq[TaskDescription]] = Nil
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
Expand All @@ -428,11 +428,36 @@ private[spark] class TaskSchedulerImpl(
}
}

// If shuffle-biased task scheduling is enabled, then first assign as many tasks as possible to
// executors containing active shuffle files, followed by assigning to executors with inactive
// shuffle files, and then finally to those without shuffle files. This bin packing allows for
// more efficient dynamic allocation in the absence of an external shuffle service.
val partitionedAndShuffledOffers = partitionAndShuffleOffers(filteredOffers)
for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) {
tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets)
}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}

private def doResourceOffers(
shuffledOffers: IndexedSeq[WorkerOffer],
sortedTaskSets: IndexedSeq[TaskSetManager]): Seq[Seq[TaskDescription]] = {
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
// Skip the barrier taskSet if the available slots are less than the number of pending tasks
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// Skip the launch process.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
Expand Down Expand Up @@ -520,18 +545,33 @@ private[spark] class TaskSchedulerImpl(
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " +
s"barrier stage ${taskSet.stageId}.")
}
}
}
tasks
}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
/**
* Shuffle offers around to avoid always placing tasks on the same workers.
* If shuffle-biased task scheduling is enabled, this function partitions the offers based on
* whether they have active/inactive/no shuffle files present.
*/
def partitionAndShuffleOffers(offers: IndexedSeq[WorkerOffer])
: IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = {
if (shuffleBiasedTaskSchedulingEnabled && offers.length > 1) {
// bias towards executors that have active shuffle outputs
val execShuffles = mapOutputTracker.getExecutorShuffleStatus
offers
.groupBy(offer => execShuffles.getOrElse(offer.executorId, ExecutorShuffleStatus.Unknown))
.mapValues(doShuffleOffers)
.toStream
.sortBy(_._1) // order: Active, Inactive, Unknown
.toIndexedSeq
} else {
IndexedSeq((ExecutorShuffleStatus.Unknown, doShuffleOffers(offers)))
}
return tasks
}

private def createUnschedulableTaskSetAbortTimer(
Expand All @@ -552,10 +592,10 @@ private[spark] class TaskSchedulerImpl(
}

/**
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
* overriding in tests, so it can be deterministic.
* Does the shuffling for [[partitionAndShuffleOffers()]]. Exposed to allow overriding in tests,
* so that it can be deterministic.
*/
protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
protected def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
Random.shuffle(offers)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.ExecutorShuffleStatus._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ManualClock

class FakeSchedulerBackend extends SchedulerBackend {
Expand Down Expand Up @@ -1030,7 +1032,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// We customize the task scheduler just to let us control the way offers are shuffled, so we
// can be sure we try both permutations, and to control the clock on the tasksetmanager.
val taskScheduler = new TaskSchedulerImpl(sc) {
override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
override def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
// Don't shuffle the offers around for this test. Instead, we'll just pass in all
// the permutations we care about directly.
offers
Expand Down Expand Up @@ -1067,6 +1069,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}

test("Shuffle-biased task scheduling enabled should lead to non-random offer shuffling") {
setupScheduler("spark.scheduler.shuffleBiasedTaskScheduling.enabled" -> "true")

// Make offers in different executors, so they can be a mix of active, inactive, unknown
val offers = IndexedSeq(
WorkerOffer("exec1", "host1", 2), // inactive
WorkerOffer("exec2", "host2", 2), // active
WorkerOffer("exec3", "host3", 2) // unknown
)
val makeMapStatus = (offer: WorkerOffer) =>
MapStatus(BlockManagerId(offer.executorId, offer.host, 1), Array(10))
val mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.registerShuffle(0, 2)
mapOutputTracker.registerShuffle(1, 1)
mapOutputTracker.registerMapOutput(0, 0, makeMapStatus(offers(0)))
mapOutputTracker.registerMapOutput(0, 1, makeMapStatus(offers(1)))
mapOutputTracker.registerMapOutput(1, 0, makeMapStatus(offers(1)))
mapOutputTracker.markShuffleInactive(0)

val execStatus = mapOutputTracker.getExecutorShuffleStatus
assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active)))

assert(taskScheduler.partitionAndShuffleOffers(offers).map(_._1)
.equals(IndexedSeq(Active, Inactive, Unknown)))
assert(taskScheduler.partitionAndShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_))
.equals(IndexedSeq(1, 0, 2)))

taskScheduler.submitTasks(FakeTask.createTaskSet(3, stageId = 1, stageAttemptId = 0))
// should go to active first, then inactive
val taskDescs = taskScheduler.resourceOffers(offers).flatten
assert(taskDescs.size === 3)
assert(taskDescs.map(_.executorId).equals(Seq("exec2", "exec2", "exec1")))
}

test("With delay scheduling off, tasks can be run at any locality level immediately") {
val conf = new SparkConf()
.set(config.LOCALITY_WAIT.key, "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private[spark] trait DynamicAllocationTestsSuite { k8sSuite: KubernetesSuite =>
.addToArgs("--conf", "spark.dynamicAllocation.enabled=true")
.addToArgs("--conf", "spark.dynamicAllocation.minExecutors=0")
.addToArgs("--conf", "spark.dynamicAllocation.maxExecutors=1")
.addToArgs("--conf", "spark.scheduler.shuffleBiasedTaskScheduling.enabled=true")
.addToArgs("--conf",
s"spark.driver.host=" +
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
Expand Down

0 comments on commit 921d72f

Please sign in to comment.