Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24795][CORE] Implement barrier execution mode #21758

Closed
wants to merge 14 commits into from

Conversation

jiangxb1987
Copy link
Contributor

@jiangxb1987 jiangxb1987 commented Jul 13, 2018

What changes were proposed in this pull request?

Propose new APIs and modify job/task scheduling to support barrier execution mode, which requires all tasks in a same barrier stage start at the same time, and retry all tasks in case some tasks fail in the middle. The barrier execution mode is useful for some ML/DL workloads.

The proposed API changes include:

  • RDDBarrier that marks an RDD as barrier (Spark must launch all the tasks together for the current stage).
  • BarrierTaskContext that support global sync of all tasks in a barrier stage, and provide extra BarrierTaskInfos.

In DAGScheduler, we retry all tasks of a barrier stage in case some tasks fail in the middle, this is achieved by unregistering map outputs for a shuffleId (for ShuffleMapStage) or clear the finished partitions in an active job (for ResultStage).

How was this patch tested?

Add RDDBarrierSuite to ensure we convert RDDs correctly;
Add new test cases in DAGSchedulerSuite to ensure we do task scheduling correctly;
Add new test cases in SparkContextSuite to ensure the barrier execution mode actually works (both under local mode and local cluster mode).
Add new test cases in TaskSchedulerImplSuite to ensure we schedule tasks for barrier taskSet together.

@jiangxb1987
Copy link
Contributor Author

