-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning #28676
Closed
Closed
[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning #28676
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
93947ab
initial checkin
imback82 225e250
Merge branch 'master' into broadcast_join_output
imback82 985834b
update comment
imback82 683a705
fix BroadcastJoinSuite tests, address comment on join type.
imback82 6fa753f
Merge branch 'master' into broadcast_join_output
imback82 dedce0c
Merge branch 'master' into broadcast_join_output
imback82 488e051
Add more checks
imback82 cac3829
Revert back to previous impl.
imback82 febc402
remove import
imback82 1ea931b
Address PR comments
imback82 c5f4803
formatting
imback82 63fdb0f
Merge branch 'master' into broadcast_join_output
imback82 126ee53
address PR comments
imback82 794890f
Merge branch 'master' into broadcast_join_output
imback82 afa5aca
Address PR comments
imback82 51187dc
Address PR comments
imback82 80df4dc
Address PR comment
imback82 ba19acb
clean up header + change it to support inner-like joins.
imback82 9caeecd
Address PR comments
imback82 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,10 +24,11 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession} | |
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft} | ||
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} | ||
import org.apache.spark.sql.catalyst.plans.logical.BROADCAST | ||
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection} | ||
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} | ||
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} | ||
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec | ||
import org.apache.spark.sql.execution.exchange.EnsureRequirements | ||
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} | ||
import org.apache.spark.sql.functions._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.test.SQLTestUtils | ||
|
@@ -415,6 +416,95 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils | |
assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs.")) | ||
} | ||
} | ||
|
||
test("broadcast join where streamed side's output partitioning is PartitioningCollection") { | ||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { | ||
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") | ||
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2") | ||
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3") | ||
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4") | ||
|
||
// join1 is a sort merge join (shuffle on the both sides). | ||
val join1 = t1.join(t2, t1("i1") === t2("i2")) | ||
val plan1 = join1.queryExecution.executedPlan | ||
assert(collect(plan1) { case s: SortMergeJoinExec => s }.size == 1) | ||
assert(collect(plan1) { case e: ShuffleExchangeExec => e }.size == 2) | ||
|
||
// join2 is a broadcast join where t3 is broadcasted. Note that output partitioning on the | ||
// streamed side (join1) is PartitioningCollection (sort merge join) | ||
val join2 = join1.join(t3, join1("i1") === t3("i3")) | ||
val plan2 = join2.queryExecution.executedPlan | ||
assert(collect(plan2) { case s: SortMergeJoinExec => s }.size == 1) | ||
assert(collect(plan2) { case e: ShuffleExchangeExec => e }.size == 2) | ||
val broadcastJoins = collect(plan2) { case b: BroadcastHashJoinExec => b } | ||
assert(broadcastJoins.size == 1) | ||
broadcastJoins(0).outputPartitioning match { | ||
case p: PartitioningCollection | ||
if p.partitionings.forall(_.isInstanceOf[HashPartitioning]) => | ||
// two partitionings from sort merge join and one from build side. | ||
assert(p.partitionings.size == 3) | ||
case _ => fail() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: For bretter test error messages,
Or, could you add error messages in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed as suggested. |
||
} | ||
|
||
// Join on the column from the broadcasted side (i3) and make sure output partitioning | ||
// is maintained by checking no shuffle exchange is introduced. Note that one extra | ||
// ShuffleExchangeExec is from t4, not from join2. | ||
val join3 = join2.join(t4, join2("i3") === t4("i4")) | ||
val plan3 = join3.queryExecution.executedPlan | ||
assert(collect(plan3) { case s: SortMergeJoinExec => s }.size == 2) | ||
assert(collect(plan3) { case b: BroadcastHashJoinExec => b }.size == 1) | ||
assert(collect(plan3) { case e: ShuffleExchangeExec => e }.size == 3) | ||
|
||
// Validate the data with boradcast join off. | ||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { | ||
val df = join2.join(t4, join2("i3") === t4("i4")) | ||
QueryTest.sameRows(join3.collect().toSeq, df.collect().toSeq) | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
test("broadcast join where streamed side's output partitioning is HashPartitioning") { | ||
withTable("t1", "t3") { | ||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { | ||
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") | ||
val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2") | ||
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") | ||
df1.write.format("parquet").bucketBy(8, "i1", "j1").saveAsTable("t1") | ||
df3.write.format("parquet").bucketBy(8, "i3", "j3").saveAsTable("t3") | ||
val t1 = spark.table("t1") | ||
val t3 = spark.table("t3") | ||
|
||
// join1 is a broadcast join where df2 is broadcasted. Note that output partitioning on the | ||
// streamed side (t1) is HashPartitioning (bucketed files). | ||
val join1 = t1.join(df2, t1("i1") === df2("i2") && t1("j1") === df2("j2")) | ||
val plan1 = join1.queryExecution.executedPlan | ||
assert(collect(plan1) { case e: ShuffleExchangeExec => e }.isEmpty) | ||
val broadcastJoins = collect(plan1) { case b: BroadcastHashJoinExec => b } | ||
assert(broadcastJoins.size == 1) | ||
broadcastJoins(0).outputPartitioning match { | ||
case p: PartitioningCollection | ||
if p.partitionings.forall(_.isInstanceOf[HashPartitioning]) => | ||
// one partitioning from streamed side and one from build side. | ||
assert(p.partitionings.size == 2) | ||
case _ => fail() | ||
} | ||
|
||
// Join on the column from the broadcasted side (i2, j2) and make sure output partitioning | ||
// is maintained by checking no shuffle exchange is introduced. | ||
val join2 = join1.join(t3, join1("i2") === t3("i3") && join1("j2") === t3("j3")) | ||
val plan2 = join2.queryExecution.executedPlan | ||
assert(collect(plan2) { case s: SortMergeJoinExec => s }.size == 1) | ||
assert(collect(plan2) { case b: BroadcastHashJoinExec => b }.size == 1) | ||
assert(collect(plan2) { case e: ShuffleExchangeExec => e }.isEmpty) | ||
|
||
// Validate the data with broadcast join off. | ||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { | ||
val df = join1.join(t3, join1("i2") === t3("i3") && join1("j2") === t3("j3")) | ||
QueryTest.sameRows(join2.collect().toSeq, df.collect().toSeq) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
class BroadcastJoinSuite extends BroadcastJoinSuiteBase with DisableAdaptiveExecutionSuite | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val or lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to
lazy val