Skip to content

Commit

Permalink
[SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
UnsafeHashedRelation should serialize numKeys out

### Why are the changes needed?
One case I found was this:
We turned on ReusedExchange(BroadcastExchange), but the returned UnsafeHashedRelation is missing numKeys.

The reason is that the current type of TorrentBroadcast._value is SoftReference, so the UnsafeHashedRelation obtained by deserialization loses numKeys, which will lead to incorrect calculation results.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added a line of assert to an existing unit test

Closes #35836 from mcdull-zhang/UnsafeHashed.

Authored-by: mcdull-zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8476c8b)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
mcdull-zhang authored and cloud-fan committed Mar 16, 2022
1 parent bc69e6c commit 302c017
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private[execution] class ValueRowWithKeyIndex {
* A HashedRelation for UnsafeRow, which is backed BytesToBytesMap.
*
* It's serialized in the following format:
* [number of keys]
* [number of keys] [number of fields]
* [size of key] [size of value] [key bytes] [bytes for value]
*/
private[joins] class UnsafeHashedRelation(
Expand Down Expand Up @@ -352,6 +352,7 @@ private[joins] class UnsafeHashedRelation(
writeInt: (Int) => Unit,
writeLong: (Long) => Unit,
writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = {
writeInt(numKeys)
writeInt(numFields)
// TODO: move these into BytesToBytesMap
writeLong(binaryMap.numKeys())
Expand Down Expand Up @@ -385,6 +386,7 @@ private[joins] class UnsafeHashedRelation(
readInt: () => Int,
readLong: () => Long,
readBuffer: (Array[Byte], Int, Int) => Unit): Unit = {
numKeys = readInt()
numFields = readInt()
resultRow = new UnsafeRow(numFields)
val nKeys = readLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession {
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)).toArray === data2)

// SPARK-38542: UnsafeHashedRelation should serialize numKeys out
assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1))

val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
hashed2.asInstanceOf[UnsafeHashedRelation].writeExternal(out2)
Expand Down

0 comments on commit 302c017

Please sign in to comment.