Skip to content

Commit

Permalink
[SPARK-24588][SS] streaming join should require HashClusteredPartitio…
Browse files Browse the repository at this point in the history
…ning from children

## What changes were proposed in this pull request?

In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.

This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```

The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]
```

The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #21587 from cloud-fan/join.

(cherry picked from commit dc8a6be)
Signed-off-by: Xiao Li <[email protected]>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jun 21, 2018
1 parent 3a4b6f3 commit a1e9640
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,19 @@ case class ClusteredDistribution(
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
*/
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution {
case class HashClusteredDistribution(
expressions: Seq[Expression],
requiredNumPartitions: Option[Int] = None) extends Distribution {
require(
expressions != Nil,
"The expressions for hash of a HashPartitionedDistribution should not be Nil. " +
"The expressions for hash of a HashClusteredDistribution should not be Nil. " +
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")

override def requiredNumPartitions: Option[Int] = None

override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This HashClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
s"the actual number of partitions is $numPartitions.")
HashPartitioning(expressions, numPartitions)
}
}
Expand Down Expand Up @@ -163,11 +166,22 @@ trait Partitioning {
* i.e. the current dataset does not need to be re-partitioned for the `required`
* Distribution (it is possible that tuples within a partition need to be reorganized).
*
* A [[Partitioning]] can never satisfy a [[Distribution]] if its `numPartitions` does't match
* [[Distribution.requiredNumPartitions]].
*/
final def satisfies(required: Distribution): Boolean = {
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
}

/**
* The actual method that defines whether this [[Partitioning]] can satisfy the given
* [[Distribution]], after the `numPartitions` check.
*
* By default a [[Partitioning]] can satisfy [[UnspecifiedDistribution]], and [[AllTuples]] if
* the [[Partitioning]] only have one partition. Implementations can overwrite this method with
* special logic.
* the [[Partitioning]] only have one partition. Implementations can also overwrite this method
* with special logic.
*/
def satisfies(required: Distribution): Boolean = required match {
protected def satisfies0(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case AllTuples => numPartitions == 1
case _ => false
Expand All @@ -186,9 +200,8 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
case object SinglePartition extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = required match {
override def satisfies0(required: Distribution): Boolean = required match {
case _: BroadcastDistribution => false
case ClusteredDistribution(_, Some(requiredNumPartitions)) => requiredNumPartitions == 1
case _ => true
}
}
Expand All @@ -205,16 +218,15 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

override def satisfies(required: Distribution): Boolean = {
super.satisfies(required) || {
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case h: HashClusteredDistribution =>
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case ClusteredDistribution(requiredClustering, requiredNumPartitions) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) &&
(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions)
case ClusteredDistribution(requiredClustering, _) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
}
Expand Down Expand Up @@ -246,15 +258,14 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

override def satisfies(required: Distribution): Boolean = {
super.satisfies(required) || {
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering, requiredNumPartitions) =>
ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) &&
(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions)
case ClusteredDistribution(requiredClustering, _) =>
ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
}
Expand Down Expand Up @@ -295,7 +306,7 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
* Returns true if any `partitioning` of this collection satisfies the given
* [[Distribution]].
*/
override def satisfies(required: Distribution): Boolean =
override def satisfies0(required: Distribution): Boolean =
partitionings.exists(_.satisfies(required))

override def toString: String = {
Expand All @@ -310,7 +321,7 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
override val numPartitions: Int = 1

override def satisfies(required: Distribution): Boolean = required match {
override def satisfies0(required: Distribution): Boolean = required match {
case BroadcastDistribution(m) if m == mode => true
case _ => false
}
Expand Down
Loading

0 comments on commit a1e9640

Please sign in to comment.