Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48613][SQL] SPJ: Support auto-shuffle one side + less join keys than partition keys #47064

Closed

Conversation

szehon-ho
Copy link
Contributor

@szehon-ho szehon-ho commented Jun 22, 2024

What changes were proposed in this pull request?

This is the final planned SPJ scenario: auto-shuffle one side + less join keys than partition keys. Background:

  • Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
  • "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns). It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec. The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario. Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

  1. EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
  2. EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
  3. BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times. After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange. But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions. This leads to following assert error from the join:

requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)

The fix is to do the de-duplication in first pass.

  1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
  2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

How was this patch tested?

Update existing unit test in KeyGroupedPartitionSuite

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jun 22, 2024
Copy link
Contributor Author

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some explanations

@@ -434,8 +434,13 @@ object KeyGroupedPartitioning {
val projectedOriginalPartitionValues =
originalPartitionValues.map(project(expressions, projectionPositions, _))

KeyGroupedPartitioning(projectedExpressions, projectedPartitionValues.length,
projectedPartitionValues, projectedOriginalPartitionValues)
val finalPartitionValues = projectedPartitionValues
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is de-duping the non-partitioned side (while creating the KeyGroupedPartition based on the partitioned side)

@@ -130,6 +130,15 @@ case class BatchScanExec(
}
k.copy(expressions = expressions, numPartitions = newPartValues.length,
partitionValues = newPartValues)
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is de-dupiing the outputPartitioning of the partitioned side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is better to combine this clause with the clause of

case k: KeyGroupedPartitioning if spjParams.commonPartitionValues.isDefined

above, since there are some common logic. It's also worthwhile to add some comments here too, indicating that this case only applies when only one side of the join is KeyGroupedPartitioning and thus commonPartitionValues is not defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined in latest pr

@@ -279,7 +288,8 @@ case class StoragePartitionJoinParams(
case other: StoragePartitionJoinParams =>
this.commonPartitionValues == other.commonPartitionValues &&
this.replicatePartitions == other.replicatePartitions &&
this.applyPartialClustering == other.applyPartialClustering
this.applyPartialClustering == other.applyPartialClustering &&
this.joinKeyPositions == other.joinKeyPositions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've always wondered why these fields are not in equals. It turns out they have to be in order to properly run node.mapChildren() , otherwise fastEquals will determine the node is the same and skip, if these fields are different. Ref: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L1224

@szehon-ho szehon-ho force-pushed the spj_less_join_key_auto_shuffle branch from 8838f8d to 8940f6e Compare June 22, 2024 02:03
@@ -175,7 +175,11 @@ case class EnsureRequirements(
child
case ((child, dist), idx) =>
if (bestSpecOpt.isDefined && bestSpecOpt.get.isCompatibleWith(specs(idx))) {
child
bestSpecOpt match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pushing down the join key to the BatchScanExec of the partitioned side (to do de-duping).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments here? to explain this is the case when isKeyGroupCompatible is false, but we still want to avoid shuffling on one side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

@szehon-ho
Copy link
Contributor Author

@sunchao can you take a look?

@@ -130,6 +130,15 @@ case class BatchScanExec(
}
k.copy(expressions = expressions, numPartitions = newPartValues.length,
partitionValues = newPartValues)
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is better to combine this clause with the clause of

case k: KeyGroupedPartitioning if spjParams.commonPartitionValues.isDefined

above, since there are some common logic. It's also worthwhile to add some comments here too, indicating that this case only applies when only one side of the join is KeyGroupedPartitioning and thus commonPartitionValues is not defined?

@@ -175,7 +175,11 @@ case class EnsureRequirements(
child
case ((child, dist), idx) =>
if (bestSpecOpt.isDefined && bestSpecOpt.get.isCompatibleWith(specs(idx))) {
child
bestSpecOpt match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments here? to explain this is the case when isKeyGroupCompatible is false, but we still want to avoid shuffling on one side?

@@ -130,6 +130,15 @@ case class BatchScanExec(
}
k.copy(expressions = expressions, numPartitions = newPartValues.length,
partitionValues = newPartValues)
case k: KeyGroupedPartitioning if spjParams.joinKeyPositions.isDefined =>
val expressions = spjParams.joinKeyPositions.get.map(i => k.expressions(i))
val newPartValues = k.partitionValues.map{r =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why this is necessary. I see that in BatchScanExec we are also doing something similar, by checking the joinKeyPositions and call uniquePartitionValues at the end to de-dup.

Copy link
Contributor Author

@szehon-ho szehon-ho Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is the same code, let me know if you think of a way to code reuse.

The current uniquePartitionValues is called only at the end (the deferred grouping, while calculating the BatchScanExec's inputRDD), so its a bit too late. This same logic is needed in BatchScanExec.outputPartitioning (if its not the partiallyClusteredDistribution case), to make the two sides match during this scenario, after the first pass.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @szehon-ho !

}


partitioning.expressions.forall{e =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after forall

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @szehon-ho !

…s than partition keys

  ### What changes were proposed in this pull request?

SPJ: Support auto-shuffle one side + less join keys than partition keys

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.
@szehon-ho szehon-ho force-pushed the spj_less_join_key_auto_shuffle branch from 190a76f to 26589ec Compare July 13, 2024 17:45
@szehon-ho
Copy link
Contributor Author

rebase

@sunchao sunchao closed this in 206cc1a Jul 14, 2024
@sunchao
Copy link
Member

sunchao commented Jul 14, 2024

Merge to master. Thanks!

@szehon-ho
Copy link
Contributor Author

Thanks @sunchao !

jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…s than partition keys

### What changes were proposed in this pull request?

This is the final planned SPJ scenario:  auto-shuffle one side + less join keys than partition keys.  Background:

- Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
- "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns).  It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec.  The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario.  Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

1.  EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
2.  EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
3.  BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times.  After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange.  But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions.  This leads to following assert error from the join:

```
requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
```

The fix is to do the de-duplication in first pass.

1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.

Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…s than partition keys (apache#2005)

### What changes were proposed in this pull request?

This is the final planned SPJ scenario:  auto-shuffle one side + less join keys than partition keys.  Background:

- Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
- "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns).  It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec.  The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario.  Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

1.  EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
2.  EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
3.  BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times.  After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange.  But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions.  This leads to following assert error from the join:

```
requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
```

The fix is to do the de-duplication in first pass.

1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.

Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle.

Authored-by: Szehon Ho <[email protected]>

Signed-off-by: Chao Sun <[email protected]>
Co-authored-by: Szehon Ho <[email protected]>
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Sep 24, 2024
…s than partition keys

### What changes were proposed in this pull request?

This is the final planned SPJ scenario:  auto-shuffle one side + less join keys than partition keys.  Background:

- Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
- "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns).  It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec.  The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario.  Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

1.  EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
2.  EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
3.  BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times.  After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange.  But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions.  This leads to following assert error from the join:

```
requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
```

The fix is to do the de-duplication in first pass.

1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.

Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…s than partition keys

### What changes were proposed in this pull request?

This is the final planned SPJ scenario:  auto-shuffle one side + less join keys than partition keys.  Background:

- Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
- "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns).  It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec.  The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario.  Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

1.  EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
2.  EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
3.  BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times.  After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange.  But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions.  This leads to following assert error from the join:

```
requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
```

The fix is to do the de-duplication in first pass.

1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.

Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…s than partition keys

### What changes were proposed in this pull request?

This is the final planned SPJ scenario:  auto-shuffle one side + less join keys than partition keys.  Background:

- Auto-shuffle works by creating ShuffleExchange for the non-partitioned side, with a clone of the partitioned side's KeyGroupedPartitioning.
- "Less join key than partition key" works by 'projecting' all partition values by join keys (ie, keeping only partition columns that are join columns).  It makes a target KeyGroupedShuffleSpec with 'projected' partition values, and then pushes this down to BatchScanExec.  The BatchScanExec then 'groups' its projected partition value (except in the skew case but that's a different story..).

This combination is hard because the SPJ planning calls is spread in several places in this scenario.  Given two sides, a non-partitioned side and a partitioned side, and the join keys are only a subset:

1.  EnsureRequirements creates the target KeyGroupedShuffleSpec from the join's required distribution (ie, using only the join keys, not all partition keys).
2.  EnsureRequirements copies this to the non-partitoned side's KeyGroupedPartition (for the auto-shuffle case)
3.  BatchScanExec groups the partitions (for the partitioned side), including by join keys (if they differ from partition keys).

Take the example partition columns (id, name) , and partition values: (1, "bob"), (2, "alice"), (2, "sam").
Projection leaves us (1, 2, 2), and the final grouped partition values are (1, 2).

The problem is, that the two sides of the join do not match at all times.  After the steps 1 and 2, the partitioned side has the 'projected' partition values (1, 2, 2), and the non-partitioned side creates a matching KeyGroupedPartitioning (1, 2, 2) for ShuffleExechange.  But on step 3, the BatchScanExec for partitioned side groups the partitions to become (1, 2), but the non-partitioned side does not group and still retains (1, 2, 2) partitions.  This leads to following assert error from the join:

```
requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
```

The fix is to do the de-duplication in first pass.

1. Pushing down join keys to the BatchScanExec to return a de-duped outputPartitioning (partitioned side)
2. Creating the non-partitioned side's KeyGroupedPartitioning with de-duped partition keys (non-partitioned side).

  ### Why are the changes needed?

This is the last planned scenario for SPJ not yet supported.

  ### How was this patch tested?
Update existing unit test in KeyGroupedPartitionSuite

  ### Was this patch authored or co-authored using generative AI tooling?
 No.

Closes apache#47064 from szehon-ho/spj_less_join_key_auto_shuffle.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants