-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48613][SQL] SPJ: Support auto-shuffle one side + less join keys than partition keys #47064
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some explanations
@@ -434,8 +434,13 @@ object KeyGroupedPartitioning { | |||
val projectedOriginalPartitionValues = | |||
originalPartitionValues.map(project(expressions, projectionPositions, _)) | |||
|
|||
KeyGroupedPartitioning(projectedExpressions, projectedPartitionValues.length, | |||
projectedPartitionValues, projectedOriginalPartitionValues) | |||
val finalPartitionValues = projectedPartitionValues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is de-duping the non-partitioned side (while creating the KeyGroupedPartition based on the partitioned side)
@@ -130,6 +130,15 @@ case class BatchScanExec( | |||
} | |||
k.copy(expressions = expressions, numPartitions = newPartValues.length, | |||
partitionValues = newPartValues) | |||
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is de-dupiing the outputPartitioning of the partitioned side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to combine this clause with the clause of
case k: KeyGroupedPartitioning if spjParams.commonPartitionValues.isDefined
above, since there are some common logic. It's also worthwhile to add some comments here too, indicating that this case only applies when only one side of the join is KeyGroupedPartitioning
and thus commonPartitionValues
is not defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined in latest pr
@@ -279,7 +288,8 @@ case class StoragePartitionJoinParams( | |||
case other: StoragePartitionJoinParams => | |||
this.commonPartitionValues == other.commonPartitionValues && | |||
this.replicatePartitions == other.replicatePartitions && | |||
this.applyPartialClustering == other.applyPartialClustering | |||
this.applyPartialClustering == other.applyPartialClustering && | |||
this.joinKeyPositions == other.joinKeyPositions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've always wondered why these fields are not in equals. It turns out they have to be in order to properly run node.mapChildren() , otherwise fastEquals will determine the node is the same and skip, if these fields are different. Ref: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L1224
8838f8d
to
8940f6e
Compare
@@ -175,7 +175,11 @@ case class EnsureRequirements( | |||
child | |||
case ((child, dist), idx) => | |||
if (bestSpecOpt.isDefined && bestSpecOpt.get.isCompatibleWith(specs(idx))) { | |||
child | |||
bestSpecOpt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pushing down the join key to the BatchScanExec of the partitioned side (to do de-duping).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comments here? to explain this is the case when isKeyGroupCompatible
is false, but we still want to avoid shuffling on one side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
@sunchao can you take a look? |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
@@ -130,6 +130,15 @@ case class BatchScanExec( | |||
} | |||
k.copy(expressions = expressions, numPartitions = newPartValues.length, | |||
partitionValues = newPartValues) | |||
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to combine this clause with the clause of
case k: KeyGroupedPartitioning if spjParams.commonPartitionValues.isDefined
above, since there are some common logic. It's also worthwhile to add some comments here too, indicating that this case only applies when only one side of the join is KeyGroupedPartitioning
and thus commonPartitionValues
is not defined?
@@ -175,7 +175,11 @@ case class EnsureRequirements( | |||
child | |||
case ((child, dist), idx) => | |||
if (bestSpecOpt.isDefined && bestSpecOpt.get.isCompatibleWith(specs(idx))) { | |||
child | |||
bestSpecOpt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comments here? to explain this is the case when isKeyGroupCompatible
is false, but we still want to avoid shuffling on one side?
@@ -130,6 +130,15 @@ case class BatchScanExec( | |||
} | |||
k.copy(expressions = expressions, numPartitions = newPartValues.length, | |||
partitionValues = newPartValues) | |||
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined => | |||
val expressions = spjParams.joinKeyPositions.get.map(i => k.expressions(i)) | |||
val newPartValues = k.partitionValues.map{r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious why this is necessary. I see that in BatchScanExec we are also doing something similar, by checking the joinKeyPositions
and call uniquePartitionValues
at the end to de-dup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it is the same code, let me know if you think of a way to code reuse.
The current uniquePartitionValues is called only at the end (the deferred grouping, while calculating the BatchScanExec's inputRDD), so its a bit too late. This same logic is needed in BatchScanExec.outputPartitioning (if its not the partiallyClusteredDistribution case), to make the two sides match during this scenario, after the first pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @szehon-ho !
} | ||
|
||
|
||
partitioning.expressions.forall{e => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after forall
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @szehon-ho !
…s than partition keys ### What changes were proposed in this pull request? SPJ: Support auto-shuffle one side + less join keys than partition keys ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No.
190a76f
to
26589ec
Compare
rebase |
Merge to master. Thanks! |
Thanks @sunchao ! |
…s than partition keys ### What changes were proposed in this pull request? This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background: - Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning. - "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..). This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset: 1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys). 2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case) 3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys). Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam"). Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2). The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` The fix is to do the de-duplication in first pass. 1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side) 2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side). ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Chao Sun <[email protected]>
…s than partition keys (apache#2005) ### What changes were proposed in this pull request? This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background: - Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning. - "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..). This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset: 1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys). 2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case) 3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys). Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam"). Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2). The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` The fix is to do the de-duplication in first pass. 1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side) 2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side). ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Chao Sun <[email protected]> Co-authored-by: Szehon Ho <[email protected]>
…s than partition keys ### What changes were proposed in this pull request? This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background: - Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning. - "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..). This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset: 1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys). 2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case) 3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys). Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam"). Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2). The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` The fix is to do the de-duplication in first pass. 1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side) 2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side). ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Chao Sun <[email protected]>
…s than partition keys ### What changes were proposed in this pull request? This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background: - Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning. - "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..). This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset: 1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys). 2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case) 3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys). Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam"). Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2). The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` The fix is to do the de-duplication in first pass. 1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side) 2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side). ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Chao Sun <[email protected]>
…s than partition keys ### What changes were proposed in this pull request? This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background: - Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning. - "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..). This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset: 1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys). 2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case) 3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys). Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam"). Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2). The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` The fix is to do the de-duplication in first pass. 1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side) 2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side). ### Why are the changes needed? This is the last planned scenario for SPJ not yet supported. ### How was this patch tested? Update existing unit test in KeyGroupedPartitionSuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Chao Sun <[email protected]>
What changes were proposed in this pull request?
This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background:
This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:
Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).
The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join:
The fix is to do the de-duplication in first pass.
Why are the changes needed?
This is the last planned scenario for SPJ not yet supported.
How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite
Was this patch authored or co-authored using generative AI tooling?
No.