-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24795][CORE] Implement barrier execution mode #21758
Conversation
Test build #92962 has finished for PR 21758 at commit
|
Test build #92965 has finished for PR 21758 at commit
|
} catch { | ||
case e: UnsupportedOperationException => | ||
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. | ||
logInfo(s"Could not cancel tasks for stage $stageId", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logWarn?
// Skip the launch process. | ||
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + | ||
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + | ||
s"number of available slots is ${availableSlots}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could get starved forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be addressed in https://issues.apache.org/jira/browse/SPARK-24819
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 Could you put the JIRA link as an inline TODO? This discussion would be hard to find once the PR is merged.
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + | ||
s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + | ||
"tasks got resource offers. The resource offers may have been blacklisted or " + | ||
"cannot fulfill task locality requirements.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many attempts - would it fail continuously if some hosts are blacklisted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't handle this for now, but we shall add timeout if a barrier stage don't get launched for a while, tracked by https://issues.apache.org/jira/browse/SPARK-24823
logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + | ||
"failed.") | ||
val message = "Stage failed because a barrier task finished unsuccessfully. " + | ||
s"${failure.toErrorString}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add task id of the failed barrier task? it would make it easier to root cause/find the error
Test build #93099 has finished for PR 21758 at commit
|
/** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ | ||
def barrier(): Unit | ||
|
||
/** Returns the all task infos in this barrier stage. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always ordered by partitionId? If true, we should mention it in the doc.
import org.apache.spark.metrics.MetricsSystem | ||
|
||
/** A [[BarrierTaskContext]] implementation. */ | ||
class BarrierTaskContextImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private?
/** | ||
* Carries all task infos of a barrier task. | ||
*/ | ||
private[spark] class BarrierTaskInfo(val host: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class cannot be package private.
* a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a | ||
* [[ShuffledRDD]] indicates start of a new stage. | ||
*/ | ||
def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't cache the result. If we have a long linage, does it have performance penalty?
@@ -60,4 +60,10 @@ private[spark] class ActiveJob( | |||
val finished = Array.fill[Boolean](numPartitions)(false) | |||
|
|||
var numFinished = 0 | |||
|
|||
// Mark all the partitions of the stage to be not finished. | |||
def clearResult(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear to me that "clearResult" accurately describes what it does. markAllPartitionsAsUnfinished
? Better names are welcome!
context.barrier() | ||
it | ||
} | ||
rdd2.collect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: collect()
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ | ||
trait BarrierTaskContext extends TaskContext { | ||
|
||
/** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to provide more documentation because it is easy to make mistakes here. We could address it in the context.barrier() PR.
with BarrierTaskContext { | ||
|
||
// TODO implement global barrier. | ||
override def barrier(): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Please provide JIRA link with TODO.
|
||
override def getTaskInfos(): Array[BarrierTaskInfo] = { | ||
val hostsStr = localProperties.getProperty("hosts", "") | ||
hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the first ".trim()" necessary?
shuffleStatus.removeOutputsByFilter(x => true) | ||
incrementEpoch() | ||
case None => | ||
throw new SparkException("unregisterAllMapOutput called for nonexistent shuffle ID") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should include the shuffleId in the error message
Test build #93180 has finished for PR 21758 at commit
|
I had left a few comments on SPARK-24375 @jiangxb1987 ... unfortunately the jira's have moved around a bit. |
@mridulm Sorry I missed that message, now I've updated the comment, we can continue the discussion on that thread. |
/** | ||
* Carries all task infos of a barrier task. | ||
*/ | ||
class BarrierTaskInfo(val address: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need param doc, to say that address is IP v4 address.
* a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a | ||
* [[ShuffledRDD]] indicates start of a new stage. | ||
*/ | ||
def isBarrier(): Boolean = isBarrier_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be public?
*/ | ||
def isBarrier(): Boolean = isBarrier_ | ||
|
||
@transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a lazy val and a def? can we merge them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a performance improvement to cache the value to avoid repeatedly compute isBarrier()
on a long RDD chain.
|
||
// Mark all the partitions of the stage to be not finished. | ||
def markAllPartitionsAsUnfinished(): Unit = { | ||
(0 until numPartitions).map(finished.update(_, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map
-> foreach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is reset
a better name?
@@ -1311,17 +1312,6 @@ class DAGScheduler( | |||
} | |||
} | |||
|
|||
case Resubmitted => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move the handling of Resubmitted
after FetchFailure
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have a barrier task failed with reason Resubmitted, so case Resubmitted
must be after case failure: TaskFailedReason if task.isBarrier
, and we must make sure that the task failed reasons (except for FetchFailed) are handled inside the barrier task failure handling logic.
|
||
if (failedStage.rdd.isBarrier()) { | ||
failedStage match { | ||
case mapStage: ShuffleMapStage => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please pick a different name. mapStage
is already used before..
} | ||
|
||
// TODO: mark the executor as failed only if there were lots of fetch failures on it | ||
if (bmAddress != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move this before the if (shouldAbortStage) { ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC previously there was a race condition, that you first post the ResubmitFailedStages
message then maybe unregister outputs, if the ResubmitFailedStages
is handled quick enough then you may first try to submit missing tasks then unregister some shuffle blocks. This code change fixes the issue. Maybe this worth being handled separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so
sms.pendingPartitions += task.partitionId | ||
|
||
case _ => | ||
assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(false ...
is weird, please throw an exception directly.
taskScheduler.cancelTasks(stageId, interruptThread = false) | ||
} catch { | ||
case e: UnsupportedOperationException => | ||
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just leave the zombie tasks and ignore their completion events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far I don't see a easy way to mark a running task as not needed and prevent it from writing shuffle files/committing. Maybe we shall leave a TODO here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
@@ -50,6 +50,7 @@ private[spark] class TaskDescription( | |||
val executorId: String, | |||
val name: String, | |||
val index: Int, // Index within this task's TaskSet | |||
val partitionId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we need this. For barrier stage we always retry the entire stage, so index
must be equal to partitionId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you are right on this, but that makes the logic a little more difficult to understand, so I prefer to use partitionId
explicitly.
@@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl( | |||
// Build a list of tasks to assign to each worker. | |||
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) | |||
val availableCpus = shuffledOffers.map(o => o.cores).toArray | |||
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
availableCpus.sum / CPUS_PER_TAS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case o.cores
not divisible by CPUS_PER_TASK
, it shall produce larger available slot numbers than reality if we first sum all availableCpus then divide by CPUS_PER_TASK
.
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { | ||
tasks: IndexedSeq[ArrayBuffer[TaskDescription]], | ||
addresses: ArrayBuffer[String], | ||
taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of filling 2 arrays here, can we just fill one ArrayBuffer[(String, Int)]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this help save some memory or something? I was wondering what shall be the name of the combined ArrayBuffer but couldn't figure out a perfect one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's just cleaner. how about addressesWithIndice
cores: Int, | ||
// `address` is an optional hostPort string, it provide more useful information than `host` | ||
// when multiple executors are launched on the same host. | ||
address: Option[String] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of having host
and address
, shall we just have host
and port
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This follows what ExecutorData
does, having both executorAddress
and executorHost
, IIUC the host of executorAddress
doesn't necessarily have to be the same as executorHost
.
@@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu | |||
assert(exc.getCause() != null) | |||
stream.close() | |||
} | |||
|
|||
test("support barrier sync under local mode") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have barrier sync
yet, do we need to test it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the title, but I think the test body is still needed to make sure barrier execution mode works with SparkContext.
@@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |||
assert(sparkListener.failedStages.size == 1) | |||
} | |||
|
|||
test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a test to make sure all the tasks are launched together for barrier stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test cases in TaskSchedulerImplSuite
.
Test build #93282 has finished for PR 21758 at commit
|
Test build #93411 has finished for PR 21758 at commit
|
Test build #93412 has finished for PR 21758 at commit
|
|
||
/** | ||
* :: Experimental :: | ||
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a particular reason why they must be ordered by partitionId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The major reason is that each tasks within the same barrier stage may need to communicate with each other, we order the task infos by partitionId so a task can find its peer tasks by index.
logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + | ||
"failed.") | ||
val message = s"Stage failed because barrier task $task finished unsuccessfully. " + | ||
s"${failure.toErrorString}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unneeded s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not needed? Could you expend more on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, it can be just failure.toErrorString
, no need to wrap it into s"..."
@@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( | |||
super.clearDependencies() | |||
prev = null | |||
} | |||
|
|||
@transient protected lazy override val isBarrier_ : Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can't we here override def isBarrier
to return false
in order to avoid the unneeded synchronization introduced by lazy?
// Only update hosts for a barrier task. | ||
if (taskSet.isBarrier) { | ||
// The executor address is expected to be non empty. | ||
addressesWithDescs += Tuple2(shuffledOffers(i).address.get, task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Tuple2(shuffledOffers(i).address.get, task)
can be (shuffledOffers(i).address.get -> task)
Test build #93445 has finished for PR 21758 at commit
|
Test build #93446 has finished for PR 21758 at commit
|
retest this please |
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
class BarrierTaskInfo(val address: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be a public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We make this a public API because the BarrierTaskContext.getTaskInfos()
will return a list of BarrierTaskInfo
s, so users have to access the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just bake address into TaskInfo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is TaskIinfo not a public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't mind to make TaskInfo a public API then I think it shall be fine to just put address into TaskInfo. The major concern is TaskInfo have been stable for a long time and do we want to potentially make frequent changes to it? (e.g. may add more variables useful for barrier tasks, though I don't really have an example at hand)
Test build #93452 has finished for PR 21758 at commit
|
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def mapPartitions[S: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the only thing you can do on this is mapPartitions
, is there any particular reason its divided into two calls barrier().mapPartititons()
, instead of just barrierMapPartitions()
or something? Are there more things planned here?
I can users expecting the ability to be able to call other functions after .barrier()
, eg. barrier().reduceByKey()
or something. the compiler will help with this, but just wondering if we can make it more obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RDDBarrier
is actually expected to be used like a builder, we shall provide more options for the barrier stage in the future, eg. config a timeout of a barrier stage.
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scheduling seems to have a very hard requirement that the number of partitions is less than the number of available task slots. It seems really hard for users to get this right. Eg., if I just do
sc.textFile(...).barrier().mapPartitions()
the number of partitions is based on the hdfs input splits. I see lots of users getting confused by this -- it'll work sometimes, won't work other times, and they won't know why. Should there be some automatic repartitioning based on cluster resources? Or at least an api which lets users do this? Even repartition()
isn't great here, because users dont' want to think about cluster resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have RDDBarrier.coalesce(numPartitions: Int)
to enforce the number of tasks to be launched together in a barrier stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this addressed at all? is there another jira for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened https://issues.apache.org/jira/browse/SPARK-24941 for this.
if (!launchedAnyTask) { | ||
taskSet.abortIfCompletelyBlacklisted(hostToExecutors) | ||
// Skip the barrier taskSet if the available slots are less than the number of pending tasks. | ||
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably have a hard failure if DynamicAllocation is enabled until that is properly addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As listed in the design doc, support running barrier stage with dynamic resource allocation is Non-Goal of this task. However, we do plan to better integrate this feature with dynamic resource allocation, hopefully we can get to work on this before Spark 3.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think its fine to not support Dyanmic Allocation in the initial version. I just think it would be better to have a failure right away if a user tries to use this with dynamic allocation, rather than some undefined behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We plan to fail the job on submit if it requires more slots than available. Are there other scenarios we shall fail fast with dynamic allocation? IIUC the barrier tasks that have not get launched are still counted into the number of pending tasks, so dynamic resource allocation shall still be able to compute a correct expected number of executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll request the slots, but I think there are a lot more complications. The whole point of using dynamic allocation is on a multi-tenant cluster, so resources will come and go. If there aren't enough resources available on the cluster no matter what, then you'll see executors get acquired, have their idle timeout expire, get released, and then acquired again. This will be really confusing to the user, as it might look there is some progress with the constant logging about executors getting acquired and released, though really it would just wait indefinitely.
Or you might get deadlock with two concurrent applications. Even if they could fit on the cluster by themselves, they might both acquire some resources, which would prevent either of them from getting enough. Again, they'd both go through the same loop, of acquiring some resources, then having them hit the idle timeout and releasing them, then acquiring resources, but they might just continually trade resources between each other. They'd only advance by luck.
You have the similar problems with concurrent jobs within one spark application, but its a bit easier to control since at least the spark scheduler knows about everything.
We plan to fail the job on submit if it requires more slots than available.
what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea you made really good point here, I've opened https://issues.apache.org/jira/browse/SPARK-24942 to track the cluster resource management issue.
what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira?
This is tracked by https://issues.apache.org/jira/browse/SPARK-24819, we shall check all the barrier stages on job submitted, to see whether the barrier stages require more slots (to be able to launch all the barrier tasks in the same stage together) than currently active slots in the cluster. If the job requires more slots than available (both busy and free slots), fail the job on submit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its totally fine to leave out properly supporting this for now, but given how confusing the current behavior will be with dynamic allocation I strongly feel like we need fail-fast if users try with dynamic allocation for now. If you want to let users give it a shot anyway, you could add a conf to let users bypass the check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I opened https://issues.apache.org/jira/browse/SPARK-24954 for this, will try to submit a PR soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 , just curious in understanding how it works:
If it's required for a barrier taskset to have all the slots available at once in the single resourceOffers, is there any mechanism to prevent starvation if the cluster if busy, and e.g. 100 available slots never become available at once, because new tasks are starting all the time?
Also, it computes availableSlot as a val before the loop. But what if the barrier taskSet is not the first one in the for (taskSet <- sortedTaskSets)
loops, and a taskSet earlier in the loop already took some slots and there is not enough left for the barrrier taskSet?
LGTM @squito Has @jiangxb1987 addressed your comments? We want to merge this PR ASAP and there are a few to-be-submitted PRs that depend on this PR. This feature is targeting to Spark 2.4 release. |
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def getTaskInfos(): Array[BarrierTaskInfo] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what other things do you expect to be included in the future in BarrierTaskInfo? It seems overkill to have a new class for a single field (address).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext} | |||
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( | |||
var prev: RDD[T], | |||
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) | |||
preservesPartitioning: Boolean = false) | |||
preservesPartitioning: Boolean = false, | |||
isFromBarrier: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should explain what this flag does in classdoc
|
||
// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long | ||
// RDD chain. | ||
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to explain why mappartitionsrdd has a different isBarrier implementation.
@@ -60,4 +60,10 @@ private[spark] class ActiveJob( | |||
val finished = Array.fill[Boolean](numPartitions)(false) | |||
|
|||
var numFinished = 0 | |||
|
|||
// Mark all the partitions of the stage to be not finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use /** */
style. also the sentence is a bit awkward. perhaps
"Resets the status of all partitions in this stage so they are marked as not finished."
What's the failure mode if there are not enough slots for the barrier mode? We should throw an exception right? |
Yes, as mentioned in https://github.com/apache/spark/pull/21758/files/c16a47f0d15998133b9d61d8df5310f1f66b11b0#diff-d4000438827afe3a185ae75b24987a61R372 , we shall fail the job on submit if there is no enough slots for the barrier stage. I'll submit another PR to add this check (tracked by SPARK-24819). |
Test build #93580 has finished for PR 21758 at commit
|
retest this please |
Test build #93590 has finished for PR 21758 at commit
|
LGTM |
Thanks! Merged to master. @jiangxb1987 Please submit the other PRs that are blocked by this ASAP. |
@@ -1349,6 +1339,29 @@ class DAGScheduler( | |||
s"longer running") | |||
} | |||
|
|||
if (mapStage.rdd.isBarrier()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this won't handle the case when the barrier stage is two shuffle stages back, right? you might need to go back many stages when there is a fetch failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw what I really mean here, is that I think you actually need to do this every time to create task set. Check if the task set is for a barrier stage, and if so, then launch all the tasks, regardless of existing output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously I was thinking that by unregistering shuffle output, we may avoid modify the submit missing tasks logic. Now I realized you have to launch all the tasks for taskSet of a barrier stage anyway, so maybe the approach you mentioned is cleaner, I'll try to submit a follow up PR on that. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also like to see a test case for what happens if you do not have enough compute resources after a stage failure. After all, one of the key reasons for multiple stage attempts is hardware failure -- so you might have had just enough resources to run your barrier stage with 20 executors on the first attempt, but on the second-attempt you're down to 19, and can't run it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @jiangxb1987 was this addressed? sorry if I missed it
import org.apache.spark.annotation.{Experimental, Since} | ||
|
||
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ | ||
trait BarrierTaskContext extends TaskContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the generated JavaDoc. I think it becomes a Java interface with only two methods defined here. We might want to define class BarrierTaskContext
directly.
What changes were proposed in this pull request?
Propose new APIs and modify job/task scheduling to support barrier execution mode, which requires all tasks in a same barrier stage start at the same time, and retry all tasks in case some tasks fail in the middle. The barrier execution mode is useful for some ML/DL workloads.
The proposed API changes include:
RDDBarrier
that marks an RDD as barrier (Spark must launch all the tasks together for the current stage).BarrierTaskContext
that support global sync of all tasks in a barrier stage, and provide extraBarrierTaskInfo
s.In DAGScheduler, we retry all tasks of a barrier stage in case some tasks fail in the middle, this is achieved by unregistering map outputs for a shuffleId (for ShuffleMapStage) or clear the finished partitions in an active job (for ResultStage).
How was this patch tested?
Add
RDDBarrierSuite
to ensure we convert RDDs correctly;Add new test cases in
DAGSchedulerSuite
to ensure we do task scheduling correctly;Add new test cases in
SparkContextSuite
to ensure the barrier execution mode actually works (both under local mode and local cluster mode).Add new test cases in
TaskSchedulerImplSuite
to ensure we schedule tasks for barrier taskSet together.