-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24795][CORE] Implement barrier execution mode #21758
Changes from all commits
d2f5b1a
93ccf4f
424bad2
377d1c0
ed89af8
51bbb0f
8c2306b
ef7dc5a
7b6def8
9229771
94e5237
9ae56d1
c16a47f
c7600c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import org.apache.spark.annotation.{Experimental, Since} | ||
|
||
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ | ||
trait BarrierTaskContext extends TaskContext { | ||
|
||
/** | ||
* :: Experimental :: | ||
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to | ||
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same | ||
* stage have reached this routine. | ||
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def barrier(): Unit | ||
|
||
/** | ||
* :: Experimental :: | ||
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a particular reason why they must be ordered by partitionId? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The major reason is that each tasks within the same barrier stage may need to communicate with each other, we order the task infos by partitionId so a task can find its peer tasks by index. |
||
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def getTaskInfos(): Array[BarrierTaskInfo] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what other things do you expect to be included in the future in BarrierTaskInfo? It seems overkill to have a new class for a single field (address). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import java.util.Properties | ||
|
||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.memory.TaskMemoryManager | ||
import org.apache.spark.metrics.MetricsSystem | ||
|
||
/** A [[BarrierTaskContext]] implementation. */ | ||
private[spark] class BarrierTaskContextImpl( | ||
override val stageId: Int, | ||
override val stageAttemptNumber: Int, | ||
override val partitionId: Int, | ||
override val taskAttemptId: Long, | ||
override val attemptNumber: Int, | ||
override val taskMemoryManager: TaskMemoryManager, | ||
localProperties: Properties, | ||
@transient private val metricsSystem: MetricsSystem, | ||
// The default value is only used in tests. | ||
override val taskMetrics: TaskMetrics = TaskMetrics.empty) | ||
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, | ||
taskMemoryManager, localProperties, metricsSystem, taskMetrics) | ||
with BarrierTaskContext { | ||
|
||
// TODO SPARK-24817 implement global barrier. | ||
override def barrier(): Unit = {} | ||
|
||
override def getTaskInfos(): Array[BarrierTaskInfo] = { | ||
val addressesStr = localProperties.getProperty("addresses", "") | ||
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import org.apache.spark.annotation.{Experimental, Since} | ||
|
||
|
||
/** | ||
* :: Experimental :: | ||
* Carries all task infos of a barrier task. | ||
* | ||
* @param address the IPv4 address(host:port) of the executor that a barrier task is running on | ||
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
class BarrierTaskInfo(val address: String) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this need to be a public API? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We make this a public API because the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just bake address into TaskInfo? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or is TaskIinfo not a public API? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't mind to make TaskInfo a public API then I think it shall be fine to just put address into TaskInfo. The major concern is TaskInfo have been stable for a long time and do we want to potentially make frequent changes to it? (e.g. may add more variables useful for barrier tasks, though I don't really have an example at hand) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,11 +23,21 @@ import org.apache.spark.{Partition, TaskContext} | |
|
||
/** | ||
* An RDD that applies the provided function to every partition of the parent RDD. | ||
* | ||
* @param prev the parent RDD. | ||
* @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to | ||
* an output iterator. | ||
* @param preservesPartitioning Whether the input function preserves the partitioner, which should | ||
* be `false` unless `prev` is a pair RDD and the input function | ||
* doesn't modify the keys. | ||
* @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage | ||
* containing at least one RDDBarrier shall be turned into a barrier stage. | ||
*/ | ||
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( | ||
var prev: RDD[T], | ||
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) | ||
preservesPartitioning: Boolean = false) | ||
preservesPartitioning: Boolean = false, | ||
isFromBarrier: Boolean = false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should explain what this flag does in classdoc |
||
extends RDD[U](prev) { | ||
|
||
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None | ||
|
@@ -41,4 +51,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( | |
super.clearDependencies() | ||
prev = null | ||
} | ||
|
||
@transient protected lazy override val isBarrier_ : Boolean = | ||
isFromBarrier || dependencies.exists(_.rdd.isBarrier()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat | |
|
||
import org.apache.spark._ | ||
import org.apache.spark.Partitioner._ | ||
import org.apache.spark.annotation.{DeveloperApi, Since} | ||
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} | ||
import org.apache.spark.api.java.JavaRDD | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.partial.BoundedDouble | ||
|
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( | |
} | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* Indicates that Spark must launch the tasks together for the current stage. | ||
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scheduling seems to have a very hard requirement that the number of partitions is less than the number of available task slots. It seems really hard for users to get this right. Eg., if I just do
the number of partitions is based on the hdfs input splits. I see lots of users getting confused by this -- it'll work sometimes, won't work other times, and they won't know why. Should there be some automatic repartitioning based on cluster resources? Or at least an api which lets users do this? Even There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was this addressed at all? is there another jira for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened https://issues.apache.org/jira/browse/SPARK-24941 for this. |
||
|
||
// ======================================================================= | ||
// Other internal methods and fields | ||
// ======================================================================= | ||
|
@@ -1839,6 +1847,23 @@ abstract class RDD[T: ClassTag]( | |
def toJavaRDD() : JavaRDD[T] = { | ||
new JavaRDD(this)(elementClassTag) | ||
} | ||
|
||
/** | ||
* Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a | ||
* barrier stage. | ||
* | ||
* An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from | ||
* an [[RDDBarrier]]. This function always returns false for a [[ShuffledRDD]], since a | ||
* [[ShuffledRDD]] indicates start of a new stage. | ||
* | ||
* A [[MapPartitionsRDD]] can be transformed from an [[RDDBarrier]], under that case the | ||
* [[MapPartitionsRDD]] shall be marked as barrier. | ||
*/ | ||
private[spark] def isBarrier(): Boolean = isBarrier_ | ||
|
||
// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long | ||
// RDD chain. | ||
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you need to explain why mappartitionsrdd has a different isBarrier implementation. |
||
} | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.rdd | ||
|
||
import scala.reflect.ClassTag | ||
|
||
import org.apache.spark.BarrierTaskContext | ||
import org.apache.spark.TaskContext | ||
import org.apache.spark.annotation.{Experimental, Since} | ||
|
||
/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ | ||
class RDDBarrier[T: ClassTag](rdd: RDD[T]) { | ||
|
||
/** | ||
* :: Experimental :: | ||
* Maps partitions together with a provided BarrierTaskContext. | ||
* | ||
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which | ||
* should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. | ||
*/ | ||
@Experimental | ||
@Since("2.4.0") | ||
def mapPartitions[S: ClassTag]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the only thing you can do on this is I can users expecting the ability to be able to call other functions after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
f: (Iterator[T], BarrierTaskContext) => Iterator[S], | ||
preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope { | ||
val cleanedF = rdd.sparkContext.clean(f) | ||
new MapPartitionsRDD( | ||
rdd, | ||
(context: TaskContext, index: Int, iter: Iterator[T]) => | ||
cleanedF(iter, context.asInstanceOf[BarrierTaskContext]), | ||
preservesPartitioning, | ||
isFromBarrier = true | ||
) | ||
} | ||
|
||
/** TODO extra conf(e.g. timeout) */ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the generated JavaDoc. I think it becomes a Java interface with only two methods defined here. We might want to define
class BarrierTaskContext
directly.