Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL #19080

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Aug 29, 2017

What changes were proposed in this pull request?

The current shuffle planning logic

  1. Each operator specifies the distribution requirements for its children, via the Distribution interface.
  2. Each operator specifies its output partitioning, via the Partitioning interface.
  3. Partitioning.satisfy determines whether a Partitioning can satisfy a Distribution.
  4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
  5. For each operator, check if its children's output partitionings are compatible with each other, via the Partitioning.compatibleWith.
  6. If the check in 5 failed, add a shuffle above each child.
  7. try to eliminate the shuffles added in 6, via Partitioning.guarantees.

This design has a major problem with the definition of "compatible".

Partitioning.compatibleWith is not well defined, ideally a Partitioning can't know if it's compatible with other Partitioning, without more information from the operator. For example, t1 join t2 on t1.a = t2.b, HashPartitioning(a, 10) should be compatible with HashPartitioning(b, 10) under this case, but the partitioning itself doesn't know it.

As a result, currently Partitioning.compatibleWith always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify ClusteredDistribution for multiple children.

I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like ClusteredDistribution(a,b) and ClusteredDistribution(c).

I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements.

Proposed shuffle planning logic after this PR
(The first 4 are same as before)

  1. Each operator specifies the distribution requirements for its children, via the Distribution interface.
  2. Each operator specifies its output partitioning, via the Partitioning interface.
  3. Partitioning.satisfy determines whether a Partitioning can satisfy a Distribution.
  4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
  5. For each operator, check if its children's output partitionings have the same number of partitions.
  6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one.

The new distribution model is very simple, we only have one kind of relationship, which is Partitioning.satisfy. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added HashPartitionedDistribution to achieve co-partition.

How was this patch tested?

existing tests.