cc @mengxr @gatorsmile @cloud-fan

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92962 has finished for PR 21758 at commit c25ec47.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait BarrierTaskContext extends TaskContext
  • class BarrierTaskContextImpl(
  • class RDDBarrier[T: ClassTag](rdd: RDD[T])
  • case class WorkerOffer(

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92965 has finished for PR 21758 at commit c8d67e4.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

} catch {
case e: UnsupportedOperationException =>
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
logInfo(s"Could not cancel tasks for stage $stageId", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logWarn?

// Skip the launch process.
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is ${availableSlots}.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could get starved forever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 Could you put the JIRA link as an inline TODO? This discussion would be hard to find once the PR is merged.

s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " +
"tasks got resource offers. The resource offers may have been blacklisted or " +
"cannot fulfill task locality requirements.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many attempts - would it fail continuously if some hosts are blacklisted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't handle this for now, but we shall add timeout if a barrier stage don't get launched for a while, tracked by https://issues.apache.org/jira/browse/SPARK-24823

logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " +
"failed.")
val message = "Stage failed because a barrier task finished unsuccessfully. " +
s"${failure.toErrorString}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add task id of the failed barrier task? it would make it easier to root cause/find the error

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93099 has finished for PR 21758 at commit e2b05e2.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

/** Sets a global barrier and waits until all tasks in this stage hit this barrier. */
def barrier(): Unit

/** Returns the all task infos in this barrier stage. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always ordered by partitionId? If true, we should mention it in the doc.

import org.apache.spark.metrics.MetricsSystem

/** A [[BarrierTaskContext]] implementation. */
class BarrierTaskContextImpl(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private?

/**
* Carries all task infos of a barrier task.
*/
private[spark] class BarrierTaskInfo(val host: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class cannot be package private.

* a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a
* [[ShuffledRDD]] indicates start of a new stage.
*/
def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't cache the result. If we have a long linage, does it have performance penalty?

@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0

// Mark all the partitions of the stage to be not finished.
def clearResult(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me that "clearResult" accurately describes what it does. markAllPartitionsAsUnfinished? Better names are welcome!

context.barrier()
it
}
rdd2.collect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: collect()

/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
trait BarrierTaskContext extends TaskContext {

/** Sets a global barrier and waits until all tasks in this stage hit this barrier. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to provide more documentation because it is easy to make mistakes here. We could address it in the context.barrier() PR.

with BarrierTaskContext {

// TODO implement global barrier.
override def barrier(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Please provide JIRA link with TODO.


override def getTaskInfos(): Array[BarrierTaskInfo] = {
val hostsStr = localProperties.getProperty("hosts", "")
hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the first ".trim()" necessary?

shuffleStatus.removeOutputsByFilter(x => true)
incrementEpoch()
case None =>
throw new SparkException("unregisterAllMapOutput called for nonexistent shuffle ID")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should include the shuffleId in the error message

@SparkQA
Copy link

SparkQA commented Jul 17, 2018

Test build #93180 has finished for PR 21758 at commit 1709ece.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class BarrierTaskInfo(val address: String)

@mridulm
Copy link
Contributor

mridulm commented Jul 18, 2018

I had left a few comments on SPARK-24375 @jiangxb1987 ... unfortunately the jira's have moved around a bit.
If this is active PR for introducing the feature, would be great to get clarity on them.

@jiangxb1987
Copy link
Contributor Author

@mridulm Sorry I missed that message, now I've updated the comment, we can continue the discussion on that thread.

/**
* Carries all task infos of a barrier task.
*/
class BarrierTaskInfo(val address: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need param doc, to say that address is IP v4 address.

* a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a
* [[ShuffledRDD]] indicates start of a new stage.
*/
def isBarrier(): Boolean = isBarrier_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be public?

*/
def isBarrier(): Boolean = isBarrier_

@transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a lazy val and a def? can we merge them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a performance improvement to cache the value to avoid repeatedly compute isBarrier() on a long RDD chain.


// Mark all the partitions of the stage to be not finished.
def markAllPartitionsAsUnfinished(): Unit = {
(0 until numPartitions).map(finished.update(_, false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map -> foreach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is reset a better name?

@@ -1311,17 +1312,6 @@ class DAGScheduler(
}
}

case Resubmitted =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move the handling of Resubmitted after FetchFailure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have a barrier task failed with reason Resubmitted, so case Resubmitted must be after case failure: TaskFailedReason if task.isBarrier, and we must make sure that the task failed reasons (except for FetchFailed) are handled inside the barrier task failure handling logic.


if (failedStage.rdd.isBarrier()) {
failedStage match {
case mapStage: ShuffleMapStage =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please pick a different name. mapStage is already used before..

}

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this before the if (shouldAbortStage) { ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC previously there was a race condition, that you first post the ResubmitFailedStages message then maybe unregister outputs, if the ResubmitFailedStages is handled quick enough then you may first try to submit missing tasks then unregister some shuffle blocks. This code change fixes the issue. Maybe this worth being handled separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so

sms.pendingPartitions += task.partitionId

case _ =>
assert(false, "TaskSetManagers should only send Resubmitted task statuses for " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(false ... is weird, please throw an exception directly.

taskScheduler.cancelTasks(stageId, interruptThread = false)
} catch {
case e: UnsupportedOperationException =>
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just leave the zombie tasks and ignore their completion events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I don't see a easy way to mark a running task as not needed and prevent it from writing shuffle files/committing. Maybe we shall leave a TODO here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@@ -50,6 +50,7 @@ private[spark] class TaskDescription(
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
val partitionId: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we need this. For barrier stage we always retry the entire stage, so index must be equal to partitionId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you are right on this, but that makes the logic a little more difficult to understand, so I prefer to use partitionId explicitly.

@@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl(
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

availableCpus.sum / CPUS_PER_TAS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case o.cores not divisible by CPUS_PER_TASK, it shall produce larger available slot numbers than reality if we first sum all availableCpus then divide by CPUS_PER_TASK.

tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addresses: ArrayBuffer[String],
taskDescs: ArrayBuffer[TaskDescription]) : Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of filling 2 arrays here, can we just fill one ArrayBuffer[(String, Int)]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this help save some memory or something? I was wondering what shall be the name of the combined ArrayBuffer but couldn't figure out a perfect one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just cleaner. how about addressesWithIndice

cores: Int,
// `address` is an optional hostPort string, it provide more useful information than `host`
// when multiple executors are launched on the same host.
address: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having host and address, shall we just have host and port?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows what ExecutorData does, having both executorAddress and executorHost, IIUC the host of executorAddress doesn't necessarily have to be the same as executorHost.

@@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(exc.getCause() != null)
stream.close()
}

test("support barrier sync under local mode") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have barrier sync yet, do we need to test it now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the title, but I think the test body is still needed to make sure barrier execution mode works with SparkContext.

@@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(sparkListener.failedStages.size == 1)
}

test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a test to make sure all the tasks are launched together for barrier stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test cases in TaskSchedulerImplSuite.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93282 has finished for PR 21758 at commit 742200a.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93411 has finished for PR 21758 at commit 9229771.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93412 has finished for PR 21758 at commit 94e5237.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


/**
* :: Experimental ::
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a particular reason why they must be ordered by partitionId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason is that each tasks within the same barrier stage may need to communicate with each other, we order the task infos by partitionId so a task can find its peer tasks by index.

logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " +
"failed.")
val message = s"Stage failed because barrier task $task finished unsuccessfully. " +
s"${failure.toErrorString}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not needed? Could you expend more on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, it can be just failure.toErrorString, no need to wrap it into s"..."

@@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
super.clearDependencies()
prev = null
}

@transient protected lazy override val isBarrier_ : Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can't we here override def isBarrier to return false in order to avoid the unneeded synchronization introduced by lazy?

// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
// The executor address is expected to be non empty.
addressesWithDescs += Tuple2(shuffledOffers(i).address.get, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Tuple2(shuffledOffers(i).address.get, task) can be (shuffledOffers(i).address.get -> task)

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93445 has finished for PR 21758 at commit 9ae56d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93446 has finished for PR 21758 at commit c16a47f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

*/
@Experimental
@Since("2.4.0")
class BarrierTaskInfo(val address: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a public API?

Copy link
Contributor Author

@jiangxb1987 jiangxb1987 Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make this a public API because the BarrierTaskContext.getTaskInfos() will return a list of BarrierTaskInfos, so users have to access the class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just bake address into TaskInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is TaskIinfo not a public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't mind to make TaskInfo a public API then I think it shall be fine to just put address into TaskInfo. The major concern is TaskInfo have been stable for a long time and do we want to potentially make frequent changes to it? (e.g. may add more variables useful for barrier tasks, though I don't really have an example at hand)

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93452 has finished for PR 21758 at commit c16a47f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
@Experimental
@Since("2.4.0")
def mapPartitions[S: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the only thing you can do on this is mapPartitions, is there any particular reason its divided into two calls barrier().mapPartititons(), instead of just barrierMapPartitions() or something? Are there more things planned here?

I can users expecting the ability to be able to call other functions after .barrier(), eg. barrier().reduceByKey() or something. the compiler will help with this, but just wondering if we can make it more obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDDBarrier is actually expected to be used like a builder, we shall provide more options for the barrier stage in the future, eg. config a timeout of a barrier stage.

*/
@Experimental
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
Copy link
Contributor

@squito squito Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduling seems to have a very hard requirement that the number of partitions is less than the number of available task slots. It seems really hard for users to get this right. Eg., if I just do

sc.textFile(...).barrier().mapPartitions()

the number of partitions is based on the hdfs input splits. I see lots of users getting confused by this -- it'll work sometimes, won't work other times, and they won't know why. Should there be some automatic repartitioning based on cluster resources? Or at least an api which lets users do this? Even repartition() isn't great here, because users dont' want to think about cluster resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have RDDBarrier.coalesce(numPartitions: Int) to enforce the number of tasks to be launched together in a barrier stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this addressed at all? is there another jira for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably have a hard failure if DynamicAllocation is enabled until that is properly addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As listed in the design doc, support running barrier stage with dynamic resource allocation is Non-Goal of this task. However, we do plan to better integrate this feature with dynamic resource allocation, hopefully we can get to work on this before Spark 3.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think its fine to not support Dyanmic Allocation in the initial version. I just think it would be better to have a failure right away if a user tries to use this with dynamic allocation, rather than some undefined behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to fail the job on submit if it requires more slots than available. Are there other scenarios we shall fail fast with dynamic allocation? IIUC the barrier tasks that have not get launched are still counted into the number of pending tasks, so dynamic resource allocation shall still be able to compute a correct expected number of executors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll request the slots, but I think there are a lot more complications. The whole point of using dynamic allocation is on a multi-tenant cluster, so resources will come and go. If there aren't enough resources available on the cluster no matter what, then you'll see executors get acquired, have their idle timeout expire, get released, and then acquired again. This will be really confusing to the user, as it might look there is some progress with the constant logging about executors getting acquired and released, though really it would just wait indefinitely.

Or you might get deadlock with two concurrent applications. Even if they could fit on the cluster by themselves, they might both acquire some resources, which would prevent either of them from getting enough. Again, they'd both go through the same loop, of acquiring some resources, then having them hit the idle timeout and releasing them, then acquiring resources, but they might just continually trade resources between each other. They'd only advance by luck.

You have the similar problems with concurrent jobs within one spark application, but its a bit easier to control since at least the spark scheduler knows about everything.

We plan to fail the job on submit if it requires more slots than available.

what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea you made really good point here, I've opened https://issues.apache.org/jira/browse/SPARK-24942 to track the cluster resource management issue.

what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira?

This is tracked by https://issues.apache.org/jira/browse/SPARK-24819, we shall check all the barrier stages on job submitted, to see whether the barrier stages require more slots (to be able to launch all the barrier tasks in the same stage together) than currently active slots in the cluster. If the job requires more slots than available (both busy and free slots), fail the job on submit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its totally fine to leave out properly supporting this for now, but given how confusing the current behavior will be with dynamic allocation I strongly feel like we need fail-fast if users try with dynamic allocation for now. If you want to let users give it a shot anyway, you could add a conf to let users bypass the check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I opened https://issues.apache.org/jira/browse/SPARK-24954 for this, will try to submit a PR soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 , just curious in understanding how it works:
If it's required for a barrier taskset to have all the slots available at once in the single resourceOffers, is there any mechanism to prevent starvation if the cluster if busy, and e.g. 100 available slots never become available at once, because new tasks are starting all the time?

Also, it computes availableSlot as a val before the loop. But what if the barrier taskSet is not the first one in the for (taskSet <- sortedTaskSets) loops, and a taskSet earlier in the loop already took some slots and there is not enough left for the barrrier taskSet?

@gatorsmile
Copy link
Member

gatorsmile commented Jul 25, 2018

LGTM

@squito Has @jiangxb1987 addressed your comments?

We want to merge this PR ASAP and there are a few to-be-submitted PRs that depend on this PR. This feature is targeting to Spark 2.4 release.

*/
@Experimental
@Since("2.4.0")
def getTaskInfos(): Array[BarrierTaskInfo]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what other things do you expect to be included in the future in BarrierTaskInfo? It seems overkill to have a new class for a single field (address).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext}
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should explain what this flag does in classdoc


// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to explain why mappartitionsrdd has a different isBarrier implementation.

@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0

// Mark all the partitions of the stage to be not finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use /** */ style. also the sentence is a bit awkward. perhaps

"Resets the status of all partitions in this stage so they are marked as not finished."

@rxin
Copy link
Contributor

rxin commented Jul 25, 2018

What's the failure mode if there are not enough slots for the barrier mode? We should throw an exception right?

@jiangxb1987
Copy link
Contributor Author

What's the failure mode if there are not enough slots for the barrier mode? We should throw an exception right?

Yes, as mentioned in https://github.com/apache/spark/pull/21758/files/c16a47f0d15998133b9d61d8df5310f1f66b11b0#diff-d4000438827afe3a185ae75b24987a61R372 , we shall fail the job on submit if there is no enough slots for the barrier stage. I'll submit another PR to add this check (tracked by SPARK-24819).

@SparkQA
Copy link

SparkQA commented Jul 26, 2018

Test build #93580 has finished for PR 21758 at commit c7600c2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 26, 2018

Test build #93590 has finished for PR 21758 at commit c7600c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@jiangxb1987 Please submit the other PRs that are blocked by this ASAP.

@@ -1349,6 +1339,29 @@ class DAGScheduler(
s"longer running")
}

if (mapStage.rdd.isBarrier()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't handle the case when the barrier stage is two shuffle stages back, right? you might need to go back many stages when there is a fetch failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw what I really mean here, is that I think you actually need to do this every time to create task set. Check if the task set is for a barrier stage, and if so, then launch all the tasks, regardless of existing output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I was thinking that by unregistering shuffle output, we may avoid modify the submit missing tasks logic. Now I realized you have to launch all the tasks for taskSet of a barrier stage anyway, so maybe the approach you mentioned is cleaner, I'll try to submit a follow up PR on that. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also like to see a test case for what happens if you do not have enough compute resources after a stage failure. After all, one of the key reasons for multiple stage attempts is hardware failure -- so you might have had just enough resources to run your barrier stage with 20 executors on the first attempt, but on the second-attempt you're down to 19, and can't run it anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @jiangxb1987 was this addressed? sorry if I missed it

import org.apache.spark.annotation.{Experimental, Since}

/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
trait BarrierTaskContext extends TaskContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the generated JavaDoc. I think it becomes a Java interface with only two methods defined here. We might want to define class BarrierTaskContext directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.