-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3861][SQL] Avoid rebuilding hash tables for broadcast joins on each partition #2727
Conversation
BroadcastHashJoin builds a new hash table for each partition. We can build it once per node and reuse the hash table.
…sh-1 Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
Test FAILed. |
QA tests have started for PR 2727 at commit
|
QA tests have finished for PR 2727 at commit
|
QA tests have started for PR 2727 at commit
|
|
||
override def get(key: Row) = { | ||
val v = hashTable.get(key) | ||
if (v eq null) null else CompactBuffer(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will that cause too many CompactBuffer
object created if there are so many duplicated records in stream side with single match in build side? Or the GeneralHashedRelation
performs great enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have a new operator that specializes for unique key joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I mean for each row in stream side, will create a CompactBuffer
instance if it finds a matched row in build side, this probably too heavy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea. What I meant was we will add a new operator that specializes for unique key joins, and that operator would just call getValue, bypassing the creation of CompactBuffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even so can't we reuse the same compact buffer? Also should the semantic be to return null or an empty buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be null, since that's what a normal hashmap would return, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really a normal hashmap its for joins, and an empty compact buffer seems like a pretty clear way to indicate no matches found. Then you don't have to special case null on the other side. You just join with whatever rows are returned.
Though I guess that doesn't work great with your getValue idea below....
QA tests have finished for PR 2727 at commit
|
Test PASSed. |
Ok I updated the code to reuse the CompactBuffer. |
QA tests have started for PR 2727 at commit
|
QA tests have finished for PR 2727 at commit
|
Test FAILed. |
This reverts commit 97626a1.
Test FAILed. |
QA tests have started for PR 2727 at commit
|
QA tests have finished for PR 2727 at commit
|
I reverted the compact buffer reuse because it is not safe to do that with JoinedRow. |
@marmbrus ready to merge? |
No description provided.