Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail #20561

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.unsafe.sort.*;
Expand Down Expand Up @@ -98,19 +99,33 @@ public UnsafeKVExternalSorter(
numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
// Note: each record in the map takes two entries in the array, one is record pointer,
// another is the key prefix.
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underlying array for in-memory sorter (it's always large enough).
// Since we will not grow the array, it's fine to pass `null` as consumer.
// During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
// that and use it as the pointer array for `UnsafeInMemorySorter`.
LongArray pointerArray = map.getArray();
// `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but
// `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
// `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer
// array can hold all the entries in `BytesToBytesMap`.
// The pointer array will be used to do in-place sort, which requires half of the space to be
// empty. Note: each record in the map takes two entries in the pointer array, one is record
// pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`.
// TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
// so that we can always reuse the pointer array.
if (map.numValues() > pointerArray.size() / 4) {
// Here we ask the map to allocate memory, so that the memory manager won't ask the map
// to spill, if the memory is not enough.
pointerArray = map.allocateArray(map.numValues() * 4L);
}

// Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed
// to be large enough, it's fine to pass `null` as consumer because we won't allocate more
// memory.
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
null,
taskMemoryManager,
comparatorSupplier.get(),
prefixComparator,
map.getArray(),
pointerArray,
canUseRadixSort);

// We cannot use the destructive iterator here because we are reusing the existing memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down Expand Up @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
spill = true
)
}

test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
val memoryManager = new TestMemoryManager(new SparkConf())
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())

// Key/value are a unsafe rows with a single int column
val schema = new StructType().add("i", IntegerType)
val key = new UnsafeRow(1)
key.pointTo(new Array[Byte](32), 32)
key.setInt(0, 1)
val value = new UnsafeRow(1)
value.pointTo(new Array[Byte](32), 32)
value.setInt(0, 2)

for (_ <- 1 to 65) {
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
}

// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we use BytesToBytesMap to build the broadcast join hash relation, which may have duplicated keys. I only create a new pointer array if the existing one is not big enough, so we won't have performance regression for aggregate.

try {
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
new UnsafeKVExternalSorter(
schema,
schema,
sparkContext.env.blockManager,
sparkContext.env.serializerManager,
taskMemoryManager.pageSizeBytes(),
Int.MaxValue,
map)
} finally {
TaskContext.unset()
}
}
}