case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
if (childPartitioning.guarantees(partitioning)) child else operator
case _ => operator
if (child.outputPartitioning.guarantees(partitioning)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a potential bug: the operator between the upper and lower shuffle may have its own partitioning, which means the upper shuffle is still necessary to change the partitioning. We don't have this kind of operator now but we may have it in the future.

@cloud-fan
Copy link
Contributor Author


children = children.zip(requiredChildDistributions).zipWithIndex.map {
case ((child, distribution), index) if needCoPartitionChildIndex.contains(index) =>
val defaultPartitioning = distribution.defaultPartitioning(numPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultPartitioning -> targetPartitioning?

* requirement. Note that the `numPartitions` is just a hint, the returned [[Partitioning]] does
* not have to have the same number of partitions.
*/
def defaultPartitioning(numPartitions: Int): Partitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any better name? satisfiedPartitioning?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that numPartitions only makes sense for hash and ranged partitions, is there a need to specify it as a hint in the default trait? Can we not just make it a parameter for the relevant partitioning scheme(s)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numPartitions is a parameter for the relevant partitioning scheme(s), here we need a way to create partitioning scheme for a certain distribution, and need this numPartitions as a hint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 for satisfiedPartitioning
  • numPartitions => numPartitionsHint so that the loose semantics gets highlighted

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81221 has finished for PR 19080 at commit 7b2314a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Distribution
  • case class BroadcastDistribution(mode: BroadcastMode) extends Distribution

@@ -153,6 +139,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
var children: Seq[SparkPlan] = operator.children
assert(requiredChildDistributions.length == children.length)

val needCoPartitionChildIndex = requiredChildDistributions.zipWithIndex.filter {
case (UnspecifiedDistribution, _) => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a needCoPartition() method for distributions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. It's kind of a blacklist mechanism, only 2 special distribution doesn't need co-location.

* requirement. Note that the `numPartitions` is just a hint, the returned [[Partitioning]] does
* not have to have the same number of partitions.
*/
def defaultPartitioning(numPartitions: Int): Partitioning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 for satisfiedPartitioning
  • numPartitions => numPartitionsHint so that the loose semantics gets highlighted

case object UnspecifiedDistribution extends Distribution
case object UnspecifiedDistribution extends Distribution {
def defaultPartitioning(numPartitions: Int): Partitioning =
throw new IllegalStateException("Do not know how to satisfy UnspecifiedDistribution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a conceptual level, this feels weird. One could question: why does any random partitioning is NOT able to satisfy UnspecifiedDistribution ?

// First check if the existing partitions of the children all match. This means they are
// partitioned by the same partitioning into the same number of partitions. In that case,
// don't try to make them match `defaultPartitions`, just use the existing partitioning.
if (needCoPartitionChildIndex.length > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked your doc in SparkPlan.scala. Feels like it should be here OR there should be some reference about that over here. It will be easy for people to get lost trying to reason whats going on here :)

.map(children.zip(requiredChildDistributions))
.forall { case (child, distribution) =>
!child.outputPartitioning.guarantees(
distribution.defaultPartitioning(maxChildrenNumPartitions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not related to this change. A question is, even maxChildrenNumPartitions doesn't require us to shuffle all children, if it needs to shuffle most of children, it's still a bad choice as the number of partitions, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should pick the most frequent numPartitions, but for now the only operator needing co-partition is SortMergeJoinExec, which only has 2 children, so pick the max one is fine. We can revisit this later.

*
* [[BroadcastDistribution]] is a special case, because a broadcasted child can always be
* co-partitioned with others, so it's ok to mix [[BroadcastDistribution]] with other kind of
* distribution requirements.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From needCoPartitionChildIndex, looks like UnspecifiedDistribution is also not counted as one of co-partition distributions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this at the beginning of this doc: provides specific distribution requirements for more than one child, specific distribution excludes UnspecifiedDistribution, I'll make it more explicit.

@cloud-fan cloud-fan changed the title [SPARK-21865][SQL] remove Partitioning.compatibleWith [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL Aug 30, 2017
@cloud-fan
Copy link
Contributor Author

also cc @marmbrus

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81264 has finished for PR 19080 at commit 9c870eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Distribution
  • case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution
  • case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
  • case class UnknownPartitioning(numPartitions: Int) extends Partitioning
  • case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning


/**
* Represents a distribution that only has a single partition and all tuples of the dataset
* are co-located.
*/
case object AllTuples extends Distribution
case object AllTuples extends Distribution {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleNodeDistribution?

Copy link
Contributor

@marmbrus marmbrus Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I spoke with @sameeragarwal about this a little. The whole point here was to have a logical / physical separation (Distribution and Partitioning respectively). AllTuples could be SingleNode or it could be Broadcast. All the operation wants to know is that its seeing all of them and it shouldn't care about how that is being accomplished.

Now, since the first version, we have started to deviate from that. I'm not sure if this is still the right thing to do, but I wanted to give a little context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concern about logical/physical separation is only useful if we take CBO into consideration. E.g., for a AllTuples distribution requirement, the planner may produce two plans using SinglePartition and BroadcastPartitioning respectively and pick a cheaper one. In the scope of our current planner framework, this separation doesn't seem to be very useful, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure you can make that argument (I'm not sure I buy it), but then why does this PR still have two different concepts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we should be moving towards CBO, not away from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep AllTuples. SingleNodeDistribution is a special case of AllTuples and seems we do not really need the extra information introduced by SingleNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep the Distribution concept, otherwise it's very weird to say a RangePartitioning satisfy HashPartitioning, while it looks reasonable to say RangePartitioning satisfy ClusteredDistribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I did miss the point that ClusteredDistribution covers both RangePartitioning and HashPartitioning.

@gatorsmile
Copy link
Member

cc @yhuai

@yhuai
Copy link
Contributor

yhuai commented Aug 30, 2017

Have a question after reading the new approach. Let's say that we have a join like T1 JOIN T2 on T1.a = T2.a. Also T1 is hash partitioned by the value of T1.a and it has 10 partitions, and T2 is range partitioned by the value of T2.a and it has 10 partitions. Both sides will satisfy the required distribution of the join. However, we need to add an exchange at either side in order to produce the correct result. How will we handle this case with this change?

Also, regarding

For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements.

Can you give a concrete example?

@cloud-fan
Copy link
Contributor Author

Both sides will satisfy the required distribution of the join

This is not true now. After this PR, join has a stricter distribution requirement called HashPartitionedDistribution, so range partitioning doesn't satisfy it and Spark will add shuffle.

I think this is reasonable, ClusteredDistribution can not represent the co-partition requirement of join.

Can you give a concrete example?

let's take join as a example, t1 join t2 on t1.a = t2.x and t1.b = t2.y. Then the join requires HashPartitionedDistribution(a, b) for t1, and requires HashPartitionedDistribution(x, y) for t2.

According to the definition of HashPartitionedDistribution, if t1 has a tuple (a=1,b=2), it will be in a certain partition, let's say the second partition of t1.

If t2 has a tuple (x=1,y=2), it will also be in the second partition of t2, because Spark guarantees t1 and t2 have the same number of partitions, and HashPartitionedDistribution determines the partition given a tuple and numParttions. So we can safely zip partitions of t1 and t2 and do the join.

@cloud-fan
Copy link
Contributor Author

so my whole point of view is, co-partition is a really tricky requirement, and it's really hard to implicitly guarantee it during shuffle planning. We should have a weaker guarantee(same number of partitions), and let the operator itself achieve the co-partition requirement by this guarantee and special distribution requirement(HashPartitionedDistribution).

Also in the future we may have operators that have distribution requirement for multiple children, but they don't need them to be co-partitioned.

@cloud-fan
Copy link
Contributor Author

also cc @rxin , to support the "pre-shuffle" feature for data source v2, I need to create similar Distribution and Partitioning interfaces in the data source package. However, the current model is too complex, I have no idea how to ask the users to implement Partitioning.compatibleWith and Partitioning.guarantees properly.

required match {
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are cleaning things up, this needs fixed. RangePartitioning(a+,b+) does not satisfy OrderedDistribution(a+). It violates the requirement that all values of a need to be in the same partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this too, the current logic only guarantees ordering but not clustering. But this is orthogonal to this PR and we can fix it in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this doesn't cause any problems, because OrderedDistribution is only used for sort operator.

@cloud-fan
Copy link
Contributor Author

cc @rxin @JoshRosen @liancheng @sameeragarwal @gatorsmile @brkyvz any more comments?

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82802 has finished for PR 19080 at commit 639e9b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Distribution
  • case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution
  • case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
  • case class UnknownPartitioning(numPartitions: Int) extends Partitioning
  • case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 30, 2017

Test build #84325 has finished for PR 19080 at commit 639e9b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Distribution
  • case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution
  • case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
  • case class UnknownPartitioning(numPartitions: Int) extends Partitioning
  • case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85715 has finished for PR 19080 at commit 639e9b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Distribution
  • case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution
  • case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
  • case class UnknownPartitioning(numPartitions: Int) extends Partitioning
  • case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few clarifying questions/comments to retain the existing Distribution/Partitioning semantics but overall LGTM for doing away with compatibleWith semantics.

* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
*/
case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, a Partitioning satisfies a Distribution so it'd be better not to call this HashPartitioned. How about we call this DeterminsticClusteredDistribution or HashClusteredDistribution? Also perhaps this can just extend ClusteredDistribution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I'll rename it to HashClusteredDistribution. But I'd like to not extend ClusteredDistribution, since if a partition can satisfy ClusteredDistribution, it may not be able to satisfy HashClusteredDistribution. Thus we can't replace a parent with a child, which obeys Liskov Substitution Principle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

}

/**
* Represents data where tuples are broadcasted to every node. It is quite common that the
* entire set of tuples is transformed into different data structure.
*/
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, how about BroadcastPartitioning just satisfying the AllTuples distribution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good idea, but again, this is an existing problem, let's fix it in another PR.

@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")

// TODO: This is not really valid...
def clustering: Set[Expression] = ordering.map(_.child).toSet
override def requiredNumPartitions: Option[Int] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, should an OrderedDistribution make any guarantees around clustering? Do we care if "tuples that share the same value for the ordering expressions will never be split across partitions"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comment This is a strictly stronger guarantee than [[ClusteredDistribution]], we want to guarantee it. However we actually don't respect it, see https://github.com/apache/spark/pull/19080/files#r136419947

Since it is an existing problem, I'd like to fix it in another PR.

@sameeragarwal
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85786 has finished for PR 19080 at commit a2f1bc1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85791 has finished for PR 19080 at commit a2f1bc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution

@cloud-fan
Copy link
Contributor Author

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 8, 2018
## What changes were proposed in this pull request?

**The current shuffle planning logic**

1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`.
4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`.
6. If the check in 5 failed, add a shuffle above each child.
7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`.

This design has a major problem with the definition of "compatible".

`Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it.

As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children.

I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`.

I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements.

**Proposed shuffle planning logic after this PR**
(The first 4 are same as before)
1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`.
4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings have the same number of partitions.
6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one.

The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #19080 from cloud-fan/exchange.

(cherry picked from commit eb45b52)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in eb45b52 Jan 8, 2018
asfgit pushed a commit that referenced this pull request Jun 21, 2018
…ning from children

## What changes were proposed in this pull request?

In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.

This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```

The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]
```

The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #21587 from cloud-fan/join.

(cherry picked from commit dc8a6be)
Signed-off-by: Xiao Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants