-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail #20561
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import org.apache.spark.storage.BlockManager; | ||
import org.apache.spark.unsafe.KVIterator; | ||
import org.apache.spark.unsafe.Platform; | ||
import org.apache.spark.unsafe.array.LongArray; | ||
import org.apache.spark.unsafe.map.BytesToBytesMap; | ||
import org.apache.spark.unsafe.memory.MemoryBlock; | ||
import org.apache.spark.util.collection.unsafe.sort.*; | ||
|
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( | |
numElementsForSpillThreshold, | ||
canUseRadixSort); | ||
} else { | ||
// The array will be used to do in-place sort, which require half of the space to be empty. | ||
// Note: each record in the map takes two entries in the array, one is record pointer, | ||
// another is the key prefix. | ||
assert(map.numKeys() * 2 <= map.getArray().size() / 2); | ||
// `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but | ||
// `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` | ||
// can have duplicated keys, here we need a check to make sure the point array can hold | ||
// all the entries in `BytesToBytesMap`. | ||
final LongArray pointArray; | ||
// The point array will be used to do in-place sort, which require half of the space to be | ||
// empty. Note: each record in the map takes two entries in the point array, one is record | ||
// pointer, another is the key prefix. | ||
if (map.numValues() > map.getArray().size() / 4) { | ||
pointArray = map.allocateArray(map.numValues() * 4); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The allocation may fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since overflow may occur (e.g. 0x70000000 * 4), should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} else { | ||
pointArray = map.getArray(); | ||
} | ||
|
||
// During spilling, the array in map will not be used, so we can borrow that and use it | ||
// as the underlying array for in-memory sorter (it's always large enough). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we update the comment here too? |
||
// Since we will not grow the array, it's fine to pass `null` as consumer. | ||
|
@@ -110,7 +121,7 @@ public UnsafeKVExternalSorter( | |
taskMemoryManager, | ||
comparatorSupplier.get(), | ||
prefixComparator, | ||
map.getArray(), | ||
pointArray, | ||
canUseRadixSort); | ||
|
||
// We cannot use the destructive iterator here because we are reusing the existing memory | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} | |
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow} | ||
import org.apache.spark.sql.test.SharedSQLContext | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.unsafe.map.BytesToBytesMap | ||
|
||
/** | ||
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data. | ||
|
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { | |
spill = true | ||
) | ||
} | ||
|
||
test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") { | ||
val memoryManager = new TestMemoryManager(new SparkConf()) | ||
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) | ||
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) | ||
|
||
// Key/value are a unsafe rows with a single int column | ||
val schema = new StructType().add("i", IntegerType) | ||
val key = new UnsafeRow(1) | ||
key.pointTo(new Array[Byte](32), 32) | ||
key.setInt(0, 1) | ||
val value = new UnsafeRow(1) | ||
value.pointTo(new Array[Byte](32), 32) | ||
value.setInt(0, 2) | ||
|
||
for (_ <- 1 to 65) { | ||
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) | ||
loc.append( | ||
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, | ||
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) | ||
} | ||
|
||
// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` | ||
// which has duplicated keys and the number of entries exceeds its capacity. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we use |
||
try { | ||
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null)) | ||
new UnsafeKVExternalSorter( | ||
schema, | ||
schema, | ||
sparkContext.env.blockManager, | ||
sparkContext.env.serializerManager, | ||
taskMemoryManager.pageSizeBytes(), | ||
Int.MaxValue, | ||
map) | ||
} finally { | ||
TaskContext.unset() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to change UnsafeInMemorySorter to have multiple entries with same key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, but it's not trivial, I'd like to do it later. The required change I can think of:
BytesToBytesMap
is actually a key -> list[value], and we need to provide a way to iterate key -> list[value] instead of key -> value.