-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL #19080
Conversation
case ShuffleExchange(childPartitioning, baseChild, _)::Nil => | ||
if (childPartitioning.guarantees(partitioning)) child else operator | ||
case _ => operator | ||
if (child.outputPartitioning.guarantees(partitioning)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this fixes a potential bug: the operator between the upper and lower shuffle may have its own partitioning, which means the upper shuffle is still necessary to change the partitioning. We don't have this kind of operator now but we may have it in the future.
|
||
children = children.zip(requiredChildDistributions).zipWithIndex.map { | ||
case ((child, distribution), index) if needCoPartitionChildIndex.contains(index) => | ||
val defaultPartitioning = distribution.defaultPartitioning(numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultPartitioning
-> targetPartitioning
?
* requirement. Note that the `numPartitions` is just a hint, the returned [[Partitioning]] does | ||
* not have to have the same number of partitions. | ||
*/ | ||
def defaultPartitioning(numPartitions: Int): Partitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better name? satisfiedPartitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that numPartitions
only makes sense for hash and ranged partitions, is there a need to specify it as a hint in the default trait? Can we not just make it a parameter for the relevant partitioning scheme(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numPartitions
is a parameter for the relevant partitioning scheme(s), here we need a way to create partitioning scheme for a certain distribution, and need this numPartitions
as a hint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- +1 for
satisfiedPartitioning
numPartitions
=>numPartitionsHint
so that the loose semantics gets highlighted
Test build #81221 has finished for PR 19080 at commit
|
@@ -153,6 +139,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |||
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering | |||
var children: Seq[SparkPlan] = operator.children | |||
assert(requiredChildDistributions.length == children.length) | |||
|
|||
val needCoPartitionChildIndex = requiredChildDistributions.zipWithIndex.filter { | |||
case (UnspecifiedDistribution, _) => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a needCoPartition()
method for distributions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. It's kind of a blacklist mechanism, only 2 special distribution doesn't need co-location.
* requirement. Note that the `numPartitions` is just a hint, the returned [[Partitioning]] does | ||
* not have to have the same number of partitions. | ||
*/ | ||
def defaultPartitioning(numPartitions: Int): Partitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- +1 for
satisfiedPartitioning
numPartitions
=>numPartitionsHint
so that the loose semantics gets highlighted
case object UnspecifiedDistribution extends Distribution | ||
case object UnspecifiedDistribution extends Distribution { | ||
def defaultPartitioning(numPartitions: Int): Partitioning = | ||
throw new IllegalStateException("Do not know how to satisfy UnspecifiedDistribution") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a conceptual level, this feels weird. One could question: why does any random partitioning is NOT able to satisfy UnspecifiedDistribution
?
// First check if the existing partitions of the children all match. This means they are | ||
// partitioned by the same partitioning into the same number of partitions. In that case, | ||
// don't try to make them match `defaultPartitions`, just use the existing partitioning. | ||
if (needCoPartitionChildIndex.length > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked your doc in SparkPlan.scala
. Feels like it should be here OR there should be some reference about that over here. It will be easy for people to get lost trying to reason whats going on here :)
.map(children.zip(requiredChildDistributions)) | ||
.forall { case (child, distribution) => | ||
!child.outputPartitioning.guarantees( | ||
distribution.defaultPartitioning(maxChildrenNumPartitions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not related to this change. A question is, even maxChildrenNumPartitions
doesn't require us to shuffle all children, if it needs to shuffle most of children, it's still a bad choice as the number of partitions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should pick the most frequent numPartitions, but for now the only operator needing co-partition is SortMergeJoinExec
, which only has 2 children, so pick the max one is fine. We can revisit this later.
* | ||
* [[BroadcastDistribution]] is a special case, because a broadcasted child can always be | ||
* co-partitioned with others, so it's ok to mix [[BroadcastDistribution]] with other kind of | ||
* distribution requirements. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From needCoPartitionChildIndex
, looks like UnspecifiedDistribution
is also not counted as one of co-partition distributions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this at the beginning of this doc: provides specific distribution requirements for more than one child
, specific distribution
excludes UnspecifiedDistribution
, I'll make it more explicit.
also cc @marmbrus |
Test build #81264 has finished for PR 19080 at commit
|
|
||
/** | ||
* Represents a distribution that only has a single partition and all tuples of the dataset | ||
* are co-located. | ||
*/ | ||
case object AllTuples extends Distribution | ||
case object AllTuples extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleNodeDistribution
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I spoke with @sameeragarwal about this a little. The whole point here was to have a logical / physical separation (Distribution
and Partitioning
respectively). AllTuples
could be SingleNode
or it could be Broadcast
. All the operation wants to know is that its seeing all of them and it shouldn't care about how that is being accomplished.
Now, since the first version, we have started to deviate from that. I'm not sure if this is still the right thing to do, but I wanted to give a little context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern about logical/physical separation is only useful if we take CBO into consideration. E.g., for a AllTuples
distribution requirement, the planner may produce two plans using SinglePartition
and BroadcastPartitioning
respectively and pick a cheaper one. In the scope of our current planner framework, this separation doesn't seem to be very useful, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure you can make that argument (I'm not sure I buy it), but then why does this PR still have two different concepts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we should be moving towards CBO, not away from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep AllTuples
. SingleNodeDistribution
is a special case of AllTuples
and seems we do not really need the extra information introduced by SingleNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep the Distribution
concept, otherwise it's very weird to say a RangePartitioning
satisfy HashPartitioning
, while it looks reasonable to say RangePartitioning
satisfy ClusteredDistribution
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I did miss the point that ClusteredDistribution
covers both RangePartitioning
and HashPartitioning
.
cc @yhuai |
Have a question after reading the new approach. Let's say that we have a join like Also, regarding
Can you give a concrete example? |
This is not true now. After this PR, join has a stricter distribution requirement called I think this is reasonable,
let's take join as a example, According to the definition of If |
so my whole point of view is, co-partition is a really tricky requirement, and it's really hard to implicitly guarantee it during shuffle planning. We should have a weaker guarantee(same number of partitions), and let the operator itself achieve the co-partition requirement by this guarantee and special distribution requirement( Also in the future we may have operators that have distribution requirement for multiple children, but they don't need them to be co-partitioned. |
also cc @rxin , to support the "pre-shuffle" feature for data source v2, I need to create similar |
required match { | ||
case OrderedDistribution(requiredOrdering) => | ||
val minSize = Seq(requiredOrdering.size, ordering.size).min | ||
requiredOrdering.take(minSize) == ordering.take(minSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are cleaning things up, this needs fixed. RangePartitioning(a+,b+)
does not satisfy OrderedDistribution(a+)
. It violates the requirement that all values of a
need to be in the same partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this too, the current logic only guarantees ordering but not clustering. But this is orthogonal to this PR and we can fix it in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW this doesn't cause any problems, because OrderedDistribution
is only used for sort operator.
cc @rxin @JoshRosen @liancheng @sameeragarwal @gatorsmile @brkyvz any more comments? |
Test build #82802 has finished for PR 19080 at commit
|
retest this please |
Test build #84325 has finished for PR 19080 at commit
|
retest this please |
Test build #85715 has finished for PR 19080 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few clarifying questions/comments to retain the existing Distribution
/Partitioning
semantics but overall LGTM for doing away with compatibleWith
semantics.
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the | ||
* number of partitions, this distribution strictly requires which partition the tuple should be in. | ||
*/ | ||
case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semantically, a Partitioning
satisfies a Distribution
so it'd be better not to call this HashPartitioned
. How about we call this DeterminsticClusteredDistribution
or HashClusteredDistribution
? Also perhaps this can just extend ClusteredDistribution
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I'll rename it to HashClusteredDistribution
. But I'd like to not extend ClusteredDistribution
, since if a partition can satisfy ClusteredDistribution
, it may not be able to satisfy HashClusteredDistribution
. Thus we can't replace a parent with a child, which obeys Liskov Substitution Principle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
} | ||
|
||
/** | ||
* Represents data where tuples are broadcasted to every node. It is quite common that the | ||
* entire set of tuples is transformed into different data structure. | ||
*/ | ||
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution | ||
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, how about BroadcastPartitioning
just satisfying the AllTuples
distribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good idea, but again, this is an existing problem, let's fix it in another PR.
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { | |||
"An AllTuples should be used to represent a distribution that only has " + | |||
"a single partition.") | |||
|
|||
// TODO: This is not really valid... | |||
def clustering: Set[Expression] = ordering.map(_.child).toSet | |||
override def requiredNumPartitions: Option[Int] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, should an OrderedDistribution
make any guarantees around clustering? Do we care if "tuples that share the same value for the ordering expressions will never be split across partitions"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the comment This is a strictly stronger guarantee than [[ClusteredDistribution]]
, we want to guarantee it. However we actually don't respect it, see https://github.com/apache/spark/pull/19080/files#r136419947
Since it is an existing problem, I'd like to fix it in another PR.
LGTM |
Test build #85786 has finished for PR 19080 at commit
|
retest this please |
Test build #85791 has finished for PR 19080 at commit
|
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? **The current shuffle planning logic** 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`. 6. If the check in 5 failed, add a shuffle above each child. 7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`. This design has a major problem with the definition of "compatible". `Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it. As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children. I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`. I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements. **Proposed shuffle planning logic after this PR** (The first 4 are same as before) 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings have the same number of partitions. 6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one. The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #19080 from cloud-fan/exchange. (cherry picked from commit eb45b52) Signed-off-by: Wenchen Fan <[email protected]>
…ning from children ## What changes were proposed in this pull request? In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed. This can cause wrong result. Think about ``` val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) ``` The physical plan is ``` *(3) Project [a#5] +- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#11, 5) +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11] +- StreamingRelation MemoryStream[value#3], [value#3] ``` The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #21587 from cloud-fan/join. (cherry picked from commit dc8a6be) Signed-off-by: Xiao Li <[email protected]>
What changes were proposed in this pull request?
The current shuffle planning logic
Distribution
interface.Partitioning
interface.Partitioning.satisfy
determines whether aPartitioning
can satisfy aDistribution
.Partitioning.compatibleWith
.Partitioning.guarantees
.This design has a major problem with the definition of "compatible".
Partitioning.compatibleWith
is not well defined, ideally aPartitioning
can't know if it's compatible with otherPartitioning
, without more information from the operator. For example,t1 join t2 on t1.a = t2.b
,HashPartitioning(a, 10)
should be compatible withHashPartitioning(b, 10)
under this case, but the partitioning itself doesn't know it.As a result, currently
Partitioning.compatibleWith
always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specifyClusteredDistribution
for multiple children.I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like
ClusteredDistribution(a,b)
andClusteredDistribution(c)
.I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements.
Proposed shuffle planning logic after this PR
(The first 4 are same as before)
Distribution
interface.Partitioning
interface.Partitioning.satisfy
determines whether aPartitioning
can satisfy aDistribution
.The new distribution model is very simple, we only have one kind of relationship, which is
Partitioning.satisfy
. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly addedHashPartitionedDistribution
to achieve co-partition.How was this patch tested?
existing tests.