Skip to content

Commit

Permalink
[SPARK-38124][SS][FOLLOWUP] Add test to harden assumption of SS parti…
Browse files Browse the repository at this point in the history
…tioning requirement

### What changes were proposed in this pull request?

This is a followup of #35419 (comment), to add unit test to harden the assumption of SS partitioning and distribution requirement:
* Check the `HashPartitioning.partitionIdExpression` to be exactly expected format
* Check all different kinds of `Partitioning` against `StatefulOpClusteredDistribution`.

Also add a minor comment for `StatefulOpClusteredDistribution`, as `SinglePartition` can also satisfy the distribution.

### Why are the changes needed?

Document our assumption of SS in code as unit test.
So next time when we introduce intrusive code change, the unit test can save us by failing loudly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The added unit test itself.

Closes #35529 from c21/partition-test.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
c21 authored and HeartSaVioR committed Feb 15, 2022
1 parent ece34f0 commit 39166ed
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ case class ClusteredDistribution(
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
* When `_requiredNumPartitions` is 1, [[SinglePartition]] is essentially same as
* [[HashPartitioning]], so it can satisfy this distribution as well.
*
* NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we
* have been using ClusteredDistribution, which could construct the physical partitioning of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType

class DistributionSuite extends SparkFunSuite {

Expand Down Expand Up @@ -265,4 +267,80 @@ class DistributionSuite extends SparkFunSuite {
ClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)),
false)
}

test("Structured Streaming output partitioning and distribution") {
// Validate HashPartitioning.partitionIdExpression to be exactly expected format, because
// Structured Streaming state store requires it to be consistent across Spark versions.
val expressions = Seq($"a", $"b", $"c")
val hashPartitioning = HashPartitioning(expressions, 10)
hashPartitioning.partitionIdExpression match {
case Pmod(Murmur3Hash(es, 42), Literal(10, IntegerType), _) =>
assert(es.length == expressions.length && es.zip(expressions).forall {
case (l, r) => l.semanticEquals(r)
})
case x => fail(s"Unexpected partitionIdExpression $x for $hashPartitioning")
}

// Validate only HashPartitioning (and HashPartitioning in PartitioningCollection) can satisfy
// StatefulOpClusteredDistribution. SinglePartition can also satisfy this distribution when
// `_requiredNumPartitions` is 1.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
true)

checkSatisfied(
PartitioningCollection(Seq(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10))),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
true)

checkSatisfied(
SinglePartition,
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 1),
true)

checkSatisfied(
PartitioningCollection(Seq(
HashPartitioning(Seq($"a", $"b"), 1),
SinglePartition)),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 1),
true)

checkSatisfied(
HashPartitioning(Seq($"a", $"b"), 10),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 5),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
SinglePartition,
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
BroadcastPartitioning(IdentityBroadcastMode),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
RoundRobinPartitioning(10),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)

checkSatisfied(
UnknownPartitioning(10),
StatefulOpClusteredDistribution(Seq($"a", $"b", $"c"), 10),
false)
}
}

0 comments on commit 39166ed

Please sign in to comment.